multithreadingPython应用程序和套接字连接的问题

我正在研究一个运行在具有4G内存的Ubuntu机器上的Python应用程序的问题。 该工具将被用于审计服务器(我们更喜欢滚动我们自己的工具)。 它使用线程连接到许多服务器,许多TCP连接失败。 但是,如果我在启动每个线程之间添加1秒的延迟,则大多数连接都会成功。 我已经使用这个简单的脚本来调查可能发生的事情:

#!/usr/bin/python import sys import socket import threading import time class Scanner(threading.Thread): def __init__(self, host, port): threading.Thread.__init__(self) self.host = host self.port = port self.status = "" def run(self): self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sk.settimeout(20) try: self.sk.connect((self.host, self.port)) except Exception, err: self.status = str(err) else: self.status = "connected" finally: self.sk.close() def get_hostnames_list(filename): return open(filename).read().splitlines() if (__name__ == "__main__"): hostnames_file = sys.argv[1] hosts_list = get_hostnames_list(hostnames_file) threads = [] for host in hosts_list: #time.sleep(1) thread = Scanner(host, 443) threads.append(thread) thread.start() for thread in threads: thread.join() print "Host: ", thread.host, " : ", thread.status 

如果我用time.sleep(1)注释掉了300个主机,那么很多连接失败会造成超时错误,而如果我把延迟时间设置为1秒,它们不会超时。应用程序在另一个Linux发行版运行在一台function更强大的机器上,并没有那么多的连接错误? 是由于内核限制吗? 有没有什么我可以做的,让连接工作,没有延误?

UPDATE

我也试过一个程序来限制一个池中可用的线程数。 通过减less到20我可以让所有的连接工作,但它只检查约1主机秒。 所以无论我尝试(放入睡眠(1)还是限制并发线程的数量),我似乎无法每秒钟检查超过1个主机。

UPDATE

我刚刚发现这个问题 ,看起来和我所看到的相似。

UPDATE

我不知道如果写这个使用扭曲可能会帮助? 任何人都可以显示我的例子看起来像写在使用扭曲?

你可以试试gevent

 from gevent.pool import Pool from gevent import monkey; monkey.patch_all() # patches stdlib import sys import logging from httplib import HTTPSConnection from timeit import default_timer as timer info = logging.getLogger().info def connect(hostname): info("connecting %s", hostname) h = HTTPSConnection(hostname, timeout=2) try: h.connect() except IOError, e: info("error %s reason: %s", hostname, e) else: info("done %s", hostname) finally: h.close() def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") info("getting hostname list") hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt" hosts_list = open(hosts_file).read().splitlines() info("spawning jobs") pool = Pool(20) # limit number of concurrent connections start = timer() for _ in pool.imap(connect, hosts_list): pass info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start) if __name__=="__main__": main() 

它可以每秒处理多个主机。

产量

 2011-01-31 11:08:29,052 getting hostname list 2011-01-31 11:08:29,052 spawning jobs 2011-01-31 11:08:29,053 connecting www.yahoo.com 2011-01-31 11:08:29,053 connecting www.abc.com 2011-01-31 11:08:29,053 connecting www.google.com 2011-01-31 11:08:29,053 connecting stackoverflow.com 2011-01-31 11:08:29,053 connecting facebook.com 2011-01-31 11:08:29,054 connecting youtube.com 2011-01-31 11:08:29,054 connecting live.com 2011-01-31 11:08:29,054 connecting baidu.com 2011-01-31 11:08:29,054 connecting wikipedia.org 2011-01-31 11:08:29,054 connecting blogspot.com 2011-01-31 11:08:29,054 connecting qq.com 2011-01-31 11:08:29,055 connecting twitter.com 2011-01-31 11:08:29,055 connecting msn.com 2011-01-31 11:08:29,055 connecting yahoo.co.jp 2011-01-31 11:08:29,055 connecting taobao.com 2011-01-31 11:08:29,055 connecting google.co.in 2011-01-31 11:08:29,056 connecting sina.com.cn 2011-01-31 11:08:29,056 connecting amazon.com 2011-01-31 11:08:29,056 connecting google.de 2011-01-31 11:08:29,056 connecting google.com.hk 2011-01-31 11:08:29,188 done www.google.com 2011-01-31 11:08:29,189 done google.com.hk 2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused 2011-01-31 11:08:29,225 done google.co.in 2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused 2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused 2011-01-31 11:08:29,250 done google.de 2011-01-31 11:08:29,262 done blogspot.com 2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused 2011-01-31 11:08:29,465 done amazon.com 2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused 2011-01-31 11:08:29,496 done www.yahoo.com 2011-01-31 11:08:29,521 done stackoverflow.com 2011-01-31 11:08:29,606 done youtube.com 2011-01-31 11:08:29,939 done twitter.com 2011-01-31 11:08:33,056 error qq.com reason: timed out 2011-01-31 11:08:33,057 error taobao.com reason: timed out 2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out 2011-01-31 11:08:34,466 done facebook.com 2011-01-31 11:08:35,056 error baidu.com reason: timed out 2011-01-31 11:08:35,057 20 hosts took us 6 seconds 

我不知道如果写这个使用扭曲可能会帮助? 任何人都可以显示我的例子看起来像写在使用扭曲?

这个变体比使用gevent的代码gevent

 #!/usr/bin/env python import sys from timeit import default_timer as timer from twisted.internet import defer, protocol, reactor, ssl, task from twisted.python import log info = log.msg class NoopProtocol(protocol.Protocol): def makeConnection(self, transport): transport.loseConnection() def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30): info("connecting %s" % host) cc = protocol.ClientCreator(reactor, NoopProtocol) d = cc.connectSSL(host, port, contextFactory, timeout) d.addCallbacks(lambda _: info("done %s" % host), lambda f: info("error %s reason: %s" % (host, f.value))) return d def n_at_a_time(it, n): """Iterate over `it` concurently `n` items at a time. `it` - an iterator creating Deferreds `n` - number of concurrent iterations return a deferred that fires on completion """ return defer.DeferredList([task.coiterate(it) for _ in xrange(n)]) def main(): try: log.startLogging(sys.stderr, setStdout=False) info("getting hostname list") hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt" hosts_list = open(hosts_file).read().splitlines() info("spawning jobs") start = timer() jobs = (connect(host, 443, timeout=2) for host in hosts_list) d = n_at_a_time(jobs, n=20) # limit number of simultaneous connections d.addCallback(lambda _: info("%d hosts took us %.2g seconds" % ( len(hosts_list), timer() - start))) d.addBoth(lambda _: (info("the end"), reactor.stop())) except: log.err() reactor.stop() if __name__=="__main__": reactor.callWhenRunning(main) reactor.run() 

这是一个使用tidinlineCallbacks的变体。 它需要Python 2.5或更新的版本。 它允许以同步(阻塞)方式编写asynchronous代码:

 #!/usr/bin/env python import sys from timeit import default_timer as timer from twisted.internet import defer, protocol, reactor, ssl, task from twisted.python import log info = log.msg class NoopProtocol(protocol.Protocol): def makeConnection(self, transport): transport.loseConnection() @defer.inlineCallbacks def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30): info("connecting %s" % host) cc = protocol.ClientCreator(reactor, NoopProtocol) try: yield cc.connectSSL(host, port, contextFactory, timeout) except Exception, e: info("error %s reason: %s" % (host, e)) else: info("done %s" % host) def n_at_a_time(it, n): """Iterate over `it` concurently `n` items at a time. `it` - an iterator creating Deferreds `n` - number of concurrent iterations return a deferred that fires on completion """ return defer.DeferredList([task.coiterate(it) for _ in xrange(n)]) @defer.inlineCallbacks def main(): try: log.startLogging(sys.stderr, setStdout=False) info("getting hostname list") hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt" hosts_list = open(hosts_file).read().splitlines() info("spawning jobs") start = timer() jobs = (connect(host, 443, timeout=2) for host in hosts_list) yield n_at_a_time(jobs, n=20) # limit number of simultaneous connections info("%d hosts took us %.2g seconds" % (len(hosts_list), timer()-start)) info("the end") except: log.err() finally: reactor.stop() if __name__=="__main__": reactor.callWhenRunning(main) reactor.run() 

如何一个真正的线程池?

 #!/usr/bin/env python3 # http://code.activestate.com/recipes/577187-python-thread-pool/ from queue import Queue from threading import Thread class Worker(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception as exception: print(exception) self.tasks.task_done() class ThreadPool: def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): self.tasks.put((func, args, kargs)) def wait_completion(self): self.tasks.join() 

例:

 import threadpool pool = threadpool.ThreadPool(20) # 20 threads pool.add_task(print, "test") pool.wait_completion() 

这是在Python 3,但不应该太难转换为2.x. 如果这能解决您的问题,我并不感到惊讶。

Python 3.4引入了asynchronousIO asyncio模块的新临时API 。

这种方法类似于基于twisted的答案 :

 #!/usr/bin/env python3.4 import asyncio import logging from contextlib import closing class NoopProtocol(asyncio.Protocol): def connection_made(self, transport): transport.close() info = logging.getLogger().info @asyncio.coroutine def connect(loop, semaphor, host, port=443, ssl=True, timeout=15): try: with (yield from semaphor): info("connecting %s" % host) done, pending = yield from asyncio.wait( [loop.create_connection(NoopProtocol, host, port, ssl=ssl)], loop=loop, timeout=timeout) if done: next(iter(done)).result() except Exception as e: info("error %s reason: %s" % (host, e)) else: if pending: info("error %s reason: timeout" % (host,)) for ft in pending: ft.cancel() else: info("done %s" % host) @asyncio.coroutine def main(loop): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") limit, timeout, hosts = parse_cmdline() # connect `limit` concurrent connections sem = asyncio.BoundedSemaphore(limit) coros = [connect(loop, sem, host, timeout=timeout) for host in hosts] if coros: yield from asyncio.wait(coros, loop=loop) if __name__=="__main__": with closing(asyncio.get_event_loop()) as loop: loop.run_until_complete(main(loop)) 

除了twisted变体,它还使用NoopProtocol ,它在连接成功时不做任何事情,而是立即断开连接。

使用信号量来限制并发连接的数量。

代码是基于协程的 。

要找出有多less成功的SSL连接,我们可以从前百万的Alexa列表中的前1000个主机:

 $ curl -O http://s3.amazonaws.com/alexa-static/top-1m.csv.zip $ unzip *.zip $ /usr/bin/time perl -nE'say $1 if /\d+,([^\s,]+)$/' top-1m.csv | head -1000 |\ python3.4 asyncio_ssl.py - --timeout 60 |& tee asyncio.log 

结果是不到一半的连接成功。 平均来说,它每秒钟检查20个主机。 一分钟后,许多网站超时。 如果主机与服务器证书的主机名不匹配,则连接也会失败。 它包括example.comwww.example.com般的比较。

首先,尝试使用非阻塞套接字。 另一个原因是你正在消耗所有的临时端口。 尝试删除的限制。