RabbitMQ 消息队列
PY
threading Queue
进程Queue 父进程与子进程,或同一父进程下的多个子进程进行交互
缺点:两个不同Python文件不能通过上面两个Queue进行交互
erlong
基于这个语言创建的一种中间商
win中需要先安装erlong才能使用
rabbitmq_server start
安装 Python module
pip install pika
or
easy_install pika
or
源码
rabbit 默认端口15672
查看当前时刻的队列数
rabbitmqctl.bat list_queue
exchange
在定义的时候就是有类型的,决定到底哪些queue符合条件,可以接受消息
fanout:所有bind到此exchange的queue都可以收到消息
direct:通过routingkey和exchange决定唯一的queue可以接受消息
topic: 所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue都可以接受消息
表达式符号说明:
# 代表一个或多个字符 * 代表任何字符
RPC
remote procedure call 双向传输,指令<-------->指令执行结果
实现方法: 创建两个队列,一个队列收指令,一个队列发送执行结果
用rabbitmq实现简单的生产者消费者模型
1) rabbit_producer.py
# Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() # create the queue, the name of queue is "hello" # durable=True can make the queue be exist, although the service have stopped before. channel.queue_declare(queue="hello", durable=True) # n RabbitMQ a message can never be sent directly to queue,it always need to go through channel.basic_publish(exchange = " ", routing_key = "hello", body = "Hello world!", properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) ) print("[x] sent 'Hello world!'") connection.close()
2) rabbit_consumer.py
# Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.queue_declare(queue="hello", durable=True) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print("------>", ch, method, properties ) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # follow is for consumer to auto change with the ability channel.basic_qos(profetch_count=1) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello", no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的fanout模式实现广播模式
1) fanout_rabbit_publish.py
# Author : Xuefeng import pika import sys # 广播模式: # 生产者发送一条消息,所有的开通链接的消费者都可以接收到消息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="logs", type="fanout") message = ' '.join(sys.argv[1:]) or "info:Hello world!" channel.basic_publish( exchange="logs", routing_key="", body=message ) print("[x] Send %r" % message) connection.close()
2) fanout_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:", queue_name) channel.queue_bind(exchange="logs", queue=queue_name) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print("------>", ch, method, properties ) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello", no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的direct模式实现消息过滤模式
1) direct_rabbit_publisher.py
# Author : Xuefeng import pika import sys # 消息过滤模式: # 生产者发送一条消息,通过severity优先级来确定是否可以接收到消息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="direct_logs", type="direct") severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or "info:Hello world!" channel.basic_publish( exchange="direct_logs", routing_key=severity, body=message ) print("[x] Send %r:%r" % (severity, message)) connection.close()
2) direct_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="direct_logs", type="direct") # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:", queue_name) severities = sys.argv[1:] if not severities: sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print("------>", ch, method, properties ) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = "hello", no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的topic模式实现细致消息过滤模式
1) topic_rabbit_publisher.py
# Author : Xuefeng import pika import sys # 消息细致过滤模式: # 生产者发送一条消息,通过运行脚本 *.info 等确定接收消息类型进行对应接收 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="topic_logs", type="topic") binding_key = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or "info:Hello world!" channel.basic_publish( exchange="topic_logs", routing_key=binding_key, body=message ) print("[x] Send %r:%r" % (binding_key, message)) connection.close()
2) topic_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="topic_logs", type="topic") # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:", queue_name) binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange="topic_logs", queue=queue_name, routing_key=binding_key) def callback(ch, method, properties, body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print("------>", ch, method, properties) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer, # when the current consumer is stop by accident. channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue="hello", no_ack=True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq实现rpc操作
1) Rpc_rabbit_client.py
# Author : Xuefeng import pika import time import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 随机的生成一个接收命令执行结果的队列 self.channel.basic_consume(self.on_response, # 只要收到消息就调用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange="", routing_key="rpc_queue", properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通过随机生成的ID来验证指令执行结果与指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有没有消息都继续 print("no message...") time.sleep(0.5) return int(self.response) fibonacci_rcp = FibonacciRpcClient() print("[x] Requesting fib(30)") response = fibonacci_rcp.call(30) print("[x] Rec %r" % response)
2) Rpc_rabbit_server.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2) def on_request(ch, method, props, body): n = int(body) print("[.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange="", routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue="rpc_queue") print("[x] Awaiting RPC requests") channel.start_consumeing() channel.exchange_declare(exchange="direct_logs", type="direct") # exclusive 排他,唯一的 随机生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:", queue_name) severities = sys.argv[1:]
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]