Попытка разобраться с многопоточностью
Многопоточность подразумевает, что несколько потоков будут исполняться параллельно. В идеале использование многопоточности ускоряет выполнение программы. Однако, в python есть такая штука как GIL, Global Interpreter Lock, которая обеспечивает то, что в каждый момент времени активен только один поток.
Также GIL защищает внутренности питона и память от необдуманных действий программиста, но не защищает написанный код, чтобы сделать наш код потокобезопасным, нужно синхронизировать потоки, то есть предотвратить одновременный доступ к общим областям памяти.
Попробуем написать код, который будет работать в два потока, просто добавляя числа к списку:
1 import threading 2 3 4 class MyThread(threading.Thread): 5 def __init__(self, name, alist): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 9 def run(self): 10 print("Starting" + self.name) 11 for _ in range(2): 12 self.alist.append(self.alist[-1]+1) 13 print("Exiting" + self.name) 14 15 16 alist = [1, 2] 17 18 thread1 = MyThread("Thread-1", alist) 19 thread2 = MyThread("Thread-2", alist) 20 21 thread1.start() 22 thread2.start() 23 24 print("Exiting Main Thread") 25 print(alist)
После запуска получим странный результат:
StartingThread-1 ExitingThread-1 StartingThread-2 Exiting Main Thread [1, 2, 3, 4] ExitingThread-2
Мало того, что результат беспорядочный, при разных запусках результаты будут отличаться. Это происходит из-за того, что потоки не синхронизированы, а функция print непотокобезопасна. Чтобы это исправить попробуем использовать Lock, чтобы предотвратить одновременный доступ к списку:
1 import threading 2 3 4 class MyThread(threading.Thread): 5 def __init__(self, name, alist, lock): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 self.lock = lock 9 10 def run(self): 11 with self.lock: 12 print("Starting" + self.name) 13 for _ in range(2): 14 self.alist.append(self.alist[-1]+1) 15 print("Exiting" + self.name) 16 17 18 alist = [1, 2] 19 20 lock = threading.Lock() 21 22 thread1 = MyThread("Thread-1", alist, lock) 23 thread2 = MyThread("Thread-2", alist, lock) 24 25 thread1.start() 26 thread2.start() 27 28 29 print("Exiting Main Thread") 30 print(alist)
Но это не помогает:
StartingThread-1 Exiting Main Thread ExitingThread-1 [1, 2, 3, 4] StartingThread-2 ExitingThread-2
Дело в том, что когда мы обращаемся к списку на 30 строке, то потоки еще могут не закончить свою работу, поэтому список может быть не равен [1, 2, 3, 4, 5, 6] на момент вызова. Поэтому также необходимо использовать функцию t.join(), которая заставляет ждать тот поток в котором она вызвана пока закончится поток t:
1 import threading 2 3 4 class MyThread(threading.Thread): 5 def __init__(self, name, alist, lock): 6 threading.Thread.__init__(self) 7 self.alist = alist 8 self.lock = lock 9 10 def run(self): 11 with self.lock: 12 print("Starting" + self.name) 13 for _ in range(2): 14 self.alist.append(self.alist[-1]+1) 15 print("Exiting" + self.name) 16 17 18 alist = [1, 2] 19 20 lock = threading.Lock() 21 22 thread1 = MyThread("Thread-1", alist, lock) 23 thread2 = MyThread("Thread-2", alist, lock) 24 25 thread1.start() 26 thread2.start() 27 28 thread1.join() 29 thread2.join() 30 31 print("Exiting Main Thread") 32 print(alist)
На строках 28 и 29 мы заставляем главный поток ждать пока закончатся потоки thread1 и thread2, и теперь вывод нормальный:
StartingThread-1 ExitingThread-1 StartingThread-2 ExitingThread-2 Exiting Main Thread [1, 2, 3, 4, 5, 6]
Интересно, что если рассматривать задачу добавления к списку нескольких чисел, то вариант с многопоточностью оказывается примерно в 50 раз медленнее обычного кода:
1 import threading 2 import time 3 4 5 alist = [1, 2] 6 t1 = time.time() 7 for _ in range(4): 8 alist.append(alist[-1]+1) 9 print(time.time() - t1) 10 11 12 class MyThread(threading.Thread): 13 def __init__(self, name, alist, lock): 14 threading.Thread.__init__(self) 15 self.alist = alist 16 self.lock = lock 17 18 def run(self): 19 with self.lock: 20 for _ in range(2): 21 self.alist.append(self.alist[-1]+1) 22 23 24 alist = [1, 2] 25 26 lock = threading.Lock() 27 28 thread1 = MyThread("Thread-1", alist, lock) 29 thread2 = MyThread("Thread-2", alist, lock) 30 31 t1 = time.time() 32 thread1.start() 33 thread2.start() 34 35 thread1.join() 36 thread2.join() 37 print(time.time() - t1)
5.00679016113e-06 0.00025200843811
Что как бы намекает нам, что использовать многопоточность для этой задачи просто глупо. Хотя, конечно, она демонстрирует важность использования Lock'а и join'ов. Но можно придумать задачу, где многопоточность действительно нужна, например, если нужно скачать несколько файлов. Попробуем закачать несколько тестовых файлов по 10 мегабайт без многопоточности и с ней:
1 import threading 2 import time 3 import urllib 4 5 url = 'http://ovh.net/files/10Mio.dat' 6 number_of_files = 10 7 url_opener = urllib.URLopener() 8 9 t1 = time.time() 10 for i in range(number_of_files): 11 url_opener.retrieve(url, "test_{0}".format(i)) 12 print(time.time()-t1) 13 14 15 class Download(threading.Thread): 16 def __init__(self, ind): 17 threading.Thread.__init__(self) 18 self.ind = ind 19 20 def run(self): 21 url_opener.retrieve( 22 url, "test_{0}".format(self.ind)) 23 24 25 threads = [Download(x) for x in range(number_of_files)] 26 27 t1 = time.time() 28 for thread in threads: 29 thread.start() 30 for thread in threads: 31 thread.join() 32 print(time.time() - t1)
a@vostro:~/projects/blog_materials$ python multithreading_test.py 25.8116919994 18.3509969711 a@vostro:~/projects/blog_materials$ python multithreading_test.py 33.8754458427 13.5044431686 a@vostro:~/projects/blog_materials$ python multithreading_test.py 16.7841210365 15.0352919102 a@vostro:~/projects/blog_materials$ python multithreading_test.py 28.8646409512 14.0407190323
Видно, что вариант с многопоточной закачкой в некоторых случаях до 2 раз быстрее.
Выводы
Многопоточность - вещь нужная, но ее нужно уметь правильно готовить, не забывать синхронизировать потоки и использовать с умом.