本文主要介绍AsyncMessenger的代码框架结构和主要使用到的
上图表示Ceph的AsyncMessenger模块中各个关键类之间的联系。在AsyncMessenger模块中用到的类主要有14个,下面逐一介绍每个类的作用,以及其中包含的主要成员变量和方法。
1、 AsyncMessenger类、SimplePolicyMessenger类和Messenger类
AsyncMessenger类、SimplePolicyMessenger类和Messenger类三者是继承与被继承的关系,Messenger是一个抽象的消息管理器,其主要接口在派生类AsyncMessenger中实现,SimplePolicyMessenger类则是对消息管理器的一些连接的策略进行定义的设置,AsyncMessenger中定义和实现了消息管理器的相关成员变量以及方法。
一个AsyncMessenger实例的关键成员变量以及类方法如下表所示(包括该类继承的父类成员变量以及类方法)。AsyncMessenger包含一个WorkerPool对象、一个Processor实例,以及3个AsyncConnectionRef对象列表和1个ConnectionRef对象列表。
AsyncMessenger类中的成员变量:
成员变量名 | 返回值类型 | 描述 |
*pool | WorkerPool | 通过pool->get_worker()从线程池中获取工作线程来进行工作 |
processor | Processor | Processor实例,主要用来监听连接,绑定socket,接受连接请求等,相当于AsyncMessenger的处理中心 |
listen_sd | int | 定义的监听套接字 |
conns | ceph::unordered_map(entity_addr_t, AsyncConnectionRef) | 地址和连接的map表,创建一个新的连接时将连接和和地址信息加入到这个map表中,在发送消息时先根据地址对这个map进行一次查找,如果找到了返回连接,如果没有找到创建一个新的连接。 |
accepting_conns | set(AsyncConnectionRef) | 接收连接的集合,这个集合主要存放那些已经接收的连接。 |
deleted_conns | set(AsyncConnectionRef) | 已经关闭并且需要清理的连接的集合 |
local_connection | ConnectionRef | 本地连接的计数器 |
did_bind | bool | 初始值为false,绑定地址后置为true,stop的时候再次置为false |
AsynsMessenger类中的成员方法:
成员方法名 | 返回值类型 | 描述 |
bind (const entity_addr_t& bind_addr) | int | 绑定套接字,具体绑定过程是由Processor的bind()函数完成的 |
start() | int | 注册一个AsyncMessenger的实例后,启动这个实例,具体执行过程是WokerPool的start()函数完成的。 |
wait() | void | 等待停止的信号,如果收到停止的信息后,调用Processor的stop()函数,然后将did_bind置为false,最后删除建立的连接 |
send_message (Message *m, const entity_inst_t& dest) | int | 加了一个锁,然后调用_send_message(m, dest),将消息发送到目的地址 |
get_connection (const entity_inst_t& dest) | ConnectionRef | 函数用来建立连接,判定是否为本地连接,否则再继续查找连接是否已经存在,如果不存在再创建一个连接 |
ready() | void | 注册的AsyncMessenger已经准备好了,启动事件处理中心,开始工作,启动工作线程 |
create_connect(const entity_addr_t& addr, int type) | AsyncConnectionRef | create一个连接并将其加到conns中 |
submit_message(Message *m, AsyncConnectionRef con,const entity_addr_t& dest_addr, int dest_type) | void | 发送消息的时候会用到,根据目的地址判断需要发送消息的连接是否存在,以及连接是否是本地连接,如果是本地连接,直接对消息进行dispatch,如果连接不存在,需要根据消息类型创建新的连接 |
_send_message(Message *m, const entity_inst_t& dest) | int | 从连接中查找目的地址,然后调用submit_message()发送消息 |
add_accept(int sd) | AsyncConnectionRef | 新建一个连接,然后将其加入到accepting_conns中 |
2、 Processor类、WorkerPool类和Worker类
-
Processor类相当于AsyncMessenger模块中的一个处理器,AsyncMessenger需要完成的很多操作(start、ready、bind等)都是通过Processor来完成的。当Messenger完成地址绑定后,Processor启动,然后监听即将到来的连接。就是说AsyncMessenger模块的一些启动、绑定、就绪等操作是在Processor相应操作的基础上封装的。
-
Processor类定义了一个AsyncMessenger的对象,一个NetHandler的实例,一个Worker对象。
Processor类的成员变量(方法):
成员变量(方法)名 | 返回值类型 | 描述 |
*msgr | AsyncMessenger | AsyncMessenger的指针实例,用于调用AsyncMessenger中的成员变量(方法),用的最多的还是绑定时获取的地址信息等。 |
net | NetHandler | 绑定套接字后将其设置为非阻塞,然后这是套接字选项。 |
*worker | Worker | 工作线程 |
listen_sd | int | 获取套接口描述字的值,非负表示套接字创建成功,-1表示出错 |
nonce | uint64_t | 构造函数中用于entity_addr_t的唯一标识ID |
bind(const entity_addr_t &bind_addr, const set& avoid_ports) | int | 执行绑定套接字的具体过程 |
start(Worker *w) | int | 执行消息模块的start,具体就是启动线程,让其处于工作状态 |
accept() | void | 建立连接的过程,如果连接建立成功,则通过add_accept()函数将连接加入到accepting_conns集合中 |
stop() | void | 关闭套接字 |
rebind(const set& avoid_port) | int | 如果第一次没有绑定成功或者其它原因导致的绑定失败,执行重新绑定 |
-
WorkerPool类是一个线程池,主要作用是创建worker线程,然后将其放入自己的worker容器中,每次创建worker线程的时候根据配置文件的参数ms_async_op_threads来指定worker线程的数量,创建是在WorkerPool的构造函数中进行的。
-
在WorkerPool类中定义了一个worker集合,用于存放worker线程,还定义了一个coreids,用户存放cpu id的集合,提供指定cpu运行单个线程的作用。在配置文件中有一个参数是ms_async_affinity_cores,将创建的worker绑定到指定的cpu core上。如果创建了2个线程,绑定的cpu core是0、1,默认的ms_async_affinity_cores值是空的,即使用全部的cpu资源,如果cpu的资源不够用的时候可以将worker指定cpu core。
WorkerPool类的成员变量(方法)
成员变量(方法)名 | 返回值类型 | 描述 |
coreids | vector | 用于存放CPU id的集合 |
WorkerPool(CephContext *c) | 构造函数 | WorkerPool的构造函数,根据ms_async_op_threads的值创建相应数量的worker线程,同时完成worker和cpu core的绑定。 |
start() | void | 创建worker集合中的worker线程,启动线程开始工作 |
*get_worker() | Worker | 获取worker集合中的worker线程 |
get_cpuid(int id) | int | 获取cpu的id |
workers | Worker* | Worker线程的集合,WorkerPool在构造函数中创建的worker线程放入到这个集合中 |
- Worker类是具体的工作线程,Worker线程的主要工作是一个循环,调用epoll_wait获取需要处理的事件,用循环来处理这个事件,当外部有操作时,比如读取消息,注册一个回调类,创建一个文件事件,然后启动回调操作即可处理请求。消息模块启动时,用一个线程在Worker类中定义了一个WorkerPool对象,一个EventCenter的实例。
Worker类的成员变量(方法)
成员变量(方法)名 | 返回值类型 | 描述 |
*pool | WorkerPool | WorkerPool的实例,在entry()函数中用于获取cpu的id |
done | bool | 如果线程的工作完成置为true,否则false |
center | EventCenter | EventCenter的实例,在Worker的构造函数中执行EventCenter的初始化工作 |
*entry() | void | 工作线程的入口函数,启动一个while循环来执行事件的处理,在整个消息模块中就使用了这一个工作线程 |
stop() | void | 将done置为true,然后调用EventCenter的wakeup函数,即停止socket工作 |
3、 AsyncConnection类
-
AsyncConnection是整个Async消息模块的核心,连接的创建和删除、数据的读写指令、连接的重建、消息的处理等都是在这个类中进行的。本小节重点分析了其中的重要成员变量和24个成员函数。
AsyncConnection类的成员变量
成员变量名 | 返回值类型 | 描述 |
*async_msgr | AsyncMessenger | AsyncMessenger对象,调用一些环境变量等 |
out_q | map(int, list(pair(bufferlist, Message*)) ) | 存放消息和消息map信息的数据结构 |
sent | list(Message*) | 存放那些需要发送的消息 |
local_messages | list(Message*) | 存放本地传输的消息 |
outcoming_bl | bufferlist | 临时存放消息的bl |
read_handler | EventCallbackRef | 处理读请求的回调指令 |
write_handler | EventCallbackRef | 处理写请求的回调指令 |
connect_handler | EventCallbackRef | 处理连接请求的回调指令 |
local_deliver_handler | EventCallbackRef | 处理本地连接请求的回调指令 |
data_buf | bufferlist | 存放数据的bl |
data_blp | bufferlist::iterator | data_buf的指针 |
front, middle, data | bufferlist | 头部,中间部分和数据部分 |
connect_msg | ceph_msg_connect | 消息连接 |
net | NetHandler | NetHandler的实例,处理网络连接 |
*center | EventCenter | EventCenter的对象,用来调用事件中心的操作 |
*recv_buf | char | 用于从套接字中接收消息的buf |
AsyncConnection类的成员方法
编号 | 成员方法名 | 返回值类型 | 描述 |
1 | do_sendmsg(struct msghdr &msg, int len, bool more) | int | 返回的是需要被发送的消息的长度 |
2 | try_send(bufferlist &bl, bool send=true) | int | 加上一个write_lock,然后调用_try_send来真正发送消息 |
3 | _try_send(bufferlist &bl, bool send=true) | int | 如果send的值为false,会将bl添加到send buffer中,这么做的目的是避免messenger线程外发生错误 |
4 | prepare_send_message(uint64_t features, Message *m, bufferlist &bl) | void | 将m中的数据encode和copy到bl中 |
5 | read_until(uint64_t needed, char *p) | int | 循环读,调用read_bulk,如果r的值不为0,一直循环下去 |
6 | _process_connection() | int | 处理连接,根据不同的state状态执行不同的操作,关键点是state的值不同 |
7 | _connect() | void | 首先将STATE_CONNECTING的值赋给state,然后调用dispatch_event_external将read_handler事件添加到external_events集合中 |
8 | _stop() | void | 注销连接,然后将STATE_CLOSED赋给state,关闭套接字,清理事件 |
9 | handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r) | int | 根据reply.tag值的不同执行不同的操作 |
10 | handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl) | int | 处理消息的连接,如果成功则接收这个连接 |
11 | discard_out_queue() | void | 清除AsyncConnection的消息队列 |
12 | requeue_sent() | void | 重新将send队列入队 |
13 | handle_ack(uint64_t seq) | void | 处理确认信息,删除send队列中的message |
14 | write_message(Message *m, bufferlist& bl) | int | 将消息写到complete_bl中,调用_try_send发送消息 |
15 | _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &replybufferlist authorizer_reply) | int | 有一个bufferlist结构的reply_bl,调用try_send将reply_bl发送出去 |
16 | is_queued() | bool | 判断是否入队列,主要是out_q和outcoming_bl这两个队列 |
17 | shutdown_socket() | void | 关闭套接字 |
18 | connect(const entity_addr_t& addr, int type) | void | 在AsyncConnection第一次构造的时候使用,然后调用_connect()函数 |
19 | accept(int sd) | void | 将state的值设置为STATE_ACCEPTING,然后调用create_file_event函数创建文件事件,调用dispatch_event_external函数将回调指令分发出去 |
20 | send_message(Message *m) | int | 一般需要发送消息的时候都会调用这个函数进行具体的发送操作,在此之前已经完成了连接 |
21 | handle_write() | void | 使用一个while循环调用write_message将data写入到m中 |
22 | process() | void | 还是根据不同的state值做不同的处理 |
23 | local_deliver() | void | 这个函数主要用来处理本地的消息传递 |
24 | cleanup_handler() | void | 清理事件处理助手,将其重置 |
4、 EventCenter类和EventCallback类
- AsyncMessenger消息模块是基于epoll的事件驱动方式,取代了之间每个连接需要建立一个Pipe,然后创建两个线程,一个用来处理消息的接收,另一个用来处理消息的发送,其它操作也是借助线程的方式。不同于SimpleMessenger消息模块,AsyncMessenger消息模块使用了事件,所以需要一个处理事件的数据结构,即EventCenter,用一个线程专门用来循环处理,所有的操作都是通过回调函数来进行的,避免了大量线程的使用。在EventCenter定义了一个FileEvent的数据结构和一个TimeEvent的数据结构,大部分事件都是FileEvent。下面介绍EventCenter中的主要成员变量和方法。
成员变量(方法)名 | 返回值类型 | 描述 |
FileEvent | struct | 文件事件类 |
TimeEvent | struct | 时间事件类 |
external_events | deque(EventCallbackRef) | 用于存放外部事件的队列 |
*file_events | FileEvent | FileEvent的实例 |
*driver | EventDriver | EventDriver的实例 |
time_events | map(utime_t, list(TimeEvent)) | 事件事件的容器 |
net | NetHandler | NetHandler的实例 |
process_time_events() | int | 处理时间事件 |
*_get_file_event(int fd) | FileEvent | 获取文件事件 |
init(int nevent) | int | 根据不同的宏创建不同的事件处理器;调用create_file_event创建事件。 |
create_file_event(int fd, int mask, EventCallbackRef ctxt) | int | 根据fd和mask创建文件事件,调用add_event函数将创建的事件加入到事件处理器中去处理 |
create_time_event(uint64_t milliseconds, EventCallbackRef ctxt) | uint64_t | 创建time event,然后将其加入到time_events中 |
delete_file_event(int fd, int mask) | void | 删除file event |
delete_time_event(uint64_t id) | void | 删除time event |
process_events(int timeout_microseconds) | int | 如果事件是read_cb或者write_cb则调用相应的回调函数来进行处理(由do_request函数来完成);如果不是这两种事件,则将external_events队列中的事件取出放入cur_process中,调用一个while一个循环来处理。 |
dispatch_event_external(EventCallbackRef e) | void | 将创建的外部事件放入external_events队列中 |
- EventCallback是一个接口类,根据操作不同会定义其子类,使用方式用一个虚函数do_request()来回调处理不同的事件,具体的处理是在do_request()中进行的。
5、 EventDriver类、EpollDriver类、KqueueDriver类和SelectDriver类
- 事件中心相当于一个事件处理的容器,它本身并不真正去处理事件,通过回调函数的方式来完成事件的处理。同样,如何获取需要处理的事件也不是事件中心来完成的,它只负责处理,具体对需要处理的事件的获取是通过EventDriver来完成的,EventDriver是一个接口类,其实现主要是由EpollDriver、KqueueDriver和SelectDriver三个类操作的。Ceph支持多种操作系统的使用,如果使用的是Linux操作系统,使用EpollDriver,如果是BSD,使用KqueueDriver,如果都不是的情况下再使用SelectDriver(系统定义为最坏状况下)。事件驱动的执行主要依赖于epoll的方式,其中主要有三个函数:epoll_create(在epoll文件系统建立了个file节点,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个list链表,用于存储准备就绪的事件); epoll_ctl(把要监听的socket放到对应的红黑树上,给内核中断处理程序注册一个回调函数,通知内核,如果这个句柄的数据到了,就把它放到就绪列表);epoll_wait(观察就绪列表里面有没有数据,并进行提取和清空就绪列表,非常高效)。由于本项目运行在Linux中,所以下面以EpollDriver为例对Ceph的底层事件驱动进行描述。
EpollDriver的成员变量(方法)
成员变量(方法)名 | 返回值类型 | 描述 |
epfd | int | epoll的文件描述符 |
*events | struct epoll_event | epoll_event的一个对象 |
size | int | 在执行初始化时获取文件数量 |
init(int nevent) | int | 执行EpollDriver的初始化,主要是调用epoll_create,建立epoll对象 |
add_event(int fd, int cur_mask, int add_mask) | int | 根据事件的mask执行不同的操作,如果是EVENT_READABLE,表示对应的文件描述符可读,如果是EVENT_WRITABLE,表示文件描述符可写,然后调用epoll_ctl添加事件 |
del_event(int fd, int cur_mask, int del_mask) | int | 调用epoll_ctl执行事件的修改或者删除 |
resize_events(int newsize) | int | 清空事件数量 |
event_wait(vector &fired_events, struct timeval *tp) | int | 调用epoll_wait循环处理事件 |
6、NetHandler类
- NetHandler是AsyncMessenger模块中用于网络处理的类,其中定义了6个关键成员方法,其中的NetHandler::generic_connect()是每个连接都需要使用到的,创建socket、将socket设置为非阻塞、设置socket选项等都是经常使用的方法,下表对其详细分析。
成员方法名 | 返回值类型 | 描述 |
create_socket(int domain, bool reuse_addr=false) | int | 创建socket |
generic_connect(const entity_addr_t& addr, bool nonblock) | int | 通信双方通过该函数产生连接,首先调用create_socket()创建一个socket,然后将创建的socket设置为非阻塞,完成以后调用系统socket:: connect()建立连接 |
set_nonblock(int sd) | int | 将Socket设置为非阻塞的 |
set_socket_options(int sd) | void | 调用系统的socket::setsockopt函数,设置套接字的一些关键选项 |
connect(const entity_addr_t &addr) | int | 对NetHandler::generic_connect()进行了一个简单的封装 |
nonblock_connect(const entity_addr_t &addr) | int | 接口函数,设置非阻塞的连接 |
上文描述了AsyncMessenger基本数据结构及框架,下一章描述代码流程。