简单来说,rabbitMQ主要解决的就是三个问题:

解耦/异步/削峰

当然这也是所有消息中间件的特点

img

下载:

windows

https://blog.csdn.net/zhm3023/article/details/82217222

教程参考官方文档:rabbitMQ tutorials

网址 https://www.rabbitmq.com/tutorials

常见名词解释:

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

教程1:简单一对一

生产者与消费者通过队列(queue)进行信息的传输

image-20200814175736145

生产者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

教程2:work工作队列

image-20200815111838307

把任务均匀分给别人,按照worker的数量完全平均分配

image-20200815111954847

这时会有一个问题,如果分配任务的对象死了,那传递的消息就也消失了无法找回。

我们把auto_ack=True的选项删除,在加上ch.basic_ack(delivery_tag = method.delivery_tag)开始手动消息确认,这样如果任务死了,会紧接着传递给下一个人

image-20200815112613770

但是RabbitMQ退出或崩溃时,它将忘记队列和消息,为了确保该队列将在RabbitMQ节点重启后继续存在,需要将其声明为持久

channel.queue_declare(queue='hello', durable=True)

为了更公平的派遣,而非仅仅是从数量上的一个一个平均分配,更要考虑被分配任务的机器是否正在工作/有空闲的时间

image-20200815113412715

为了解决这个问题,我们可以将Channel#basic_qos通道方法与 prefetch_count = 1设置一起使用。这使用basic.qos协议方法来告诉RabbitMQ一次不向工作人员提供多个消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给尚不繁忙的下一个工作人员。

channel.basic_qos(prefetch_count=1)

image-20200815113321909

new_task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

教程3:发布与订阅

RabbitMQ消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列,交换机必须确切知道如何处理收到的消息。

image-20200815114302592

有几种交换类型可用:direct,topic,headers 和fanout,这里主要讨论fanout扇区,我们创建一个exchange称之为log

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

扇出交换指的是将接收到的所有消息广播到它知道的所有队列中

无论何时连接到Rabbit,我们都需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者甚至更好-让服务器为我们选择一个随机队列名称。我们可以通过为queue_declare提供空的queue参数来做到这一点

result = channel.queue_declare(queue='')

result.method.queue包含一个随机队列名称,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

其次,一旦使用方连接关闭,则应删除队列。有一个exclusive标志:

result = channel.queue_declare(queue='', exclusive=True)

我们已经创建了一个扇出交换和一个队列,现在我们需要告诉交换机将消息发送到我们的队列,交换和队列之间的关系称为绑定 binding

image-20200815120339203

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

image-20200815135453038

exchange里的log

image-20200815135559756

两个绑定binding的queue队列

image-20200815135709463

在发送消息结束之后,两个队列会自动取消

image-20200815135917845

receive_logs.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

emit_log.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

教程4:更加复杂的订阅系统

在教程三的基础上增加功能,使仅订阅消息的子集成为可能

Direct exchange直接交换

上一教程中的日志系统将所有消息广播给所有使用者,想要扩展它以允许根据邮件的严重性过滤邮件。例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

我们使用的是fanout,它并没有给我们太大的灵活性-它只能进行无意识的广播。

我们将使用direct交换。direct交换背后的路由算法很简单-消息进入其绑定密钥与消息的路由密钥完全匹配的队列 。

image-20200815141748835

在此设置中,我们可以看到绑定了两个队列的direct交换X。第一个队列由绑定键orange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定为green。在这样的设置中,将使用路由键orange将要发布到交换机的消息路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

Multiple bindings多重绑定

image-20200815142309335

用相同的绑定密钥绑定多个队列是完全可行的。如上图,我们可以使用绑定键blackXQ1.Q2之间添加绑定。在这种情况下,直接交换的行为类似于sanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给 Q1Q2

将以上的运用在我们的信息系统之中

image-20200815144536340

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

image-20200815145732667

教程5 更复杂的topic交换

direct虽然相比sanout有了更加好的功能性,但仍有局限性,无法基于多个条件进行路由。

例如我们可能只想听听来自’cron’的严重错误,也可以听听’kern’的所有日志,为了实现类似的功能我们就需要topic

topic exchange

使用topic的exchange不能具有任意的routing_key,它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。比如:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由关键字中可以包含任意多个单词,最多255个字节。

topic的逻辑和direct很像,使用特定路由密钥发送的消息将被传递到所有与匹配的绑定密钥绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(star)可以代替一个单词。
  • #(hash)可以替代零个或多个单词。

我们可以使用以下示例进行理解:

image-20200818145608244