Phụ thuộc là một tính năng Airflow mạnh mẽ và phổ biến. Trong Luồng không khí, các đường ống của bạn được xác định là Đồ thị theo chu kỳ có hướng [DAG]. Mỗi tác vụ là một nút trong biểu đồ và các phần phụ thuộc là các cạnh có hướng xác định cách di chuyển qua biểu đồ. Do đó, các yếu tố phụ thuộc là chìa khóa để tuân theo các phương pháp hay nhất về kỹ thuật dữ liệu vì chúng giúp bạn xác định các quy trình linh hoạt với các tác vụ nguyên tử
Xuyên suốt hướng dẫn này, các thuật ngữ sau đây được sử dụng để mô tả các nhiệm vụ phụ thuộc
- nhiệm vụ ngược dòng. Tác vụ phải đạt đến trạng thái đã chỉ định trước khi tác vụ phụ thuộc có thể chạy
- nhiệm vụ xuôi dòng. Tác vụ phụ thuộc không thể chạy cho đến khi tác vụ ngược dòng đạt đến trạng thái đã chỉ định
Trong hướng dẫn này, bạn sẽ tìm hiểu về nhiều cách bạn có thể triển khai các phụ thuộc trong Luồng không khí, bao gồm
- Phụ thuộc nhiệm vụ cơ bản
- Phụ thuộc động
- Phụ thuộc với các nhóm nhiệm vụ
- Phụ thuộc với API TaskFlow
- Quy tắc kích hoạt
Để xem bản trình bày video về các khái niệm này, hãy xem Quản lý sự phụ thuộc giữa các triển khai luồng không khí, DAG và tác vụ
Trọng tâm của hướng dẫn này là sự phụ thuộc giữa các tác vụ trong cùng một DAG. Nếu bạn cần triển khai các phụ thuộc giữa các DAG, hãy xem phần phụ thuộc Cross-DAG
Kiến thức giả định
Để tận dụng tối đa hướng dẫn này, bạn nên có hiểu biết về
- Các khái niệm luồng khí cơ bản. Xem Giới thiệu về Luồng khí Apache
Phụ thuộc cơ bản
Các phụ thuộc cơ bản giữa các tác vụ Luồng khí có thể được đặt theo các cách sau
- Sử dụng toán tử dịch chuyển bit [______01 và
2]t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0] - Sử dụng phương pháp
3 vàt3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
4t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
Ví dụ: nếu bạn có một DAG với bốn tác vụ tuần tự, thì các phần phụ thuộc có thể được đặt theo bốn cách
Sử dụng
5t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]t0.set_downstream[t1]
t1.set_downstream[t2]
t2.set_downstream[t3]Sử dụng
6t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]Sử dụng
2t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]t0 >> t1 >> t2 >> t3
Sử dụng
1t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
1t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
Tất cả các phương pháp này đều tương đương và dẫn đến DAG được hiển thị trong hình ảnh sau
Nhà thiên văn học khuyên bạn nên sử dụng một phương pháp duy nhất một cách nhất quán. Sử dụng cả hai toán tử bitshift và
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
3/t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
4 trong DAG của bạn có thể làm phức tạp quá mức mã của bạnĐể đặt một phụ thuộc trong đó hai tác vụ xuôi dòng phụ thuộc vào cùng một tác vụ ngược dòng, hãy sử dụng danh sách hoặc bộ dữ liệu. Ví dụ
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
4Các câu lệnh này là tương đương và dẫn đến DAG được hiển thị trong hình ảnh sau
Luồng khí không thể phân tích cú pháp phụ thuộc giữa hai danh sách. Ví dụ:
t0 >> t1 >> t2 >> t3
1 trả về lỗi. Để đặt các phụ thuộc này, hãy sử dụng chức năng Airflow t0 >> t1 >> t2 >> t3
2. Ví dụt3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
7Hình ảnh này hiển thị kết quả DAG
Với hàm
t0 >> t1 >> t2 >> t3
2, bất kỳ danh sách hoặc bộ dữ liệu nào bạn đưa vào phải có cùng độ dàiPhụ thuộc động
Nếu bạn tạo các tác vụ động trong DAG của mình, bạn nên xác định các thành phần phụ thuộc trong ngữ cảnh của mã được sử dụng để tạo động các tác vụ
Trong ví dụ sau, một tập hợp các tác vụ động song song được tạo bằng cách lặp qua danh sách các điểm cuối. Mỗi nhiệm vụ của
t0 >> t1 >> t2 >> t3
4 ở phía dưới của t0 >> t1 >> t2 >> t3
5 và ngược dòng của t0 >> t1 >> t2 >> t3
6t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
2Hình ảnh này hiển thị kết quả DAG
Phụ thuộc nhóm nhiệm vụ
Nhóm nhiệm vụ là một khái niệm nhóm dựa trên giao diện người dùng có sẵn trong Airflow 2. 0 trở lên. Để biết thêm thông tin về các nhóm nhiệm vụ, bao gồm cách tạo chúng và thời điểm sử dụng chúng, hãy xem Sử dụng các nhóm nhiệm vụ trong luồng không khí
Khi làm việc với các nhóm nhiệm vụ, điều quan trọng cần lưu ý là các phụ thuộc có thể được đặt cả bên trong và bên ngoài nhóm. Ví dụ: trong mã DAG sau có một tác vụ bắt đầu, một nhóm tác vụ có hai tác vụ phụ thuộc và một tác vụ kết thúc cần diễn ra tuần tự. Sự phụ thuộc giữa hai nhiệm vụ trong nhóm nhiệm vụ được đặt trong ngữ cảnh của nhóm nhiệm vụ [
t0 >> t1 >> t2 >> t3
7]. Sự phụ thuộc giữa nhóm nhiệm vụ và nhiệm vụ bắt đầu và kết thúc được đặt trong ngữ cảnh của DAG [t0 >> t1 >> t2 >> t3
8]t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
5Hình ảnh này hiển thị kết quả DAG
Phụ thuộc API TaskFlow
API TaskFlow, có sẵn trong Airflow 2. 0 trở lên, cho phép bạn biến các hàm Python thành các tác vụ Luồng không khí bằng trình trang trí
t0 >> t1 >> t2 >> t3
9Nếu DAG của bạn chỉ có các hàm Python được xác định bằng trình trang trí, hãy gọi các hàm Python để đặt các phụ thuộc. Ví dụ, trong DAG sau đây có hai nhiệm vụ phụ thuộc,
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
10 và t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
11. Để đặt các phụ thuộc, bạn gọi hàm t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
12t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
0Hình ảnh này hiển thị kết quả DAG
Nếu DAG của bạn có sự kết hợp của các tác vụ hàm Python được xác định bằng trình trang trí và tác vụ được xác định bằng toán tử truyền thống, bạn có thể đặt các phần phụ thuộc bằng cách gán lệnh gọi tác vụ được trang trí cho một biến rồi xác định các phần phụ thuộc một cách bình thường. Ví dụ: trong DAG bên dưới, tác vụ
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
13 được xác định bởi trình trang trí t0 >> t1 >> t2 >> t3
9 và được gọi bằng t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
15. Biến t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
16 được sử dụng ở dòng cuối cùng để xác định các phụ thuộct3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
5Quy tắc kích hoạt
Khi bạn đặt các quan hệ phụ thuộc giữa các tác vụ, hành vi Luồng không khí mặc định là chỉ chạy một tác vụ khi tất cả các tác vụ ngược dòng đã thành công. Bạn có thể sử dụng quy tắc kích hoạt để thay đổi hành vi mặc định này
Lựa chọn tiếp theo đã khả thi
- all_success. [mặc định] Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã thành công
- all_failed. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng ở trạng thái không thành công hoặc ngược dòng không thành công
- tất cả đã được làm xong. Tác vụ chạy sau khi tất cả các tác vụ ngược dòng được thực hiện xong
- all_skip. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã bị bỏ qua
- one_failed. Tác vụ chạy khi ít nhất một tác vụ ngược dòng không thành công
- one_success. Tác vụ chạy khi ít nhất một tác vụ ngược dòng đã thành công
- one_done. Tác vụ chạy khi có ít nhất một tác vụ ngược dòng thành công hoặc không thành công
- none_fail. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng đã thành công hoặc bị bỏ qua
- none_failed_min_one_success. Tác vụ chỉ chạy khi tất cả các tác vụ ngược dòng không bị lỗi hoặc ngược dòng_không thành công và ít nhất một tác vụ ngược dòng đã thành công
- none_skip. Tác vụ chỉ chạy khi không có tác vụ ngược dòng nào ở trạng thái bị bỏ qua
- luôn. Nhiệm vụ chạy bất cứ lúc nào
Quy tắc phân nhánh và kích hoạt
Một tình huống phổ biến mà bạn có thể cần triển khai các quy tắc kích hoạt là nếu DAG của bạn chứa logic có điều kiện, chẳng hạn như phân nhánh. Trong những trường hợp này,
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
17 có thể là một quy tắc phù hợp hơn so với t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
18Trong ví dụ DAG sau đây, có một nhánh đơn giản với tác vụ xuôi dòng cần chạy nếu một trong hai nhánh được theo dõi. Với quy tắc
t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
18, tác vụ t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
40 không bao giờ chạy vì tất cả trừ một trong các tác vụ t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
41 luôn bị bỏ qua và do đó không có trạng thái thành công. Nếu bạn thay đổi quy tắc kích hoạt thành t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
17, thì tác vụ t3.set_upstream[t2]
t2.set_upstream[t1]
t1.set_upstream[t0]
40 có thể chạy miễn là một trong các nhánh hoàn tất thành công