Python实现简单多线程任务队列
发布时间:2025-05-13 19:14:57 发布人:远客网络
一、Python实现简单多线程任务队列
1、最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):
2、defgradient_descent():# the gradient descent code plotly.write(X, Y)
3、一般来说,当网络请求 plot.ly绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。
4、一种解决办法是每调用一次 plotly.write函数就开启一个新的线程,但是这种方法感觉不是很好。我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis来持久化数据。
5、那用什么办法解决呢?我在 python中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。
6、fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):
7、首先我们继承 Queue.Queue类。从 Queue.Queue类可以继承 get和 put方法,以及队列的行为。
8、def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
9、初始化的时候,我们可以不用考虑工作线程的数量。
10、defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
11、我们把 task, args, kwargs以元组的形式存储在队列中。*args可以传递数量不等的参数,**kwargs可以传递命名参数。
12、defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
13、我们为每个 worker创建一个线程,然后在后台删除。
14、defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
15、worker函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:
16、defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”
17、Blokkah是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。
18、defgradient_descent():# the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
19、修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
二、python多线程的问题如何处理
1、在python里线程出问题,可能会导致主进程崩溃。虽然python里的线程是操作系统的真实线程。
2、那么怎么解决呢?通过我们用进程方式。子进程崩溃后,会完全的释放所有的内存和错误状态。所以进程更安全。另外通过进程,python可以很好的绕过GIL,这个全局锁问题。
3、但是进程也是有局限的。不要建立超过CPU总核数的进程,否则效率也不高。
4、当我们想实现多任务处理时,首先要想到使用multiprocessing,但是如果觉着进程太笨重,那么就要考虑使用线程。如果多任务处理中需要处理的太多了,可以考虑多进程,每个进程再采用多线程。如果还处理不要,就要使用轮询模式,比如使用poll event, twisted等方式。如果是GUI方式,则要通过事件机制,或者是消息机制处理,GUI使用单线程。
5、所以在python里线程不要盲目用,也不要滥用。但是线程不安全是事实。如果仅仅是做几个后台任务,则可以考虑使用守护线程做。如果需要做一些危险操作,可能会崩溃的,就用子进程去做。如果需要高度稳定性,同时并发数又不高的服务。则强烈建议用多进程的multiprocessing模块实现。
6、在linux或者是unix里,进程的使用代价没有windows高。还是可以接受的。
三、python多线程的几种方法
1、Python进阶(二十六)-多线程实现同步的四种方式
2、临界资源即那些一次只能被一个线程访问的资源,典型例子就是打印机,它一次只能被一个程序用来执行打印功能,因为不能多个线程同时操作,而访问这部分资源的代码通常称之为临界区。
3、threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
4、import threadingimport timeclass Num:
5、self.lock= threading.Lock() def add(self):
6、self.lock.acquire()#加锁,锁住相应的资源
7、self.lock.release()#解锁,离开该资源
8、n= Num()class jdThread(threading.Thread):
9、threading.Thread.__init__(self)
10、self.item= item def run(self):
11、value= n.add()#将num加1,并输出原来的数据和+1之后的数据
12、print(self.item,value)for item in range(5):
13、t.join()#使线程一个一个执行12345678910111213141516171819202122232425262728
14、当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。
15、直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
16、信号量也提供acquire方法和release方法,每当调用acquire方法的时候,如果内部计数器大于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,知道有线程调用了release方法将内部计数器更新到大于1位置。
17、import threadingimport timeclass Num:
18、self.sem= threading.Semaphore(value= 3)#允许最多三个线程同时访问资源
19、self.sem.acquire()#内部计数器减1
20、self.sem.release()#内部计数器加1
21、n= Num()class jdThread(threading.Thread):
22、threading.Thread.__init__(self)
23、self.item= item def run(self):
24、print(self.item,value)for item in range(100):