• 隐藏侧边栏
  • 展开分类目录
  • 关注微信公众号
  • 我的GitHub
  • QQ:1753970025
Chen Jiehua

Python 多进程/线程 

数据量太大,单个进程处理不完,咋办?试试python的多进程/多线程并发……

multiprocessing

Process

multiprocessing模块提供了一个Process类来代表一个进程对象:

import multiprocessing

procs_num = 10

def run_in_multiprocess():
    process = []
    for i in range(procs_num):
        p = multiprocessing.Process(target=worker, args=(params,))
        p.start()
        process.append(p)
   
    for p in process:
        p.join()

def worker(params):
    # do something 
    pass

Queue

import multiprocessing

result_queue = multiprocessing.Queue()

def run_in_multiprocess():
    process = []
    for i in range(procs_num):
        p = multiprocessing.Process(target=worker, args=(params,))
        p.start()
        process.append(p)
   
    for p in process:
        p.join()

    result = []
    for p in process:
        result.append(result_queue.get())

def worker(): 
    result = ... 
    result_queue.put(result)

Gevent

Gevent是python的一个并发框架,以微线程greenlet为核心。

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

gevent.monkey

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。

import requests
import gevent
import gevent.monkey

gevent.monkey.patch_all()

def process_with_gevent(urls):
    """ 采用gevent进行处理 """
    jobs = [gevent.spawn(worker, url) for url in urls]
    gevent.joinall(jobs)
    result = []
    for job in jobs:
        result.append(job.value)

def worker(url):
    r = requests.get(url)
    return r.text

gevent.Greenlet

启动一个greenlet线程,我们可以通过 gevent.Greenlet

from gevent import Greenlet

g = Greenlet(myfunction, 'arg1', 'arg2', kwarg1=1)
g.start()

# 或者也可以通过类方法 spawn() 来启动

g = Greenlet.spawn(myfunction, 'arg1', 'arg2', kwarg1=1)

gevent.pool

如果要限制 greenlet 的数量,我们可以通过 gevent.pool 来实现:

import gevent.pool

pool_size = 100 

def proecss_with_gevent_pool(urls):
    pool = gevent.pool.Pool(pool_size)
    for url in urls:
        pool.spawn(worker, url)
    pool.join()

gevent.queue

对于每个greeenlet执行的结果,我们可以通过 queue 来保存,统一处理:

import gevent.queue

gevent_queue = gevent.queue.Queue(1000)

def process_with_gevent(urls):
    jobs = [gevent.spawn(worker, url) for url in urls]
    gevent.joinall(jobs)
    gevent_queue.put(StopIteration)

def worker(url):
    r = requests.get(url)
    gevent_queue.put(r.text)

def process_result(): 
    for item in gevent_queue:
        print item 

 

码字很辛苦,转载请注明来自ChenJiehua《Python 多进程/线程》

评论