Nhà sản xuất tiêu dùng python threading

Trong bài viết mới này, chúng tôi sẽ mở rộng khái niệm luồng với một mô hình được sử dụng rộng rãi trong công nghệ phần mềm. mô hình Nhà sản xuất-Người tiêu dùng mà chúng tôi sẽ triển khai bằng hai luồng. Cụ thể, chúng tôi sẽ phát triển Đường ống để liên lạc nội bộ giữa hai luồng

  • Thread trong Python – Phân luồng [phần 1]
  • Chủ đề trong Python – Tham gia [phần 2]
  • Thread trong Python – Đa luồng [phần 3]
  • Thread trong Python – Khóa và Bế tắc [phần 4]

Luồng sản xuất-người tiêu dùng

Vấn đề Producer-Consumer là một vấn đề tiêu chuẩn và rất phổ biến trong thế giới lập trình. Sự cố này được đánh dấu khi bạn cần quản lý đồng bộ hóa các quy trình nói chung, bao gồm cả các luồng

Hãy tưởng tượng trường hợp một chương trình phải nhận [không đọc] dữ liệu từ một tài nguyên [chẳng hạn như mạng]. Chương trình sẽ không nhận dữ liệu tại thời điểm yêu cầu [thực tế nó không phải là đọc] mà sẽ chờ nhận trong khi nghe. Dữ liệu từ tài nguyên không được phát hành một cách thường xuyên. Chúng tôi xác định tài nguyên này hoặc nguồn dữ liệu là nhà sản xuất

Ở phía bên kia, có chương trình lắng nghe, chương trình này sau khi lấy được dữ liệu từ tài nguyên, phải gửi chúng [không ghi] đến tài nguyên khác, chẳng hạn như cơ sở dữ liệu. Truy cập vào cơ sở dữ liệu sẽ không ngay lập tức, nhưng chúng tôi tính đến việc nó đủ nhanh để nhận tất cả dữ liệu đến từ chương trình trong một khoảng thời gian cụ thể và hợp lý. Do đó, chương trình đôi khi sẽ phải đợi cơ sở dữ liệu sẵn sàng nhận dữ liệu. Phần này của chương trình sẽ xác định bạn là người tiêu dùng

Do đó, chúng tôi có hai thực thể, Nhà sản xuất và Người tiêu dùng, ba trong số đó sẽ tạo ra một Đường ống, có nhiệm vụ quản lý dữ liệu giữa chúng

Mô hình nhà sản xuất-người tiêu dùng sử dụng Khóa

Vì chúng ta đang nói về phân luồng, hãy thử giải quyết vấn đề này bằng cách sử dụng Khóa

Như chúng tôi đã nói trước đó, chúng tôi có một nhà sản xuất luồng nhận dữ liệu từ mạng và đặt nó vào một đường dẫn. Để mô phỏng mạng, chúng tôi sẽ tạo một mạng giả

Chúng tôi nhập mô-đun ngẫu nhiên và mô-đun thời gian để có các giá trị ngẫu nhiên và mô phỏng sự không xác định của việc nhận dữ liệu của mạng, kết hợp chúng lại, chúng tôi sẽ có các khoảng thời gian ngẫu nhiên. Sau đó đồng thời. mô-đun tương lai cho ThreadPoolExecutor mà chúng ta sẽ sử dụng làm trình quản lý bối cảnh để quản lý luồng [như chúng ta đã thấy trước đó]. Cuối cùng, chúng ta sẽ nhập lớp Đường ống từ một đường ống. py mà chúng tôi sẽ triển khai sau

 import random
 import concurrent.futures
 import time
 from pipeline import Pipeline 

Chúng tôi xác định một đối tượng END chung mà chúng tôi sẽ sử dụng để báo hiệu kết thúc nhận tin nhắn. Giá trị này sẽ được Nhà sản xuất tạo ra để cho biết phần cuối của các tin nhắn nhận được và được chèn vào Đường ống dẫn. Khi Consumer đọc giá trị END này từ Pipeline sẽ ngắt chương trình

 END = object[] 

Do đó, trước tiên, chúng tôi triển khai trình tạo luồng lấy một đường dẫn [pl] làm đối số của nó

 def producer[pl]:
   for index in range[10]:
     time.sleep[random.randint[1,11]]
     message = random.randint[1, 101] 
     print["Producer received data: ", message, " [", index+1, " of 10 ]"]
     pl.set_message[message, "Producer"]
  
   pl.set_message[END, "Producer"] 

Chuỗi sẽ phải mô phỏng việc nhận 10 dữ liệu [số ngẫu nhiên từ 1 đến 100] sẽ được thu thập trong thời gian không thường xuyên [sử dụng thời gian. ngủ với số thời gian ngẫu nhiên trong khoảng từ 1 đến 10 giây]. Theo cách này, chuỗi có thể nhận được một loạt tin nhắn theo thời gian thay đổi

Dữ liệu nhận được được nhập vào tin nhắn và sau đó được gửi đến đường ống bằng cách sử dụng. hàm set_message[]. Sau đó, từ đường ống, luồng tiêu dùng sẽ nhận được giá trị

Khi kết thúc 10 dữ liệu, luồng của nhà sản xuất sẽ chèn vào đường ống một giá trị cụ thể [END] sẽ báo hiệu cho luồng người tiêu dùng việc nhận tất cả các tin nhắn và do đó đóng chương trình

Ở phía bên kia của đường ống là luồng tiêu dùng, chúng tôi sẽ triển khai theo cách sau

 def consumer[pl]:
     message = 0
     while message is not END:
         message = pl.get_message["Consumer"]
         if message is not END:
             time.sleep[random.randint[1,11]]
             print["Consumer stored data: ", message] 

Chuỗi người tiêu dùng đọc một tin nhắn từ đường ống và để mô phỏng việc chèn dữ liệu vào cơ sở dữ liệu, thời gian ngẫu nhiên từ 1 đến 10 giây cũng được sử dụng ở đây. Điều này để thời gian xử lý dữ liệu từ mạng, đến đường ống để đến cơ sở dữ liệu là tương tự nhau. Do thời gian không đều và khác nhau giữa quá trình nhận và chèn vào cơ sở dữ liệu, do đó, chúng tôi sẽ mong đợi sự tích lũy trong đường dẫn của một số dữ liệu bổ sung [thông báo] trước khi luồng người tiêu dùng có thể xử lý tất cả chúng, chèn chúng vào cơ sở dữ liệu

Khi luồng tiêu dùng nhận được thông báo KẾT THÚC từ đường ống, nó sẽ hiểu rằng các thông báo nhận được đã kết thúc và sẽ thoát khỏi quá trình thực thi

Khi hai luồng [người tiêu dùng và sản phẩm] được xác định, chúng tôi xác định chương trình chính. Đầu tiên chúng ta sẽ phải kích hoạt đường ống [pl]. Sau đó, với một ThreadPoolExecutor, kích hoạt hai luồng, gán cho mỗi luồng một đường dẫn giống nhau

 pl = Pipeline[]
 with concurrent.futures.ThreadPoolExecutor[max_workers=2] as executor:
   executor.submit[producer, pl]
   executor.submit[consumer, pl] 

Điều cuối cùng chúng ta còn lại để xác định là Đường ống. Thực hiện nó với một lớp trong một đường ống bên ngoài. tập tin py

 import threading
  
 class Pipeline:
  
   def __init__[self]:
     self.message = 0
     self.producer_lock = threading.Lock[]
     self.consumer_lock = threading.Lock[]
     self.consumer_lock.acquire[]
  
   def get_message[self, name]:
     self.consumer_lock.acquire[]
     message = self.message
     self.producer_lock.release[]
     return message
  
   def set_message[self, message, name]:
     self.producer_lock.acquire[]
     self.message = message
     self.consumer_lock.release[] 

Lớp Pipeline có ba thành viên

  • thông báo lưu trữ giá trị được truyền [tin nhắn]
  • producer_lock là một luồng. Khóa hạn chế quyền truy cập vào tin nhắn chỉ đối với chuỗi nhà sản xuất
  • Consumer_lock là một luồng. Khóa hạn chế quyền truy cập vào tin nhắn chỉ đối với chuỗi người tiêu dùng

Hàm đầu tiên chúng ta thấy được định nghĩa trong lớp là __init__[]. Chức năng của nó là khởi tạo ba thành viên của lớp và sau đó gọi hàm thu thập [] trên Consumer_lock. Làm theo cách này, chúng tôi đặt trạng thái bắt đầu mà chúng tôi muốn. nhà sản xuất được phép thêm một tin nhắn mới vào đường ống, trong khi người tiêu dùng phải đợi

Trong lớp Đường ống, sau đó hai phương thức khác được định nghĩa. get_message[] và. set_messages[]

get_message[] gọi. phương thức thu được [] trên Consumer_lock. Theo cách này, người tiêu dùng sẽ đợi cho đến khi một thông báo sẵn sàng trong Đường ống. Sau đó, khi người tiêu dùng đã có được. Consumer_lock, nó sẽ sao chép giá trị trong tin nhắn. Tiếp theo nó sẽ gọi. phát hành[] trên. nhà sản xuất_lock. Sau đó, người tiêu dùng sẽ giải phóng khóa, cho phép nhà sản xuất chèn một thông báo khác vào đường ống

Chúng tôi đặc biệt chú ý đến phần cuối cùng. Bạn có thể muốn loại bỏ thông báo, đóng chức năng bằng return self. thông điệp. Nhưng bạn không nên làm điều đó

Đây là câu trả lời. Ngay khi người tiêu dùng gọi. nhà sản xuất_lock. release[], nó dừng lại để khởi động lại nhà sản xuất. Nhưng nhà sản xuất có thể khởi động lại trước khi. Hàm release[] trả về giá trị. Nếu điều này xảy ra, người tiêu dùng sẽ tự trả lại. tin nhắn thực sự là tin nhắn tiếp theo được tạo, mất tin nhắn đầu tiên mãi mãi. Đây là một ví dụ về điều kiện chủng tộc

Chuyển đến. set_message[], bạn có thể thấy mặt trái của giao dịch. Nhà sản xuất gọi chức năng này sẽ có được. nhà sản xuất_lock. Sau đó thiết lập. nhắn tin và cuối cùng là gọi. release[] trên Consumer_lock, cho phép người tiêu dùng đọc giá trị đó

Nếu chúng ta thực hiện chúng ta sẽ thu được kết quả tương tự như sau

kết luận

Trong phần thứ năm này về Chủ đề Python, chúng tôi đã giới thiệu mô hình Nhà sản xuất – Người tiêu dùng sử dụng các luồng và với Đường ống. Trong phần tiếp theo, chúng ta sẽ xem cách triển khai mô hình tương tự bằng cách thay thế Đường ống bằng Hàng đợi

Chủ Đề