ceph源码分析: Log实现

14380阅读 3评论2015-05-30 Bean_lee
分类:云计算


每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。


下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。

ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数

创建一个重要的数据结构CephContext cct。

  1. CephContext::CephContext(uint32_t module_type_)
  2.   : nref(1),
  3.     _conf(new md_config_t()),
  4.     _log(NULL),
  5.     _module_type(module_type_),
  6.     _service_thread(NULL),
  7.     _log_obs(NULL),
  8.     _admin_socket(NULL),
  9.     _perf_counters_collection(NULL),
  10.     _perf_counters_conf_obs(NULL),
  11.     _heartbeat_map(NULL),
  12.     _crypto_none(NULL),
  13.     _crypto_aes(NULL)
  14. {
  15.   pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
  16.  
  17.   _log = new ceph::log::Log(&_conf->subsys);
  18.   _log->start();
  19.  
  20.   _log_obs = new LogObs(_log);
  21.   _conf->add_observer(_log_obs);
  22.  
  23.   _perf_counters_collection = new PerfCountersCollection(this);
  24.   _admin_socket = new AdminSocket(this);
  25.   _heartbeat_map = new HeartbeatMap(this);
  26.  
  27.   _admin_hook = new CephContextHook(this);
  28.   _admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
  29.   _admin_socket->register_command("1", "1", _admin_hook, "");
  30.   _admin_socket->register_command("perf dump", "perf dump", _admin_hook, "dump perfcounters value");
  31.   _admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
  32.   _admin_socket->register_command("2", "2", _admin_hook, "");
  33.   _admin_socket->register_command("perf schema", "perf schema", _admin_hook, "dump perfcounters schema");
  34.   _admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");
  35.   _admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set [ ...]: set a config variable");
  36.   _admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get : get the config value");
  37.   _admin_socket->register_command("log flush", "log flush", _admin_hook, "flush log entries to log file");
  38.   _admin_socket->register_command("log dump", "log dump", _admin_hook, "dump recent log entries to log file");
  39.   _admin_socket->register_command("log reopen", "log reopen", _admin_hook, "reopen log file");
  40.  
  41.   _crypto_none = new CryptoNone;
  42.   _crypto_aes = new CryptoAES;
  43. }
这个函数并不长,但是绝不简单,本文就是介绍Log线程
 
  1.   _log = new ceph::log::Log(&_conf->subsys);
  2.   _log->start();

    首先是创建了成员变量_log
    第一句 new 是创建了Log实例:基本就是初始化
   1 初始化自旋锁  m_lock
   2 初始化互斥量  m_flush_mutex 和m_queue_mutex
   3 初始化条件变量 m_con_logger 和m_cond_flusher

  1. Log::Log(SubsystemMap *s)
  2.   : m_indirect_this(NULL),
  3.     m_subs(s),
  4.     m_new(), m_recent(),
  5.     m_fd(-1),
  6.     m_syslog_log(-2), m_syslog_crash(-2),
  7.     m_stderr_log(1), m_stderr_crash(-1),
  8.     m_stop(false),
  9.     m_max_new(DEFAULT_MAX_NEW),
  10.     m_max_recent(DEFAULT_MAX_RECENT)
  11. {
  12.   int ret;
  13.  
  14.   ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);
  15.   assert(ret == 0);
  16.  
  17.   ret = pthread_mutex_init(&m_flush_mutex, NULL);
  18.   assert(ret == 0);
  19.  
  20.   ret = pthread_mutex_init(&m_queue_mutex, NULL);
  21.   assert(ret == 0);
  22.  
  23.   ret = pthread_cond_init(&m_cond_loggers, NULL);
  24.   assert(ret == 0);
  25.  
  26.   ret = pthread_cond_init(&m_cond_flusher, NULL);
  27.   assert(ret == 0);
  28.  
  29.   // kludge for prealloc testing
  30.   if (false)
  31.     for (int i=0; i < PREALLOC; i++)
  32.       m_recent.enqueue(new Entry);
  33. }

 接下来启动了log线程:
 
  1. void Log::start()
  2. {
  3.   assert(!is_started());
  4.   pthread_mutex_lock(&m_queue_mutex);
  5.   m_stop = false;
  6.   pthread_mutex_unlock(&m_queue_mutex);
  7.   create();
  8. }

 关键一句是create,这个create从哪里冒出来的? 这个调用的是 Thread里面的create 方法:

  1. int Thread::try_create(size_t stacksize)
  2. {
  3.   pthread_attr_t *thread_attr = NULL;
  4.   stacksize &= CEPH_PAGE_MASK; // must be multiple of page
  5.   if (stacksize) {
  6.     thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));
  7.     if (!thread_attr)
  8.       return -ENOMEM;
  9.     pthread_attr_init(thread_attr);
  10.     pthread_attr_setstacksize(thread_attr, stacksize);
  11.   }
  12.   int r;
  13.   // The child thread will inherit our signal mask. Set our signal mask to
  14.   // the set of signals we want to block. (It's ok to block signals more
  15.   // signals than usual for a little while-- they will just be delivered to
  16.   // another thread or delieverd to this thread later.)
  17.   sigset_t old_sigset;
  18.   if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
  19.     block_signals(NULL, &old_sigset);
  20.   }
  21.   else {
  22.     int to_block[] = { SIGPIPE , 0 };
  23.     block_signals(to_block, &old_sigset);
  24.   }
  25.   r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
  26.   restore_sigset(&old_sigset);
  27.   if (thread_attr)
  28.     free(thread_attr);
  29.   return r;
  30. }
  31. void Thread::create(size_t stacksize)
  32. {
  33.   int ret = try_create(stacksize);
  34.   if (ret != 0) {
  35.     char buf[256];
  36.     snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
  37.      "failed with error %d", ret);
  38.     dout_emergency(buf);
  39.     assert(ret == 0);
  40.   }
  41. }
创建线程,该方法做了下面的事情:
1  可以设置线程栈的size 
2  对于daemon进程,会创建线程前会阻塞SIGPIPE
3  调用pthread_create函数创建线程

等一下,线程执行什么函数? _entry_func到底是什么?

  1. void *Thread::_entry_func(void *arg) {
  2.   void *r = ((Thread*)arg)->entry();
  3.   return r;
  4. }
也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:

  1. namespace ceph {
  2. namespace log {
  3. class Log : private Thread
  4. {
  5.   Log **m_indirect_this;
  6.   SubsystemMap *m_subs;
  7.   
  8.   pthread_spinlock_t m_lock;
  9.   pthread_mutex_t m_queue_mutex;
  10.   pthread_mutex_t m_flush_mutex;
  11.   pthread_cond_t m_cond_loggers;
  12.   pthread_cond_t m_cond_flusher;
  13.   EntryQueue m_new; ///< new entries
  14.   EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail
  15.   std::string m_log_file;
  16.   int m_fd;
  17.   int m_syslog_log, m_syslog_crash;
  18.   int m_stderr_log, m_stderr_crash;
  19.   bool m_stop;
  20.   int m_max_new, m_max_recent;
  21.   void *entry();
  22.   void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
  23.   void _log_message(const char *s, bool crash);
  24. public:
  25.   Log(SubsystemMap *s);
  26.   virtual ~Log();
  27.   void set_flush_on_exit();
  28.   void set_max_new(int n);
  29.   void set_max_recent(int n);
  30.   void set_log_file(std::string fn);
  31.   void reopen_log_file();
  32.   void flush();
  33.   void dump_recent();
  34.   void set_syslog_level(int log, int crash);
  35.   void set_stderr_level(int log, int crash);
  36.   Entry *create_entry(int level, int subsys);
  37.   void submit_entry(Entry *e);
  38.   void start();
  39.   void stop();
  40. };
  41. }
  42. }
即Log的类中,有成员函数 entry,即线程应该执行的函数:

  1. void *Log::entry()
  2. {
  3.   pthread_mutex_lock(&m_queue_mutex);
  4.   while (!m_stop) {
  5.     if (!m_new.empty()) {
  6.       pthread_mutex_unlock(&m_queue_mutex);
  7.       flush();
  8.       pthread_mutex_lock(&m_queue_mutex);
  9.       continue;
  10.     }
  11.     pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
  12.   }
  13.   pthread_mutex_unlock(&m_queue_mutex);
  14.   flush();
  15.   return NULL;
  16. }
这就是log线程执行的动作。
1  m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程

暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下

  1. void Log::flush()
  2. {
  3.   pthread_mutex_lock(&m_flush_mutex);
  4.   pthread_mutex_lock(&m_queue_mutex);
  5.   EntryQueue t;
  6.   t.swap(m_new);
  7.   pthread_cond_broadcast(&m_cond_loggers);
  8.   pthread_mutex_unlock(&m_queue_mutex);
  9.   _flush(&t, &m_recent, false);
  10.   // trim
  11.   //m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了
  12.   while (m_recent.m_len > m_max_recent) {
  13.     delete m_recent.dequeue();
  14.   }
  15.   pthread_mutex_unlock(&m_flush_mutex);
  16. }
首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。

swap操作比较简单,就是交换指针的彼此指向:

  1. void swap(EntryQueue& other) {
  2.     int len = m_len;
  3.     struct Entry *h = m_head, *t = m_tail;
  4.     m_len = other.m_len;
  5.     m_head = other.m_head;
  6.     m_tail = other.m_tail;
  7.     other.m_len = len;
  8.     other.m_head = h;
  9.     other.m_tail = t;
  10.   }

然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。

  1. void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
  2. {
  3.   Entry *e;
  4.   char buf[80];
  5.   while ((e = t->dequeue()) != NULL) {
  6.     unsigned sub = e->m_subsys;
  7.     bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
  8.     bool do_fd = m_fd >= 0 && should_log;
  9.     bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
  10.     bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
  11.     if (do_fd || do_syslog || do_stderr) {
  12.       int buflen = 0;
  13.       if (crash)
  14.     buflen += snprintf(buf, sizeof(buf), "%6d> ", -t->m_len);
  15.       buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
  16.       buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
  17.             (unsigned long)e->m_thread, e->m_prio);
  18.       // FIXME: this is slow
  19.       string s = e->get_str();
  20.       if (do_fd) {
  21.     int r = safe_write(m_fd, buf, buflen);
  22.     if (r >= 0)
  23.      r = safe_write(m_fd, s.data(), s.size());
  24.     if (r >= 0)
  25.      r = write(m_fd, "\n", 1);
  26.     if (r < 0)
  27.      cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
  28.       }
  29.       if (do_syslog) {
  30.     syslog(LOG_USER, "%s%s", buf, s.c_str());
  31.       }
  32.       if (do_stderr) {
  33.     cerr << buf << s << std::endl;
  34.       }
  35.     }
  36.     requeue->enqueue(e);
  37.   }
  38. }
接下来是common字段,日志总免不了要记录消息的发生时间。:
  1. buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
这个m_stamp是utime_t类型,他的sprintf实现如下:

  1. int sprintf(char *out, int outlen) const {
  2.     struct tm bdt;
  3.     time_t tt = sec();
  4.     localtime_r(&tt, &bdt);
  5.  
  6.     return snprintf(out, outlen,
  7.     "%04d-%02d-%02d %02d:%02d:%02d.%06ld",
  8.     bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,
  9.     bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());
  10.   }

因此log的前缀是
  1. 2015-05-29 10:20:45.076105
接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID
  1. buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
  2.             (unsigned long)e->m_thread, e->m_prio);
ceph-mon是多线程的程序:
  1. root@test3:/var/log/ceph# pidof ceph--mon
  2. root@test3:/var/log/ceph# pidof ceph-mon
  3. 138812
  4. root@test3:/var/log/ceph#
  5. root@test3:/var/log/ceph# ll /proc/138812/task
  6. total 0
  7. dr-xr-xr-x 23 root root 0 May 29 16:17 ./
  8. dr-xr-xr-x 9 root root 0 May 29 16:11 ../
  9. dr-xr-xr-x 6 root root 0 May 29 16:17 138812/
  10. dr-xr-xr-x 6 root root 0 May 29 16:17 138813/
  11. dr-xr-xr-x 6 root root 0 May 29 16:17 138814/
  12. dr-xr-xr-x 6 root root 0 May 29 16:17 138815/
  13. dr-xr-xr-x 6 root root 0 May 29 16:17 138816/
  14. dr-xr-xr-x 6 root root 0 May 29 16:17 138817/
  15. dr-xr-xr-x 6 root root 0 May 29 16:17 138818/
  16. dr-xr-xr-x 6 root root 0 May 29 16:17 138819/
  17. dr-xr-xr-x 6 root root 0 May 29 16:17 138820/
  18. dr-xr-xr-x 6 root root 0 May 29 16:17 138821/
  19. dr-xr-xr-x 6 root root 0 May 29 16:17 138822/
  20. dr-xr-xr-x 6 root root 0 May 29 16:17 138823/
  21. dr-xr-xr-x 6 root root 0 May 29 16:17 138824/
  22. dr-xr-xr-x 6 root root 0 May 29 16:17 139643/
  23. dr-xr-xr-x 6 root root 0 May 29 16:17 139895/
  24. dr-xr-xr-x 6 root root 0 May 29 16:17 139896/
  25. dr-xr-xr-x 6 root root 0 May 29 16:17 143413/
  26. dr-xr-xr-x 6 root root 0 May 30 16:47 78042/
  27. dr-xr-xr-x 6 root root 0 May 30 16:47 78044/
  28. dr-xr-xr-x 6 root root 0 May 30 17:08 90496/
  29. dr-xr-xr-x 6 root root 0 May 30 17:08 90497/
因此,我们可以取来一条log查看下ceph log的前缀:
  1. 2015-05-29 10:20:45.076105 7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats avail 92% total 95990980 used 2183384 avail 88908400

值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:

当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
  1. while (m_recent.m_len > m_max_recent) {
  2.     delete m_recent.dequeue();
  3.   }

ceph提供了方法来查看最近的log,这就是log dump方法。
  1. ceph daemon /var/run/ceph/ceph-mon.*asok log dump
  2. {}

表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:

  1. v2 ==== 42+0+0 (3105867923 0 0) 0x48d6bc0 con 0x3d5c9a0
  2.    -13> 2015-05-30 17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
  3.    -12> 2015-05-30 17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
  4.    -11> 2015-05-30 17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631bdc0 next 0 (onetime)
  5.    -10> 2015-05-30 17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4230b40
  6.     -9> 2015-05-30 17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6680
  7.     -8> 2015-05-30 17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923 0 0) 0x48d5180 con 0x3d5c9a0
  8.     -7> 2015-05-30 17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
  9.     -6> 2015-05-30 17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
  10.     -5> 2015-05-30 17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631a7c0 next 0 (onetime)
  11.     -4> 2015-05-30 17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4232ac0
  12.     -3> 2015-05-30 17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6bc0
  13.     -2> 2015-05-30 17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 5 ==== mon_command({"prefix": "get_command_descriptions"} v 0) v1 ==== 80+0+0 (3374501561 0 0) 0x477f0e0 con 0x3d5c9a0
  14.     -1> 2015-05-30 17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0 v0) v1 -- ?+24689 0x3e62760 con 0x3d5c9a0
  15.      0> 2015-05-30 17:23:06.821389 7f5859662700 1 do_command 'log dump'

很有意思的是,ceph如何做到  ceph daemon  /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。

很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。

admin socket机制,那是下一篇的任务。


上一篇:使用blktrace统计磁盘块I/O访问频率
下一篇:ceph源码分析:Admin Socket机制

文章评论