安装客户端python-pika
python-pika.spec redhat 6
点击(此处)折叠或打开
- %{!?python_sitelib: %global python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
- %define short_name pika
- Name: python-%{short_name}
- Version: 0.10.0
- Release: 0%{?dist}
- Summary: AMQP 0-9-1 client library for Python
- Group: Development/Libraries
- License: MPLv1.1 or GPLv2
- URL: %{short_name}/%{short_name}
- # The tarball comes from here:
- # %{short_name}/%{short_name}/tarball/v%{version}
- # GitHub has layers of redirection and renames that make this a troublesome
- # URL to include directly.
- Source0: %{short_name}-%{version}.tar.gz
- BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
- BuildArch: noarch
- BuildRequires: python-setuptools
- BuildRequires: python-devel
- Requires: python-pyev
- Requires: python-tornado
- Requires: python-twisted
- Requires: python >= 2.6.5
- %description
- Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that
- tries to stay fairly independent of the underlying network support
- library.
- %prep
- %setup -q -n %{short_name}-%{version}
- %build
- %{__python} setup.py build
- %install
- %{__rm} -rf %{buildroot}
- %{__python} setup.py install -O1 --skip-build --root %{buildroot}
- install -D -m 644 LICENSE %{buildroot}%{_docdir}/%{name}-%{version}
- install -D -m 644 README.rst %{buildroot}%{_docdir}/%{name}-%{version}
- install -D -m 644 PKG-INFO %{buildroot}%{_docdir}/%{name}-%{version}
- %clean
- %{__rm} -rf %{buildroot}
- %files
- %defattr(-,root,root,-)
- %dir %{python_sitelib}/%{short_name}*
- %{python_sitelib}/%{short_name}*/*
- %doc README.rst
- %doc LICENSE
- %doc PKG-INFO
- %changelog
- * Sat Aug 20 2016 gcy - 0.10.0
- - update version 0.10.0
- * Tue Dec 13 2011 Daniel Aharon - 0.9.5-2
- - Patch pika/adapters/blocking_connection.py
- * Sun Apr 3 2011 Ilia Cheishvili - 0.9.5-1
- - Upgrade to version 0.9.5
- * Sun Mar 6 2011 Ilia Cheishvili - 0.9.4-1
- - Upgrade to version 0.9.4
- * Sat Feb 19 2011 Ilia Cheishvili - 0.9.3-1
- - Upgrade to version 0.9.3
- * Sat Oct 2 2010 Ilia Cheishvili - 0.5.2-1
- - Initial Package
安装服务端rabbitmq-server
rabbitmq配置(rpm包自己搞定)
cat /etc/rabbitmq/rabbitmq-env.conf
# 文件位置
RABBITMQ_MNESIA_BASE=/data/rabbitmq_mnesia
# 监听IP 端口
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=5673
如果上述配置无效,自己去init脚本里看怎么读配置文件的
rabbitmqctl add_vhost 添加一个vhost
rabbitmqctl add_user 添加一个用户
添加完就能用了
接下来是如何使用消息队列
先搞清楚几个比较重要的概念
1、什么是消息队列, 什么是Producer、Exchange、Queue、Consumer、Topic、Fanout、Routing key
2、no_ack
http://my.oschina.net/moooofly/blog/143883
3、流量控制
(Fair dispatch 公平分发 部分)
http://my.oschina.net/hncscwc/blog/195560
简介:默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?
通过 basic.qos 方法设置 prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。
换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
现在上简单代码,直接用pika
点击(此处)折叠或打开
- 
				#!/usr/bin/python
 
- 
				# -*- coding: UTF-8 -*-
 
- 
				
 
- 
				import pika
 
- 
				from pika import PlainCredentials
 
- 
				from pika import ConnectionParameters
 
- 
				
 
- 
				
 
- 
				def main():
 
- 
				    user = 'phptest'
 
- 
				    passwd = 'phptest'
 
- 
				    vhost = 'phptest'
 
- 
				    ip = '127.0.0.1'
 
- 
				    port = 5673
 
- 
				    identified = PlainCredentials(user, passwd)
 
- 
				    paarameters = ConnectionParameters(ip, port, vhost, identified)
 
- 
				    connection = pika.BlockingConnection(paarameters)
 
- 
				    channel = connection.channel()
 
- 
				    print 'connect success'
 
- 
				    channel.exchange_declare(exchange='gcy2', auto_delete=True)
 
- 
				    #channel._delivery_confirmation = 1
 
- 
				    print 'start send data'
 
- 
				    channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff1')
 
- 
				    channel.basic_publish(exchange='gcy2',routing_key='a', body='wtffffff2')
 
- 
				    print 'end'
 
- 
				
 
- 
				
 
- 
				if __name__ == '__main__':
 
- main()
发送者部分非常简单
账号密码用PlainCredentials类封装
连接的参数用ConnectionParameters封装, rabbitmq的vhost相当于mysql实例一个库,用来互相隔离权限范围的
BlockingConnection类就是一个rabbitmq的连接,如果要用多连接的话,pika有一个pika-pool的库,在openstack里,server端是单连接的,只有发送端才用到了多连接。
不要看BlockingConnection是block打头就以为是block的了,实际上blockconnection是一个分装好的上层类,实际会调用下面的select pool epoll 甚至event。一般都直接使用BlockingConnection,openstack就是用的BlockingConnection(linux上会以epoll来处理socket数据,下次详细讲openstack的rpc通信的时候会详细说明)
channel这个玩意比较蛋碎,之前我看了很久就是为了看明白为什么不直接用connection还要在connection上封一层channel,后来大致明白,其实就是为了不多建立多个connection也能做隔离
数据实际是从connection获取到后分发到channel
生成channel后, 声明一个叫gcy2的 交换机(exchange), 默认的exchange_type是direct,即单点的,
然后发送数据 到gcy2 这个exchange, 接收者的routing_key是a
发送者代码完结
接收者会稍微复杂一些
点击(此处)折叠或打开
- 
				def main():
 
- 
				    user = 'phptest'
 
- 
				    passwd = 'phptest'
 
- 
				    vhost = 'phptest'
 
- 
				    ip = '127.0.0.1'
 
- 
				    port = 5673
 
- 
				    identified = PlainCredentials(user, passwd)
 
- 
				    paarameters = ConnectionParameters(ip, port, vhost, identified)
 
- 
				    connection = pika.BlockingConnection(paarameters)
 
- 
				    channel = connection.channel()
 
- 
				    print 'connect success'
 
- 
				    channel.queue_declare(queue='myqeueu')
 
- 
				    channel.queue_bind(queue='myqeueu', exchange='gcy2', routing_key='a')
 
- 
				    get_list = []
 
- 
				    def callback(ch, method, properties, body):
 
- 
				        print 'get body %s ' % body,
 
- 
				        get_list.append([method.delivery_tag, body])
 
- 
				        print method.consumer_tag
 
- 
				        #ch.basic_ack(method.delivery_tag)
 
- 
				        #ch.stop_consuming()
 
- 
				    channel.basic_qos(prefetch_count=5)
 
- 
				    tag1 = channel.basic_consume(callback, queue='myqeueu', no_ack=True)
 
- 
				    tag2 = channel.basic_consume(callback, queue='myqeueu', no_ack=False)
 
- 
				    print tag1
 
- 
				    print tag2
 
- 
				    def get_data():
 
- 
				        while 1:
 
- 
				            last_queue_size = len(get_list)
 
- 
				            if last_queue_size >= 5:
 
- 
				                ret = get_list[:5]
 
- 
				                del get_list[:5]
 
- 
				                return ret
 
- 
				            else:
 
- 
				                connection.process_data_events()
 
- 
				                if last_queue_size == len(get_list):
 
- 
				                    ret = get_list[:5]
 
- 
				                    del get_list[:5]
 
- 
				                    return ret
 
- 
				    while 1:
 
- 
				        ret = get_data()
 
- 
				        if ret:
 
- 
				            print 'fucked ',
 
- 
				            print ret[0][1]
 
- 
				            #channel.basic_ack(ret[0][0])
 
- 
				    print 'end'
 
- 
				    connection.process_data_events()
 
- 
				    connection.close()
 
- 
				
 
- 
				
 
- 
				if __name__ == '__main__':
 
- main()
上面没有使用网上常用的start_consuming()写法,这里的写法模仿了openstack的pika驱动的写法
上述代码的写法是有问题的,我们先一步步说明再解释错误
