In this blog post, I will discuss the Redis Message Queue, Stream type of
Redis. Redis has introduced a new data type called Stream type since 5.0,
which is specially designed for message queues. There are many more message
queues such as RabbitMQ, Kafka but here we will just discuss how to implement
Redis as a message queue.
In a distributed system, when two components want to communicate based on a
message queue, one component sends a message to the message queue, which we
call a producer, and the other component consumes the message, and then
processes it, which we call a consumer. As shown below:
The message queue is especially to clear up the trouble of processing
inconsistent competencies among producers and consumers. It is often an
indispensable middleware in large factories. Redis had a message queue
function based on publisher and subscriber (pub/sub) before 5.0.
Redis has a disadvantage that when there is a Redis downtime, network
disconnection, etc., messages get discarded. However, Redis Stream provides
message persistence and master-slave replication functions, allowing any
client to access the data at any time, and remember the location of each
client’s access, and ensure that the message is not lost.
Redis Message Queue Commands
1. XADD
ADD infotipsnews * 1 hello
XADD is used to insert a message into the message queue(In the Current Example
message queue name is infotipsnews). The key of the message is the 1 and the
value is “hello”. The “*” after infotipsnews auto-generate globally unique
ID.
It is automatically generated for the inserted message, 1631288930852-0, the
first half of 1631288930852 indicates the UNIX time in milliseconds of the
server, and the second half of 0 is a message sequence number in order to
distinguish messages which are delivered at the same time.
2. XTRIM
XTRIM infotipsnews maxlen 100
It is used to remove older entries from the message queue based on parameters
such as MAXLEN or MINID. When the Stream reaches the maximum length, the old
messages will be deleted. In the above example, if the stream reaches a maximum
length of 5 then it will delete the old message.
Due to the internal implementation mechanism of the stream, and accurate
setting of an upper limit of length will consume more resources, so we
generally adopt a fuzzy setting method: XTRIM infotipsnews maxlen ~ 5 , which
means that the length can exceed 5, which can be 6, 9, etc., It is up to redis
to determine when to truncate.
3. XLEN
It returns the number of entries inside the stream. In the above example, XLEN
returns the length of the message queue i.e. infotipsnews.
4. XDEL
It is used to remove specific entries from the message queue. In the above
example, the command Indicates to delete the message with ID 1631288930852-0
in the message queue infotipsnews.
5. XRANGE
XRANGE infotipsnews - +
It is used to read the message. “$” represents the latest message, and “block
10000” is blocking time in milliseconds i.e. 10s. In the above example, XREAD is
reading a message, if no message arrives, XREAD will block for 10s and then
return NIL. If a message arrives within 10s then the message is returned.6. XREAD
XREAD block 10000 streams infotipsnews $
It is used to read the message. “$” represents the latest message, and “block
10000” is blocking time in milliseconds i.e. 10s. In the above example, XREAD is
reading a message, if no message arrives, XREAD will block for 10s and then
return NIL. If a message arrives within 10s then the message is returned.7. XGROUP
XGROUP CREATE infotipsnews mygroup 0
XGROUP is used when you want to create New Consumer, Destroy a Consumer Group,
Delete Specific Consumer etc. In the above example, I have created a consumer
group mygroup for the message queue infotipsnews, 0 means to read from the very
beginning position.
In Order to Destroy the Consumer execute XGROUP DESTROY infotipsnews
consumers.
XREADGROUP is a special version of XREAD with the support of consumer groups. In
the above example, consumer1 in the consumer group mygroup reads all messages
from the message queue infotipsnews, where “>” means to start reading from the
first unconsumed message.
8. XREADGROUP
XREADGROUP group mygroup consumer1 streams infotipsnews >
It should be noted that once the message in the message queue is consumed by a
consumer in the consumer group, it can no longer be read by other consumers in
the consumer group. The purpose of using consumer groups is to allow multiple
consumers in the group to share and read messages.
Therefore, we usually let each consumer read part of the message so that the
message read load is evenly distributed among multiple consumers.
9. XPENDING
XPENDING infotipsnews mygroup
In order to ensure that consumers can still obtain unprocessed messages after a
failure and restart, Streams will automatically use an internal queue to store
the messages read by each consumer in the consumer group until the consumer uses
the XACK command to notify Streams, The message has been processed. When the
consumer restarts, you can use the XPENDING command to view the messages that
have been read but have not been confirmed.
10. XACK
XACK infotipsnews mygroup 1631289246997-0
It means that the consumer group mygroup has confirmed that it has processed the message with id 1631289246997-0 in the test message queue.
So far, we have understood the usage of using the Stream type to implement message queues.
Why do we use Redis as a message queue
To use message queues, you should use special message queue middleware such as Kafka and RabbitMQ, and Redis is more suitable for caching. In fact, I think that the technology used is related to the application scenario you are currently encountering.
If your message communication is not large and you are not sensitive to data loss, then using Redis as a message queue is a good way. After all, Redis is compared to Kafka. For professional messaging systems, it is more lightweight and has low maintenance costs.
Comments
Post a Comment