从接触Python到现在也有将近两年的时间了,工作中对异步任务的处理一直依靠Celery框架来完成,对Python多线程这部分知识一直没有一个感性直接的认识。这两天正好遇到一个小任务,在遇到问题–寻求解决方案–完成任务的过程中,学习了Python多线程的知识并切身体会到了它的优势与应用场景,觉得非常有趣,记录分享在这里。
任务需求 现在有一个excel文件包含上千条url,它们在工作表的第一列(格式如下)。任务要求检查这些url的可访问性,把对应url请求的Http响应码存入第二列,把页面内容保存在第三列。
尝试 任务需求看起来简单且清晰,我想了一下,Python有xlrd和xlwt用来读写excel, 用requests来做Http请求,没有技术难点,那么开始写代码吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import xlrdimport xlwtimport requestsdef main (): output_file = xlwt.Workbook(encoding = 'utf-8' ) output_table = output_file.add_sheet('sheet1' ) url_file = xlrd.open_workbook('url.xlsx' ) url_table = url_file.sheets()[0 ] nrows = url_table.nrows for row_num in xrange(nrows): row = url_table.row_values(row_num) url = row[0 ] output_table.write(row_num, 0 , url) print 'get url: %s' % url try : resp = requests.get(url) status_code = resp.status_code output_table.write(row_num, 1 , status_code) if status_code == requests.codes.ok: try : content = resp.content[:32000 ] output_table.write(row_num, 2 , content) except Exception as e: print url, e.message except Exception as e: output_table.write(row_num, 1 , 'Error' ) output_file.save('./output.xls' ) if __name__ == '__main__' : main()
测试运行,代码工作正常,只是速度太慢了,检查千余项url至少要花费1小时以上,不给力啊!
修改 上面的代码已经跑通了业务逻辑,但阻塞式的等待网络请求响应花费了太多时间,如果能并发的向多个url发出请求互不等待就好了。这样多线程的方案就呼之欲出了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 # -*- coding:utf8 -*- import xlrd import xlwt import requests import threading # 并发线程数 THREAD_NUM = 30 def testUrl(url, index , table): print 'get url: %s' % url table.write (index , 0 , url) try : resp = requests.get (url) status_code = resp.status_code table.write (index , 1 , status_code) if status_code == requests.codes.ok: try : content = resp.content[:32000 ] table.write (index , 2 , content) except Exception as e : print url, e .message except: table.write (index , 1 , 'Error' ) def batchTestUrls(rows, table): for row in rows: testUrl(row['url' ], row['row_num' ], table) def main(): output_file = xlwt.Workbook(encoding = 'utf-8' ) output_table = output_file.add_sheet('sheet1' ) rows = [] url_file = xlrd.open_workbook('url.xlsx' ) url_table = url_file.sheets()[0 ] nrows = url_table.nrows for row_num in xrange(nrows): row = url_table.row_values(row_num) url = row[0 ] rows.append ({'row_num' : row_num, 'url' : url}) size = len (rows) / THREAD_NUM for i in range (THREAD_NUM): if i == THREAD_NUM - 1 : rows_slice = rows[i*size:] else : rows_slice = rows[i*size: i*size+size] t = threading.Thread(target=batchTestUrls, args =(rows_slice, output_table)) t.start() t.join () output_file.save('./output.xls' ) if __name__ == '__main__' : main()
在上面的代码中我设置了30个线程,把所有的url切片分给这些线程同时执行网络请求。 测试运行,看起来好很多了,比起第一版代码效率有了明显提升。 但很快我就发现了新的问题:因为一开始就把url切片分给了多个线程,在程序运行一段时间后,有些线程已经完成任务停止工作了,有些线程还在孤军奋战,这显然是浪费了资源嘛。 还能再给力一点吗?
完善 第二版代码最大的问题就是多线程资源没有充分的利用起来,一开始就给这些线程分配定量的工作,干的快线程的完成任务就闲下来了。如果能让这些线程在完成一项工作之后主动获取新的工作就好了。 想到这一点,方案就清晰了: 现在我只要把待处理的工作都放在一个任务队列里,然后让线程们自己去任务队列认领工作,完成一项就再认领一项,直到队列中没有任务为止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import xlwtimport xlrdimport requestsimport threadingfrom time import timefrom Queue import QueueTHREADS_NUM = 30 def testUrl (url, index, table ): print 'get url: %s' % url table.write(index, 0 , url) try : resp = requests.get(url) status_code = resp.status_code table.write(index, 1 , status_code) if status_code == requests.codes.ok: try : content = resp.content[:32000 ] table.write(index, 2 , content) except Exception as e: print url, e.message except : table.write(index, 1 , 'Error' ) class Producer (threading.Thread ): def __init__ (self, name, queue ): threading.Thread.__init__(self, name=name) self.queue = queue def run (self ): url_file = xlrd.open_workbook('url.xlsx' ) url_table = url_file.sheets()[0 ] nrows = url_table.nrows for row_num in xrange(nrows): row = url_table.row_values(row_num) url = row[0 ] self.queue.put((row_num, url)) class Consumer (threading.Thread ): def __init__ (self, name, queue, table ): threading.Thread.__init__(self, name=name) self.queue = queue self.table = table def run (self ): while not self.queue.empty(): index, url = self.queue.get() testUrl(url, index, self.table) def main (): file = xlwt.Workbook(encoding = 'utf-8' ) table = file.add_sheet('sheet1' ) queue = Queue() producer = Producer('Producer' , queue) consumers = [] for i in range (THREADS_NUM): consumer = Consumer('Consumer_%s' % str (i), queue, table) consumers.append(consumer) producer.start() producer.join() for consumer in consumers: consumer.start() for consumer in consumers: consumer.join() file.save('./output.xls' ) print 'All threads finished!' if __name__ == '__main__' : start = time() main() end = time() print 'time used %s seconds' % str (end - start)
上面的代码使用Python的Queue做任务队列,编写了一个Producer类向队列里插入任务,编写了一个Consumer类从队列里获取并完成任务。现在只要实例化多个Consumer,它们就会不知疲倦的工作起来啦。
总结
Python多线程使用场景: 在IO密集型的任务中使用多线程可以显著提升程序效率。本次任务中有大量的时间耗费在等待网络请求响应而CPU处于空闲状态,是一个应用多线程的典型场景。
在Python中如何创建多线程: Thread是Python中的线程类,有两种使用方法: 方法一:
1 2 3 4 5 6 7 8 9 import threadingdef action (arg): # do something print 'action %s' % arg for i in range (5 ): t = threading.Thread(target =action , args=(i,)) t.start()
方法二:
1 2 3 4 5 6 7 8 9 10 11 12 class TaskThread (threading .Thread ): def __init__ (self , arg) super (TaskThread, self ).__init__() self .arg = arg def run (self ) : print 'arg is :%s' % self .arg for i in range(5 ): t = TaskThread(i) t.start()
join()方法
join([timeout])
Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception. 简单的说就是:阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
Queue的使用: Python的Queue模块中的Queue类实现了一个线程安全的队列,在多线程编程中十分有用,避免了自己去操作锁的麻烦。