GVKun编程网logo

将超时参数添加到python的Queue.join()

27

对于将超时参数添加到python的Queue.join感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解,并且为您提供关于Day034--Python--锁,信号量,事件,队列,生产者消费者模

对于将超时参数添加到python的Queue.join感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解,并且为您提供关于Day034--Python--锁, 信号量, 事件, 队列, 生产者消费者模型, joinableQueue、jQuery中的queue和dequeue之间的用法区别、python JoinableQueue在生产者消费者项目中的简单应用、Python multiprocessing.Queue与multiprocessing.manager()Queue()的宝贵知识。

本文目录一览:

将超时参数添加到python的Queue.join()

将超时参数添加到python的Queue.join()

我希望能够加入()Queue类,但如果调用尚未返回,则在一段时间后超时。最好的方法是什么?是否可以通过使用metaclass子类化子类来做到这一点?

Day034--Python--锁, 信号量, 事件, 队列, 生产者消费者模型, joinableQueue

Day034--Python--锁, 信号量, 事件, 队列, 生产者消费者模型, joinableQueue

进程同步:

  1. 锁 (重点)

      锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁

           Lock  先异步, 到共同区域的时候同步, 一次只能有一个进程执行加锁的程序, 避免错乱.  由并发变成串行, 牺牲效率, 保证了数据的安全.

import json
from multiprocessing import Process

def get_ticket(i):
    with open(''ticket'', ''r'') as f:    # 此时的读取信息时异步的, 可能出现进程1打开的时候进程2已经把票数修改了
        last_ticket_info = json.load(f)
    last_ticket = last_ticket_info[''count'']
    if last_ticket > 0:
        last_ticket = last_ticket - 1
        last_ticket_info[''count''] = last_ticket
        with open(''ticket'', ''w'') as f:    # 涉及到文件修改的地方现在还是异步的
            json.dump(last_ticket_info, f)
        print(''恭喜%s号用户,抢票成功!'' % i)
    else:
        print(''%s号用户,票已经抢光啦~'' % i)

if __name__ == ''__main__'':
    for i in range(1, 11):
        p = Process(target=get_ticket, args=(i,))
        p.start()

# 这样可能会出现全都抢到票的情况, 因为票数0还没来得及写入文件
# test文件

{"count": 0}   # 一会儿要用json, 只能用双引号

json中只能用双引号" "

# 通过加锁解决问题, 只有一个客户能抢到票
import time
import random
import json
from multiprocessing import Process, Lock


def get_ticket(i, ticket_lock):
    # 加锁, 确保每次只有一个进程执行锁里面的程序, 这一段程序对于所有写上这个锁的进程来说, 大家都变成了串行
    ticket_lock.acquire()    # 被锁住的程序运行变成了同步运行, 只能是串行, 降低了效率
    with open(''ticket'', ''r'') as f:
        time.sleep(random.random())   # 模拟网络延迟, 0-1内随机小数
        ticket_info = json.load(f)
        ticket_num = ticket_info[''count'']
        if ticket_num > 0:
            ticket_info[''count''] = ticket_num - 1
            with open(''ticket'', ''w'') as f:
                time.sleep(random.random())  # 模拟网络延迟, 0-1内随机小数
                json.dump(ticket_info, f)
                print(''恭喜%s号用户,抢票成功!'' % i)
        else:
            print(''很遗憾, %s号用户, 票抢光了~~'' % i)

    ticket_lock.release()   # 解锁, 解锁之后其他进程才能继续执行自己的程序


if __name__ == ''__main__'':
    ticket_lock = Lock()    # 创建一把锁
    lst = []
    for i in range(1, 11):
        p = Process(target=get_ticket, args=(i, ticket_lock))
        p.start()
        lst.append(p)

   # 抢完一波赶紧把票补上, 继续抢下一波~
    for p in lst:
        p.join()

    with open(''ticket'', ''r'') as f:
        ticket_info = json.load(f)
        ticket_info["count"] = 3   # 一次补货3张票

    with open(''ticket'', ''w'') as f:
        json.dump(ticket_info, f)
作业:
  

       使用同步锁写一个简单的抢票程序,提供并发查票和并发买票的功能

import json
import time
import random
from multiprocessing import Process, Lock


def check(i):
    time.sleep(random.random())
    with open(''ticket'', ''r'') as f:
        ticket_info = json.load(f)
        ticket_num = ticket_info[''count'']
        print(''用户%s查看剩余票数: %s张'' % (i, ticket_num))
        return ticket_info


def get_ticket(lock, i):
    lock.acquire()
    time.sleep(random.random())   #  模拟网络延迟
    with open(''ticket'', ''r'') as f:
        ticket_info = json.load(f)
    ticket_num = ticket_info[''count'']
    if ticket_num > 0:
        with open(''ticket'', ''w'') as f:
            ticket_info[''count''] = ticket_num - 1
            json.dump(ticket_info, f)
        print(''用户%s抢票成功!'' % i)
    else:
        print(''很遗憾,用户%s没有抢到票, 明年再来吧.'' % i)

    lock.release()


if __name__ == ''__main__'':
    lock = Lock()
    clst = []
    glst = []
    for i in range(1, 11):  # 创建用户
        c_p = Process(target=check, args=(i,))
        c_p.start()
        clst.append(c_p)
        g_p = Process(target=get_ticket, args=(lock, i))
        g_p.start()
        glst.append(g_p)

    for c in clst:
        c.join()
    for g in glst:
        g.join()

    with open(''ticket'', ''r'') as f:
        ticket_info = json.load(f)
        ticket_info[''count''] = 1
    with open(''ticket'', ''w'') as f:
        json.dump(ticket_info, f)
View Code作业

 

 

 

 

  2. 信号量

      Semaphore

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
View Code 信号量

 

''''''
10个客户去按摩店,按摩店有4个服务房间,每个客人独占1间房,其余人排队等候. 每个人按摩的时间不一样,
只有当一个客户按摩完成走出房间,给出空房信号,另一位等候的客户才能进入房间.
''''''

import time
import random
from multiprocessing import Process, Semaphore

def message(i,s):
    s.acquire()    # lock开始计数, 每个进程占用1把锁, 相当于每个客人独占一个房间
    t = random.randrange(1, 5)   # 每个个人按摩时间不一样, 随机前闭后开1-4秒
    print(''%s号客人正在按摩中...还需等待%s秒'' % (i, t))
    time.sleep(t)   # 按摩中...
    print(''%s号顾客离开了房间'' % i)
    s.release()    # 锁释放了, lock计数-1, 有空房间了, 可以服务下一个顾客

if __name__ == ''__main__'':
    s = Semaphore(4)    # 一共4把锁, 相当于4个房间
    lst = []
    for i in range(1, 11):   # 客人编号1-10
        p = Process(target=message, args=(i, s))
        p.start()
        lst.append(p)

    for p in lst:
        p.join()

    print(''客人全部离开, 打烊中...'')

 

 

  3. 事件 

   Event   

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True
View Code事件

 

from multiprocessing import Process, Event

e = Event()  # 创建一个事件
print(e.is_set())  # 查看事件状态, bool, 初始状态 False

e.set()  # 将e事件的状态改为True
print(''在这里等待'')
e.clear()   # 将e事件的状态改为False
print(''等待中...'')
e.wait()     # 只有e时间状态为True时, 才能继续往下执行
print(''你好啊,我叫赛利亚'')
# 利用事件模拟红绿灯
import time
import random
from multiprocessing import Process, Event

def traffic_light(e):
    while 1:
        print(''红灯啦'')
        time.sleep(5)    # 此时事件状态初始为False
        e.set()      # 绿灯亮, 事件状态更改为True
        print(''绿灯亮了, 可以走了~'')
        time.sleep(3)
        e.clear()    # 将事件状态初始化成False

def car(i, e):
    if not e.is_set():    # 如果事件是False执行此条件下代码
        print(''汽车%s号正在等待红灯...'' % i)
        e.wait()          # 等待事件状态变成True后继续往下执行
        print(''汽车%s号开走了'' % i)
    else:                 # 当e.is_set()变成True时执行此条件下代码
        print(''汽车%s号畅通无阻地开了过去'' % i)    # 此时是绿灯, 无需等待, 直接畅行

if __name__ == ''__main__'':
    e = Event()    # 创建一个事件对象
    tra_p = Process(target=traffic_light, args=(e,))   # 创建红绿灯进程
    tra_p.start()
    
    # 为了不断测试红绿灯效果, 10辆车不够用, 就循环创建序号从1-10的车辆
    while 1:
        for i in range(1, 11):   # 车辆编号1-10
            time.sleep(random.randrange(1, 5))   # 每隔随机1-4秒
            car_p = Process(target=car, args=(i, e))  # 创建一辆车
            car_p.start()   # 开启车辆进程

 

 

 进程间通信 IPC:

  1. 队列 (重点)

    Queue

Queue([maxsize]) 创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁实现。

 

   q.empty()  判断队列是否是空

    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

   q.full() 判断队列是否已满

    由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

  q.qsize()   获取队列的大小. 结果也不可靠.

 

queue的其他方法(了解)

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
View Code

 

 

from multiprocessing import Process, Queue
# 专门设置线程间进行通信的
# 先进先出

q = Queue(3)  # 队列长度3, 可以放3条 数据

q.put(1)
q.put(2)
q.put(3)
# q.put(4)  # 超出队列长度,阻塞住啦  此处可以使用 q.put_nowait(4),此种情况下, 队列满了不会阻塞, 但是会报错, 可以抛异常和异常处理.
print(''队列是否满了>>>'', q.full())  # 判断队列是否满了

print(q.get())  # 取出1
print(q.get())  # 取出2
print(q.get())  # 取出3
print(''队列是否为空:'', q.empty())
# print(q.get())  # 此时队列为空, 阻塞住

q.put(4) ####################################
print(''又放了个数据进来'')
print(''队列是空吗?'', q.empty())

# while 1:  # 加了循环就可以等着队列有新增的时候拿东西
try:
    print(q.get(False))  # get的默认参数是True,如果队列空了取不到就阻塞住,改为False就不会阻塞,但是会报错
    # q.get_nowait()  # 和get(False)一个效果
except:     # 异常处理
    print(''队列目前是空的'')

 

 

# 队列在多线程中的应用

import time
from multiprocessing import Process, Queue

def girl(q):
    print(''来自男孩的消息'', q.get())
    print(''学校广播'', q.get())

def boy(q):
    q.put(''你好啊,女孩'')

if __name__ == ''__main__'':
    q = Queue(5)
    b = Process(target=boy, args=(q,))
    g = Process(target=girl, args=(q,))
    b.start()
    g.start()
    time.sleep(0.1)
    q.put(''好好学习'')

 

  2. 生产者消费者模型

      为什么要使用生产者和消费者模式

      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继      续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

      什么是生产者消费者模式

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个阻塞队列就是用来给生产者和消费者解耦的. 并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。

 

    生产者消费者中间有个柜台(缓冲区), 彼此都不用等待对方(阻塞), 等待期间可以处理自己的事情. 解耦缓冲, 解决双方效率差异问题.

# 版本1  有bug
import
time from multiprocessing import Process, Queue def producer(q): for i in range(1, 11): time.sleep(0.5) print(''生产了包子%s号'' % i) q.put(i) def consumer(q): while 1: time.sleep(0.8) try: s = q.get(False) # 不加False的话等队列中的包子都吃完了会阻塞住 print(''顾客吃了包子%s号'' % s) except: print(''包子店关门了'') break # 注意,如果此时顾客吃的比店铺生产的快, 就会先触发break, 所以此版本有缺陷 if __name__ == ''__main__'': # 通过队列来摸底缓冲区, 设置缓冲区大小20 q = Queue(20) pro_p = Process(target=producer, args=(q,)) pro_p.start() con_p = Process(target=consumer, args=(q,)) con_p.start()
# 版本2子进程发送结束信号  消费者吃得快
import time
from multiprocessing import Process, Queue

def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        print(''生产了包子%s号'' % i)
        q.put(i)
    q.put(None)  # 给消费者发送一个结束信号

def consumer(q):
    while 1:
        s = q.get()
        if s == None:
            print(''包子店关门了'')
            break
        else:
            time.sleep(0.6)
            print(''顾客吃了包子%s号'' % s)

# 注意,如果此时顾客吃的比店铺生产的快, 就会先触发break, 所以此版本有缺陷

if __name__ == ''__main__'':
    # 通过队列来摸底缓冲区, 设置缓冲区大小20
    q = Queue(20)
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    con_p = Process(target=consumer, args=(q,))
    con_p.start()
# 版本3  主进程发送结束信号  消费者吃得快
import time
from multiprocessing import Process, Queue

def producer(q):
    for i in range(1, 11):
        time.sleep(1)
        print(''生产了包子%s号'' % i)
        q.put(i)

def consumer(q):
    while 1:
        s = q.get()
        if s == None:
            print(''包子店关门了'')
            break
        else:
            time.sleep(0.6)
            print(''顾客吃了包子%s号'' % s)

# 注意,如果此时顾客吃的比店铺生产的快, 就会先触发break, 所以此版本有缺陷

if __name__ == ''__main__'':
    # 通过队列来摸底缓冲区, 设置缓冲区大小20
    q = Queue(20)
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    con_p = Process(target=consumer, args=(q,))
    con_p.start()
    pro_p.join()
    q.put(None)
View Code版本3 主进程发送结束信号

 

 

  3. JoinableQueue 生产者消费者模型

# 通过joinableQueue结束消费者子进程

import time
from multiprocessing import Process, Queue, JoinableQueue

def prodecer(q):
    for i in range(1, 11):
        time.sleep(1)
        print(''生产了包子%s号'' % i)
        q.put(i)
    #  记录了往队列中放了多少个和取出了多少个(收到了多少个task_done))
    q.join()
    # 等待接收全部task_done信号(共10个), 接收后才继续
    print(''客人都走了,关门中...'')

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        print(''消费者吃了包子%s号'' % s)
        q.task_done()  # 每从队列中获取一个包子就发送一个task_done信号给生产者

if __name__ == ''__main__'':
    q = JoinableQueue(20)
    pro_p = Process(target=prodecer, args=(q,))
    pro_p.start()
    con_p = Process(target=consumer, args=(q,))
    con_p.daemon = True     # 把消费者子进程设置为守护进程,当主进程结束, 消费者子进程也结束   如果不设置守护进程, 消费者进程就会在while循环中的q.get()阻塞住,队列中已经没有包子了 
    con_p.start()

    pro_p.join()   # 等待生产者子进程结束, 主进程才结束
    print(''主进程结束'')

 

#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

   #参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。    
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。

 

import time
from multiprocessing import Process, JoinableQueue

def producer(food, q):
    for i in range(1, 21):
        food_info = ''%s%s号'' % (food, i)
        time.sleep(8)
        q.put(food_info)
        print(''生产了%s%s号'' % (food, i))

    q.join()
    print(''%s都卖完了, %s店关门了'' % (food, food))

    q.join()
def consumer(i, q):
    while 1:
        time.sleep(5)
        print(''客户%s吃了%s'' % (i, q.get()))
        q.task_done()

if __name__ == ''__main__'':
    q = JoinableQueue()
    pro_p1 = Process(target=producer, args=(''包子'', q))
    pro_p1.start()
    pro_p2 = Process(target=producer, args=(''炸鱼薯条'', q))
    pro_p2.start()
    pro_p3 = Process(target=producer, args=(''大碗茶'', q))
    pro_p3.start()

    for i in range(1, 31):
        con_p = Process(target=consumer, args=(i, q))
        con_p.daemon = True
        con_p.start()

    pro_p1.join()
    pro_p2.join()
    pro_p3.join()
    print(''店铺都关门了, 今天吃不到夜宵了'')
View Code 多个生产者消费者模型一

 

import time
import random
from multiprocessing import Process, JoinableQueue


def producer(num, q):
    for i in range(1, 11):
        time.sleep(random.randrange(1, 3))
        q.put(i)
        print(''生产者%s号生产了包子%s'' % (num, i))

    q.join()
    print(''包子店%s打烊了...明天再来吧'' % num)


def consumer(num, q):
    while 1:
        time.sleep(random.randrange(2, 6))
        s = q.get()
        print(''消费者\033[31m%s\033[0m吃了包子%s'' % (num, s))
        q.task_done()


if __name__ == ''__main__'':
    q = JoinableQueue()
    c_lst = []
    p_lst = []
    for num in range(1, 4):
        pro_p = Process(target=producer, args=(num, q))
        pro_p.start()
        p_lst.append(pro_p)

    for num in range(1, 11):
        con_p = Process(target=consumer, args=(num, q))
        con_p.daemon = True
        con_p.start()
        c_lst.append(con_p)

    for pro_p in p_lst:
        pro_p.join()

    print(''包子店全都关门了, 买不到了'')
View Code 多个消费者和生产者模型二

 

 

jQuery中的queue和dequeue之间的用法区别

jQuery中的queue和dequeue之间的用法区别

jquery中的queue和dequeue是一组很有用的方法,他们对于一系列需要按次序运行的函数特别有用。特别animate动画,ajax,以及timeout等需要一定时间的函数

queue和dequeue的过程主要是:
1,用queue把函数加入队列(通常是函数数组)
2,用dequeue将函数数组中的第一个函数取出,并执行(用shift()方法取出并执行)

也就意味着当再次执行dequeue的时候,得到的是另一个函数了
同时也意味着,如果不执行dequeue,那么队列中的下一个函数永远不会执行

对于一个元素上执行animate方法加动画,jQuery内部也会将其加入名为 fx 的函数队列
而对于多个元素要依次执行动画,则必须我们手动设置队列进行了。

一个例子,要两个div依次向左移动,点击这里查看

 div {
 background:#aaa;
 width:18px;
 height:18px;
 position:absolute;
 top:10px;
 } 
登录后复制

如果只是轮流移动次数较少,可以使用animate的回调函数来做,一个动画放在另一个动画的回调里边
比如

 $(“#block1″).animate({left:”+=100″},function() {
 $(“#block2″).animate({left:”+=100″},function() {
 $(“#block1″).animate({left:”+=100″},function() {
 $(“#block2″).animate({left:”+=100″},function() {
 $(“#block1″).animate({left:”+=100″},function(){
 alert(“动画结束”);
 });
 });
 });
 });
 });
登录后复制

但这种方法当动画较多的时候简直是残忍。

此时利用queue和dequeue则显得简单很多:

var FUNC=[
 function() {$("#block1").animate({left:"+=100"},aniCB);},
 function() {$("#block2").animate({left:"+=100"},aniCB);},
 function() {$("#block1").animate({left:"+=100"},aniCB);},
 function() {$("#block2").animate({left:"+=100"},aniCB);},
 function() {$("#block1").animate({left:"+=100"},aniCB);},
 function(){alert("动画结束")}
 ];
 var aniCB=function() {
 $(document).dequeue(“myAnimation”);
 }
 $(document).queue(“myAnimation”,FUNC);
 aniCB();  
登录后复制


1,我首先建议建立了一个函数数组,里边是一些列需要依次执行的动画
2,然后我定义了一个回调函数,用dequeue方法用来执行队列中的下一个函数
3,接着把这个函数数组放到document上的myAnimation的队列中(可以选择任何元素,我只是为了方便而把这个队列放在document上)
4,最后我开始执行队列中的第一个函数

这样做的好处在于函数数组是线性展开,增减起来非常方便。
而且,当不要要继续进行接下来动画的时候(比如用户点了某个按钮),只需要清空那个队列即可。而要增加更多则只需要加入队列即可

 //清空队列
 $(document).queue(“myAnimation”,[]);
 //加一个新的函数放在最后
 $(document).queue(“myAnimation”,function(){alert(“动画真的结束了!”)});
登录后复制

以前发过一个wait插件,用于让动画之间可以暂停一段时间

可以看一下,他也就是利用了这个原理,默认在fx中插入一个timeout,放到队列中,直到timeout结束后才执行dequeue继续执行队列中的下一个函数。

这当然也可以用于ajax之类的方法,如果需要一系列ajax交互,每个ajax都希望在前一个结束之后开始,之前最原始的方法就是用回调函数,但这样太麻烦了。同样利用queue添加队列,每次ajax之后的回调中执行一次dequeue即可。

如果没有使用jQuey库,也可以自己写段简易代码来解决这个问题。

以上就是jQuery中的queue和dequeue之间的用法区别的详细内容,更多请关注php中文网其它相关文章!

python JoinableQueue在生产者消费者项目中的简单应用

python JoinableQueue在生产者消费者项目中的简单应用

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done ()

Indicate that a formerly enqueued task is complete. Used by queue consumers. For each get() used to fetch a task, a subsequent call totask_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join ()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

 

这是官网对JoinableQueue的概述,我们通过这个方法就可以实现我们自己的生产者消费者模型,具体的实现思路请看我的分析<<项目开发中使用并发模型常见问题的整理与思考>>

code如下:

import multiprocessing

def printAll(queue, out_queue):
    while 1:
        t = queue.get()
        print(t)
        s = "生产{0}".format(t)
        queue.task_done()
        out_queue.put(s)

if __name__ == "__main__":
    queue = multiprocessing.JoinableQueue()
    num_consumer = multiprocessing.cpu_count() * 2
    out_queue = multiprocessing.Queue()

    for i in range(250):
        queue.put(i)
        

    for _ in range(num_consumer):
        p = multiprocessing.Process(target=printAll, args=(queue, out_queue))
        p.start()

    queue.join()  # 阻塞队列直到队列为空。
    result = []

    print("数量是: {}".format(out_queue.qsize()))

    while out_queue.qsize() != 0:
        result.append(out_queue.get())
        
    for i in result:
        print(i)

 

简单地实现了我要的结果,具体可以再项目中应用上。

Python multiprocessing.Queue与multiprocessing.manager()Queue()

Python multiprocessing.Queue与multiprocessing.manager()Queue()

我有一个简单的任务,例如:

def worker(queue):
    while True:
        try:
            _ = queue.get_nowait()
        except Queue.Empty:
            break

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # queue = multiprocessing.Queue()
    queue = manager.Queue()

    for i in range(5):
        queue.put(i)

    processes = []

    for i in range(2):
        proc = multiprocessing.Process(target=worker,args=(queue,))
        processes.append(proc)
        proc.start()

    for proc in processes:
        proc.join()

看来multiprocessing.Queue可以完成我需要的所有工作,但另一方面,我看到了很多manager()。Queue()的示例,并且无法理解我的真正需求。看起来Manager()。Queue()使用某种代理对象,但我不明白这些目的,因为multiprocessing.Queue()在没有任何代理对象的情况下执行相同的工作。

因此,我的问题是:

1)multiprocessing.Queue和multiprocessing.manager()。Queue()返回的对象之间的真正区别是什么?

2)我需要使用什么?

关于将超时参数添加到python的Queue.join的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Day034--Python--锁, 信号量, 事件, 队列, 生产者消费者模型, joinableQueue、jQuery中的queue和dequeue之间的用法区别、python JoinableQueue在生产者消费者项目中的简单应用、Python multiprocessing.Queue与multiprocessing.manager()Queue()等相关知识的信息别忘了在本站进行查找喔。

本文标签: