Đa xử lý python có sử dụng bộ nhớ dùng chung không?

Tôi đang cố gắng viết lại mã đa xử lý Python bằng Ray vì nó dường như có thể trừu tượng hóa các vấn đề quản lý bộ nhớ dùng chung và thực hiện tính toán song song nhanh hơn đa xử lý thẳng (dựa trên bài viết này). Mục tiêu của tôi là xử lý song song tất cả các chuỗi thời gian cho lưới lat/lon (với cả mảng đầu vào và đầu ra có hình dạng [lat, lon, time]) mà không cần sao chép các mảng đầu vào/đầu ra không cần thiết. Ý tưởng là có cả mảng đầu vào và đầu ra trong bộ nhớ dùng chung và nhiều quy trình sẽ đọc và ghi vào các mảng bộ nhớ dùng chung để mỗi quy trình không cần sao chép/tuần tự hóa để truy cập vào các mảng đang được xử lý

Trường hợp sử dụng của tôi là tôi có một chức năng nặng về CPU mà tôi muốn áp dụng trên tất cả các mảng con 1-D của mảng 3-D. Tôi đã quản lý để làm cho việc này thực thi nhanh hơn nhiều bằng cách sử dụng phương pháp cuộn tại nhà cho các đối tượng bộ nhớ dùng chung với đa xử lý, nhưng mã phức tạp/phức tạp hơn nhiều so với mức tôi cảm thấy thoải mái và tôi hy vọng sẽ đơn giản hóa nó bằng cách sử dụng ray. Tuy nhiên, tôi vẫn chưa tìm ra cách ghi vào bộ nhớ dùng chung bằng tia và nếu không có điều đó, tôi không biết làm cách nào để thực hiện việc này. Hy vọng rằng ai đó đọc điều này có thể đề xuất một giải pháp

Tôi có một sổ ghi chép Jupyter với một ví dụ đơn giản về những gì tôi đã cố gắng làm cho nó hoạt động bằng ray

Đây là ý chính

Tôi khởi tạo môi trường của mình cho ray và tạo một hàm thực hiện thao tác đơn giản trên lát cắt 1-D của mảng 3-D và ghi kết quả vào một mảng đầu ra, với mong muốn rằng hàm này có thể chạy song song và đọc/ghi

import psutil
import numpy as np
import ray

num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus, ignore_reinit_error=True)

@ray.remote
def add_average_ray(
        in_ary: np.ndarray,
        out_ary: np.ndarray,
        lat_index: int,
        lon_index: int,
):
    ary = in_ary[lat_index, lon_index]
    out_ary[lat_index, lon_index] = ary + np.mean(ary)

Tiếp theo, tôi tạo một hàm sẽ lặp qua lưới 3-D các giá trị và áp dụng hàm trên cho từng giá trị song song bằng ray

def compute_with_ray(
        input_array: np.ndarray,
) -> np.ndarray:
    # create an output array that computed values will be written
    output_array = np.full(shape=input_array.shape, fill_value=np.NaN)

    # put the input and output arrays into ray's object store
    in_array_id = ray.put(input_array)
    out_array_id = ray.put(output_array)

    # make a list of futures, one per lat/lon (assuming shape (lat, lon, time))
    futures = []
    for lat_index in range(input_array.shape[0]):
        for lon_index in range(input_array.shape[1]):
            futures.append(add_average_ray.remote(in_array_id, out_array_id, lat_index, lon_index))

    # launch the remote tasks in parallel
    ray.get(futures)

    return output_array

Tiếp theo, tôi tạo một mảng đầu vào và thực hiện mã

# create an array that can be used to represent a 2x2 cell lat/lon map with 3 times
tst_ary = np.array([[[1, 6, 5], [3, 2, 7]], [[8, 4., 6.], [9, 4, 2]]])

# exercise the ray remote function in parallel
average_added_ray = compute_with_ray(tst_ary)

Rõ ràng điều này là không thể vì các mảng đã được thêm vào kho lưu trữ đối tượng của ray là chỉ đọc và nó dẫn đến lỗi

RayTaskError(ValueError): ray_worker (pid=5260, host=skypilot)
  File "", line 9, in add_average_ray
ValueError: assignment destination is read-only

Có cách nào tốt hơn để tiếp cận/hoàn thành quá trình xử lý song song này trên các mảng có nhiều mảng bằng ray không?

torch.multiprocessing là sự thay thế cho mô-đun multiprocessing của Python. Nó hỗ trợ chính xác các hoạt động tương tự, nhưng mở rộng nó, để tất cả các tenxơ được gửi qua multiprocessing.Queue, sẽ chuyển dữ liệu của chúng vào bộ nhớ dùng chung và sẽ chỉ gửi một chốt điều khiển cho một quy trình khác

Ghi chú

Khi một Tensor được gửi đến một quy trình khác, dữ liệu Tensor được chia sẻ. Nếu torch.Tensor.grad không phải là

def compute_with_ray(
        input_array: np.ndarray,
) -> np.ndarray:
    # create an output array that computed values will be written
    output_array = np.full(shape=input_array.shape, fill_value=np.NaN)

    # put the input and output arrays into ray's object store
    in_array_id = ray.put(input_array)
    out_array_id = ray.put(output_array)

    # make a list of futures, one per lat/lon (assuming shape (lat, lon, time))
    futures = []
    for lat_index in range(input_array.shape[0]):
        for lon_index in range(input_array.shape[1]):
            futures.append(add_average_ray.remote(in_array_id, out_array_id, lat_index, lon_index))

    # launch the remote tasks in parallel
    ray.get(futures)

    return output_array
0, nó cũng được chia sẻ. Sau khi một trường Tensor không có trường torch.Tensor.grad được gửi đến quy trình khác, nó sẽ tạo ra một torch.multiprocessing0 Tensor dành riêng cho quy trình tiêu chuẩn không được chia sẻ tự động trên tất cả các quy trình, không giống như cách dữ liệu của Tensor đã được chia sẻ

Điều này cho phép triển khai các phương pháp đào tạo khác nhau, như Hogwild, A3C hoặc bất kỳ phương pháp nào khác yêu cầu hoạt động không đồng bộ

CUDA trong đa xử lý¶

Thời gian chạy CUDA không hỗ trợ phương pháp khởi động torch.multiprocessing3;

Ghi chú

Phương thức bắt đầu có thể được đặt thông qua việc tạo ngữ cảnh bằng torch.multiprocessing6 hoặc trực tiếp bằng cách sử dụng torch.multiprocessing7

Không giống như tenxơ CPU, quá trình gửi bắt buộc phải giữ tenxơ ban đầu miễn là quy trình nhận giữ lại một bản sao của tenxơ. Nó được triển khai ngầm nhưng yêu cầu người dùng tuân theo các phương pháp hay nhất để chương trình chạy chính xác. Ví dụ: quy trình gửi phải tồn tại miễn là quy trình tiêu dùng có tham chiếu đến tensor và việc đếm lại không thể cứu bạn nếu quy trình tiêu dùng thoát bất thường thông qua tín hiệu nghiêm trọng. Xem phần này .

Xem thêm. Sử dụng nn. song song. DistributedDataParallel thay vì đa xử lý hoặc nn. Dữ liệu song song

Các mẹo và phương pháp hay nhất¶

Tránh và chống bế tắc¶

Có rất nhiều thứ có thể sai khi một quy trình mới được tạo ra, với nguyên nhân phổ biến nhất gây ra bế tắc là các luồng nền. Nếu có bất kỳ luồng nào giữ khóa hoặc nhập mô-đun và torch.multiprocessing3 được gọi, thì rất có khả năng quy trình con sẽ ở trạng thái bị hỏng và sẽ bế tắc hoặc lỗi theo một cách khác. Lưu ý rằng ngay cả khi bạn không làm như vậy, thì các thư viện tích hợp sẵn của Python cũng có - không cần tìm đâu xa ngoài multiprocessing. multiprocessing.Queue thực sự là một lớp rất phức tạp, sinh ra nhiều luồng được sử dụng để tuần tự hóa, gửi và nhận các đối tượng và chúng cũng có thể gây ra các sự cố đã nói ở trên. Nếu bạn thấy mình trong tình huống như vậy, hãy thử sử dụng multiprocessing1, không sử dụng bất kỳ chủ đề bổ sung nào

Chúng tôi đang cố gắng hết sức để giúp bạn dễ dàng và đảm bảo những bế tắc này không xảy ra nhưng một số điều nằm ngoài tầm kiểm soát của chúng tôi. Nếu bạn gặp bất kỳ sự cố nào mà bạn không thể giải quyết trong một thời gian, hãy thử liên hệ trên các diễn đàn và chúng tôi sẽ xem liệu đó có phải là sự cố mà chúng tôi có thể khắc phục hay không

Sử dụng lại bộ đệm được chuyển qua Hàng đợi¶

Hãy nhớ rằng mỗi khi bạn đặt một Tensor vào một multiprocessing.Queue, nó phải được chuyển vào bộ nhớ dùng chung. Nếu nó đã được chia sẻ, nó không hoạt động, nếu không, nó sẽ phát sinh một bản sao bộ nhớ bổ sung có thể làm chậm toàn bộ quá trình. Ngay cả khi bạn có một nhóm quy trình gửi dữ liệu đến một quy trình duy nhất, hãy yêu cầu nó gửi lại bộ đệm - điều này gần như miễn phí và sẽ cho phép bạn tránh sao chép khi gửi đợt tiếp theo

Đào tạo đa xử lý không đồng bộ (e. g. Lợn rừng)¶

Sử dụng torch.multiprocessing, có thể đào tạo một mô hình không đồng bộ, với các tham số được chia sẻ mọi lúc hoặc được đồng bộ hóa định kỳ. Trong trường hợp đầu tiên, chúng tôi khuyên bạn nên gửi toàn bộ đối tượng mô hình, trong trường hợp sau, chúng tôi khuyên bạn chỉ nên gửi multiprocessing5

Chúng tôi khuyên bạn nên sử dụng multiprocessing.Queue để chuyển tất cả các loại đối tượng PyTorch giữa các quy trình. Có thể đ. g. kế thừa các tenxơ và bộ lưu trữ đã có trong bộ nhớ dùng chung, khi sử dụng phương pháp khởi động torch.multiprocessing3, tuy nhiên, nó rất dễ bị lỗi và nên được sử dụng cẩn thận và chỉ bởi người dùng nâng cao. Hàng đợi, mặc dù đôi khi chúng là một giải pháp kém tinh tế hơn, sẽ hoạt động bình thường trong mọi trường hợp

Cảnh báo

Bạn nên cẩn thận về việc có các câu lệnh toàn cầu, không được bảo vệ bằng một multiprocessing8. Nếu sử dụng một phương thức bắt đầu khác với phương thức bắt đầu của torch.multiprocessing3, thì chúng sẽ được thực thi trong tất cả các quy trình con

Lợn rừng¶

Có thể tìm thấy cách triển khai Hogwild cụ thể trong kho ví dụ, nhưng để giới thiệu cấu trúc tổng thể của mã, cũng có một ví dụ tối thiểu bên dưới

Python có phải là bộ nhớ dùng chung đa xử lý không?

Trăn 3. 8 đã giới thiệu một mô-đun đa xử lý mới. shared_memory cung cấp bộ nhớ dùng chung để truy cập trực tiếp giữa các quy trình .

Hàng đợi đa xử lý có sử dụng bộ nhớ dùng chung không?

đa xử lý là sự thay thế giảm dần cho mô-đun đa xử lý của Python. Nó hỗ trợ chính xác các hoạt động tương tự, nhưng mở rộng nó, sao cho tất cả các tenxơ được gửi qua một bộ đa xử lý. Hàng đợi sẽ chuyển dữ liệu của họ vào bộ nhớ dùng chung và sẽ chỉ gửi một điều khiển cho một quy trình khác.

Nhiều tiến trình có thể chia sẻ bộ nhớ không?

Các quy trình không chia sẻ bộ nhớ với các quy trình khác . Chủ đề chia sẻ bộ nhớ với các chủ đề khác của cùng một quá trình.

Đa xử lý Python có sử dụng nhiều lõi không?

Các quy trình Python thường sử dụng một luồng đơn vì GIL. Mặc dù có GIL, các thư viện thực hiện các tác vụ tính toán nặng như numpy, scipy và pytorch sử dụng triển khai dựa trên C hoàn toàn, cho phép sử dụng nhiều lõi .