各位用户为了找寻关于Python自定义主从分布式架构实例分析的资料费劲了很多周折。这里教程网为您整理了关于Python自定义主从分布式架构实例分析的相关资料,仅供查阅,以下为您介绍关于Python自定义主从分布式架构实例分析的详细内容
本文实例讲述了Python自定义主从分布式架构。分享给大家供大家参考,具体如下:
环境:Win7 x64,Python 2.7,APScheduler 2.1.2。
原理图如下:
代码部分:
(1)、中心节点:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import
SocketServer, socket, Queue
CenterIP
=
'127.0.0.1'
#中心节点IP
CenterListenPort
=
9999
#中心节点监听端口
CenterClient
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#中心节点用于发送网络消息的socket
TaskQueue
=
Queue.Queue()
#任务队列
#获取任务队列
def
GetTaskQueue():
for
i
in
range
(
1
,
11
):
TaskQueue.put(
str
(i))
#CenterServer的回调函数,在接受到udp报文是触发
class
MyUDPHandler(SocketServer.BaseRequestHandler):
def
handle(
self
):
data
=
self
.request[
0
].strip()
socket
=
self
.request[
1
]
print
(data)
if
data.startswith(
'wait'
):
vec
=
data.split(
':'
)
if
len
(vec) !
=
3
:
print
(
'Error: len(vec) != 3'
)
else
:
nodeIP
=
vec[
1
]
nodeListenPort
=
vec[
2
]
nodeID
=
nodeIP
+
':'
+
nodeListenPort
if
not
TaskQueue.empty():
task
=
TaskQueue.get()
print
(
'send task '
+
task
+
' to '
+
nodeID)
CenterClient.sendto(
'task:'
+
task, (nodeIP,
int
(nodeListenPort)))
else
:
print
(
'TaskQueue is empty!'
)
GetTaskQueue()
#获取任务队列
CenterServer
=
SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print
(
'Listen port '
+
str
(CenterListenPort)
+
' ...'
)
CenterServer.serve_forever()
(2)、任务节点:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import
time, socket, SocketServer
from
apscheduler.scheduler
import
Scheduler
CenterIP
=
'127.0.0.1'
#中心节点IP
CenterListenPort
=
9999
#中心节点监听端口
NodeIP
=
socket.gethostbyname(socket.gethostname())
#任务节点自身IP
NodeClient
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#任务节点用于发送网络消息的socket
#任务:发送网络信息
def
jobSendNetMsg():
msg
=
''
if
NodeServer.TaskState
=
=
'wait'
:
msg
=
'wait:'
+
NodeIP
+
':'
+
str
(NodeListenPort)
elif
NodeServer.TaskState
=
=
'exec'
:
msg
=
'exec:'
+
NodeIP
+
':'
+
str
(NodeListenPort)
print
(msg)
NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def
InitTimer():
sched
=
Scheduler()
sched.add_interval_job(jobSendNetMsg, seconds
=
1
)
sched.start()
#执行任务
def
ExecTask(task):
print
(
'ExecTask '
+
task
+
' ...'
)
time.sleep(
2
)
print
(
'ExecTask '
+
task
+
' over'
)
#NodeServer的回调函数,在接受到udp报文是触发
class
MyUDPHandler(SocketServer.BaseRequestHandler):
def
handle(
self
):
data
=
self
.request[
0
].strip()
socket
=
self
.request[
1
]
print
(
'recv data: '
+
data)
if
data.startswith(
'task'
):
vec
=
data.split(
':'
)
if
len
(vec) !
=
2
:
print
(
'Error: len(vec) != 2'
)
else
:
task
=
vec[
1
]
self
.server.TaskState
=
'exec'
ExecTask(task)
self
.server.TaskState
=
'wait'
InitTimer()
NodeServer
=
SocketServer.UDPServer(('',
0
), MyUDPHandler)
NodeServer.TaskState
=
'wait'
#(exec/wait)
NodeListenPort
=
NodeServer.server_address[
1
]
print
(
'NodeListenPort:'
+
str
(NodeListenPort))
NodeServer.serve_forever()
希望本文所述对大家Python程序设计有所帮助。