GearmanWorker的多进程实现

1470阅读 0评论2017-11-27 andersonyan
分类:云计算

前言

因为项目原因选择了gearman作为任务委派的中间件,但原生的python拓展包只支持单进程,期间为了将gearman改造成自适应多进程的方式在实现方式上走进了些误区,故在此记录这些误区的坑以及目前的最优解决方案。

实现思路

实现方式

  1. 主进程接收任务,子进程处理任务。以一个主进程作为任务委派的接收进程,接收到任务后将任务分派给子进程进行处理,处理完成后由该子进程直接返回任务结果给gearman。
  2. 多进程接收并处理任务。批量fork多个子进程注册任务,子进程间互不影响,各自完成接收、处理任务的过程。

先说说第一种实现方式的优缺点

优点:

缺点:

再来说说第二种实现方式的优缺点

优点:

缺点:

选择第二种方案,针对缺点的解决办法

  1. 利用PID文件记录每个子进程的pid,确保主进程退出后仍能通过PID文件退出子进程。
  2. 利用Redis的发布订阅模式实现GearmanWorker的正常退出。

client 端代码: test_multi_gearman_worker.py

  1. #!/usr/bin/env python
  2. from gearman import GearmanClient

  3. gearman_client = GearmanClient(['10.10.13.5:4730'])


  4. new_jobs = [
  5.     dict(task='test_multi_gearman_worker', data='job-0.mp4'),
  6.     dict(task='test_multi_gearman_worker', data='job-1.mp4'),
  7.     dict(task='test_multi_gearman_worker', data='job-2.mp4'),
  8.     dict(task='test_multi_gearman_worker', data='job-3.mp4'),
  9.     dict(task='test_multi_gearman_worker', data='job-4.mp4'),
  10.     dict(task='test_multi_gearman_worker', data='job-5.mp4'),
  11.     dict(task='test_multi_gearman_worker', data='job-6.mp4'),
  12.     dict(task='test_multi_gearman_worker', data='job-7.mp4'),
  13.     dict(task='test_multi_gearman_worker', data='job-8.mp4'),
  14.     dict(task='test_multi_gearman_worker', data='job-9.mp4'),
  15.     dict(task='test_multi_gearman_worker', data='job-10.mp4'),
  16. ]


  17. completed_requests = gearman_client.submit_multiple_jobs(new_jobs)
  18. for current_request in completed_requests:
  19.     print (current_request.result)


  20. print ("Game over!")

worker端代码: multi-gearman-worker.py

  1. #!/usr/bin/env python

  2. import os
  3. import string
  4. import signal
  5. import threading
  6. import subprocess
  7. import multiprocessing

  8. import redis
  9. from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS

  10. WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid'


  11. class MultiGearmanWorker(GearmanWorker):
  12.     """ multi-process gearman worker """
  13.     def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID):
  14.         super(MultiGearmanWorker, self).__init__(host_list=host_list)
  15.         self.redis_host = redis_host
  16.         self.redis_port = redis_port
  17.         self.pid = pid

  18.     def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()):
  19.         """
  20.         start working, blocking process first
  21.         :param poll_timeout: int , the connection time of gearman
  22.         :param process: int , the number of process for working , the default is the number of cpu-core
  23.         :return:
  24.         """
  25.         print('Clear last process.')
  26.         self.gearman_worker_exit()
  27.         print('Ready to start %d process for work.' % process)
  28.         gm_poll = multiprocessing.Pool(process)
  29.         for x in range(0, process):
  30.             print('start %d child process', x)
  31.             gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid))

  32.         gm_poll.close()
  33.         gm_poll.join()

  34.         # Delete the PID file if all sub-process exit normally
  35.         if os.path.isfile(self.pid):
  36.             os.remove(self.pid)

  37.         print('Multi gearman worker exit.')


  38.     def gearman_worker_exit(self):
  39.         """ Terminate sub-process """
  40.         if not os.path.isfile(self.pid):
  41.             return True

  42.         with open(self.pid, 'r+') as f:
  43.             for pid in f.readlines():
  44.                 pid = int(pid)
  45.                 try:
  46.                     os.kill(pid, signal.SIGKILL)
  47.                     print('Kill process %d.' % pid)
  48.                 except OSError:
  49.                     print('Process %d not exists' % pid)
  50.                     continue

  51.         os.remove(self.pid)
  52.         print('Remove process pid file.')
  53.         return True


  54. #The gearman job switch identifier used by the child process
  55. GEARMAN_CONTINUE_WORK = True


  56. def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID):
  57.     """ Open gearman;s worker in multiple processes """
  58.     try:
  59.         """ Record the child process pid,
  60.              so that the main process is cleared by the supervisor
  61.         to clear the child process did not exit the last time
  62.         """
  63.         with open(pid, 'a+') as f:
  64.             f.write("%d%s" % (os.getpid(), os.linesep))

  65.         print('Child process start for work.')
  66.         continue_working = True
  67.         worker_connections = []
  68.         d = threading.Thread(name='monitor', target=gearman_monitor,
  69.                              args=(gm_worker.redis_host, gm_worker.redis_port))
  70.         d.start()

  71.         def continue_while_connections_alive(any_activity):
  72.             return gm_worker.after_poll(any_activity)

  73.         # Shuffle our connections after the poll timeout
  74.         while continue_working and GEARMAN_CONTINUE_WORK:
  75.             worker_connections = gm_worker.establish_worker_connections()
  76.             continue_working = gm_worker.poll_connections_until_stopped(
  77.                 worker_connections, continue_while_connections_alive, timeout=poll_timeout)

  78.         # If we were kicked out of the worker loop, we should shutdown all our connections
  79.         for current_connection in worker_connections:
  80.             current_connection.close()

  81.         print('Gearman worker closed')
  82.         return None
  83.     except Exception as e:
  84.         print(e)


  85. def gearman_monitor(redis_host, redis_port):
  86.     """ Listen to dynamic update instructions """
  87.     global GEARMAN_CONTINUE_WORK
  88.     print('Start gearman monitor.')
  89.     while GEARMAN_CONTINUE_WORK:
  90.         """To prevent abnormal operation caused by the thread is not monitoring the redis response after hanging up,
  91.        exception handling on here, after an exception re-listen
  92.         """
  93.         try:
  94.             sub = redis.StrictRedis(redis_host, redis_port).pubsub()
  95.             sub.subscribe('hot')
  96.             for i in sub.listen():
  97.                 if isinstance(i.get('data'), str):
  98.                     if i.get('data') == 'exit':
  99.                         # worker???????????????????
  100.                         print('Gearman monitor receive restart signal.')
  101.                         GEARMAN_CONTINUE_WORK = False
  102.                         sub.unsubscribe('hot')
  103.                         break
  104.                 # ????????,?????????gearman worker????????

  105.         except Exception as e:
  106.             print(e)
  107.             try:
  108.                 sub.unsubscribe('hot')
  109.             except Exception:
  110.                 pass

  111.     print('Gearman monitor closed')


  112. if __name__ == '__main__':
  113.     def test_multi_gearman_worker(worker, job):
  114.         print('who ', worker)
  115.         print('do what ', job.data)
  116.         print('Game over! ', job.data)
  117.         return job.data




  118.     gearman_worker = MultiGearmanWorker(('10.10.13.5:4730', ), '10.10.13.8', 6379)
  119.     gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker)
  120.     gearman_worker.work(POLL_TIMEOUT_IN_SECONDS, 5)
注:
1. 需要先有安装好的redis; 
2. 需要先有安装好的gearman

上一篇:在linux服务器上安装单机版redis
下一篇:GearmanWorker的多进程实现