GVKun编程网logo

为什么在python map()和multiprocessing.Pool.map()中得到不同的答案?(为什么要用map)

26

本文将分享为什么在pythonmap的详细内容,并且还将对和multiprocessing.Pool.map进行详尽解释,此外,我们还将为大家带来关于Java8中的map()和flatMap()方法之

本文将分享为什么在python map的详细内容,并且还将对和multiprocessing.Pool.map进行详尽解释,此外,我们还将为大家带来关于Java 8中的map()和flatMap()方法之间有什么区别?、multiprocessing.Manager()dict()setdefault()是否损坏?、multiprocessing.pool.Pool.apply()的用例是什么?、multiprocessing.Pool()map内部的异常会暂停执行的相关知识,希望对你有所帮助。

本文目录一览:

为什么在python map()和multiprocessing.Pool.map()中得到不同的答案?(为什么要用map)

为什么在python map()和multiprocessing.Pool.map()中得到不同的答案?(为什么要用map)

我有一个奇怪的问题。我有一个格式的文件:

START12STOPlllllllllSTART356STOP

和我想读之间的线STARTSTOP块,而使用my_f来处理每个块。

def block_generator(file):with open(file) as lines:    for line in lines:        if line == ''START'':             block=itertools.takewhile(lambda x:x!=''STOP'',lines)             yield block

在我的主要职能中,我试图用来map()完成工作。有效。

blocks=block_generator(file)map(my_f,blocks)

会给我我想要的 但是当我尝试与进行相同的操作时
multiprocessing.Pool.map(),它给了我一个错误,说takewhile()想接受2个参数,给了0。

    blocks=block_generator(file)    p=multiprocessing.Pool(4)     p.map(my_f,blocks)

这是一个错误吗?

  1. 该文件有1000000多个块,每个块少于100行。
  2. 我接受未填写的答案表格。
  3. 但是也许我会简单地拆分文件并使用原始脚本的n个实例,而无需进行多处理来处理它们,然后将结果汇总在一起。这样,只要脚本适用于小文件,您就不会出错。

答案1

小编典典

怎么样:

import itertoolsdef grouper(n, iterable, fillvalue=None):    # Source: http://docs.python.org/library/itertools.html#recipes    "grouper(3, ''ABCDEFG'', ''x'') --> ABC DEF Gxx"    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)def block_generator(file):    with open(file) as lines:        for line in lines:            if line == ''START'':                 block=list(itertools.takewhile(lambda x:x!=''STOP'',lines))                yield blockblocks=block_generator(file)p=multiprocessing.Pool(4)for chunk in grouper(100,blocks,fillvalue=''''):    p.map(my_f,chunk)

使用grouper会限制占用的文件量p.map。因此,无需将整个文件立即读入内存(送入任务队列)。


我在上面声明,当您调用时p.map(func,iterator),整个迭代器会立即被消耗掉以填充任务队列。然后,池工作人员从队列中获取任务并同时处理作业。

如果查看pool.py并跟踪定义,您将看到_handle_tasks线程从中获取项目self._taskqueue,并立即枚举:

         for i, task in enumerate(taskseq):             ...             put(task)

结论是,传递给的迭代器立即p.map被消耗。从队列中获取下一个任务之前,无需等待一个任务结束。

作为进一步的佐证,如果运行此命令:

示范代码:

import multiprocessing as mpimport timeimport loggingdef foo(x):    time.sleep(1)    return x*xdef blocks():    for x in range(1000):        if x%100==0:            logger.info(''Got here'')        yield xlogger=mp.log_to_stderr(logging.DEBUG)logger.setLevel(logging.DEBUG) pool=mp.Pool() print pool.map(foo, blocks())

Gothere几乎会立即看到该消息打印了10次,然后由于time.sleep(1)打进来而暂停了很长时间foo。这明显表明迭代器在池进程完成任务之前很久就被完全消耗。

Java 8中的map()和flatMap()方法之间有什么区别?

Java 8中的map()和flatMap()方法之间有什么区别?

在Java 8中,Stream.map()Stream.flatMap()方法之间有什么区别?

答案1

小编典典

双方map并flatMap可以应用到Stream<T>他们都回报Stream<R>。不同之处在于,该map运算为每个输入值生成一个输出值,而该flatMap运算为每个输入值生成任意数量(零个或多个)的值。

这反映在每个操作的参数中。

该map操作采用一个Function,对输入流中的每个值调用,并产生一个结果值,该结果值发送到输出流。

flatMap操作采用的功能在概念上要消耗一个值并产生任意数量的值。但是,在Java中,方法返回任意数量的值很麻烦,因为方法只能返回零或一个值。可以想象一个API,其中的映射器函数flatMap需要一个值并返回一个数组或一个List值,然后将其发送到输出。鉴于这是流库,一种表示任意数量的返回值的特别合适的方法是使映射器函数本身返回流!映射器返回的流中的值将从流中排出,并传递到输出流。每次对映射器函数的调用返回的值的“聚集”在输出流中根本没有被区分,因此输出被认为是“扁平化的”。

典型的应用是的映射功能flatMap,以回报Stream.empty(),如果要发送零个值,或者类似的东西Stream.of(a, b, c),如果要返回几个值。但当然可以返回任何流。

multiprocessing.Manager()dict()setdefault()是否损坏?

multiprocessing.Manager()dict()setdefault()是否损坏?

其后期且可能是愚蠢的部门提出:

>>> import multiprocessing
>>> mgr = multiprocessing.Manager()
>>> d = mgr.dict()
>>> d.setdefault('foo',[]).append({'bar': 'baz'})
>>> print d.items()
[('foo',[])]         <-- Where did the dict go?

鉴于:

>>> e = mgr.dict()
>>> e['foo'] = [{'bar': 'baz'}]
>>> print e.items()
[('foo',[{'bar': 'baz'}])]

版:

>>> sys.version
'2.7.2+ (default,Jan 20 2012,23:05:38) \n[GCC 4.6.2]'

虫子还是臭虫?

编辑:更多相同,在python 3.2上:

>>> sys.version
'3.2.2rc1 (default,Aug 14 2011,21:09:07) \n[GCC 4.6.1]'

>>> e['foo'] = [{'bar': 'baz'}]
>>> print(e.items())
[('foo',[{'bar': 'baz'}])]

>>> id(type(e['foo']))
137341152
>>> id(type([]))
137341152

>>> e['foo'].append({'asdf': 'fdsa'})
>>> print(e.items())
[('foo',[{'bar': 'baz'}])]

字典代理中的列表如何不包含其他元素?

multiprocessing.pool.Pool.apply()的用例是什么?

multiprocessing.pool.Pool.apply()的用例是什么?

如何解决multiprocessing.pool.Pool.apply()的用例是什么??

在过去的几天里,我一直在尝试掌握Python中的多处理(而非多线程),并且不了解multiprocessing.pool.Pool.apply()的目的。

作为Pool类的第一个方法,我假设它是默认使用的方法,但在多处理上下文中似乎没有用。该文档令人困惑且非常简短,并说:

它会阻塞直到结果准备就绪。有了这个限制,apply_async()更适合并行执行工作。

  1. 它是否仅阻止为其创建的进程?如果是,那么对于大多数受cpu约束的进程而言,这将不是问题。

另外,func仅在池的工作程序之一中执行。

  1. 为什么这样的事情甚至在Pool类中也是如此?当我第一次阅读它时,我以为我误会了它,但实际上似乎只启动了一个过程。 如果有人使用专门用于 multiprocessing 的类,为什么有人会想要这种行为?

我知道这里有apply_async,而且似乎可以正常工作,但是我仍然感到必须丢失关于apply的某些东西,我想清除它。

我正在使用与此类似的代码:

with Pool(16) as pool:
    pool_results = []
    for city in cities:
        for street in streets:
            for house in houses:
                for room in rooms:
                    pool_results.append(pool.apply_async(func=simulate,args=(city,street,house,room)))
    [result.wait() for result in pool_results]

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

multiprocessing.Pool()map内部的异常会暂停执行

multiprocessing.Pool()map内部的异常会暂停执行

您可以在多处理池上查看python文档:

警告 multiprocessing.pool对象具有需要通过将池用作上下文管理器或手动调用close()和Terminate()来适当管理的内部资源(与任何其他资源一样)。如果不这样做,可能会导致该过程无法完成。 请注意,依靠垃圾回收器破坏池是不正确的,因为CPython不能确保将调用池的终结器(有关更多信息,请参见object。 del ())。

它清楚地表明,如果您不使用上下文管理器或close()或终止程序,则程序将卡在其中。

from os import getpid
import multiprocessing as mp


def f(x):
    print(x**2,f'Hi Im {getpid()}')
    if not x:
        raise Exception(f'bla_{x}')


with mp.Pool(5) as pool:
    res = pool.map(f,range(5))
print(res)

现在,如果您运行此代码,它将不会卡住并且将正确执行。

此外,如果您不想使用上下文管理器,则需要通过使用

手动关闭池
pool.close()

这样做之后,您将正确管理池资源。

,

注释中提到的Pool.imap解决方案是一种方法,但只能方便地使用单个函数实现目标函数。我相信以下内容会更灵活:

如果相反,您将ProcessPoolExecutor模块中的concurrent.futures类与submit方法一起使用,该方法返回一个Future实例,则您可以更好地控制每个提交的结果:

#!/usr/bin/env python3
from os import getpid
import concurrent.futures

def f(x):
    print(x**2,f'Hi Im {getpid()}')
    if not x:
        raise Exception(f'bla {x}')
    return x**2 # let's return a value just for fun


def main():
    with concurrent.futures.ProcessPoolExecutor(5) as pool:
        futures = [pool.submit(f,i) for i in range(5)]
        for future in futures: # wait for all "jobs" to complete ...
            try:
                result = future.result() # ... by retrieving the result,which could be an exception
                print('result of calling f:',result)
            except Exception as e:
                print('exception found in result:',e)

if __name__ == '__main__':
    main()

打印:

0 Hi Im 12996
1 Hi Im 5152
4 Hi Im 11160
9 Hi Im 12996
exception found in result: bla 0
16 Hi Im 5152
result of calling f: 1
result of calling f: 4
result of calling f: 9
result of calling f: 16

今天的关于为什么在python map和multiprocessing.Pool.map的分享已经结束,谢谢您的关注,如果想了解更多关于Java 8中的map()和flatMap()方法之间有什么区别?、multiprocessing.Manager()dict()setdefault()是否损坏?、multiprocessing.pool.Pool.apply()的用例是什么?、multiprocessing.Pool()map内部的异常会暂停执行的相关知识,请在本站进行查询。

本文标签: