python中分布式进程的详细介绍(附示例)


本文摘自php中文网,作者不言,侵删。

本篇文章给大家带来的内容是关于PHP中的SAPI是什么?如何实现?(图文),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

Python的 multiprocessing 模块不但支持多进程, 其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。

BaseManager: 提供了不同机器进程之间共享数据的一种方法;

1

(重要的点: ip:port)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

# task_master.py

 

import random

from multiprocessing import freeze_support

from queue import Queue

from multiprocessing.managers import  BaseManager

# 1. 创建需要的队列

# task_queue:发送任务的队列

# coding=utf-8

 

import random,time

from queue import Queue

from multiprocessing.managers import BaseManager

from multiprocessing import freeze_support

 

task_queue =  Queue()  # 发送任务的队列:

result_queue = Queue() # 接收结果的队列:

class QueueManager(BaseManager):  # 从BaseManager继承的QueueManager:

    pass

# windows下运行

def return_task_queue():

    global task_queue

    return task_queue  # 返回发送任务队列

def return_result_queue ():

    global result_queue

    return result_queue # 返回接收结果队列

 

def test():

    # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象

    #QueueManager.register('get_task_queue', callable=lambda: task_queue)

    #QueueManager.register('get_result_queue', callable=lambda: result_queue)

    QueueManager.register('get_task_queue', callable=return_task_queue)

    QueueManager.register('get_result_queue', callable=return_result_queue)

    # 绑定端口4000, 设置验证码'sheenstar':

    #manager = QueueManager(address=('', 4000), authkey=b'sheenstar')

    # windows需要写ip地址

    manager = QueueManager(address=('192.168.1.160', 4000), authkey=b'sheenstar')

    manager.start()  # 启动Queue:

    # 获得通过网络访问的Queue对象:

    task = manager.get_task_queue()

    result = manager.get_result_queue()

    for i in range(13):   # 放几个任务进去:

        n = random.randint(0, 10000)

        print('Put task %d...' % n)

        task.put(n)

    # 从result队列读取结果:

    print('Try get results...')

    for i in range(13):

        r = result.get(timeout=10)

        print('Result: %s' % r)

 

    # 关闭:

    manager.shutdown()

    print('master exit.')

if __name__=='__main__':

    freeze_support()

    print('start!')

    test()

运行程序,会等待执行结果10s,如果没有worker端获取任务,返回结果,程序将报错。

1669045229-5ba335502bbd1_articlex.png

当我们在一台机器上写多进程程序时,创建的 Queue 可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的 task_queue 进行操作,那样就绕过了QueueManager 的封装,必须通过manager.get_task_queue()获得的 Queue 接口添加。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

# coding=utf-8

import time, sys

from queue import Queue

from multiprocessing.managers import BaseManager

 

# 创建类似的QueueManager:

class QueueManager(BaseManager):

    pass

 

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:

QueueManager.register('get_task_queue')

QueueManager.register('get_result_queue')

 

# 连接到服务器,也就是运行task_master.py的机器:

server_addr = '192.168.1.160'

print('Connect to server %s...' % server_addr)

# 端口和验证码注意保持与task_master.py设置的完全一致:

m = QueueManager(address=(server_addr, 4000), authkey=b'sheenstar')

# 从网络连接:

try:

    m.connect()

except:

    print('请先启动task_master.py!')

    #sys.exit("sorry, goodbye!");

# 获取Queue的对象:

task = m.get_task_queue()

result = m.get_result_queue()

# 从task队列取任务,并把结果写入result队列:

for i in range(13):

    try:

        n = task.get()

        print('run task %d * %d...' % (n, n))

        r = '%d * %d = %d' % (n, n, n*n)

        time.sleep(1)

        result.put(r)

    except ConnectionResetError as e:

        print("任务执行结束,自动断开连接")

# 处理结束:

print('worker exit.')

使用命令行运行程序,结果更直观

479084071-5ba3351d8ee62_articlex.png

以上就是python中分布式进程的详细介绍(附示例)的详细内容,更多文章请关注木庄网络博客!!

相关阅读 >>

Python学习】pycharm超实用使用教程

Python中找出numpy array数组的最值及其索引方法

黑马云课堂8天深入理解Python视频资料

Python中排列组合计算操作的实现示例

Python实现简单配置发送邮件的功能

列表、元组、字符串是Python的什么序列?

如何在Python中使用运算符?(代码实例)

Python变量类型 -元组的实际运用与意义

Python中的析构函数详解

Python最长回文串算法

更多相关阅读请进入《Python》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...