redis中的消息列队总结

安装环境

mac上可以先安装一个cli端。然后下载docker并运行。

如果没有配置文件、可以先启动容器并且进入容器cp出来

或者使用

查看版本

商品服务举例图

先进入的先出。

Redis 列表(List)

lrange key 0 -1 获取列表左右元素

命令:LPUSH 、将一个或多个值插入到列表头部

语法格式:LPUSH key value1 [value2]

➜ vagrant_openresty_swoole redis-cli -h 127.0.0.1
127.0.0.1:6379> lpush order id001 id002
(integer) 2
127.0.0.1:6379> lrange order 0 -1
1) “id002”
2) “id001”
127.0.0.1:6379>

命令: RPOP 移出并获取列表的第一个元素

语法格式:RPOP key

127.0.0.1:6379> rpop order
“id001”
127.0.0.1:6379> lrange order 0 -1
1) “id002”
127.0.0.1:6379>

命令:BRPOP

移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

语法格式:BRPOP key1 [key2 ] timeout

启动N个服务、死循环同事取列队的订单信息

如果出现断电等是事情、那么会存在丢失数据的行为。可以在设置个备份list来解决、

例子:N1死循环服务、取出一个数据、放到N1备份list里面

用到的命令:Brpoplpush

 Brpoplpush 命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

代码例子:

延迟列队? 比方说新闻的延迟发布

利用redis的有序集合(sorted set) 用分值来存时间戳、进行排序

用zadd添加 、可以是一张新闻表的id也和是表和id组合取出来进行切割

ZREVRANGEBYSCORE newsList +inf -inf 倒序排列

PHP代码可以这样写

也可以做订单指定时间未支付关闭订单

结合列队实现熔断器

条件、适用于php-fpm下

熔断器是对电器起到了保护作用。

熔断器的功能、常用语rpc服务。

那么问题来了!假设有个不断运行的主程序?在某个点调用函数A假设函数A抛出了异常。 —那么明知道A函数有异常?那么是否反复去调用能呢?

答:会、因为程序不是人脑,没有那么灵活,除非修改源码。

熔断器上场

  1. 主程序不直接调 A,而是让熔断器调用A
  2. A如果有异常,熔断器记录
  3. 超过一定次数,则熔断器打开,下次则调用备用函数

代码例子

整体的原理、是代理模式。外面包裹一个try_catch运行。错误进行捕获。redis进行计数。

更新到达一定的次数、直接调用降级函数。

熔断器的三种状态

1、关。调用原函数,失败后计数器+1,并返回降级函数,到达一定阀值后,进入”开”状态

2、开。也就是直接返回降级函数,不调用原函数,但是我们也要给失败函数一些机会,于是出现搬开状态,不过需要设定一个定时器,一定时间后才进入半开状态

3、搬开状态。处理搬开状态是熔断器的核心、1、开状态下、一定时间后,自动进入半开状态。2、调用函数时 有一定数量的调用是进入的降级函数,有一定数量进入的是真实函数调用

半开状态、简单代码例子

state.php

在半开状态下、随机执行 原函数和降级函数。当执行一定次数成功原函数熔断器恢复关状态。

0为关3为开-3为半开

如下代码例子

CiruitBreaker.php

swith.php循环监听

调用test.php

Redis中的Stream类型

文档地址:http://www.redis.cn/commands/xadd.html

XADD key ID field string [field string …]

id 如果指定的ID参数是字符*(星号ASCII字符),XADD命令会自动为您生成一个唯一的ID。 但是,也可以指定

XRANGE key start end [COUNT count]

-号为最小值 +号为最大值

XREAD

语法:[COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

读取一条

还有阻塞读取 block后面跟毫秒级时间戳

目前的状态是:谁都能过来取、但是没有区分状态。但是看不出来状态,是否被消费了,所以redis添加了“组”功能。来进行查看。

消费组

消息列队,不仅仅处理一个服务,所以有多个“组”,消费者进入组里进行消费。组是和stream 进行关联的。

xgroup 命令 来创建组

0从起始点消费 $从尾部开始消费

添加

查看消息列队信息

查看组信息

消费者读取的话需要用xreadgroup命令

xreadgroup

现在 在使用xinfo groups newUsers

xreadgroup group users qidong count 10 streams newUsers

把>替换成0 就是读取pending里面去取

通过xack命令来通知redis服务器词条消息消费完成

结果剩下3条

XPENDING 命令 去读取未处理的消息

127.0.0.1:6379> xreadgroup group users c1 count 1 streams newUsers >

用xrange命令查看id的内容

xclaim 消息转发、多个消费者其中之1有出现故障的、把消息转给其他

然后使用命令后在查看未消费信息的时候、发现时间间隔啥的变短了

发表评论