任何一个成熟的项目,必须要提供出接口,就像探针一样,可以让我们探查进程内部的运行情况,进程不能是一个黑盒子。对于ceph而言,Admin Socket 提供了该功能。Admin Socket 不仅可以查看当前的配置,进程运行的状态,而且,还可以修改配置,获取log等。
下面help可以看出,ceph给出了很多命令,来了解ceph内部的运行情况。
-
root@test3:~# ceph daemon /var/run/ceph/ceph-osd.4.asok help
-
{ "config get": "config get
: get the config value" ,
-
"config set": "config set
[ ...]: set a config variable"
-
"config show": "dump current config settings",
-
"dump_blacklist": "dump blacklisted clients and times",
-
"dump_historic_ops": "show slowest recent ops",
-
"dump_op_pq_state": "dump op priority queue state",
-
"dump_ops_in_flight": "show the ops currently in flight",
-
"dump_watchers": "show clients which have active watches, and on which objects",
-
"get_command_descriptions": "list available commands",
-
"getomap": "output entire object map",
-
"git_version": "get git sha1",
-
"help": "list available commands",
-
"injectdataerr": "inject data error into omap",
-
"injectmdataerr": "inject metadata error",
-
"log dump": "dump recent log entries to log file",
-
"log flush": "flush log entries to log file",
-
"log reopen": "reopen log file",
-
"perf dump": "dump perfcounters value",
-
"perf schema": "dump perfcounters schema",
-
"rmomapkey": "remove omap key",
-
"setomapheader": "set omap header",
-
"setomapval": "set omap key",
-
"truncobj": "truncate object to length",
-
"version": "get ceph version"}
-
root@test3:~# ceph daemon /var/run/ceph/ceph-mon.*.asok help
-
{ "add_bootstrap_peer_hint": "add peer address as potential bootstrap peer for cluster bringup",
-
"config get": "config get
: get the config value" ,
-
"config set": "config set
[ ...]: set a config variable"
-
"config show": "dump current config settings",
-
"get_command_descriptions": "list available commands",
-
"git_version": "get git sha1",
-
"help": "list available commands",
-
"log dump": "dump recent log entries to log file",
-
"log flush": "flush log entries to log file",
-
"log reopen": "reopen log file",
-
"mon_status": "show current monitor status",
-
"perf dump": "dump perfcounters value",
-
"perf schema": "dump perfcounters schema",
-
"quorum_status": "show current quorum status",
-
"sync_force": "force sync of and clear monitor store",
-
"version": "get ceph version"}
-
root@test3:~# ceph daemon /var/run/ceph/ceph-mds.*.asok help
-
{ "config get": "config get
: get the config value" ,
-
"config set": "config set
[ ...]: set a config variable"
-
"config show": "dump current config settings",
-
"get_command_descriptions": "list available commands",
-
"git_version": "get git sha1",
-
"help": "list available commands",
-
"log dump": "dump recent log entries to log file",
-
"log flush": "flush log entries to log file",
-
"log reopen": "reopen log file",
-
"objecter_requests": "show in-progress osd requests",
-
"perf dump": "dump perfcounters value",
-
"perf schema": "dump perfcounters schema",
- "version": "get ceph version"}
比如可以查看 ceph的各个模块的当前配置
-
ceph daemon /var/run/ceph/ceph-mds.*.asok config show
-
ceph daemon /var/run/ceph/ceph-mon.*.asok config show
- ceph daemon /var/run/ceph/ceph-osd.4.asok config show
CephContext中会创建一个AdminSocket对象,该对象本质是一个线程。ceph-mon/ceph-osd/ceph-mds这些进程都会有创建一个AdminSocket的线程,负责响应用户的探查命令。
从上面的OSD MON MDS help的不同输出可以看出,他们支持的命令有共同的,也有各自独立的。
首先是共同支持的命令有:
- config show 显示所有的配置项
- config get 获取某个配置项
- config set 设置某个配置项
- log flush 将log 刷入日志文件
- log dump 将最近的若干笔log刷入到log文件
- log reopen 重新打开log文件
- perf dump 输出统计信息
- perf schema 输出统计信息的类型
- version 版本信息
- git_version : git 版本信息
-
_admin_hook = new CephContextHook(this);
-
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
-
_admin_socket->register_command("1", "1", _admin_hook, "");
-
_admin_socket->register_command("perf dump", "perf dump", _admin_hook, "dump perfcounters value");
-
_admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
-
_admin_socket->register_command("2", "2", _admin_hook, "");
-
_admin_socket->register_command("perf schema", "perf schema", _admin_hook, "dump perfcounters schema");
-
_admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");
-
_admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set
[ ...]: set a config variable"
-
_admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get
: get the config value" );
-
_admin_socket->register_command("log flush", "log flush", _admin_hook, "flush log entries to log file");
-
_admin_socket->register_command("log dump", "log dump", _admin_hook, "dump recent log entries to log file");
- _admin_socket->register_command("log reopen", "log reopen", _admin_hook, "reopen log file")
首先定义了个CephContextHook,注册该Hook也作为一个参数传递进去,这个register_command比较简单,就是建立了command字符串和Hook的关联,这个关联有何作用,后面会分析到,暂时按下不表。
-
int AdminSocket::register_command(std::string command, std::string cmddesc, AdminSocketHook *hook, std::string help)
-
{
-
int ret;
-
m_lock.Lock();
-
if (m_hooks.count(command)) {
-
ldout(m_cct, 5) << "register_command " << command << " hook " << hook << " EEXIST" << dendl;
-
ret = -EEXIST;
-
} else {
-
ldout(m_cct, 5) << "register_command " << command << " hook " << hook << dendl;
-
m_hooks[command] = hook;
-
m_descs[command] = cmddesc;
-
m_help[command] = help;
-
ret = 0;
-
}
-
m_lock.Unlock();
-
return ret;
-
}
后面从注册部分的代码我们可以知道下面三个命令是等效的。
-
ceph daemon /var/run/ceph/ceph-mds.bfudz.asok perf dump
-
ceph daemon /var/run/ceph/ceph-mds.bfudz.asok perfcounters_dump
- ceph daemon /var/run/ceph/ceph-mds.bfudz.asok 1
-
ceph daemon /var/run/ceph/ceph-mds.bfudz.asok perf schema
-
ceph daemon /var/run/ceph/ceph-mds.bfudz.asok perfcounters_schema
- ceph daemon /var/run/ceph/ceph-mds.bfudz.asok 2
这个AdminSocket 对应的线程,在common_init_finish函数中负责创建。
-
void common_init_finish(CephContext *cct)
-
{
-
ceph::crypto::init(cct);
-
cct->start_service_thread();
-
if (cct->_conf->lockdep) {
-
g_lockdep = true;
-
ldout(cct,0) << "lockdep is enabled" << dendl;
-
lockdep_register_ceph_context(cct);
-
}
- }
-
void CephContext::start_service_thread()
-
{
-
pthread_spin_lock(&_service_thread_lock);
-
if (_service_thread) {
-
pthread_spin_unlock(&_service_thread_lock);
-
return;
-
}
-
_service_thread = new CephContextServiceThread(this);
-
_service_thread->create();
-
pthread_spin_unlock(&_service_thread_lock);
-
// make logs flush on_exit()
-
if (_conf->log_flush_on_exit)
-
_log->set_flush_on_exit();
-
// Trigger callbacks on any config observers that were waiting for
-
// it to become safe to start threads.
-
_conf->set_val("internal_safe_to_start_threads", "true");
-
_conf->call_all_observers();
-
// start admin socket
-
if (_conf->admin_socket.length())
-
_admin_socket->init(_conf->admin_socket);
- }
接下来可以分析下admin_socket的初始化函数init:
-
bool AdminSocket::init(const std::string &path)
-
{
-
ldout(m_cct, 5) << "init " << path << dendl;
-
/* Set up things for the new thread */
-
std::string err;
-
int pipe_rd = -1, pipe_wr = -1;
-
err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
-
if (!err.empty()) {
-
lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
-
return false;
-
}
-
int sock_fd;
-
err = bind_and_listen(path, &sock_fd);
-
if (!err.empty()) {
-
lderr(m_cct) << "AdminSocketConfigObs::init: failed: " << err << dendl;
-
close(pipe_rd);
-
close(pipe_wr);
-
return false;
-
}
-
/* Create new thread */
-
m_sock_fd = sock_fd;
-
m_shutdown_rd_fd = pipe_rd;
-
m_shutdown_wr_fd = pipe_wr;
-
m_path = path;
-
m_version_hook = new VersionHook;
-
register_command("0", "0", m_version_hook, "");
-
register_command("version", "version", m_version_hook, "get ceph version");
-
register_command("git_version", "git_version", m_version_hook, "get git sha1");
-
m_help_hook = new HelpHook(this);
-
register_command("help", "help", m_help_hook, "list available commands");
-
m_getdescs_hook = new GetdescsHook(this);
-
register_command("get_command_descriptions", "get_command_descriptions",
-
m_getdescs_hook, "list available commands");
-
create();
-
add_cleanup_file(m_path.c_str());
-
return true;
- }
首先是创建了管道,读取端的文件描述符记录在m_shutdown_rd_fd中,写入端的文件描述符记录在m_shutdown_wr_fd中。
从变量名字也可以看出,该文件描述符的作用是收取关闭信息。因为adminsocket一旦创建,必须能够通知到该线程及时退出。
退出的事情会写入管道的写入端,而线程会通过多路复用接口,监听读取端,一旦发现m_shutdown_rd_fd中读出内容,线程就知道,可以退出了。
按下不表。
AdminSocket最重要的是监听发过来的请求,它是用socket来实现的,初始化在bind_and_listen 函数:
-
std::string AdminSocket::bind_and_listen(const std::string &sock_path, int *fd)
-
{
-
ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
-
-
struct sockaddr_un address;
-
if (sock_path.size() > sizeof(address.sun_path) - 1) {
-
ostringstream oss;
-
oss << "AdminSocket::bind_and_listen: "
-
<< "The UNIX domain socket path " << sock_path << " is too long! The "
-
<< "maximum length on this system is "
-
<< (sizeof(address.sun_path) - 1);
-
return oss.str();
-
}
-
int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
-
if (sock_fd < 0) {
-
int err = errno;
-
ostringstream oss;
-
oss << "AdminSocket::bind_and_listen: "
-
<< "failed to create socket: " << cpp_strerror(err);
-
return oss.str();
-
}
-
int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
-
if (r < 0) {
-
r = errno;
-
TEMP_FAILURE_RETRY(::close(sock_fd));
-
ostringstream oss;
-
oss << "AdminSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
-
return oss.str();
-
}
-
memset(&address, 0, sizeof(struct sockaddr_un));
-
address.sun_family = AF_UNIX;
-
snprintf(address.sun_path, sizeof(address.sun_path),
-
"%s", sock_path.c_str());
-
if (bind(sock_fd, (struct sockaddr*)&address,
-
sizeof(struct sockaddr_un)) != 0) {
-
int err = errno;
-
if (err == EADDRINUSE) {
-
AdminSocketClient client(sock_path);
-
bool ok;
-
client.ping(&ok);
-
if (ok) {
-
ldout(m_cct, 20) << "socket " << sock_path << " is in use" << dendl;
-
err = EEXIST;
-
} else {
-
ldout(m_cct, 20) << "unlink stale file " << sock_path << dendl;
-
TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
-
if (bind(sock_fd, (struct sockaddr*)&address,
-
sizeof(struct sockaddr_un)) == 0) {
-
err = 0;
-
} else {
-
err = errno;
-
}
-
}
-
}
-
if (err != 0) {
-
ostringstream oss;
-
oss << "AdminSocket::bind_and_listen: "
-
<< "failed to bind the UNIX domain socket to '" << sock_path
-
<< "': " << cpp_strerror(err);
-
close(sock_fd);
-
return oss.str();
-
}
-
}
-
if (listen(sock_fd, 5) != 0) {
-
int err = errno;
-
ostringstream oss;
-
oss << "AdminSocket::bind_and_listen: "
-
<< "failed to listen to socket: " << cpp_strerror(err);
-
close(sock_fd);
-
TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
-
return oss.str();
-
}
-
*fd = sock_fd;
-
return "";
- }
这个函数做的事情并不难理解,做的事情比较老套:
1 创建一个socket,该函数需要一个入参,指定socket路径名:
2 bind
3 listen
而传入的路径名是这个
-
root@test3:/var/run/ceph# ceph daemon /var/run/ceph/ceph-mds.bfudz.asok config get admin_socket
- { "admin_socket": "\/var\/run\/ceph\/ceph-mds.bfudz.asok"}
-
m_version_hook = new VersionHook;
-
register_command("0", "0", m_version_hook, "");
-
register_command("version", "version", m_version_hook, "get ceph version");
-
register_command("git_version", "git_version", m_version_hook, "get git sha1");
-
m_help_hook = new HelpHook(this);
-
register_command("help", "help", m_help_hook, "list available commands");
-
m_getdescs_hook = new GetdescsHook(this);
-
register_command("get_command_descriptions", "get_command_descriptions",
- m_getdescs_hook, "list available commands");
-
root@test3:/var/run/ceph# ceph daemon /var/run/ceph/ceph-mds.bfudz.asok version
-
{"version":"0.67.9-222-g014b35f"}
-
root@test3:/var/run/ceph# ceph daemon /var/run/ceph/ceph-mds.bfudz.asok git_version
-
{"git_version":"014b35fc1ee0a1ad1f699a3705f3481a88614d36"}
- root@test3:/var/run/ceph#
init函数最后调用了create函数。create函数是老朋友,前面分析Log的时候已经提到,对于Thread这个类,做的事情无非就是创建线程。关键内容是,线程执行的函数是哪个?
和Log一样,是entry函数。AdminSocket类也有entry函数,该函数是AdminSocket 对应线程指定的函数:
-
void* AdminSocket::entry()
-
{
-
ldout(m_cct, 5) << "entry start" << dendl;
-
while (true) {
-
struct pollfd fds[2];
-
memset(fds, 0, sizeof(fds));
-
fds[0].fd = m_sock_fd;
-
fds[0].events = POLLIN | POLLRDBAND;
-
fds[1].fd = m_shutdown_rd_fd;
-
fds[1].events = POLLIN | POLLRDBAND;
-
int ret = poll(fds, 2, -1);
-
if (ret < 0) {
-
int err = errno;
-
if (err == EINTR) {
-
continue;
-
}
-
lderr(m_cct) << "AdminSocket: poll(2) error: '"
-
<< cpp_strerror(err) << dendl;
-
return PFL_FAIL;
-
}
-
if (fds[0].revents & POLLIN) {
-
// Send out some data
-
do_accept();
-
}
-
if (fds[1].revents & POLLIN) {
-
// Parent wants us to shut down
-
return PFL_SUCCESS;
-
}
-
}
-
ldout(m_cct, 5) << "entry exit" << dendl;
- }
这个线程函数比较简单,它监听socket fd和管道的读取端。
1 管道的读取端负责管理何时退出
2 socket fd 负责监听用户发过来的指令。
处理用户发过来的命令,是do_accept函数干的事情:
-
bool AdminSocket::do_accept()
-
{
-
struct sockaddr_un address;
-
socklen_t address_length = sizeof(address);
-
ldout(m_cct, 30) << "AdminSocket: calling accept" << dendl;
-
int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
-
&address_length);
-
ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
-
if (connection_fd < 0) {
-
int err = errno;
-
lderr(m_cct) << "AdminSocket: do_accept error: '"
-
<< cpp_strerror(err) << dendl;
-
return false;
-
}
-
-
char cmd[1024];
-
int pos = 0;
-
string c;
-
while (1) {
-
int ret = safe_read(connection_fd, &cmd[pos], 1);
-
if (ret <= 0) {
-
lderr(m_cct) << "AdminSocket: error reading request code: "
-
<< cpp_strerror(ret) << dendl;
-
close(connection_fd);
-
return false;
-
}
-
//ldout(m_cct, 0) << "AdminSocket read byte " << (int)cmd[pos] << " pos " << pos << dendl;
-
if (cmd[0] == '\0') {
-
// old protocol: __be32
-
if (pos == 3 && cmd[0] == '\0') {
-
switch (cmd[3]) {
-
case 0:
-
c = "0";
-
break;
-
case 1:
-
c = "perfcounters_dump";
-
break;
-
case 2:
-
c = "perfcounters_schema";
-
break;
-
default:
-
c = "foo";
-
break;
-
}
-
break;
-
}
-
} else {
-
// new protocol: null or \n terminated string
-
if (cmd[pos] == '\n' || cmd[pos] == '\0') {
-
cmd[pos] = '\0';
-
c = cmd;
-
break;
-
}
-
}
-
pos++;
-
}
-
-
bool rval = false;
-
-
map<string, cmd_vartype> cmdmap;
-
string format;
-
vector<string> cmdvec;
-
stringstream errss;
-
cmdvec.push_back(cmd);
-
if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
-
ldout(m_cct, 0) << "AdminSocket: " << errss << dendl;
-
return false;
-
}
-
cmd_getval(m_cct, cmdmap, "format", format);
-
if (format != "json" && format != "json-pretty" &&
-
format != "xml" && format != "xml-pretty")
-
format = "json-pretty";
-
cmd_getval(m_cct, cmdmap, "prefix", c);
-
-
string firstword;
-
if (c.find(" ") == string::npos)
-
firstword = c;
-
else
-
firstword = c.substr(0, c.find(" "));
-
-
m_lock.Lock();
-
map<string,AdminSocketHook*>::iterator p;
-
string match = c;
-
while (match.size()) {
-
p = m_hooks.find(match);
-
if (p != m_hooks.end())
-
break;
-
-
// drop right-most word
-
size_t pos = match.rfind(' ');
-
if (pos == std::string::npos) {
-
match.clear(); // we fail
-
break;
-
} else {
-
match.resize(pos);
-
}
-
}
-
-
bufferlist out;
-
if (p == m_hooks.end()) {
-
lderr(m_cct) << "AdminSocket: request '" << c << "' not defined" << dendl;
-
} else {
-
string args;
-
if (match != c)
-
args = c.substr(match.length() + 1);
-
bool success = p->second->call(match, cmdmap, format, out);
-
if (!success) {
-
ldout(m_cct, 0) << "AdminSocket: request '" << match << "' args '" << args
-
<< "' to " << p->second << " failed" << dendl;
-
out.append("failed");
-
} else {
-
ldout(m_cct, 5) << "AdminSocket: request '" << match << "' '" << args
-
<< "' to " << p->second
-
<< " returned " << out.length() << " bytes" << dendl;
-
}
-
uint32_t len = htonl(out.length());
-
int ret = safe_write(connection_fd, &len, sizeof(len));
-
if (ret < 0) {
-
lderr(m_cct) << "AdminSocket: error writing response length "
-
<< cpp_strerror(ret) << dendl;
-
} else {
-
if (out.write_fd(connection_fd) >= 0)
-
rval = true;
-
}
-
}
-
m_lock.Unlock();
-
-
TEMP_FAILURE_RETRY(close(connection_fd));
-
return rval;
- }
这个函数有点长,但是并不复杂。简单说,如果有个client尝试 connect ,该线程就poll就会感知到,然后进入do_accept函数。
do_accept首先执行accept,和client 搭上线,然后开始通信。
safe_read负责 读取客户发过来的指令。前面已经提到过,AdminSocket支持的命令是有限的,初始化之前都已注册过了。
如果client 发来的指令时注册过的指令,就见招拆招,返回相应的结果给客户端。
每一个命令的字符串,都是和一个AdminSocketHook 的类型关联的,但是一个AdminSocketHook可以对应多个command
- std::map<std::string,AdminSocketHook*> m_hooks
见招拆招的函数,就记录在对应的Hook上:
-
class CephContextHook : public AdminSocketHook {
-
CephContext *m_cct;
-
public:
-
CephContextHook(CephContext *cct) : m_cct(cct) {}
-
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
-
bufferlist& out) {
-
m_cct->do_command(command, cmdmap, format, &out);
-
return true;
-
}
- };
do_accept函数中黄色的一行,具体是实现,就是对应Hook的call函数,对于CephContextHook,就是这个类的call方法。
下面我们看下CephContextHook的call方法:即它的do_command函数:
-
void CephContext::do_command(std::string command, cmdmap_t& cmdmap,
-
std::string format, bufferlist *out)
-
{
-
Formatter *f = new_formatter(format);
-
if (!f)
-
f = new_formatter("json-pretty");
-
stringstream ss;
-
for (cmdmap_t::iterator it = cmdmap.begin(); it != cmdmap.end(); ++it) {
-
if (it->first != "prefix") {
-
ss << it->first << ":" << cmd_vartype_stringify(it->second) << " ";
-
}
-
}
-
lgeneric_dout(this, 1) << "do_command '" << command << "' '"
-
<< ss.str() << dendl;
-
if (command == "perfcounters_dump" || command == "1" ||
-
command == "perf dump") {
-
_perf_counters_collection->dump_formatted(f, false);
-
}
-
else if (command == "perfcounters_schema" || command == "2" ||
-
command == "perf schema") {
-
_perf_counters_collection->dump_formatted(f, true);
-
}
-
else {
-
f->open_object_section(command.c_str());
-
if (command == "config show") {
-
_conf->show_config(f);
-
}
-
else if (command == "config set") {
-
std::string var;
-
std::vector<std::string> val;
-
-
if (!(cmd_getval(this, cmdmap, "var", var)) ||
-
!(cmd_getval(this, cmdmap, "val", val))) {
-
f->dump_string("error", "syntax error: 'config set
'" );
-
} else {
-
// val may be multiple words
-
string valstr = str_join(val, " ");
-
int r = _conf->set_val(var.c_str(), valstr.c_str());
-
if (r < 0) {
-
f->dump_stream("error") << "error setting '" << var << "' to '" << valstr << "': " << cpp_strerror(r);
-
} else {
-
ostringstream ss;
-
_conf->apply_changes(&ss);
-
f->dump_string("success", ss.str());
-
}
-
}
-
} else if (command == "config get") {
-
std::string var;
-
if (!cmd_getval(this, cmdmap, "var", var)) {
-
f->dump_string("error", "syntax error: 'config get '");
-
} else {
-
char buf[4096];
-
memset(buf, 0, sizeof(buf));
-
char *tmp = buf;
-
int r = _conf->get_val(var.c_str(), &tmp, sizeof(buf));
-
if (r < 0) {
-
f->dump_stream("error") << "error getting '" << var << "': " << cpp_strerror(r);
-
} else {
-
f->dump_string(var.c_str(), buf);
-
}
-
}
-
} else if (command == "log flush") {
-
_log->flush();
-
}
-
else if (command == "log dump") {
-
_log->dump_recent();
-
}
-
else if (command == "log reopen") {
-
_log->reopen_log_file();
-
}
-
else {
-
assert(0 == "registered under wrong command?");
-
}
-
f->close_section();
-
}
-
f->flush(*out);
-
delete f;
-
lgeneric_dout(this, 1) << "do_command '" << command << "' '" << ss.str()
-
<< "result is " << out->length() << " bytes" << dendl;
- };