点击(此处)折叠或打开
-
#! /usr/bin/python
-
# -*- coding: UTF-8 -*-
-
-
import sys,time,select,threading
-
import struct,Queue,copy
-
-
class TalkServer(object):
-
MinMassageLen = 400
-
MaxMassageLen = 1200
-
def __init__(self):
-
self.sock = None
-
self.bindEopll = None
-
self.clientEopll = None
-
self.connectionPoll = {}
-
self.clientLock = None
-
self.cond = None
-
self.dataQueue = None
-
-
def __start__(self):
-
self.bindEopll = select.epoll()
-
self.clientEopll = select.epoll()
-
self.bindEopll.register(self.sock.fileno(), select.EPOLLIN)
-
self.clientLock = threading.Lock()
-
self.cond = threading.Condition()
-
self.dataQueue = Queue.Queue()
-
-
def __bindDeamon(self):
-
while True:
-
events = self.bindEopll.poll(1)
-
for fileno, event in events:
-
if fileno == self.sock.fileno():
-
self.clientLock.acquire()
-
newConnection, address = self.sock.accept()
-
self.clientEopll.register(newConnection.fileno(), select.EPOLLIN)
-
# 保存客户端socket文件符fileno及其对应的连接,同时初始化一个0字节的str,用于存放分包数据
-
self.connectionPoll[newConnection.fileno()] = [newConnection,'']
-
self.clientLock.release()
-
else:
-
self.clientLock.acquire()
-
for clientFileno in self.connectionPoll.keys():
-
self.clientEopll.unregister(clientFileno)
-
self.connectionPoll[clientFileno][0].close()
-
self.connectionPoll.clear()
-
self.clientEopll.close()
-
self.bindEopll.unregister(self.sock.fileno())
-
self.bindEopll.close()
-
self.sock.close()
-
self.clientLock.release()
-
-
def __clientDeamon(self):
-
while True:
-
events = self.clientEopll.poll(0.01)
-
for fileno, event in events:
-
if event & select.EPOLLIN:
-
data = self.connectionPoll[fileno].recv(1024)
-
if len(data) > 0:
-
self.cond.acquire()
-
self.__unpackDataToQueue(data, fileno)
-
self.cond.release()
-
else:
-
self.clientLock.acquire()
-
self.clientEopll.unregister(fileno)
-
self.connectionPoll[fileno][0].close()
-
del self.connectionPoll[fileno]
-
self.clientLock.release()
-
-
def __unpackDataToQueue(self,data,sockFileno):
-
"""An iteraor to unpack data from socket buffer"""
-
if self.connectionPoll[sockFileno][1] != '':
-
# 上次解包有未处理的分包数据
-
if len(self.connectionPoll[sockFileno][1])<8:
-
# 上次遗留的分包长度小于包头长度
-
if len(self.connectionPoll[sockFileno][1]) + len(data) < 8:
-
# 合并后长度依旧不小于包头长度,直接合并两次数据包
-
self.connectionPoll[sockFileno][1] += data
-
return
-
try:
-
# 尝试将上次遗留数据与新数据合并
-
packMark,dataLen = struct.unpack('Ii',self.connectionPoll[sockFileno][1] + data)
-
# 检查合并数据包头,包头错误直接丢弃上次数据
-
if (packMark != 1346 or ((dataLen > MaxMassageLen - 4) or (dataLen < MinMassageLen + 4))):
-
self.connectionPoll[sockFileno][1] = ''
-
__unpackDataToQueue(data,sockFileno)
-
return
-
else:
-
# 包头正确合并两数据包并清除保存遗留数据的变量,递归调用解包函数
-
data = self.connectionPoll[sockFileno][1] + data
-
self.connectionPoll[sockFileno][1] = ''
-
__unpackDataToQueue(data,sockFileno)
-
return
-
except:
-
self.connectionPoll[sockFileno][1] = ''
-
__unpackDataToQueue(data,sockFileno)
-
return
-
else:
-
data = self.connectionPoll[sockFileno][1] + data
-
self.connectionPoll[sockFileno][1] = ''
-
__unpackDataToQueue(data,sockFileno)
-
return
-
if len(data)>8:
-
# 数据包长度大于包头长度
-
try:
-
packMark,dataLen = struct.unpack('Ii',data)
-
if (packMark != 1346 or ((dataLen > MaxMassageLen - 4) or (dataLen < MinMassageLen + 4))):
-
return
-
except:
-
return
-
if dataLen > len(data) - 8:
-
self.connectionPoll[sockFileno][1] = data
-
return
-
elif (dataLen == len(data)):
-
# self.cond.acquire()
-
self.dataQueue.put((data[8:],sockFileno),False)
-
# self.cond.release()
-
return
-
elif dataLen < len(data) - 8:
-
# self.cond.acquire()
-
self.dataQueue.put((data[8:dataLen + 8],sockFileno),False)
-
# self.cond.release()
-
__unpackDataToQueue(data[dataLen + 8:],sockFileno)
-
return
-
else:
-
# 数据包长度小于包头长度,保存当前数据分包
-
self.connectionPoll[sockFileno][1] = data
-
return
-
-
-
def __talkDeamon(self):
-
time.sleep(5)
-
while True:
-
self.cond.acquire()
-
if(self.dataQueue.empty()):
-
self.cond.wait()
-
while(not self.dataQueue.empty()):
-
package = self.dataQueue.get()
- self.cond.release()
1