Bài 52 - Parallel Computing on Python
30 Nov 2020 - phamdinhkhanh
1. Tại sao cần xử lý song song
Trong quá trình xây dựng các ứng dụng deep learning trên python, mình nhận ra rằng để tạo ra một ứng dụng thì không khó. Nhưng để tạo ra một ứng dụng đáp ứng được tốc độ xử lý, độ chính xác và mức độ sử dụng resource thì cần phải tối ưu rất nhiều thứ. Bạn sẽ phải quan tâm đến các khía cạnh như:
- Giảm nhẹ kích thước mô hình thông qua: Quantization và compress mô hình.
- Optimize lại code.
- Chuyển từ single-thread sang multi-thread.
- Allocate lại tài nguyên như CPU, Memory.
Đặc biệt là các ứng dụng trên python thì tối ưu tốc độ xử lý là một challenge bởi python bị ràng buộc bởi cơ chế GIL [Global Interpreter Lock]. Tức là nó chỉ cho phép một thread hoạt động truy suất và chỉnh sửa bộ nhớ tại một thời điểm. Do đó python không tận dụng được các tính toán đa luồng. Tuy nhiên ở python 3.2 trở đi thì python đã bắt đầu hỗ trợ đa luồng. Và thông qua bài viết này mình sẽ hướng dẫn các bạn có thể accelerate các ứng dụng của mình thông qua đa luồng.
Nhưng trước tiên chúng ta cần hiểu về thread/process là gì? Vì blog dành cho đa dạng bạn đọc ở trình độ khác nhau nên bạn nào đã biết thì có thể bỏ qua phần kiến thức rất sơ đẳng này.
1.1. Thread và Process
Thread và process là hai khái niệm cơ bản trong lập trình và cũng có nhiều định nghĩa từ các nguồn khác nhau cho chúng.
Process là gì?
Chúng ta hiểu một cách đơn giản thì process là tiến trình để chạy một phần mềm. Khi bạn start một program thì tức là bạn đang khởi tạo một process. Hệ điều hành khi đó sẽ cung cấp các tài nguyên về memory, cpu, disk, bandwidth cho process để cho chạy ứng dụng của bạn.
Hình 1: Khi bạn vào task management của window bạn có thể theo dõi các process đang chạy với mã PID của process. Mỗi một process sẽ phụ trách một instance của OS system và được cung cấp các thành phần như memory, cpu, disk, bandwith,….: Khi bạn vào task management của window bạn có thể theo dõi các process đang chạy với mã PID của process. Mỗi một process sẽ phụ trách một instance của OS system và được cung cấp các thành phần như memory, cpu, disk, bandwith,….
Lịch xử lý của các processes sẽ được OS sắp xếp dựa trên một số thuật toán lập lịch như round robin, first come first serve,… Mình sẽ không đi sâu vào phần này, các bạn có thể tham khảo thêm Operating System Scheduling algorithms.
Threads là gì?
Chắc hẳn bạn đã từng nghe đến thông số số cores của CPU. Các CPU càng hiện đại, số lượng cores sẽ càng nhiều. Các core sẽ hỗ trợ cho việc tính toán multi-task tốt hơn. Các threads sẽ được vận hành và tính toán trên các core của CPU.
Một process khi được khởi tạo sẽ sinh ra các threads để run application. Bạn sẽ thắc mắc vậy thì chỉ cần một thread cũng được ? Tại sao lại cần nhiều threads? Nhiều threads sẽ giúp cho việc tính toán multi-task tốt hơn. Tức là bạn có thể làm nhiều nhiệm vụ một lúc. Nếu coi mỗi thread là một công nhân, thì việc sản xuất sẽ nhanh hơn nếu có nhiều công nhân phối hợp cùng làm việc. Bạn có thể hình dung dễ hơn qua ví dụ:
Khi bạn làm việc với microsoft word, bạn gõ bàn phím thì có những công việc sau cần thực hiện:
- Đọc thông tin input từ keyboards.
- Hiển thị lên màn hình các thông tin đã nhập trong quá trình gõ.
- High light những chỗ bị sai chính tả.
- Suggest các từ để có thể gõ nhanh hơn.
Mỗi công việc được phụ trách bởi một thread và chúng phối hợp với nhau để giúp ứng dụng của bạn mượt hơn. Nếu chỉ có một thread làm tất cả mọi công việc thì nó sẽ bị quá tải và bạn có thể gặp phải giới hạn về tốc độ xử lý của CPU, thuật ngữ hay được gọi là CPU bound.
Một process có thể là single-thread hoặc multiple-threads tùy thuộc vào số lượng là một hoặc nhiều. Khi có nhiều threads thì đòi hỏi phải có sự phối hợp tính toán song song [parrallel computing] giữa các threads với nhau. Từ đó sinh ra các khái niệm về đồng bộ [
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
4] và bất đồng bộ [
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5]. Chúng ta sẽ làm rõ hai khái niệm này ở các phần tiếp theo.
Bạn có thể thắc mắc multiple-threads thì có khác gì khác biệt so với việc sử dụng multiple-processes? Chúng ta vẫn có thể tính toán song song được trên cả hai? Vậy tại sao lại cần phải tách một process thành nhiều threads làm gì ? Thực tế là trong python thì process và thread cùng kế thừa chung một interface là một base thread. Chúng sẽ có những đặc tính chung, nhưng thread là một phiên bản nhẹ hơn so với process. Do đó việc khởi tạo thread sẽ nhanh hơn. Một điểm khác biệt nữa đó là thread được thiết kế để có thể hoạt động tương tác lẫn nhau. Các threads trong cùng một process sẽ chia sẻ được dữ liệu qua lại nên có lợi thế về I/O. Dữ liệu của process thì được thiết kế private nên một process không thể chia sẻ dữ liệu với các process khác. Đây là lý do chúng ta cần nhiều threads hoạt động trong một process.
Tiếp theo môi trường hoạt động của multiple-threads sẽ như thế nào ?
Khi các threads chạy song song trên cùng một process, chúng sẽ khởi tạo dữ liệu như thế nào? Dữ liệu sẽ được lưu vào đâu? Chúng chia sẻ chung một code như thế nào? Chúng ta cùng làm rõ qua sơ đồ bên dưới.
Hình 2: Cấu trúc của single thread và multiple threads. Cấu trúc của single thread và multiple threads.
Đầu tiên ứng dụng của bạn sẽ khi khởi chạy sẽ load code lên. Phần main của chương trình sẽ được compiler khởi chạy đầu tiên. Lần lượt các method sẽ được load vào môi trường stack theo trình tự chạy. Compiler chạy lần lượt các hàm trong stack. Các hàm được compiler biên dịch thành mã máy [
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
6] và được thực thi để sinh ra dữ liệu. Dữ liệu sau đó được lưu trữ tại hai bộ nhớ là Heap và Stack [cái này cũng tùy thuộc vào virtual machine của từng ngôn ngữ]. Stack lưu trữ method và các local variable còn heap lưu trữ object, array từ chương trình của bạn [phần lưu trữ này cũng có thể thay đổi tùy vào cách sắp xếp bộ nhớ của các ngôn ngữ]. Nếu load trên stack thì không bị phân mảng dữ liệu và có thời gian load/access nhanh hơn. Còn heap sẽ allocate vùng nhớ ngẫu nhiên, các ô nhớ không liên tục nên do đó bị phân mảng.
Okie, mình nghĩ lý thuyết như vậy là đủ rồi. Tiếp theo chúng ta sẽ cùng thực hành khởi tạo các thread và process trong python.
2. Khởi tạo thread trong python
2.1. Khởi tạo từ hàm
Trên python3 để khởi tạo một thread thì chúng ta sử dụng module
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
7, trên python2 là
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
8. Để start một method trên thread thì chúng ta chỉ cần truyền vào
import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
9 tên method và các giá trị đối số của nó. Ví dụ bên dưới chúng ta sử dụng hàm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
0 để đếm lùi các số từ trên xuống dưới.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import _thread
import time
def _counter[counter, thread_name]:
while [counter]:
time.sleep[0.01]
print["{}: {}".format[thread_name, counter]]
counter -= 1
counter = 5
# Khởi tạo 2 threads 1 và 2
try:
_thread.start_new_thread[_counter, [counter, "khanh thread"]] # pass counter and thread_name into method _counter
print["\n"]
_thread.start_new_thread[_counter, [counter, "ai thread"]]
except:
print["Error: unable to start thread"]
# Running counter
while [counter]:
counter -= 1
pass
| 1
2
3
4
5
6
7
8
9
10
| ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
| Ta thấy hai thread đã xen kẽ nhau cùng thực hiện tác vụ đếm ngược. Tuy nhiên về bản chất thì chúng vẫn là đơn luồng vì cơ chế GIL của python ép buộc một thời điểm chỉ có một thread được tương tác với dữ liệu. Có khá nhiều developer tỏ ra thất vọng về hạn chế này nhưng một số khác thì bảo vệ quan điểm này bởi nó giúp một dữ liệu không bị sử dụng và thay đổi cùng lúc bởi nhiều threads. Hiện tượng này dẫn tới 1
2
3
4
5
6
7
8
9
10
11
12
13
14
1, một trong những bug thường gặp ở các ngôn ngữ đa luồng như java hay C++.Trong ví dụ trên thì các method trên hai threads 1
2
3
4
5
6
7
8
9
10
11
12
13
14
2 và 1
2
3
4
5
6
7
8
9
10
11
12
13
14
3 khởi chạy độc lập nhau mà không ưu tiên một thread hoàn thành thì mới chạy thread tiếp theo. Cách chạy như vậy được gọi là bất đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5, một khái niệm cơ bản của parallel application. Trái ngược lại thì là đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
4, các method sẽ chạy theo tuần tự, sau khi method trước đó đã hoàn thành.2.2. Khởi tạo kế thừaMột cách khác để khởi tạo một thread đó là kế thừa lại Threading module. Kiểu kế thừa này khá phổ biến trong lập trình, chắc các bạn còn nhớ khi khởi tạo model trên pytorch chúng ta cũng kế thừa lại nn.Module chứ ? Khi đó chúng ta chỉ cần override lại các method cần điều chỉnh từ class cha. 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
| 2.3. Cơ chế Thread LockNhư đã giới thiệu chương trước, trong ví dụ ở trên các threads là bất đồng bộ [ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5]. Hai threads chạy độc lập với nhau mà không theo thứ tự. Chúng ta có thể đồng bộ [synchronous] các thread. Tức là cho phép một thread chạy xong thì thread khác mới được phép chạy bằng cách sử dụng Thread Lock trong python.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import threading
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
threadLock.acquire[]
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
threadLock.release[]
threadLock = threading.Lock[]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "linh thread", 5]
thread1.start[]
thread2.start[]
threads = [thread1, thread2]
for t in threads:
t.join[]
| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
| 2.3. Cơ chế Thread Lock Như đã giới thiệu chương trước, trong ví dụ ở trên các threads là bất đồng bộ [import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5]. Hai threads chạy độc lập với nhau mà không theo thứ tự. Chúng ta có thể đồng bộ [synchronous] các thread. Tức là cho phép một thread chạy xong thì thread khác mới được phép chạy bằng cách sử dụng Thread Lock trong python.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
threadLock.acquire[]
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
threadLock.release[]
threadLock = threading.Lock[]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "linh thread", 5]
thread1.start[]
thread2.start[]
threads = [thread1, thread2]
for t in threads:
t.join[]
| import _thread
import time
def _counter[counter, thread_name]:
while [counter]:
time.sleep[0.01]
print["{}: {}".format[thread_name, counter]]
counter -= 1
counter = 5
# Khởi tạo 2 threads 1 và 2
try:
_thread.start_new_thread[_counter, [counter, "khanh thread"]] # pass counter and thread_name into method _counter
print["\n"]
_thread.start_new_thread[_counter, [counter, "ai thread"]]
except:
print["Error: unable to start thread"]
# Running counter
while [counter]:
counter -= 1
pass
1 | 1
2
3
4
5
6
7
8
9
10
| ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
| Ta thấy hai thread đã xen kẽ nhau cùng thực hiện tác vụ đếm ngược. Tuy nhiên về bản chất thì chúng vẫn là đơn luồng vì cơ chế GIL của python ép buộc một thời điểm chỉ có một thread được tương tác với dữ liệu. Có khá nhiều developer tỏ ra thất vọng về hạn chế này nhưng một số khác thì bảo vệ quan điểm này bởi nó giúp một dữ liệu không bị sử dụng và thay đổi cùng lúc bởi nhiều threads. Hiện tượng này dẫn tới 1
2
3
4
5
6
7
8
9
10
11
12
13
14
1, một trong những bug thường gặp ở các ngôn ngữ đa luồng như java hay C++.Trong ví dụ trên thì các method trên hai threads 1
2
3
4
5
6
7
8
9
10
11
12
13
14
2 và 1
2
3
4
5
6
7
8
9
10
11
12
13
14
3 khởi chạy độc lập nhau mà không ưu tiên một thread hoàn thành thì mới chạy thread tiếp theo. Cách chạy như vậy được gọi là bất đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5, một khái niệm cơ bản của parallel application. Trái ngược lại thì là đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
4, các method sẽ chạy theo tuần tự, sau khi method trước đó đã hoàn thành.2.2. Khởi tạo kế thừa | Một cách khác để khởi tạo một thread đó là kế thừa lại Threading module. Kiểu kế thừa này khá phổ biến trong lập trình, chắc các bạn còn nhớ khi khởi tạo model trên pytorch chúng ta cũng kế thừa lại nn.Module chứ ? Khi đó chúng ta chỉ cần override lại các method cần điều chỉnh từ class cha. | 1
2
3
4
5
6
7
8
9
10
| ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
| Ta thấy hai thread đã xen kẽ nhau cùng thực hiện tác vụ đếm ngược. Tuy nhiên về bản chất thì chúng vẫn là đơn luồng vì cơ chế GIL của python ép buộc một thời điểm chỉ có một thread được tương tác với dữ liệu. Có khá nhiều developer tỏ ra thất vọng về hạn chế này nhưng một số khác thì bảo vệ quan điểm này bởi nó giúp một dữ liệu không bị sử dụng và thay đổi cùng lúc bởi nhiều threads. Hiện tượng này dẫn tới 1
2
3
4
5
6
7
8
9
10
11
12
13
14
1, một trong những bug thường gặp ở các ngôn ngữ đa luồng như java hay C++.Trong ví dụ trên thì các method trên hai threads 1
2
3
4
5
6
7
8
9
10
11
12
13
14
2 và 1
2
3
4
5
6
7
8
9
10
11
12
13
14
3 khởi chạy độc lập nhau mà không ưu tiên một thread hoàn thành thì mới chạy thread tiếp theo. Cách chạy như vậy được gọi là bất đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
5, một khái niệm cơ bản của parallel application. Trái ngược lại thì là đồng bộ import threading
import time
class FirstThread[threading.Thread]:
def __init__[self, thread_id, thread_name, counter]:
threading.Thread.__init__[self]
self.thread_id = thread_id
self.thread_name = thread_name
self.counter = counter
def run[self]:
print["Start thread {}!".format[self.thread_name]]
while [self.counter]:
time.sleep[0.01]
print["{} : {}".format[self.thread_name, self.counter]]
self.counter -= 1
print["End thread {}".format[self.thread_name]]
thread1 = FirstThread[1, "khanh thread", 5]
thread2 = FirstThread[2, "ai thread", 5]
thread1.start[]
thread2.start[]
4, các method sẽ chạy theo tuần tự, sau khi method trước đó đã hoàn thành.2.2. Khởi tạo kế thừa 1
2
3
4
5
6
7
8
9
10
0 | 1
2
3
4
5
6
7
8
9
10
1 | 1
2
3
4
5
6
7
8
9
10
2 | 1
2
3
4
5
6
7
8
9
10
3 | Ta nhận thấy dữ liệu là không được chia sẻ giữa 2 processes vì process thứ hai đổi lại dấu của process thứ nhất sẽ khiến các phần tử của 2 processes này đảo dấu. Tiếp theo nếu chúng ta sử dụng Array trong multiprocessing thì sao ? 1
2
3
4
5
6
7
8
9
10
0 | 1
2
3
4
5
6
7
8
9
10
5 | 1
2
3
4
5
6
7
8
9
10
2 | 1
2
3
4
5
6
7
8
9
10
3 | Ta nhận thấy dữ liệu là không được chia sẻ giữa 2 processes vì process thứ hai đổi lại dấu của process thứ nhất sẽ khiến các phần tử của 2 processes này đảo dấu. Tiếp theo nếu chúng ta sử dụng Array trong multiprocessing thì sao ? 1
2
3
4
5
6
7
8
9
10
5
1
2
3
4
5
6
7
8
9
10
7 Các bạn đã thấy gì chưa ? Dữ liệu đã được chia sẻ qua lại giữa hai processes. Vậy thì chúng ta sẽ thường sử dụng shared memory khi nào ? Giả định bạn đang có một pipeline biến đổi dữ liệu gồm nhiều step khác nhau, mỗi một process sẽ phụ trách một step trong pipeline. Khi đó dữ liệu cần được shared chung giữa các process. | Queue là một định dạng stack an toàn khi làm việc với multi thread và process. Chúng ta có thể tạo ra một queue và cho phép các thread, process truy cập dữ liệu mà không bị hiện tượng concurrency vì dữ liệu được truy suất và sử dụng một lần bởi một thread hoặc process. | Bên dưới chúng ta sẽ lấy ví dụ về việc sử dụng 2 process để đọc các dữ liệu trong một queue. Hai process này tới phiên của mình sẽ lấy ra các phần từ nằm trong queue theo kiểu FIFO [First Come First Out]. 1
2
3
4
5
6
7
8
9
10
8 | 1
2
3
4
5
6
7
8
9
10
9 1
2
3
4
5
6
7
8
9
10
ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
1 Như vậy không có bất kỳ một data nào được sử dụng chung giữa 2 processes nên tránh được concurrency. 3.3. Pool trong multiprocess 1
2
3
4
5
6
7
8
9
10
0 | Trong python chúng ta có thể sử dụng pool để tận dụng được các tính toán song song trên nhiều process một lúc. Cơ chế của pool đã loại bỏ hạn chế của GIL trong python, cho phép nhiều luồng hoạt động đồng thời và giúp đẩy nhanh quá trình tính toán. | Trong Pool chúng ta có thể khai báo nhiều workers cùng thực hiện chương trình. Các chương trình có thể thực hiện một cách bất đồng bộ thông qua hàm Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
5. Tức là cho phép thực hiện song song nhiều method trên các workers. Đồng thời Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
5 cũng cho phép đưa vào các hàm callback để xử lý giữa liệu sau cùng.Ví dụ bên dưới chúng ta sẽ sử dụng 5 workers để tính toán bất đồng bộ bình phương của các số trong phạm vi 20. Kết quả sau khi tính sẽ được lưu vào một list. | ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
3 ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
4 ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
5
Ta thấy thứ tự của list không theo tuần tự từ thấp tới cao do hàm được gọi bất đồng bộ.Bên cạnh cách khởi tạo Pool cho process như trên, chúng ta còn có thể khởi tạo từ Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
7 như phần tiếp theo mình giới thiệu.4. Process Pool vs Thread Pool 4.1. Process Pool Trong python thì bắt đầu từ version 3.2 chúng ta có thể sử dụng module Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
7 để xử lý bất đồng bộ các tasks. Đây là một abstract layer được kế thừa trên cả hai modules là threading và multiprocessing để tạo ra một interface cho phép khởi tạo các task sử dụng pool của các processes và threads. | Để khởi tạo một Process Pool, chúng ta sử dụng Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
9 trong Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
7 module.import _thread
import time
def _counter[counter, thread_name]:
while [counter]:
time.sleep[0.01]
print["{}: {}".format[thread_name, counter]]
counter -= 1
counter = 5
# Khởi tạo 2 threads 1 và 2
try:
_thread.start_new_thread[_counter, [counter, "khanh thread"]] # pass counter and thread_name into method _counter
print["\n"]
_thread.start_new_thread[_counter, [counter, "ai thread"]]
except:
print["Error: unable to start thread"]
# Running counter
while [counter]:
counter -= 1
pass
6 | ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
7 1
2
3
4
5
6
7
8
9
10
11
12
13
14
ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
9 Trong 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1 chúng ta cần truyền vào số lượng các worker để chạy process. Số lượng worker càng lớn thì càng nhiều threads được sinh ra để tính toán process.Hàm submit[] Hàm 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2 được sử dụng để load các task vào process pool. Tham số truyền vào là tên hàm và các đối số của hàm. Hàm 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 để kiểm tra trạng thái của task. Lúc đầu ngay sau khi submit thì task chưa hoàn thành nên 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 là 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
5. Hàm 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
6 thường được dùng để kiểm tra kết quả sau khi task cuối cùng trong process pool đã thực thi xong. Do đó trạng thái 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 sau khi result được trả về là 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
8.Hàm map[] | Nhắc đến hàm map trong python, nếu bạn đã có kinh nghiệm thì sẽ hiểu ngay nó sẽ map các đối số từ một list vào hàm. | Ví dụ: Để tính diện tích của các bounding box dựa trên tọa độ 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
9 thì chúng ta thực hiện hàm map trong process pool như sau:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
0 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1 | 4.2. Thread Pool | Để khởi tạo một Process Pool, chúng ta sử dụng Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
9 trong Start thread khanh thread!
Start thread ai thread!
khanh thread : 5
ai thread : 5
khanh thread : 4
ai thread : 4
khanh thread : 3
ai thread : 3
khanh thread : 2
ai thread : 2
khanh thread : 1
End thread khanh thread
ai thread : 1
End thread ai thread
7 module.import _thread
import time
def _counter[counter, thread_name]:
while [counter]:
time.sleep[0.01]
print["{}: {}".format[thread_name, counter]]
counter -= 1
counter = 5
# Khởi tạo 2 threads 1 và 2
try:
_thread.start_new_thread[_counter, [counter, "khanh thread"]] # pass counter and thread_name into method _counter
print["\n"]
_thread.start_new_thread[_counter, [counter, "ai thread"]]
except:
print["Error: unable to start thread"]
# Running counter
while [counter]:
counter -= 1
pass
6 | ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
7 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| ai thread: 5
khanh thread: 5
ai thread: 4
khanh thread: 4
ai thread: 3
khanh thread: 3
ai thread: 2
khanh thread: 2
ai thread: 1
khanh thread: 1
9 |
1
2
3
4
5
6
7
8
9
10
2 | Trong 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1 chúng ta cần truyền vào số lượng các worker để chạy process. Số lượng worker càng lớn thì càng nhiều threads được sinh ra để tính toán process. | Hàm submit[] Hàm 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2 được sử dụng để load các task vào process pool. Tham số truyền vào là tên hàm và các đối số của hàm. Hàm 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 để kiểm tra trạng thái của task. Lúc đầu ngay sau khi submit thì task chưa hoàn thành nên 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3 là 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
5. Hàm
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|