如何在Python中使用线程?

我想了解Python中的线程。 我已经看过文档和示例,但坦率地说,很多示例都过于复杂,我无法理解它们。

你如何清楚地显示multithreading分工的任务?

自2010年提出这个问题以来,在如何用mappool进行简单的multithreading处理方面已经有了真正的简化。

下面的代码来自一篇文章/博客文章,你一定要检查(无隶属关系) – 并行性在一行:一个更好的日常线程任务模型 。 我将在下面进行总结 – 最后只是几​​行代码:

from multiprocessing.dummy import Pool as ThreadPool pool = ThreadPool(4) results = pool.map(my_function, my_array) 

以下是multithreading版本:

 results = [] for item in my_array: results.append(my_function(item)) 

描述

Map是一个很酷的小函数,也是在Python代码中轻松注入并行的关键。 对于那些不熟悉的人来说,地图可以从Lisp等function语言中解脱出来。 这是一个将另一个函数映射到一个序列上的函数。

Map处理对我们序列的迭代,应用函数,并将所有结果存储在最后的便捷列表中。

在这里输入图像描述


履行

地图function的并行版本由两个库提供:多处理,也是其已知但同样梦幻般的步骤子:multiprocessing.dummy。

 import urllib2 from multiprocessing.dummy import Pool as ThreadPool urls = [ 'http://www.python.org', 'http://www.python.org/about/', 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html', 'http://www.python.org/doc/', 'http://www.python.org/download/', 'http://www.python.org/getit/', 'http://www.python.org/community/', 'https://wiki.python.org/moin/', ] # make the Pool of workers pool = ThreadPool(4) # open the urls in their own threads # and return the results results = pool.map(urllib2.urlopen, urls) # close the pool and wait for the work to finish pool.close() pool.join() 

和时间结果:

 Single thread: 14.4 seconds 4 Pool: 3.1 seconds 8 Pool: 1.4 seconds 13 Pool: 1.3 seconds 

传递多个参数 (仅在Python 3.3及更高版本中工作):( source ):

要传递多个数组:

 results = pool.starmap(function, zip(list_a, list_b)) 

或者传递一个常量和一个数组:

 results = pool.starmap(function, zip(itertools.repeat(constant), list_a)) 

如果您使用的是早期版本的Python,则可以通过此解决方法传递多个参数。

(感谢user136036的有用评论)

下面是一个简单的例子:你需要尝试一些替代的URL,并返回第一个的内容作出回应。

 import Queue import threading import urllib2 # called by each thread def get_url(q, url): q.put(urllib2.urlopen(url).read()) theurls = ["http://google.com", "http://yahoo.com"] q = Queue.Queue() for u in theurls: t = threading.Thread(target=get_url, args = (q,u)) t.daemon = True t.start() s = q.get() print s 

这是线程被用作简单优化的情况:每个子线程正在等待URLparsing和响应,以便将其内容放在队列中; 每个线程都是一个守护进程(如果主线程结束,将不会保留进程 – 这是比较常见的); 主线程启动所有的子线程,进入队列等待,直到其中一个进行了put ,然后发出结果并终止(由于它们是守护进程线程,所有的子线程可能仍然在运行)。

正确使用Python中的线程总是连接到I / O操作(因为CPython不会使用多个内核来运行CPU绑定的任务,线程的唯一原因并不是阻塞进程,而是等待一些I / O )。 顺便说一下,队列几乎总是把工作排除在外并且/或者收集工作结果的最好方法,而且它们本质上是线程安全的,所以它们可以让你免于担心锁,条件,事件,信号量和其他相互之间的冲突,线程协调/通信概念。

注意 :对于Python中的实际并行化,您应该使用多处理模块来并行执行多个进程(由于全局解释器locking,Python线程提供了交错,但实际上是串行执行的,而不是并行执行)交错I / O操作)。

但是,如果您只是在寻找交错(或者正在进行I / O操作,尽pipe全局解释器locking可以并行化),那么线程模块就是开始的地方。 作为一个非常简单的例子,让我们考虑通过并行求和子范围来求和大范围的问题:

 import threading class SummingThread(threading.Thread): def __init__(self,low,high): super(SummingThread, self).__init__() self.low=low self.high=high self.total=0 def run(self): for i in range(self.low,self.high): self.total+=i thread1 = SummingThread(0,500000) thread2 = SummingThread(500000,1000000) thread1.start() # This actually causes the thread to run thread2.start() thread1.join() # This waits until the thread has completed thread2.join() # At this point, both threads have completed result = thread1.total + thread2.total print result 

请注意,上面是一个非常愚蠢的例子,因为它绝对没有I / O,并且由于全局解释器locking而将在CPython中以串行方式执行,尽pipe它们是交错的(伴随着上下文切换的额外开销)。

像其他人提到的一样,CPython只能使用线程来处理由于GIL而造成的I \ O等待。 如果要从多个CPU限制任务的内核中受益,请使用多处理 :

 from multiprocessing import Process def f(name): print 'hello', name if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join() 

请注意,队列不是线程所必需的。

这是我可以想象的最简单的例子,显示了10个进程并发运行。

 import threading from random import randint from time import sleep def print_number(number): # Sleeps a random 1 to 10 seconds rand_int_var = randint(1, 10) sleep(rand_int_var) print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds" thread_list = [] for i in range(1, 10): # Instantiates the thread # (i) does not make a sequence, so (i,) t = threading.Thread(target=print_number, args=(i,)) # Sticks the thread in a list so that it remains accessible thread_list.append(t) # Starts threads for thread in thread_list: thread.start() # This blocks the calling thread until the thread whose join() method is called is terminated. # From http://docs.python.org/2/library/threading.html#thread-objects for thread in thread_list: thread.join() # Demonstrates that the main process waited for threads to complete print "Done" 

Alex Martelli的回答对我有帮助,不过这里是我认为更有用的修改版本(至less对我来说)。

 import Queue import threading import urllib2 worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com'] #load up a queue with your data, this will handle locking q = Queue.Queue() for url in worker_data: q.put(url) #define a worker function def worker(queue): queue_full = True while queue_full: try: #get your data off the queue, and do some work url= queue.get(False) data = urllib2.urlopen(url).read() print len(data) except Queue.Empty: queue_full = False #create as many threads as you want thread_count = 5 for i in range(thread_count): t = threading.Thread(target=worker, args = (q,)) t.start() 

我发现这非常有用:创build与核心一样多的线程,并让它们执行(大量)任务(在这种情况下调用一个shell程序):

 import Queue import threading import multiprocessing import subprocess q = Queue.Queue() for i in range(30): #put 30 tasks in the queue q.put(i) def worker(): while True: item = q.get() #execute a task: call a shell program and wait until it completes subprocess.call("echo "+str(item), shell=True) q.task_done() cpus=multiprocessing.cpu_count() #detect number of cores print("Creating %d threads" % cpus) for i in range(cpus): t = threading.Thread(target=worker) t.daemon = True t.start() q.join() #block until all tasks are done 

对我来说,线程的完美例子是监视asynchronous事件。 看看这个代码。

 # thread_test.py import threading import time class Monitor(threading.Thread): def __init__(self, mon): threading.Thread.__init__(self) self.mon = mon def run(self): while True: if self.mon[0] == 2: print "Mon = 2" self.mon[0] = 3; 

您可以通过打开IPython会话来玩这个代码,并执行如下操作:

 >>>from thread_test import Monitor >>>a = [0] >>>mon = Monitor(a) >>>mon.start() >>>a[0] = 2 Mon = 2 >>>a[0] = 2 Mon = 2 

等几分钟

 >>>a[0] = 2 Mon = 2 

除了所有这些帮助我个人理解上面提到的多处理和线程的大量的答案和例子之外,下面的站点(在这里我已经提供了特定于主题的链接,由于其中包含的简单示例;正如所请求的那样。 ),真的点击了我的文档,以及我对python到目前为止的任何问题的理解,希望能够像我一样帮助这里的人。

  • 没有隶属关系

Python线程

Python多处理

虽然我认识到了很多答案,但是如果我能帮助另一个人更好地理解他们感兴趣的东西,我觉得值得参考。

使用炽热的新的concurrent.futures模块

 def sqr(val): import time time.sleep(0.1) return val * val def process_result(result): print(result) def process_these_asap(tasks): import concurrent.futures with concurrent.futures.ProcessPoolExecutor() as executor: futures = [] for task in tasks: futures.append(executor.submit(sqr, task)) for future in concurrent.futures.as_completed(futures): process_result(future.result()) # Or instead of all this just do: # results = executor.map(sqr, tasks) # list(map(process_result, results)) def main(): tasks = list(range(10)) print('Processing {} tasks'.format(len(tasks))) process_these_asap(tasks) print('Done') return 0 if __name__ == '__main__': import sys sys.exit(main()) 

执行者的方法可能似乎熟悉所有那些谁之前与Java弄脏了手。

另外还有一个方面:为了保持宇宙的健全,如果你不使用上下文,那么不要忘记closures你的池/执行程序(这对于你来说太棒了)

给定一个函数f ,如下所示:

 import threading threading.Thread(target=f).start() 

将parameter passing给f

 threading.Thread(target=f, args=(a,b,c)).start() 

multithreading与简单的例子,这将是有益的。 你可以运行它,很容易理解multithreading如何在python中工作。 我用锁来防止访问其他线程,直到以前的线程完成他们的工作。 通过使用

tLock = threading.BoundedSemaphore(value = 4)

这行代码可以允许一次处理多个进程,并保持稍后或完成先前进程之后运行的线程的其余部分。

 import threading import time #tLock = threading.Lock() tLock = threading.BoundedSemaphore(value=4) def timer(name, delay, repeat): print "\r\nTimer: ", name, " Started" tLock.acquire() print "\r\n", name, " has the acquired the lock" while repeat > 0: time.sleep(delay) print "\r\n", name, ": ", str(time.ctime(time.time())) repeat -= 1 print "\r\n", name, " is releaseing the lock" tLock.release() print "\r\nTimer: ", name, " Completed" def Main(): t1 = threading.Thread(target=timer, args=("Timer1", 2, 5)) t2 = threading.Thread(target=timer, args=("Timer2", 3, 5)) t3 = threading.Thread(target=timer, args=("Timer3", 4, 5)) t4 = threading.Thread(target=timer, args=("Timer4", 5, 5)) t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5)) t1.start() t2.start() t3.start() t4.start() t5.start() print "\r\nMain Complete" if __name__ == "__main__": Main() 

Python 3具有启动并行任务的function 。 这使我们的工作更容易。

它用于线程池和进程池 。

以下给出一个见解:

ThreadPoolExecutor示例

 import concurrent.futures import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) 

ProcessPoolExecutor

 import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main() 

大多数文档和教程使用Python的ThreadingQueue模块,他们似乎压倒性的初学者。

也许考虑python的concurrent.futures.ThreadPoolExecutor模块3.结合with子句和列表理解它可能是一个真正的魅力。

 from concurrent.futures import ThreadPoolExecutor, as_completed def get_url(url): # Your actual program here. Using threading.Lock() if necessary return "" # List of urls to fetch urls = ["url1", "url2"] with ThreadPoolExecutor(max_workers = 5) as executor: # Creat threads futures = {executor.submit(get_url, url) for url in urls} # as_completed() gives you the threads once finished for f in as_completed(futures): # Get the results rs = f.result() 

在这里,参数是一个参数的元组; 使用一个空元组来调用函数而不传递任何参数。 kwargs是一个关键字参数的可选字典。

 #!/usr/bin/python import thread import time # Define a function for the thread def print_time( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 print "%s: %s" % ( threadName, time.ctime(time.time()) ) # Create two threads as follows try: thread.start_new_thread( print_time, ("Thread-1", 2, ) ) thread.start_new_thread( print_time, ("Thread-2", 4, ) ) except: print "Error: unable to start thread" while 1: pass 

当上面的代码被执行时,它会产生以下结果 –

 Thread-1: Thu Jan 22 15:42:17 2009 Thread-1: Thu Jan 22 15:42:19 2009 Thread-2: Thu Jan 22 15:42:19 2009 Thread-1: Thu Jan 22 15:42:21 2009 Thread-2: Thu Jan 22 15:42:23 2009 Thread-1: Thu Jan 22 15:42:23 2009 Thread-1: Thu Jan 22 15:42:25 2009 Thread-2: Thu Jan 22 15:42:27 2009 Thread-2: Thu Jan 22 15:42:31 2009 Thread-2: Thu Jan 22 15:42:35 2009 

上面的解决scheme实际上并没有在我的GNU / Linux服务器上使用多个内核(我没有pipe理员权限)。 他们只是在一个核心上运行。 我使用较低级别的os.fork接口来产生多个线程。 这是我工作的代码:

 from os import fork values = ['different', 'values', 'for', 'threads'] for i in range(len(values)): p = fork() if p == 0: my_function(values[i]) break