Là hàng đợi trong chuỗi Python

get()10_______7_______1 get()03get()13_______7_______05get()15get()05get()17get()05get()19get()05get()21get()09

Gần đây, tôi gặp sự cố với một quy trình web đang chạy trong thời gian dài mà tôi cần tăng tốc đáng kể do hết thời gian chờ. Sự chậm trễ phát sinh do hệ thống cần tìm nạp dữ liệu từ một số URL. Tổng số URL thay đổi từ người dùng này sang người dùng khác và thời gian phản hồi cho mỗi URL khá dài (khoảng 1. 5 giây)

Đã xảy ra sự cố với 10-15 yêu cầu URL mất hơn 20 giây và kết nối HTTP máy chủ của tôi đã hết thời gian chờ. Thay vì kéo dài thời gian chờ, tôi đã chuyển sang thư viện luồng của Python. Dễ học, triển khai nhanh và giải quyết vấn đề của tôi rất nhanh. Hệ thống đã được triển khai trong khung vi mô web Pythons Flask

Là hàng đợi trong chuỗi Python

Sử dụng Chủ đề cho số lượng tác vụ thấp

Luồng trong Python rất đơn giản. Nó cho phép bạn quản lý các luồng đồng thời làm việc cùng một lúc. Thư viện có tên là “luồng”, bạn tạo các đối tượng “Thread” và chúng chạy các hàm mục tiêu cho bạn. Bạn có thể bắt đầu hàng trăm luồng tiềm năng sẽ hoạt động song song. Giải pháp đầu tiên được lấy cảm hứng từ một số bài đăng trên StackOverflow và liên quan đến việc khởi chạy một chuỗi riêng lẻ cho từng yêu cầu URL. Điều này hóa ra không phải là giải pháp lý tưởng, nhưng cung cấp một nền tảng học tập tốt

Trước tiên, bạn cần xác định chức năng "làm việc" mà mỗi luồng sẽ thực thi riêng. Trong ví dụ này, hàm công việc là một phương thức "thu thập thông tin" để truy xuất dữ liệu từ một url. Không thể trả về các giá trị từ chuỗi và do đó, trong ví dụ này, chúng tôi chuyển vào một mảng "kết quả" có thể truy cập toàn cầu (cho tất cả các chuỗi) cùng với chỉ mục của mảng sẽ lưu trữ kết quả sau khi được tìm nạp. Hàm thu thập dữ liệu () sẽ giống như

...
import logging
from urllib2 import urlopen
from threading import Thread
from json import JSONDecoder
...

# Define a crawl function that retrieves data from a url and places the result in results[index]
# The 'results' list will hold our retrieved data
# The 'urls' list contains all of the urls that are to be checked for data
results = [{} for x in urls]
def crawl(url, result, index):
    # Keep everything in try/catch loop so we handle errors
    try:
        data = urlopen(url).read()
        logging.info("Requested..." + url)
        result[index] = data
    except:
        logging.error('Error with URL check!')
        result[index] = {}
    return True

 

Để thực sự bắt đầu Chủ đề trong python, chúng tôi sử dụng thư viện “” và tạo các đối tượng “Thead”. Chúng ta có thể chỉ định một hàm mục tiêu ('mục tiêu') và tập hợp các đối số ('args') cho từng chuỗi và sau khi bắt đầu, các quảng cáo sẽ thực thi song song tất cả chức năng được chỉ định. Trong trường hợp này, việc sử dụng chuỗi sẽ giúp giảm thời gian tra cứu URL của chúng tôi xuống còn 1. 5 giây (xấp xỉ) bất kể có bao nhiêu URL cần kiểm tra. Mã để bắt đầu quy trình theo luồng là

#create a list of threads
threads = []
# In this case 'urls' is a list of urls to be crawled.
for ii in range(len(urls)):
    # We start one thread per url present.
    process = Thread(target=crawl, args=[urls[ii], result, ii])
    process.start()
    threads.append(process)

# We now pause execution on the main thread by 'joining' all of our started threads.
# This ensures that each has finished processing the urls.
for process in threads:
    process.join()

# At this point, results for each URL are now neatly stored in order in 'results'

Điểm đặc biệt duy nhất ở đây là chức năng join()   . Về cơ bản, tham gia () tạm dừng chuỗi gọi (trong trường hợp này là chuỗi chính của chương trình) cho đến khi chuỗi được đề cập xử lý xong. Việc gọi tham gia ngăn chương trình của chúng tôi tiến triển cho đến khi tất cả các URL đã được tìm nạp.

Phương pháp bắt đầu một luồng cho mỗi nhiệm vụ này sẽ hoạt động tốt trừ khi bạn có một số lượng lớn (hàng trăm) nhiệm vụ cần hoàn thành

Sử dụng Hàng đợi cho nhiều tác vụ

Giải pháp được nêu ở trên hoạt động thành công đối với chúng tôi, với người dùng ứng dụng web của chúng tôi yêu cầu trung bình 9-11 luồng cho mỗi yêu cầu. Chuỗi đã bắt đầu, hoạt động và trả về kết quả thành công. Các vấn đề phát sinh sau này khi người dùng yêu cầu nhiều quy trình theo luồng hơn (>400). Với những yêu cầu như vậy, Python đã bắt đầu hàng trăm luồng đang nhận lỗi như

error: can't start new thread

File "https://c8j9w8r3.rocketcdn.me/usr/lib/python2.5/threading.py", line 440, in start
    _start_new_thread(self.__bootstrap, ())

Đối với những người dùng này, giải pháp ban đầu không khả thi. Có một giới hạn trong môi trường của bạn đối với số luồng tối đa mà Python có thể bắt đầu. Có thể sử dụng một thư viện tích hợp khác của Pythons để phân luồng, , để vượt qua chướng ngại vật. Về cơ bản, hàng đợi được sử dụng để lưu trữ một số “nhiệm vụ cần hoàn thành”. Các luồng có thể nhận các tác vụ từ hàng đợi khi chúng khả dụng, thực hiện công việc và sau đó quay lại để thực hiện thêm. Trong ví dụ này, chúng tôi cần đảm bảo tối đa 50 chuỗi cùng một lúc nhưng khả năng xử lý bất kỳ số lượng yêu cầu URL nào. Thiết lập hàng đợi trong Python rất đơn giản

# Setting up the Queue
...
from Queue import Queue
...
#set up the queue to hold all the urls
q = Queue(maxsize=0)
# Use many threads (50 max, or one for each url)
num_theads = min(50, len(urls))

Để trả về kết quả từ chuỗi, chúng tôi sẽ sử dụng cùng một kỹ thuật chuyển danh sách kết quả cùng với chỉ mục để lưu trữ cho từng chuỗi công nhân. Chỉ mục cần được đưa vào Hàng đợi khi thiết lập tác vụ vì chúng tôi sẽ không gọi rõ ràng từng chức năng "thu thập thông tin" bằng các đối số (chúng tôi cũng không đảm bảo thứ tự các tác vụ được thực thi)

#Populating Queue with tasks
results = [{} for x in urls];
#load up the queue with the urls to fetch and the index for each job (as a tuple):
for i in range(len(urls)):
    #need the index and the url in each queue item.
    q.put((i,urls[i]))

Chức năng “thu thập thông tin” theo luồng sẽ khác vì giờ đây nó dựa vào hàng đợi. Chuỗi được thiết lập để đóng và quay lại khi hàng đợi không có nhiệm vụ nào

# Threaded function for queue processing.
def crawl(q, result):
    while not q.empty():
        work = q.get()                      #fetch new work from the Queue
        try:
            data = urlopen(work[1]).read()
            logging.info("Requested..." + work[1])
            result[work[0]] = data          #Store data back at correct index
        except:
            logging.error('Error with URL check!')
            result[work[0]] = {}
        #signal to the queue that task has been processed
        q.task_done()
    return True

Bản thân đối tượng Hàng đợi mới được chuyển đến các luồng cùng với danh sách để lưu trữ kết quả. Vị trí cuối cùng cho mỗi kết quả được chứa trong các tác vụ xếp hàng – đảm bảo rằng danh sách "kết quả" cuối cùng theo cùng thứ tự với danh sách "url" ban đầu. Chúng tôi đưa vào hàng đợi thông tin công việc này

#Starting worker threads on queue processing
for i in range(num_theads):
    logging.debug('Starting thread ', i)
    worker = Thread(target=crawl, args=(q,results))
    worker.setDaemon(True)    #setting threads as "daemon" allows main program to 
                              #exit eventually even if these dont finish 
                              #correctly.
    worker.start()

#now we wait until the queue has been processed
q.join()

logging.info('All tasks completed.')

Các tác vụ của chúng tôi bây giờ sẽ không được xử lý song song hoàn toàn mà thay vào đó là 50 luồng hoạt động song song. Do đó, 100 url sẽ mất 2 x 1. khoảng 5 giây. Ở đây, độ trễ này có thể chấp nhận được vì số lượng người dùng yêu cầu hơn 50 luồng là tối thiểu. Tuy nhiên, ít nhất hệ thống cũng đủ linh hoạt để xử lý mọi tình huống

Thiết lập này rất phù hợp với ví dụ về công việc đầu vào/đầu ra không tích hợp tính toán (tìm nạp URL), vì phần lớn thời gian của luồng sẽ được dành để chờ dữ liệu. Trong công việc sử dụng nhiều dữ liệu hoặc khoa học dữ liệu, thư viện đa xử lý hoặc cần tây có thể phù hợp hơn vì chúng phân chia công việc trên nhiều lõi CPU. Hy vọng nội dung trên giúp bạn đi đúng hướng

Thông tin thêm về Python Threading

Có một số bài đọc bổ sung tuyệt vời về chuỗi và mô-đun chuỗi nếu bạn đang tìm kiếm thông tin chuyên sâu hơn

Kiểu dữ liệu nào là hàng đợi trong Python?

Hàng đợi trong Python là một cấu trúc dữ liệu tuyến tính có phần sau và phần trước, tương tự như ngăn xếp . Nó lưu trữ các mục tuần tự theo cách FIFO (First In First Out). Bạn có thể coi nó như một hàng đợi dịch vụ khách hàng hoạt động trên cơ sở ai đến trước được phục vụ trước.

Làm cách nào để sử dụng hàng đợi trong đa luồng Python?

Mô-đun Hàng đợi chủ yếu được sử dụng để quản lý xử lý lượng lớn dữ liệu trên nhiều luồng . Nó hỗ trợ việc tạo một đối tượng hàng đợi mới có thể lấy một số mục riêng biệt. Các phương thức get() và put() được sử dụng để thêm hoặc xóa các phần tử khỏi hàng đợi tương ứng.

Sự khác biệt giữa chủ đề và hàng đợi là gì?

Hàng đợi tin nhắn là cấu trúc dữ liệu để lưu giữ tin nhắn từ khi chúng được gửi cho đến khi người nhận truy xuất và xử lý chúng. Nói chung, hàng đợi được sử dụng như một cách để 'kết nối' nhà sản xuất (dữ liệu) và người tiêu dùng (dữ liệu). Nhóm luồng là một nhóm các luồng thực hiện một số loại xử lý .

Là luồng hàng đợi đa xử lý của Python

Điều này bao gồm hàng đợi trong đa xử lý. Hàng đợi là luồng và xử lý an toàn . Điều này có nghĩa là các quy trình có thể nhận () và đặt () các mục từ và vào hàng đợi đồng thời mà không sợ điều kiện chạy đua.