Airflow Basic

Airflow Basic

Airflow의 기본적인 컨셉에 대해서 이해해보자

Workflow 관리! Airflow 컨셉을 알아보자

요즘 왠만한 회사에서 Airflow를 안 쓰는 곳이 없습니다. 파이프라인 관련 세션을 들으면 심심치 않게 들을 수 있는 것이 Airflow를 이용한 워크플로우 관리일 것 입니다. 최근 저희 회사에서도 Airflow를 도입했습니다. 여러 세션에서 관련 내용을 기억해두고 정리해 두었다가, 잡 스케쥴을 관리할 필요성이 생겨서 Airflow 도입을 제안했습니다. 현재 Airflow를 이용해서 추천 쪽에 적용하고 있고, 대략 전처리-모델링-Prediction의 플로우를 돌리려고 합니다. test를 계속해서 진행 중이고 시행착오법 끝에 DAG들이 잘 돌아가는 것을 확인하고 있습니다.

오늘 글은 간단하게 Airflow의 컨셉에 대해서 알아보는 내용입니다. 위에서도 등장한, Airflow의 핵심인 DAG에 대해서 알아보고, DAG안에 들어가는 요소들을 살펴보겠습니다.

Workflow? Airflow?

Airflow는 AirBnB에서 만든 Workflow 관리 툴입니다. workflow라고 하면, 대략적으로 ‘아 작업의 흐름’이라고 할 수 있겠습니다만, workflow를 조금 더 자세히 설명하자면 워크플로우는 작업 절차를 통한 정보 또는 업무의 이동을 의미하며, 더 자세히 말하면, 워크플로는 작업 절차의 운영적 측면이라고 할 수 있습니다.(출처 위키피디아) 여기서 업무라는 것이 등장합니다. Airflow에서 업무는 Task라고 합니다. 이 Task들이 연결 된 것이 Workflow고 이것을 관리하는 것이 Airflow입니다. 쉽게 말하자면, Airflow는 Task들을 잘 연결시키고 관리하는 툴이라고 볼 수 있겠습니다. Airflow를 활용하는 예는 다양합니다. 데이터 분야에서는 ETL 파이프라인에 사용해 데이터들을 관리할 수 있고, 모델의 학습주기를 관리할 수도 있습니다. 학습된 모델을 이용해 prediction해 배치성으로 결과들을 주기적으로 저장해 놓을 수도 있겠네요. 마케팅 도메인에서 활용한다면, 마케팅 자동화에 적용해서, 이메일을 자동으로 보내주는 일 등에도 활용할 수 있겠습니다. 어떤 작업이 성공했을때 다음 작업을 하게 한다거나(의존성, branching) 등의 작업도 가능하기 때문에 굉장히 다양하게 활용할 수 있습니다.

DAG

방금 전까지 Airflow가 어떻게 흐르는지 알아봤습니다. 이 흐름은 어떻게 만들까요? Airflow에 대해서 찾아보면 다음과 같은 가지들을 볼 수 있습니다.
깔끔한 DAG

간단한 DAG라 보기 굉장히 편합니다. 하지만 DAG를 어떻게 짜느냐에 따라, 작업이 얼마나 복잡하냐에 따라 DAG는 복잡하게 변할 수 있습니다.
흉악한 DAG
그래도 이렇게 눈으로 보고 확인할 수 있으니 얼마나 편한지 모르겠습니다. 이걸 airflow없이 코드로 일일이 보고 작업 스케쥴 관리를 하려면 몸과 마음이 지쳐 월요일부터 글또 채널에 pass권을 사용할지도 모릅니다.

다행히도, 우리에겐 Airflow가 있고 Task들을 DAG를 통해서 이어주면 한 눈에 알아볼 수 있습니다. DAG는 Directed Acyclic Graph의 약자입니다. 번역하자면, ‘방향성 비순환 그래프’입니다. 노드와 노드가 단방향으로 연결되고 한번 노드로 향하면, 돌아오지 않는 특성을 가진다는 정도만 알면 될 것 같습니다.

DAG는 Python script로 작성되어 있습니다. 따라서 python에 익숙한 분들이라면 DAG작성은 별로 어렵지 않을 것입니다. DAG를 구성하는 요소들에 대한 개념만 알면 쉽게 쉽게 짤 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import json
from datetime import timedelta, datetime

from utils.slack_alert import SlackAlert

from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

alert = SlackAlert('#test')
# Config variables
dag_config = Variable.get("bigquery_github_trends_variables", deserialize_json=True)
BQ_CONN_ID = dag_config["bq_conn_id"]
BQ_PROJECT = dag_config["bq_project"]
BQ_DATASET = dag_config["bq_dataset"]

default_args = {
'owner': 'Jose Lee',
'depends_on_past': True,
'start_date': datetime(2018, 12, 1),
'end_date': datetime(2018, 12, 5),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'on_failure_callback' : alert.slack_fail_alert
}

# Set Schedule: Run pipeline once a day.
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
schedule_interval = "00 21 * * *"

# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
'bigquery_github_trends',
default_args=default_args,
schedule_interval='@once'
# schedule_interval
)


# Config variables
BQ_CONN_ID = 'my_gcp_conn'
# BQ_PROJECT = 'airflow-268501'
# BQ_DATASET = 'github_trends'

## Task 1: check that the github archive data has a dated table created for that date
t1 = BigQueryCheckOperator(
task_id='bq_check_githubarchive_day',
sql='''
#standardSQL
SELECT
table_id
FROM
`githubarchive.day.__TABLES_SUMMARY__`
WHERE
table_id = "{{ yesterday_ds_nodash }}"
''',
use_legacy_sql=False,
bigquery_conn_id=BQ_CONN_ID,
dag=dag
)

제가 연습하느라 사용했던 DAG를 보면서 설명해보겠습니다. import를 뭘하는지 보면 방금까지 설명한 DAG, 그리고 Variable과 operator가 보입니다.

Variable

Variable은 Airflow UI를 보면서 설명을 해야 할 것 같습니다. UI에 Variable의 탭이 별도로 존재하기 때문입니다.
Airflow의 Variable탭

위의 화면이 Variable을 보여주고 있습니다. Key와 Value로 이루어진 걸 알 수 있습니다. Key Value로 이루어진 데이터를 이용하고 싶다면, Variable에 등록해두면 편하게 dict로 값을 받듯이 Variable.get를 이용해서 값을 얻을 수 있습니다. 위의 예시에서는 dag_config = Variable.get("bigquery_github_trends_variables", deserialize_json=True)
BQ_CONN_ID = dag_config["bq_conn_id"]이 부분이 되겠습니다. 저희 회사의 경우에는 각 고객사의 서비스키와 캠페인키를 Variable에 넣어두고 사용하고 있습니다. 신규 고객사가 발생하면 코드를 일일이 만들 필요없이 간단하게 Variable에 넣어두고 DAG를 태우면 끝입니다!

default_args

default_args는 dictionary로 이루어져 있고 DAG에 들어가게 됩니다. 여기에 작성한 내용들이 operator들에 적용될 것입니다. 일일이 operator에 넣을 필요없이 default_args에 넣어서 수정하면 되니까 굉장히 간편하게 여러 오퍼레이터에 들어갈 파라미터들을 관리 할 수 있습니다.

Operator

DAG가 워크 플로우를 어떻게 run할지를 설명한다면, Operator는 실제 Task가 수행하는 작업을 결정합니다. 실제 Task가 돌아가는 것을 Operator를 통해서 지정해준다고 보면 됩니다. operator의 종류는 굉장히 다양합니다. 대표적으로는

  • BashOperator : bash 커맨드를 실행시킨다
  • PythonOperator : python 함수를 호출한다
  • EmailOperator : 이메일을 보낸다
  • SimpleHttpOperator : HTTP 요청을 보낸다
    이런 것들이 있습니다. 위의 코드에서는 BigQueryCheckOperator를 사용했습니다. Bigquery에 접근해서 쿼리를 보내고 싶었기 때문입니다. Google Cloud Platform에 지원되는 다양한 Operator들이 있고, 저희 회사는 AWS를 주로 사용하기 때문에 AWS operator를 많이 사용하고 있습니다. 추가로 SlackAPIOperator 등도 있으니 아이디어만 있다면 왠만한 작업은 다 처리가 가능할 것 입니다.

슬랙 이야기가 나와서 덧붙이자면, default_args에 보면 'on_failure_callback' : alert.slack_fail_alert이 있습니다. 대략 유추가 가능하겠지만, 실패시에 슬랙에 알럿을 띄우려고 만들어 둔 파라미터 입니다. Airflow 작업을 하다가 실패가 나면 슬랙을 통해서 알럿을 보고 대응하기 위해서 작성해 두었습니다. 슬랙에 알람을 보내는 내용은 다음 글에서 다뤄보도록 하겠습니다.

이렇게 간단한 DAG가 만들어지고 airflow_home에 있는 dag폴더에 업로드가 되면 airflow UI에서 DAG가 떠있는 것을 확인할 수 있습니다.
(Airflow UIhttp://localhost:8080)
내가 만든 DAG
DAG로 들어가면 Task들의 상태를 확인할 수 있다

Task들의 상태는 다음과 같이 나눠집니다.
Task들의 상태들

이제 이 상태들을 확인해 보면서 워크플로우를 관리해 나가면 됩니다.


이번 글에서는 Airflow의 정말 기본적인 개념들에 대해서 다뤄봤습니다. 간단한 DAG를 직접 작성해보고 success를 한번 띄워보면 Airflow가 어떤 건지 대략적인 감을 잡을 수 있을 것 이라고 생각합니다. 다음 글에서는 슬랙을 통해서 메세지 알람을 보내는 걸 작성해 보려고 합니다. 저도 공부를 하는 입장이라 부족한 내용일 수 있지만, 보시면서 틀린 부분이나 수정할 부분 알려주시면 감사하겠습니다.

Author

SangHyub Lee, Jose

Posted on

2020-03-14

Updated on

2023-12-08

Licensed under

Comments