点击(此处)折叠或打开
-
# -*- coding: UTF-8 -*-
-
import re
-
import socket
-
import select
-
import logging
-
import struct
-
import time
-
-
# 里面存放自定义常量
-
import _static
-
-
__author__ = 'loliz_000'
-
-
-
def decode_socket_data(data_buffer):
-
"""
-
命令包解析函数,所有包都以WPROJ_PACKAGER_PACK_HEAD + 包命令类型为开头 包内存在string,则第三部分为string的长度
-
@param data_buffer:
-
@return 返回none,解析失败
-
@return 返回结构 (剩余半包, [(包1命令码,包1结果码,包1内容), (包2命令码,包2结果码,包2内容)])
-
"""
-
data_list = []
-
buffer_size = len(data_buffer)
-
if buffer_size < 12:
-
return data_buffer, data_list
-
try:
-
# 这里是我自定义包结构
-
wproj_head_code, package_command_type, code, package_len = struct.unpack('>HHII', data_buffer[0:12])
-
# 解包,判断包头码
-
if wproj_head_code != _static.WPROJ_PACKAGER_PACK_HEAD:
-
return None
-
except struct.error:
-
return None
-
# 包长度过长
-
if package_len >= _static.WPROJ_PACKAGER_MAX_PACK_LEN:
-
logging.warning('decode_socket_data find data over max size')
-
return None
-
# 没有包体
-
if buffer_size == 12:
-
if package_len > 0:
-
return None
-
data_list.append((package_command_type, code, None))
-
return None, data_list
-
# 所需包长度 > 实际包长, 这是一个半包
-
if package_len > len(data_buffer[12:]):
-
return data_buffer, data_list
-
# 包长度正好
-
elif package_len == len(data_buffer[12:]):
-
try:
-
data = struct.unpack('>%ds' % package_len, data_buffer[12:])
-
except struct.error:
-
return None
-
data_list.append((package_command_type, code, data))
-
return None, data_list
-
# 所需包长度 < 实际包长 有粘包,递归处理
-
else:
-
try:
-
data = struct.unpack('>%ds' % package_len, data_buffer[12:12+package_len])
-
except struct.error:
-
return None
-
data_list.append((package_command_type, code, data))
-
decode_socket_res = decode_socket_data(data_buffer[12+package_len:])
-
if not decode_socket_res:
-
return None
-
return decode_socket_res[0], data_list.extend(decode_socket_res[1])
-
-
-
-
class AgentServer:
-
"""
-
打包端守护进程
-
"""
-
LOG_TYPE_RE = re.compile('^((DEBUG)|(INFO)|(WARNING)|(ERROR)|(CRITICAL))+?$', re.IGNORECASE)
-
-
def __init__(self):
-
self.bind_port = 2000
-
self.bind_sock = None
-
self.epoll = None
-
self.bind_lock = None
-
self.queue = None
-
self.socket_pool_dict = {}
-
-
def bind_sock_start(self):
-
logging.info('PackageAgentServer bind socket start')
-
bind_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
#bind_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
try:
-
socket.setdefaulttimeout(10)
-
bind_sock.bind(('0.0.0.0', self.bind_port))
-
bind_sock.listen(100)
-
self.bind_sock = bind_sock
-
logging.warning('PackageAgentServer bind socket success')
-
except socket.error:
-
logging.warning('PackageAgentServer bind socket get socket error')
-
return False
-
except OSError:
-
logging.warning('PackageAgentServer bind socket get os error')
-
return False
-
except IOError:
-
logging.warning('PackageAgentServer bind socket get io error')
-
return False
-
logging.info('PackageAgentServer bind socket end')
-
self.epoll = select.epoll()
-
self.epoll.register(self.bind_sock.fileno(), select.EPOLLIN)
-
return True
-
-
-
def add_to_socket_pool(self, sock):
-
self.bind_lock.acquire()
-
# socket 连接池 添加
-
self.epoll.register(sock.fileno(), select.EPOLLIN | select.EPOLLET)
-
self.socket_pool_dict[sock.fileno] = (sock, None)
-
self.bind_lock.release()
-
-
def del_from_socket_pool(self, sock):
-
if isinstance(sock, socket.socket):
-
fd = sock.fileno()
-
else:
-
fd = sock
-
self.bind_lock.acquire()
-
try:
-
# socket 连接池移除
-
self.socket_pool_dict.pop(fd)
-
self.epoll.unregister(fd, select.EPOLLIN | select.EPOLLET)
-
except KeyError:
-
logging.warning('del_from_socket_pool pop sock more then once')
-
self.bind_lock.release()
-
-
def connection_loop(self):
-
"""
-
获取新tcp-ip连接,另启一个线程运行
-
"""
-
error_count = 0
-
logging.info('scoket connection_loop start')
-
while 1: # 这个循不在主线程里运行,fork后不需要处理
-
# 退出标记为1结束循环
-
if self.single_out:
-
break
-
if error_count > 500:
-
logging.warning('connection_loop get too much error')
-
self.single_out = 1
-
continue
-
for fd, events in self.epoll.poll():
-
if fd == self.bind_sock.fileno():
-
client_sock, client_ipaddr = self.bind_sock.accept()
-
self.add_to_socket_pool(client_sock)
-
elif select.EPOLLIN & events:
-
cur_sock, half_package = self.socket_pool_dict[fd]
-
data = cur_sock.recv(10000)
-
# 有半包数据
-
if half_package:
-
data = half_package + data
-
# 调用解析数据包
-
decode_data = decode_socket_data(data)
-
# 包解析出错,返回None
-
if not decode_data:
-
# 自己写个关闭函数
-
close_ret = close_sock(cur_sock, _static.DECODE_PACKAGE_FAILURE, 'package decode failure')
-
if close_ret['ret'] > 0:
-
logging.error('close socket error at DECODE_PACKAGE, error %s' % close_ret['msg'])
-
self.del_from_socket_pool(fd)
-
continue
-
# 包解析正确, 半包传入socket_pool_dict,供下次数据处理用
-
self.socket_pool_dict[fd][1], data_list = decode_data
-
# 已经处理好的数据列表塞入队列, 这里可以做点优化, 直接在解析包的函数中就塞入队列
-
for _data in data_list:
-
self.queue.put((cur_sock, _data))
-
elif select.EPOLLHUP & events:
-
self.del_from_socket_pool(fd)
-
else:
-
logging.warning('unkonw event')
-
-
-
def work_loop(self):
-
while 1:
-
# 退出标记为1,结束循环
-
if self.single_out:
-
break
-
if self.queue.empty():
-
time.sleep(0.01)
-
else:
-
queue_data = self.queue.get()
-
cur_sock = queue_data[0]
-
package_command_type, code, data = queue_data[1]
-
# do what you want do
-
-
def start(self):
-
# 绑定端口
-
if not self.bind_sock_start():
-
raise
-
import threading
-
# 启动数据接收线程
-
data_thread = threading.Thread(target=self.connection_loop)
-
data_thread.setDaemon(True)
-
data_thread.start()
-
# 启动数据处理线程
-
data_thread = threading.Thread(target=self.work_loop)
-
data_thread.setDaemon(True)
- data_thread.start()