Попытка разобраться с многопоточностью
Многопоточность подразумевает, что несколько потоков будут исполняться параллельно. В идеале использование многопоточности ускоряет выполнение программы. Однако, в python есть такая штука как GIL, Global Interpreter Lock, которая обеспечивает то, что в каждый момент времени активен только один поток.
Также GIL защищает внутренности питона и память от необдуманных действий программиста, но не защищает написанный код, чтобы сделать наш код потокобезопасным, нужно синхронизировать потоки, то есть предотвратить одновременный доступ к общим областям памяти.
Попробуем написать код, который будет работать в два потока, просто добавляя числа к списку:
1import threading 2 3 4class MyThread(threading.Thread): 5 def __init__(self, name, alist): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 9 def run(self): 10 print("Starting" + self.name) 11 for _ in range(2): 12 self.alist.append(self.alist[-1]+1) 13 print("Exiting" + self.name) 14 15 16alist = [1, 2] 17 18thread1 = MyThread("Thread-1", alist) 19thread2 = MyThread("Thread-2", alist) 20 21thread1.start() 22thread2.start() 23 24print("Exiting Main Thread") 25print(alist)
После запуска получим странный результат:
StartingThread-1
ExitingThread-1
StartingThread-2
Exiting Main Thread
[1, 2, 3, 4]
ExitingThread-2
Мало того, что результат беспорядочный, при разных запусках результаты будут отличаться. Это происходит из-за того, что потоки не синхронизированы, а функция print непотокобезопасна. Чтобы это исправить попробуем использовать Lock, чтобы предотвратить одновременный доступ к списку:
1import threading 2 3 4class MyThread(threading.Thread): 5 def __init__(self, name, alist, lock): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 self.lock = lock 9 10 def run(self): 11 with self.lock: 12 print("Starting" + self.name) 13 for _ in range(2): 14 self.alist.append(self.alist[-1]+1) 15 print("Exiting" + self.name) 16 17 18alist = [1, 2] 19 20lock = threading.Lock() 21 22thread1 = MyThread("Thread-1", alist, lock) 23thread2 = MyThread("Thread-2", alist, lock) 24 25thread1.start() 26thread2.start() 27 28 29print("Exiting Main Thread") 30print(alist)
Но это не помогает:
StartingThread-1
Exiting Main Thread
ExitingThread-1
[1, 2, 3, 4]
StartingThread-2
ExitingThread-2
Дело в том, что когда мы обращаемся к списку на 30 строке, то потоки еще могут не закончить свою работу, поэтому список может быть не равен [1, 2, 3, 4, 5, 6] на момент вызова. Поэтому также необходимо использовать функцию t.join(), которая заставляет ждать тот поток в котором она вызвана пока закончится поток t:
1import threading 2 3 4class MyThread(threading.Thread): 5 def __init__(self, name, alist, lock): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 self.lock = lock 9 10 def run(self): 11 with self.lock: 12 print("Starting" + self.name) 13 for _ in range(2): 14 self.alist.append(self.alist[-1]+1) 15 print("Exiting" + self.name) 16 17 18alist = [1, 2] 19 20lock = threading.Lock() 21 22thread1 = MyThread("Thread-1", alist, lock) 23thread2 = MyThread("Thread-2", alist, lock) 24 25thread1.start() 26thread2.start() 27 28thread1.join() 29thread2.join() 30 31print("Exiting Main Thread") 32print(alist)
На строках 28 и 29 мы заставляем главный поток ждать пока закончатся потоки thread1 и thread2, и теперь вывод нормальный:
StartingThread-1
ExitingThread-1
StartingThread-2
ExitingThread-2
Exiting Main Thread
[1, 2, 3, 4, 5, 6]
Интересно, что если рассматривать задачу добавления к списку нескольких чисел, то вариант с многопоточностью оказывается примерно в 50 раз медленнее обычного кода:
1import threading 2import time 3 4 5alist = [1, 2] 6t1 = time.time() 7for _ in range(4): 8 alist.append(alist[-1]+1) 9print(time.time() - t1) 10 11 12class MyThread(threading.Thread): 13 def __init__(self, name, alist, lock): 14 threading.Thread.__init__(self) 15 self.alist = alist 16 self.lock = lock 17 18 def run(self): 19 with self.lock: 20 for _ in range(2): 21 self.alist.append(self.alist[-1]+1) 22 23 24alist = [1, 2] 25 26lock = threading.Lock() 27 28thread1 = MyThread("Thread-1", alist, lock) 29thread2 = MyThread("Thread-2", alist, lock) 30 31t1 = time.time() 32thread1.start() 33thread2.start() 34 35thread1.join() 36thread2.join() 37print(time.time() - t1)
5.00679016113e-06
0.00025200843811
Что как бы намекает нам, что использовать многопоточность для этой задачи просто глупо. Хотя, конечно, она демонстрирует важность использования Lock'а и join'ов. Но можно придумать задачу, где многопоточность действительно нужна, например, если нужно скачать несколько файлов. Попробуем закачать несколько тестовых файлов по 10 мегабайт без многопоточности и с ней:
1import threading 2import time 3import urllib 4 5url = 'http://ovh.net/files/10Mio.dat' 6number_of_files = 10 7url_opener = urllib.URLopener() 8 9t1 = time.time() 10for i in range(number_of_files): 11 url_opener.retrieve(url, "test_{0}".format(i)) 12print(time.time()-t1) 13 14 15class Download(threading.Thread): 16 def __init__(self, ind): 17 threading.Thread.__init__(self) 18 self.ind = ind 19 20 def run(self): 21 url_opener.retrieve( 22 url, "test_{0}".format(self.ind)) 23 24 25threads = [Download(x) for x in range(number_of_files)] 26 27t1 = time.time() 28for thread in threads: 29 thread.start() 30for thread in threads: 31 thread.join() 32print(time.time() - t1)
a@vostro:~/projects/blog_materials$ python multithreading_test.py
25.8116919994
18.3509969711
a@vostro:~/projects/blog_materials$ python multithreading_test.py
33.8754458427
13.5044431686
a@vostro:~/projects/blog_materials$ python multithreading_test.py
16.7841210365
15.0352919102
a@vostro:~/projects/blog_materials$ python multithreading_test.py
28.8646409512
14.0407190323
Видно, что вариант с многопоточной закачкой в некоторых случаях до 2 раз быстрее.
Выводы
Многопоточность - вещь нужная, но ее нужно уметь правильно готовить, не забывать синхронизировать потоки и использовать с умом.