Python的threadpool传的参不能是queuedeclare 参数吗

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递
基本FIFO队列
class Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
举个栗子:
import Queue
q = Queue.Queue()
for i in range(5):
while not q.empty():
print q.get()
class Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
再举个栗子:
import Queue
q = Queue.LifoQueue()
for i in range(5):
while not q.empty():
print q.get()
可以看到仅仅是将Queue.Quenu类替换为Queue.LifiQueue类
优先级队列
class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。maxsize用法同上。
import Queue
import threading
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Job:',description
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job'))
def process_job(q):
while True:
next_job = q.get()
print 'for:', next_job.description
q.task_done()
workers = [threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,))
for w in workers:
w.setDaemon(True)
Job: level 3 job
Job: level 10 job
Job: level 1 job
for: level 1 job
for: level 3 job
for: job: level 10 job
一些常用方法
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
将item放入队列中。
如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
其非阻塞版本为put_nowait等同于put(item, False)
从队列中移除并返回一个数据。block跟timeout参数同put方法
其非阻塞方法为`get_nowait()`相当与get(False)
如果队列为空,返回True,反之返回False
阅读(...) 评论()【python】Threadpool线程池任务终止简单示例 - 推酷
【python】Threadpool线程池任务终止简单示例
加入我们需要处理一串个位数(0~9),奇数时需要循环打印它;偶数则等待对应时长并完成所有任务;0则是错误,但不需要终止任务,可以自定义一些处理。
定义func函数处理需求
callback处理返回结果,只有偶数和0返回;奇数会一直执行;要控制线程池状态,则需要针对偶数和0时抛出异常,并捕获异常处理。
threadpool定义线程池并发
# -*- coding: utf-8 -*-
from threadpool import makeRequests, ThreadPool
import time
from multiprocessing import Process
异常定义和特殊值(0)定义
class Finish(SyntaxWarning):
class PauseInfo(SyntaxWarning):
pause_num = 0
func函数定义
0时返回False,其他偶数返回True
def func(para):
if para == pause_num:
print('start for %d and wait %ds' % (para, 4))
time.sleep(4)
print('error bcs ',para)
return False
if para % 2 == 0:
print('start for %d and wait %ds' % (para, para))
time.sleep(para)
print('stop for', para)
return True
while True:
print('continue for', para)
time.sleep(para)
callback定义
def callback(request, result):
if result:
raise Finish
raise PauseInfo
线程池处理
Finish标识任务完成,再次诱发异常退出线程池处理;
def main_thread(paras):
pool = ThreadPool(10)
requests = makeRequests(callable_=func, args_list=paras, callback=callback)
[pool.putRequest(req) for req in requests]
while True:
pool.wait()
except Finish as e:
raise SystemExit
except PauseInfo as e:
print('Pause bcs %d but will continue' % pause_num)
except Exception as e:
print('Unknown error so will quit')
raise SystemExit
主函数起一个测试进程
if __name__ == '__main__':
while True:
s = input('Input number list to test and any other word to quit\n')
paras = []
for para in s:
if para.isnumeric():
paras.append(int(para))
thread_test = Process(target=main_thread, args=(paras,))
thread_test.start()
thread_test.join(timeout=20)
except TimeoutError as e:
print('task timeout')
except Exception as e:
print('unknow error:',e)
处理108,看打印可以看到,1被循环处理,0处理的时候报错;8处理完毕则任务结束
Input number list to test and any other word to quit
continue for 1
start for 0 and wait 4s
start for 8 and wait 8s
continue for 1
continue for 1
continue for 1
error bcs& 0
continue for 1
Pause bcs 0 but will continue
continue for 1
continue for 1
continue for 1
continue for 1
stop for 8
Input number list to test and any other word to quit
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致#_*_encoding:utf-8_*_
import threading
import Queue
from DisposeXml import *
class ThreadPoolMgr():
def __init__(self,work_queue,thread_num=2):
self.threads=[]
self.work_queue=work_queue
self.init_threadpool(thread_num)
def init_threadpool(self,thread_num):
for i in range(thread_num):
self.threads.append(Mythread(self.work_queue));
def wait_allcomplete(self):
for item in self.threads:
if item.isAlive():
item.join()
class Mythread(threading.Thread):
def __init__(self,work_queue):
threading.Thread.__init__(self)
self.work_queue=work_queue
self.start()
def run(self):
# 从任务队列中取任务,一直等到任务队列为空
while not self.work_queue.empty():
dir = self.work_queue.get()
os.chdir(dir)
self.disposeMetaXml=DisposeTableMetaXml(TableMetadataFileName)
self.disposeMetaXml.Analysis_Metadata()
if -1==self.disposeMetaXml.connect2Db():
QMessageBox.about(self,u&失败&,u&数据库连接失败&)
self.disposeMetaXml.QueryMysql()
self.disposeMetaXml.remove_tmp_file()
print self.getName(),' work queue is empty'
if __name__=='__main__':
p=ThreadPoolMgr()
p.wait_allcomplete()
python的Queue模块: &import Queue
默认构造的queue大小为0,当然也可以指定大小 &myqueue = Queue.Queue(maxsize = 10)
入队 &myqueue.put(T) &队列中可以放任何格式对象或者组合。&
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
出对 myqueue.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
queue模块有三种队列:
1、python queue模块的FIFO队列先进先出。
2、LIFO类似于堆。即先进后出。
3、还有一种是优先级队列级别越低越先出来。&
针对这三种队列分别有三个构造函数:
1、class Queue.Queue(maxsize) FIFO&
2、class Queue.LifoQueue(maxsize) LIFO&
3、class Queue.PriorityQueue(maxsize) 优先级队列&
介绍一下此包中的常用方法:
Queue.qsize() 返回队列的大小&
Queue.empty() 如果队列为空,返回True,反之False&
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应&
Queue.get([block[, timeout]]) 获取队列,timeout等待时间&
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间&
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done() 函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
明天详细介绍些python的线程。
本文已收录于以下专栏:
相关文章推荐
最近碰到个问题,需要telnet登录上千台机器去取主机名;其中有用户名密码交互部分,有需要延迟的部分,大概一次登录一次到处理完要10s,1000台机器串行处理就需要s,差不多三个小时,这...
上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:
(1)线程池的创建
(2)任务的创建
(3)任务的推送到线程池
(4)线程处理任务
(5)线程池的退...
threading 模块的Thread类有一个join()函数,允许主线程等待线程的结束
# -*- coding: UTF-8 -*-
import threading
import time
#_*_encoding:utf-8_*_
  import threading  import Queue  from DisposeXml import *  &#...
androidxianc
[size=1.8em]Handler+Runnable模式
我们先看一个并不是异步线程加载的例子,使用 Handler+Runnable模式。
这里为何不是新开线程的...
1、文件扫描任务线程
package com.godway.
import java.io.F
import java.util...
他的最新文章
讲师:汪剑
讲师:刘道宽
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

我要回帖

更多关于 threadpool 多参数 的文章

 

随机推荐