作者:gfree.wind@gmail.com
博客:blog.focus-linux.net linuxfocus.blog.chinaunix.net
博客:blog.focus-linux.net linuxfocus.blog.chinaunix.net
本文的copyleft归gfree.wind@gmail.com所有,使用GPL发布,可以自由拷贝,转载。但转载请保持文档的完整性,注明原作者及原链接,严禁用于任何商业用途。
======================================================================================================
有一段时间没有更新了,开始看《深入理解Linux内核架构》,感觉写得很不错。不过暂时还是在学习阶段,所以没有什么可写的。为了保持自己学习的动力,今天继续学习一下zeromq吧。
上次看到了zmq_init中的reaper->start ();以及其对应的工作线程,今天就继续后面的代码。
- zmq::ctx_t::ctx_t (uint32_t io_threads_) :
-
tag (0xbadcafe0),
-
terminating (false)
-
{
。。。。。。
-
// Create I/O thread objects and launch them.
- /*
- 为什么i从2开始呢?这里的i是为slot的索引,在前面的代码中slot[0]为zmq_term对应的mailbox,即&term_ mailbox ,slot[1]为reaper线程对应的mailbox。所以I/O线程的mailbox的索引i只能从2开始。
这里创建并启动了I/O线程,并加入到slots中。
- */
-
for (uint32_t i = 2; i != io_threads_ + 2; i++) {
-
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
-
alloc_assert (io_thread);
-
io_threads.push_back (io_thread);
-
slots [i] = io_thread->get_mailbox ();
-
io_thread->start ();
-
}
// In the unused part of the slot array, create a list of empty slots.
/* 如注释所言,保存空的slot 索引*/
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
// Create the logging infrastructure.
log_socket = create_socket (ZMQ_PUB);
zmq_assert (log_socket);
rc = log_socket->bind ("sys://log");
zmq_assert (rc == 0);
- }
先看一下I/O thread的工作线程:
- void zmq::epoll_t::loop ()
-
{
-
epoll_event ev_buf [max_io_events];
-
-
while (!stopping) {
-
-
// Execute any due timers.
-
int timeout = (int) execute_timers ();
-
-
// Wait for events.
-
int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
-
timeout ? timeout : -1);
-
if (n == -1 && errno == EINTR)
-
continue;
-
errno_assert (n != -1);
-
-
for (int i = 0; i < n; i ++) {
-
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
-
-
if (pe->fd == retired_fd)
-
continue;
-
if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
-
pe->events->in_event ();
-
if (pe->fd == retired_fd)
-
continue;
-
if (ev_buf [i].events & EPOLLOUT)
-
pe->events->out_event ();
-
if (pe->fd == retired_fd)
-
continue;
-
if (ev_buf [i].events & EPOLLIN)
-
pe->events->in_event ();
-
}
-
-
// Destroy retired event sources.
-
for (retired_t::iterator it = retired.begin (); it != retired.end ();
-
++it)
-
delete *it;
-
retired.clear ();
-
}
- }
这个工作流程和机制和reaper线程的工作函数zmq::select_t::loop很相似。
1. 先执行到期的timer,并计算出合适的timeout;
2. 使用epoll监视事件,并根据事件类型执行不同的操作;
3. 销毁过期的poller对象;
再看一下bind的代码,这个zmq_init基本上就结束了。
- int zmq::socket_base_t::bind (const char *addr_)
-
{
-
if (unlikely (ctx_terminated)) {
-
errno = ETERM;
-
return -1;
-
}
-
-
// Parse addr_ string.
-
std::string protocol;
-
std::string address;
- /*
- 按照zeromq的格式即protocol://address解析uri
- 得到protocol和address
- */
-
int rc = parse_uri (addr_, protocol, address);
-
if (rc != 0)
-
return -1;
// 检测是否为zeromq支持的协议类型
-
rc = check_protocol (protocol);
-
if (rc != 0)
-
return -1;
-
-
if (protocol == "inproc" || protocol == "sys") {
- /*
- 为本机的一个进程间的通信,即线程间通信.
- 因为使用内存间的直接通信,所以无需使用I/O thread
- */
-
endpoint_t endpoint = {this, options};
-
return register_endpoint (addr_, endpoint);
-
}
-
-
if (protocol == "tcp" || protocol == "ipc") {
-
-
// Choose I/O thread to run the listerner in.
- /* 根据亲和性以及线程的负载load挑选一个合适的I/O线程 */
-
io_thread_t *io_thread = choose_io_thread (options.affinity);
-
if (!io_thread) {
-
errno = EMTHREAD;
-
return -1;
-
}
-
-
// Create and run the listener.
- /* 根据协议和地址,创建一个listener,并将其bind到一个I/O线程。*/
-
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
-
io_thread, this, options);
-
alloc_assert (listener);
-
int rc = listener->set_address (protocol.c_str(), address.c_str ());
-
if (rc != 0) {
-
delete listener;
-
return -1;
-
}
-
launch_child (listener);
-
-
return 0;
-
}
-
-
if (protocol == "pgm" || protocol == "epgm") {
-
-
// For convenience's sake, bind can be used interchageable with
-
// connect for PGM and EPGM transports.
-
return connect (addr_);
-
}
-
-
zmq_assert (false);
-
return -1;
- }
看到bind的实现,总算到了zeromq有点意思的地方了。今天对于bind看得很粗略,下次会仔细的学习一下bind的代码——特别是protocol==tcp和ipc