Tìm thứ tự các tác vụ từ các phụ thuộc đã cho Python

Phụ thuộc là một tính năng Airflow mạnh mẽ và phổ biến. Trong Luồng không khí, các đường ống của bạn được xác định là Đồ thị theo chu kỳ có hướng [DAG]. Mỗi tác vụ là một nút trong biểu đồ và các phần phụ thuộc là các cạnh có hướng xác định cách di chuyển qua biểu đồ. Do đó, các yếu tố phụ thuộc là chìa khóa để tuân theo các phương pháp hay nhất về kỹ thuật dữ liệu vì chúng giúp bạn xác định các quy trình linh hoạt với các tác vụ nguyên tử

Xuyên suốt hướng dẫn này, các thuật ngữ sau đây được sử dụng để mô tả các nhiệm vụ phụ thuộc

  • nhiệm vụ ngược dòng. Tác vụ phải đạt đến trạng thái đã chỉ định trước khi tác vụ phụ thuộc có thể chạy
  • nhiệm vụ xuôi dòng. Tác vụ phụ thuộc không thể chạy cho đến khi tác vụ ngược dòng đạt đến trạng thái đã chỉ định

Trong hướng dẫn này, bạn sẽ tìm hiểu về nhiều cách bạn có thể triển khai các phụ thuộc trong Luồng không khí, bao gồm

  • Phụ thuộc nhiệm vụ cơ bản
  • Phụ thuộc động
  • Phụ thuộc với các nhóm nhiệm vụ
  • Phụ thuộc với API TaskFlow
  • Quy tắc kích hoạt

Để xem bản trình bày video về các khái niệm này, hãy xem Quản lý sự phụ thuộc giữa các triển khai luồng không khí, DAG và tác vụ

Trọng tâm của hướng dẫn này là sự phụ thuộc giữa các tác vụ trong cùng một DAG. Nếu bạn cần triển khai các phụ thuộc giữa các DAG, hãy xem phần phụ thuộc Cross-DAG

Kiến thức giả định

Để tận dụng tối đa hướng dẫn này, bạn nên có hiểu biết về

  • Các khái niệm luồng khí cơ bản. Xem Giới thiệu về Luồng khí Apache

Phụ thuộc cơ bản

Các phụ thuộc cơ bản giữa các tác vụ Luồng khí có thể được đặt theo các cách sau

  • Sử dụng toán tử dịch chuyển bit [______01 và
    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    2]
  • Sử dụng phương pháp
    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    3 và
    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    4

Ví dụ: nếu bạn có một DAG với bốn tác vụ tuần tự, thì các phần phụ thuộc có thể được đặt theo bốn cách

  • Sử dụng

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    5

    t0.set_downstream[t1]
    t1.set_downstream[t2]
    t2.set_downstream[t3]

  • Sử dụng

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    6

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]

  • Sử dụng

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    2

    t0 >> t1 >> t2 >> t3

  • Sử dụng

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    1

    t3.set_upstream[t2]
    t2.set_upstream[t1]
    t1.set_upstream[t0]
    1

Tất cả các phương pháp này đều tương đương và dẫn đến DAG được hiển thị trong hình ảnh sau

Nhà thiên văn học khuyên bạn nên sử dụng một phương pháp duy nhất một cách nhất quán. Sử dụng cả hai toán tử bitshift và

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
3/
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
4 trong DAG của bạn có thể làm phức tạp quá mức mã của bạn

Để đặt một phụ thuộc trong đó hai tác vụ xuôi dòng phụ thuộc vào cùng một tác vụ ngược dòng, hãy sử dụng danh sách hoặc bộ dữ liệu. Ví dụ

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
4

Các câu lệnh này là tương đương và dẫn đến DAG được hiển thị trong hình ảnh sau

Luồng khí không thể phân tích cú pháp phụ thuộc giữa hai danh sách. Ví dụ:

t0 >> t1 >> t2 >> t3
1 trả về lỗi. Để đặt các phụ thuộc này, hãy sử dụng chức năng Airflow
t0 >> t1 >> t2 >> t3
2. Ví dụ

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
7

Hình ảnh này hiển thị kết quả DAG

Với hàm

t0 >> t1 >> t2 >> t3
2, bất kỳ danh sách hoặc bộ dữ liệu nào bạn đưa vào phải có cùng độ dài

Phụ thuộc động

Nếu bạn tạo các tác vụ động trong DAG của mình, bạn nên xác định các thành phần phụ thuộc trong ngữ cảnh của mã được sử dụng để tạo động các tác vụ

Trong ví dụ sau, một tập hợp các tác vụ động song song được tạo bằng cách lặp qua danh sách các điểm cuối. Mỗi nhiệm vụ của

t0 >> t1 >> t2 >> t3
4 ở phía dưới của
t0 >> t1 >> t2 >> t3
5 và ngược dòng của
t0 >> t1 >> t2 >> t3
6

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
2

Hình ảnh này hiển thị kết quả DAG

Phụ thuộc nhóm nhiệm vụ​

Nhóm nhiệm vụ là một khái niệm nhóm dựa trên giao diện người dùng có sẵn trong Airflow 2. 0 trở lên. Để biết thêm thông tin về các nhóm nhiệm vụ, bao gồm cách tạo chúng và thời điểm sử dụng chúng, hãy xem Sử dụng các nhóm nhiệm vụ trong luồng không khí

Khi làm việc với các nhóm nhiệm vụ, điều quan trọng cần lưu ý là các phụ thuộc có thể được đặt cả bên trong và bên ngoài nhóm. Ví dụ: trong mã DAG sau có một tác vụ bắt đầu, một nhóm tác vụ có hai tác vụ phụ thuộc và một tác vụ kết thúc cần diễn ra tuần tự. Sự phụ thuộc giữa hai nhiệm vụ trong nhóm nhiệm vụ được đặt trong ngữ cảnh của nhóm nhiệm vụ [

t0 >> t1 >> t2 >> t3
7]. Sự phụ thuộc giữa nhóm nhiệm vụ và nhiệm vụ bắt đầu và kết thúc được đặt trong ngữ cảnh của DAG [
t0 >> t1 >> t2 >> t3
8]

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
5

Hình ảnh này hiển thị kết quả DAG

Phụ thuộc API TaskFlow​

API TaskFlow, có sẵn trong Airflow 2. 0 trở lên, cho phép bạn biến các hàm Python thành các tác vụ Luồng không khí bằng trình trang trí

t0 >> t1 >> t2 >> t3
9

Nếu DAG của bạn chỉ có các hàm Python được xác định bằng trình trang trí, hãy gọi các hàm Python để đặt các phụ thuộc. Ví dụ, trong DAG sau đây có hai nhiệm vụ phụ thuộc,

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
10 và
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
11. Để đặt các phụ thuộc, bạn gọi hàm
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
12

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
0

Hình ảnh này hiển thị kết quả DAG

Nếu DAG của bạn có sự kết hợp của các tác vụ hàm Python được xác định bằng trình trang trí và tác vụ được xác định bằng toán tử truyền thống, bạn có thể đặt các phần phụ thuộc bằng cách gán lệnh gọi tác vụ được trang trí cho một biến rồi xác định các phần phụ thuộc một cách bình thường. Ví dụ: trong DAG bên dưới, tác vụ

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
13 được xác định bởi trình trang trí
t0 >> t1 >> t2 >> t3
9 và được gọi bằng
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
15. Biến
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
16 được sử dụng ở dòng cuối cùng để xác định các phụ thuộc

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
5

Quy tắc kích hoạt

Khi bạn đặt các quan hệ phụ thuộc giữa các tác vụ, hành vi Luồng không khí mặc định là chỉ chạy một tác vụ khi tất cả các tác vụ ngược dòng đã thành công. Bạn có thể sử dụng quy tắc kích hoạt để thay đổi hành vi mặc định này

Lựa chọn tiếp theo đã khả thi

  • all_success. [mặc định] Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã thành công
  • all_failed. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng ở trạng thái không thành công hoặc ngược dòng không thành công
  • tất cả đã được làm xong. Tác vụ chạy sau khi tất cả các tác vụ ngược dòng được thực hiện xong
  • all_skip. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã bị bỏ qua
  • one_failed. Tác vụ chạy khi ít nhất một tác vụ ngược dòng không thành công
  • one_success. Tác vụ chạy khi ít nhất một tác vụ ngược dòng đã thành công
  • one_done. Tác vụ chạy khi có ít nhất một tác vụ ngược dòng thành công hoặc không thành công
  • none_fail. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã thành công hoặc bị bỏ qua
  • none_failed_min_one_success. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng không bị lỗi hoặc ngược dòng_không thành công và ít nhất một tác vụ ngược dòng đã thành công
  • none_skip. Tác vụ chỉ chạy khi không có tác vụ ngược dòng nào ở trạng thái bị bỏ qua
  • luôn. Nhiệm vụ chạy bất cứ lúc nào

Quy tắc phân nhánh và kích hoạt

Một tình huống phổ biến mà bạn có thể cần triển khai các quy tắc kích hoạt là nếu DAG của bạn chứa logic có điều kiện, chẳng hạn như phân nhánh. Trong những trường hợp này,

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
17 có thể là một quy tắc phù hợp hơn so với
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
18

Trong ví dụ DAG sau đây, có một nhánh đơn giản với tác vụ xuôi dòng cần chạy nếu một trong hai nhánh được theo dõi. Với quy tắc

t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
18, tác vụ
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
40 không bao giờ chạy vì tất cả trừ một trong các tác vụ
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
41 luôn bị bỏ qua và do đó không có trạng thái thành công. Nếu bạn thay đổi quy tắc kích hoạt thành
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
17, thì tác vụ
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
40 có thể chạy miễn là một trong các nhánh hoàn tất thành công

Chủ Đề