Python多线程多进程中的几个坑

Introduction

本文系转载,原文:今天遇到的Python多线程、多进程中的几个坑

 

今天在写oj的判题端的时候犯了一个低级错误,就是为了加快判题速度,我就采用了多线程多组用例同时运行的方法,但是后来不经意的发现,明明跑的很快的程序到了我这实际运行时间就变成了好几倍,而cpu时间并没有太大的变化。

我开始怀疑是runner的问题,因为以前使用ptrace的runner的时候,ptrace会在进程用户态和内核态之间反复的检查,导致程序运行缓慢。但是我手动的使用命令行启动runner运行的时候,发现并没有问题,cpu时间和实际运行时间几乎一样的。我就开始怀疑是我的Python代码的问题,后来恍然大悟,为了能让cpu时间更长一些,方便测试,我写了一个时间复杂度很高的c程序,这是在运行一个cpu密集型的应用啊,而由于GIL的存在,Python多线程并不适合干这个,这个场景应该使用多进程。更详细的解释参考 http://www.oschina.net/translate/pythons-hardest-problem

下面是我的测试数据,

//多线程
//两组用例同时运行
{'cpu_time': 3543.0, 'real_time': 13384.0, 'test_case_id': 2}
{'cpu_time': 3592.0, 'real_time': 13688.0, 'test_case_id': 1}

//只有一组测试用例
{'cpu_time': 3612.0, 'real_time': 6856.0, 'test_case_id': 1}

很明显的结果,下面是采用了多进程之后的测试数据

//多进程
//两组用例同时运行
{'cpu_time': 4110.0, 'real_time': 4250.0, 'test_case_id': 2}
{'cpu_time': 4121.0, 'real_time': 4298.0, 'test_case_id': 1}

//一组用例
{'cpu_time': 3861.0, 'real_time': 4040.0, 'test_case_id': 1}

好了,其实我不是专门想说这个的了,因为这是一个愚蠢的问题。我要记录一下今天遇到的三个多进程中的问题:

第一个问题

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

poc如下

from multiprocessing import Pool


class Runner(object):
    def func(self, i):
        print i
        return i


runner = Runner()
pool = Pool(processes=5)
for i in range(5):
    pool.apply_async(runner.func, (i, ))
pool.close()
pool.join()

这个问题只出现在Python2上,Python3没有问题。这是因为多进程之间要使用pickle来序列化并传递一些数据,但是实例方法并不能被pickle,参见Python文档,可以被pickle的类型列表,还有在Python3中实例方法可以被pickle了,见Python bug list

最简单的解决办法就是写一个可以被pickle的函数代理一下

from multiprocessing import Pool


def run(cls_instance, i):
    return cls_instance.func(i)


class Runner(object):
    def func(self, i):
        print i
        return i


runner = Runner()
pool = Pool(processes=5)
for i in range(5):
    pool.apply_async(run, (runner, i))
pool.close()
pool.join()

还有一个方法已经被指出可能存在缺陷了,就是这个人第一个例子,但是我不知道为什么一个类可以被析构多次呢?是不是这个类实例化一次以后就被复制到了各个进程上,然后再单独进行的析构呢。这个人第二个例子是反驳的__call__方法的,我没法运行,总是提示Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed,估计是一样的原因。

第二个问题

pool作为实例变量的时候出错 pool objects cannot be passed between processes or pickled

把上面的例子稍微的改造了一下,

from multiprocessing import Pool


def run(cls_instance, i):
    return cls_instance.func(i)


class Runner(object):
    def __init__(self):
        pool = Pool(processes=5)
        for i in range(5):
            pool.apply_async(run, (self, i))
        pool.close()
        pool.join()

    def func(self, i):
        print i
        return i


runner = Runner()

把pool放在实例内部了,使用外部函数代理,运行正常。但是如果把里面的pool都换成self.pool的话,就会出现上面的错误。原因是在pickle传递给pool的对象的时候,这个对象就包含pool这个实例变量,它不能被pickle,造成错误。而不使用self的话,那就是__init__方法的一个局部变量,不受影响。解决方法就是自己实现__getstate__方法,它是决定什么需要pickle的函数,我们删除掉pool,不让它pickle就好了。__setstate__作用是相反的,是用来增加实例变量的。

看例子

from multiprocessing import Pool


def run(cls_instance, i):
    return cls_instance.func(i)


class Runner(object):
    def __init__(self):
        self.pool = Pool(processes=2)
        self.var = 10
        for i in range(2):
            self.pool.apply_async(run, (self, i))
        self.pool.close()
        self.pool.join()

    def func(self, i):
        print i
        return i

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        print self.__dict__
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        print state
        self.__dict__.update(state)


runner = Runner()

输出是

{'var': 10, 'pool': <multiprocessing.pool.Pool object at 0x102e99790>}
{'var': 10, 'pool': <multiprocessing.pool.Pool object at 0x102e99790>}
{'var': 10}
0
{'var': 10}
1

也就是实例在unpickle的时候丢了pool这个变量,但是我们也不需要了,所以可以这样解决问题。

第三个问题

子进程引发的异常怎么消失了?

from multiprocessing import Pool


def run(cls_instance, i):
    return cls_instance.func(i)


class Runner(object):
    def __init__(self):
        self.pool = Pool(processes=2)
        self.var = 10
        for i in range(2):
            self.pool.apply_async(run, (self, i))
        self.pool.close()
        self.pool.join()

    def func(self, i):
        if i == 1:
            raise ValueError("xxx")
        print i
        return i

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)


runner = Runner()

只能打印出一个0来,当i为1的时候有一个异常啊,怎么没显示出来。在文档中这么说的

get([timeout]) Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

apply_async返回的是AsyncResult,其中出现的异常只有在调用AsyncResult.get()的时候才会被重新引发。

from multiprocessing import Pool


def run(cls_instance, i):
    return cls_instance.func(i)


class Runner(object):
    def __init__(self):
        self.pool = Pool(processes=2)
        results = []
        for i in range(2):
            results.append(self.pool.apply_async(run, (self, i)))
        self.pool.close()
        self.pool.join()
        for i in range(2):
            print results[i].get()

    def func(self, i):
        if i == 1:
            raise ValueError("xxx")
        return i

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)


runner = Runner()

这样就能看到异常了。

 

Reference

python中的Queue与多进程(multiprocessing)

python进程池:multiprocessing.pool

python进程池专题总结

python doc 16.6. multiprocessing — Process-based “threading” interface

 

文章版权归 FindHao 所有丨本站默认采用CC-BY-NC-SA 4.0协议进行授权|
转载必须包含本声明,并以超链接形式注明作者 FindHao 和本文原始地址:
https://www.findhao.net/easycoding/2410

你可能喜欢:

Find

新浪微博(FindHaoX86)QQ群:不安分的Coder(375670127)不安分的Coder 微信公众号(findhao-net)

1 条回复

  1. antior说道:

    好玩,过一阵子试试多线程。

发表评论

电子邮件地址不会被公开。 必填项已用*标注