AsyncMessenger数据结构分析

1680阅读 0评论2017-01-24 xiong9937
分类:服务器与存储

本文主要介绍AsyncMessenger的代码框架结构和主要使用到的

这里写图片描述

上图表示Ceph的AsyncMessenger模块中各个关键类之间的联系。在AsyncMessenger模块中用到的类主要有14个,下面逐一介绍每个类的作用,以及其中包含的主要成员变量和方法。


1、 AsyncMessenger类、SimplePolicyMessenger类和Messenger类

AsyncMessenger类、SimplePolicyMessenger类和Messenger类三者是继承与被继承的关系,Messenger是一个抽象的消息管理器,其主要接口在派生类AsyncMessenger中实现,SimplePolicyMessenger类则是对消息管理器的一些连接的策略进行定义的设置,AsyncMessenger中定义和实现了消息管理器的相关成员变量以及方法。

一个AsyncMessenger实例的关键成员变量以及类方法如下表所示(包括该类继承的父类成员变量以及类方法)。AsyncMessenger包含一个WorkerPool对象、一个Processor实例,以及3个AsyncConnectionRef对象列表和1个ConnectionRef对象列表。


AsyncMessenger类中的成员变量:

成员变量名 返回值类型 描述
*pool WorkerPool 通过pool->get_worker()从线程池中获取工作线程来进行工作
processor Processor Processor实例,主要用来监听连接,绑定socket,接受连接请求等,相当于AsyncMessenger的处理中心
listen_sd int 定义的监听套接字
conns ceph::unordered_map(entity_addr_t, AsyncConnectionRef) 地址和连接的map表,创建一个新的连接时将连接和和地址信息加入到这个map表中,在发送消息时先根据地址对这个map进行一次查找,如果找到了返回连接,如果没有找到创建一个新的连接。
accepting_conns set(AsyncConnectionRef) 接收连接的集合,这个集合主要存放那些已经接收的连接。
deleted_conns set(AsyncConnectionRef) 已经关闭并且需要清理的连接的集合
local_connection ConnectionRef 本地连接的计数器
did_bind bool 初始值为false,绑定地址后置为true,stop的时候再次置为false

AsynsMessenger类中的成员方法:

成员方法名 返回值类型 描述
bind (const entity_addr_t& bind_addr) int 绑定套接字,具体绑定过程是由Processor的bind()函数完成的
start() int 注册一个AsyncMessenger的实例后,启动这个实例,具体执行过程是WokerPool的start()函数完成的。
wait() void 等待停止的信号,如果收到停止的信息后,调用Processor的stop()函数,然后将did_bind置为false,最后删除建立的连接
send_message (Message *m, const entity_inst_t& dest) int 加了一个锁,然后调用_send_message(m, dest),将消息发送到目的地址
get_connection (const entity_inst_t& dest) ConnectionRef 函数用来建立连接,判定是否为本地连接,否则再继续查找连接是否已经存在,如果不存在再创建一个连接
ready() void 注册的AsyncMessenger已经准备好了,启动事件处理中心,开始工作,启动工作线程
create_connect(const entity_addr_t& addr, int type) AsyncConnectionRef create一个连接并将其加到conns中
submit_message(Message *m, AsyncConnectionRef con,const entity_addr_t& dest_addr, int dest_type) void 发送消息的时候会用到,根据目的地址判断需要发送消息的连接是否存在,以及连接是否是本地连接,如果是本地连接,直接对消息进行dispatch,如果连接不存在,需要根据消息类型创建新的连接
_send_message(Message *m, const entity_inst_t& dest) int 从连接中查找目的地址,然后调用submit_message()发送消息
add_accept(int sd) AsyncConnectionRef 新建一个连接,然后将其加入到accepting_conns中

2、 Processor类、WorkerPool类和Worker类


Processor类的成员变量(方法):

成员变量(方法)名 返回值类型 描述
*msgr AsyncMessenger AsyncMessenger的指针实例,用于调用AsyncMessenger中的成员变量(方法),用的最多的还是绑定时获取的地址信息等。
net NetHandler 绑定套接字后将其设置为非阻塞,然后这是套接字选项。
*worker Worker 工作线程
listen_sd int 获取套接口描述字的值,非负表示套接字创建成功,-1表示出错
nonce uint64_t 构造函数中用于entity_addr_t的唯一标识ID
bind(const entity_addr_t &bind_addr, const set& avoid_ports) int 执行绑定套接字的具体过程
start(Worker *w) int 执行消息模块的start,具体就是启动线程,让其处于工作状态
accept() void 建立连接的过程,如果连接建立成功,则通过add_accept()函数将连接加入到accepting_conns集合中
stop() void 关闭套接字
rebind(const set& avoid_port) int 如果第一次没有绑定成功或者其它原因导致的绑定失败,执行重新绑定

WorkerPool类的成员变量(方法)

成员变量(方法)名 返回值类型 描述
coreids vector 用于存放CPU id的集合
WorkerPool(CephContext *c) 构造函数 WorkerPool的构造函数,根据ms_async_op_threads的值创建相应数量的worker线程,同时完成worker和cpu core的绑定。
start() void 创建worker集合中的worker线程,启动线程开始工作
*get_worker() Worker 获取worker集合中的worker线程
get_cpuid(int id) int 获取cpu的id
workers Worker* Worker线程的集合,WorkerPool在构造函数中创建的worker线程放入到这个集合中

Worker类的成员变量(方法)

成员变量(方法)名 返回值类型 描述
*pool WorkerPool WorkerPool的实例,在entry()函数中用于获取cpu的id
done bool 如果线程的工作完成置为true,否则false
center EventCenter EventCenter的实例,在Worker的构造函数中执行EventCenter的初始化工作
*entry() void 工作线程的入口函数,启动一个while循环来执行事件的处理,在整个消息模块中就使用了这一个工作线程
stop() void 将done置为true,然后调用EventCenter的wakeup函数,即停止socket工作

3、 AsyncConnection类

成员变量名 返回值类型 描述
*async_msgr AsyncMessenger AsyncMessenger对象,调用一些环境变量等
out_q map(int, list(pair(bufferlist, Message*)) ) 存放消息和消息map信息的数据结构
sent list(Message*) 存放那些需要发送的消息
local_messages list(Message*) 存放本地传输的消息
outcoming_bl bufferlist 临时存放消息的bl
read_handler EventCallbackRef 处理读请求的回调指令
write_handler EventCallbackRef 处理写请求的回调指令
connect_handler EventCallbackRef 处理连接请求的回调指令
local_deliver_handler EventCallbackRef 处理本地连接请求的回调指令
data_buf bufferlist 存放数据的bl
data_blp bufferlist::iterator data_buf的指针
front, middle, data bufferlist 头部,中间部分和数据部分
connect_msg ceph_msg_connect 消息连接
net NetHandler NetHandler的实例,处理网络连接
*center EventCenter EventCenter的对象,用来调用事件中心的操作
*recv_buf char 用于从套接字中接收消息的buf

AsyncConnection类的成员方法

编号 成员方法名 返回值类型 描述
1 do_sendmsg(struct msghdr &msg, int len, bool more) int 返回的是需要被发送的消息的长度
2 try_send(bufferlist &bl, bool send=true) int 加上一个write_lock,然后调用_try_send来真正发送消息
3 _try_send(bufferlist &bl, bool send=true) int 如果send的值为false,会将bl添加到send buffer中,这么做的目的是避免messenger线程外发生错误
4 prepare_send_message(uint64_t features, Message *m, bufferlist &bl) void 将m中的数据encode和copy到bl中
5 read_until(uint64_t needed, char *p) int 循环读,调用read_bulk,如果r的值不为0,一直循环下去
6 _process_connection() int 处理连接,根据不同的state状态执行不同的操作,关键点是state的值不同
7 _connect() void 首先将STATE_CONNECTING的值赋给state,然后调用dispatch_event_external将read_handler事件添加到external_events集合中
8 _stop() void 注销连接,然后将STATE_CLOSED赋给state,关闭套接字,清理事件
9 handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r) int 根据reply.tag值的不同执行不同的操作
10 handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl) int 处理消息的连接,如果成功则接收这个连接
11 discard_out_queue() void 清除AsyncConnection的消息队列
12 requeue_sent() void 重新将send队列入队
13 handle_ack(uint64_t seq) void 处理确认信息,删除send队列中的message
14 write_message(Message *m, bufferlist& bl) int 将消息写到complete_bl中,调用_try_send发送消息
15 _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &replybufferlist authorizer_reply) int 有一个bufferlist结构的reply_bl,调用try_send将reply_bl发送出去
16 is_queued() bool 判断是否入队列,主要是out_q和outcoming_bl这两个队列
17 shutdown_socket() void 关闭套接字
18 connect(const entity_addr_t& addr, int type) void 在AsyncConnection第一次构造的时候使用,然后调用_connect()函数
19 accept(int sd) void 将state的值设置为STATE_ACCEPTING,然后调用create_file_event函数创建文件事件,调用dispatch_event_external函数将回调指令分发出去
20 send_message(Message *m) int 一般需要发送消息的时候都会调用这个函数进行具体的发送操作,在此之前已经完成了连接
21 handle_write() void 使用一个while循环调用write_message将data写入到m中
22 process() void 还是根据不同的state值做不同的处理
23 local_deliver() void 这个函数主要用来处理本地的消息传递
24 cleanup_handler() void 清理事件处理助手,将其重置

4、 EventCenter类和EventCallback类

成员变量(方法)名 返回值类型 描述
FileEvent struct 文件事件类
TimeEvent struct 时间事件类
external_events deque(EventCallbackRef) 用于存放外部事件的队列
*file_events FileEvent FileEvent的实例
*driver EventDriver EventDriver的实例
time_events map(utime_t, list(TimeEvent)) 事件事件的容器
net NetHandler NetHandler的实例
process_time_events() int 处理时间事件
*_get_file_event(int fd) FileEvent 获取文件事件
init(int nevent) int 根据不同的宏创建不同的事件处理器;调用create_file_event创建事件。
create_file_event(int fd, int mask, EventCallbackRef ctxt) int 根据fd和mask创建文件事件,调用add_event函数将创建的事件加入到事件处理器中去处理
create_time_event(uint64_t milliseconds, EventCallbackRef ctxt) uint64_t 创建time event,然后将其加入到time_events中
delete_file_event(int fd, int mask) void 删除file event
delete_time_event(uint64_t id) void 删除time event
process_events(int timeout_microseconds) int 如果事件是read_cb或者write_cb则调用相应的回调函数来进行处理(由do_request函数来完成);如果不是这两种事件,则将external_events队列中的事件取出放入cur_process中,调用一个while一个循环来处理。
dispatch_event_external(EventCallbackRef e) void 将创建的外部事件放入external_events队列中

5、 EventDriver类、EpollDriver类、KqueueDriver类和SelectDriver类

EpollDriver的成员变量(方法)

成员变量(方法)名 返回值类型 描述
epfd int epoll的文件描述符
*events struct epoll_event epoll_event的一个对象
size int 在执行初始化时获取文件数量
init(int nevent) int 执行EpollDriver的初始化,主要是调用epoll_create,建立epoll对象
add_event(int fd, int cur_mask, int add_mask) int 根据事件的mask执行不同的操作,如果是EVENT_READABLE,表示对应的文件描述符可读,如果是EVENT_WRITABLE,表示文件描述符可写,然后调用epoll_ctl添加事件
del_event(int fd, int cur_mask, int del_mask) int 调用epoll_ctl执行事件的修改或者删除
resize_events(int newsize) int 清空事件数量
event_wait(vector &fired_events, struct timeval *tp) int 调用epoll_wait循环处理事件

6、NetHandler类

成员方法名 返回值类型 描述
create_socket(int domain, bool reuse_addr=false) int 创建socket
generic_connect(const entity_addr_t& addr, bool nonblock) int 通信双方通过该函数产生连接,首先调用create_socket()创建一个socket,然后将创建的socket设置为非阻塞,完成以后调用系统socket:: connect()建立连接
set_nonblock(int sd) int 将Socket设置为非阻塞的
set_socket_options(int sd) void 调用系统的socket::setsockopt函数,设置套接字的一些关键选项
connect(const entity_addr_t &addr) int 对NetHandler::generic_connect()进行了一个简单的封装
nonblock_connect(const entity_addr_t &addr) int 接口函数,设置非阻塞的连接

上文描述了AsyncMessenger基本数据结构及框架,下一章描述代码流程。

上一篇:Ceph网络模块基本结构
下一篇:SimpleMessenger数据结构及代码流程分析