Airflow Workers on ECS Fargate
Airflow 워커를 ECS에 띄워서 연결해보자, MWAA흉내내기
기존 구조
기존에 사용하던 구조는 굉장히 구식이었다고 볼 수 있었습니다. 전임자가 만들어 놓고 나간 구조인데, Airflow의 main서버와 metadb를 AWS에 각각 인스턴스로 올리고, Auto Scaling 그룹으로 워커들을 띄워놓고 연결한 구조였습니다. 버전도 1.10.9 버전이라 잔 버그 같은 이슈도 있어보였고, 워커들을 좀 더 심플하게 관리하고 싶었습니다. 아무래도 인스턴스 비용이 나가다보니 부담스러운 점도 있었구요.
마침 ECS관련해서 작업을 하고 마무리 짓던 중이었고, 워커들을 ECS Fargate로 변경해볼까란 생각이 들었습니다. 워커들은 이미 도커 이미지로 띄워져서 실행되고 있는 상태였기 때문에, 이미지만 Task Definition으로 넣어서 서비스를 만들어주면 간단하게 끝나지 않을까?? 생각했습니다. 잔 이슈 때문에 한 1-2주정도 걸리게 되었습니다.
ECS란
AWS에서 제공하는 ECS는 쿠버네티스와 유사한 부분이 많습니다. 쿠버네티스를 조금이라고 공부해 보신 분이라면, 간단한 버전의 ECS라고도 느끼실 것 같습니다.
먼저 ECS는 Elastic Container Service의 약자로 AWS에서 제공 하고 있는 컨테이너 오케스트레이션 서비스입니다. 컨테이너 오케스트레이션 서비스에는 대표적으로 쿠버네티스, Docker Swarm 등이 있습니다. 물론 AWS에도 쿠버네티스 제품, EKS가 나와 있지만 ECS는 좀 더 간소화되어 있고, 심플한 버전의 컨테이너 오케스트레이션 서비스라고 생각되네요.
ECS를 사용하게 되면 클러스터를 관리하기 위한 별도의 인스턴스를 구성, 관리하지 않아도 되고, 클러스터 관리에 대한 비용도 없습니다. 그래서 저희 다른 팀원 분이 인스턴스에 올린 API서버를 ECS로 변경했을때, 비용 감소 효과가 발생한 것 같습니다. 서버관리를 해주니까, 관리포인트도 적어지고 오토스케일링까지 지원이 됩니다. 또한 쿠버네티스에 비해 학습 시간이 매우 적기 때문에 금방 서치해서 원하는 서비스를 빠르게 올릴 수 있는 장점이 있습니다.
ECS의 기본 개념
ECS의 기본 개념으로는 클러스터, 작업정의(Task Definition), Task, Service가 있습니다. 하나씩 자세히 알아가도록 하겠습니다.
클러스터(Cluster)
클러스터는 도커 컨테이너를 실행할 수 있는 논리적인 공간입니다. 사용하려는 도커 이미지는 컨테이너에서 실행되는데, 이 컨테이너가 실행되는 인스턴스들을 묶어놓은 것이 클러스터입니다. 그냥 논리적인 공간이기 때문에 빈 클러스터도 생성이 가능합니다. airflow를 ECS에 올릴 때도 클러스터는 비어있는 상태로 올렸습니다. 만약에 빈 클러스터가 아니라 몇 몇개의 인스턴스를 만들어서 클러스터를 생성한 경우, ECS Agent에 의해서 클러스터와 인스턴스는 연결됩니다.
Task Definition
태스크 데피니션, 작업 정의는 실제 컨테이너를 구성하는 것이라고 볼 수 있습니다. ECS는 기본적으로 ECR 레포지토리와 연동이 되어 있는데, 이 레포지토리에서 어떤 이미지를 사용할 것인지, 포트는 몇번을 열 것인지, 환경 변수는 어떤 것을 줄 것인지 등등을 설정할 수 있습니다.
docker run에서 사용했던 명령어들이 있다면 여기서 설정하면 됩니다. 처음에 이걸 잘 몰라서 고생을 꽤나 했는데, 작업 정의를 통해 설정하면 끝입니다. 작업 정의는 버전 관리가 가능하다는 장점이 있습니다. 만약에 새로 작업정의를 만들어서 배포를 했는데 원하는 결과가 제대로 나오지 않는다면, 이전 버전으로 롤백하면 됩니다. 이 구성이 정말 잘 되어 있어서, Airflow worker들을 ASG로 만들었을 때보다 훨씬 안정적으로 서비스를 운영할 수 있는 것 같네요.
Task
태스크는 작업 정의에 의해 만들어진 컨테이너의 셋들이며, ECS에서 컨테이너를 실행하는 최소 단위입니다. 한 태스크에는 1개 이상의 컨테이너를 구성할 수 있고, 해당 Task 내의 컨테이너는 모두 같은 ECS 클러스터 인스턴스 또는 Fargate 내에 실행되도록 보장 받습니다. 하지만 저는 한 태스크에 한 컨테이너만 실행되도록 했습니다. 여러 컨테이너를 docker-compose같이 사용하는 것도 가능하다고 합니다.
- 태스크에선 익숙한 EC2와 함께 Fargate가 등장합니다.
💡Fargate로 설정하면! ECS 클러스터내에 인스턴스가 없어도, Task에 정의한 CPU, 메모리 설정에 따라 관리하는 EC2 인스턴스 없이 Serverless 하게 서비스를 실행할 수 있습니다.
Bespin Global 설명 : AWS Fargate 는 AWS에 컨테이너를 배포하는 쉬운 방법입니다. 간단히 말하면 Fargate는 EC2와 비슷하지만 가상 시스템을 제공하는 대신 컨테이너를 얻습니다. 기본 인스턴스를 관리 할 필요없이 컨테이너를 기본 계산 프리미티브로 사용할 수있게 해주는 기술입니다. 컨테이너 이미지 작성, CPU 및 메모리 요구 사항 지정, 네트워킹 및 IAM 정책 정의 및 실행 만하면됩니다. Fargate를 사용하면 응용 프로그램 요구 사항과 밀접하게 일치하는 유연한 구성 옵션을 사용할 수 있으며 초 단위로 세분화됩니다.
Service
서비스는 태스크의 라이프 사이클을 관리해주는 역할을 합니다. 태스크의 상태를 지속적으로 감시, 관리해주는 것이라고 볼 수 있습니다. 또한 태스크를 클러스터에 몇 개 배포할 것인지를 결정하고, 서비스를 만들때 어떤 작업정의로 태스크를 생성할지 정할 수 있습니다.
만약 태스크가 어떤 문제로 중지되거나 다운되면 이것을 감지해서 새로운 Task를 클러스터에 배포하게 하는 역할도 수행합니다. 실제로 서비스에 동작하고 있는 Task를 강제로 중지시킬 경우에 Task는 잠시동안 중단(종료)되었다가 다시 프로비저닝 받아서 생성되는 것을 확인할 수 있습니다. 따라서 테스트를 위해서 ECS를 켜두고 만들어 놓은게 아까워서 중지시켜버리면 다음날에 큰 혼란이 생길 수 있습니다. 저는 어차피 작업정의를 사용해서 서비스를 만들 수 있으니, 깔끔하게 종료하고 다시 만들었습니다. 한 번 크게 데일뻔한 경험이 있었습니다.
Airflow Workers On ECS
이제 본격적으로 에어플로우의 워커들을 ECS에 올려보도록 하겠습니다. 먼저 클러스터를 만들어주겠습니다.
클러스터 생성
저는 빈 깡통의 클러스터가 필요했습니다. ASG로 되어 있는 인스턴스를 Fargate로 변경하고 인스턴스 비용을 감소하려는 목적이 있기 때문에 빈 클러스터인 네트워킹 전용 클러스터를 만들어줍니다.
그 다음은 빈 깡통답게 이름만 적어주고 VPC를 만들어줄 것인지, 태그를 넣을 것인지만 설정하면 됩니다. 전 이름만 설정하고 다른 건 건드리지 않고 그냥 생성했습니다.
Task Definition 생성
이제 작업정의를 만들어볼 시간입니다. 작업정의 생성을 누르면 시작 유형 호환성 선택
이 나옵니다. 저는 Fargate로 만들어보겠습니다.
그 다음 단계를 누르면 본격적으로 작업 정의를 구성할 수 있습니다. 작업 역할에는 ecsTaskExecutionRole,
ecsS3FullAccessTaskRole
, ecsTaskInstanceRole
이 있습니다. 원하는 목적에 맞게 설정하시면 됩니다. 저는 ecsTaskExecutionRole를 선택하겠습니다. 작업 실행 IAM도 같이 세팅해주시고 다음으로 넘어갑니다.
이제 작업 크기가 어떻게 되는지, 작업 CPU가 얼마나 필요한지 설정해줘야 합니다. Fargate를 사용하기 때문에 사용량을 결정해야 하는 것입니다. 만약에 인스턴스를 선택한다면, 필요한 인스턴스의 크기를 골라주면 되겠습니다. 저는 기존에 사용하던 인스턴스 크기를 알기 때문에 해당 인스턴스의 스펙을 넣어주었습니다. 이제 가장 중요한 컨테이너 정의를 해보겠습니다.
컨테이너 추가를 누르면 다음과 같은 화면이 등장합니다. 컨테이너 이름에 원하는 이름을 넣어주시고, 이미지에는 ECR에 올려둔 이미지의 주소와 이미지명, 태그를 예시와 같이 넣어주시면 됩니다. 메모리 제한은 사용하는 메모리의 제한 정도를 뜻하는데 저는 위에서 넣어준 작업 메모리의 80%정도를 넣어놨습니다. 그리고 포트 매핑에는 열어줄 포트를 기입하면 됩니다. 만약에 docker run -p 8793:8793
이런 식으로 docker run을 한다면 8793을 넣어주면 됩니다. 굉장히 편리하죠??
그 밑에 고급 컨테이너 구성으로 넘어가면 상태검사 할 부분을 넣어주면 되는데 워커에는 딱히 상태검사할 게 없기 때문에 패스하겠습니다.
환경 부분으로 넘어가면 사용할 CPU단위와 GPU가 나옵니다. GPU는 사용하지 않기 때문에 넘어갔고, CPU는 위에서 작성한 CPU코어 정도를 넣어놨습니다. 그리고 제가 가장 아쉬워 하는 부분인 진입점과 명령, 작업 디렉토리 부분입니다. 쌩 번역체를 사용한 것으로 보이는 이 부분은 바로 도커의 ENTRYPOINT와 CMD, WORKDIR를 나타내는 말이였습니다. 처음에 이 부분을 몰라서 헤맸는데, 헤매지 마시고 사용하는 명령어가 따로 있으시다면 넣어주시면 됩니다. 그리고 환경변수에는 key와 value로 나와있는데 docker run을 할때 -e
로 넣어주는 파라미터가 있다면 여기에 넣어주시면 됩니다. 저는 AIRFLOW_HOME
, queue
정도를 넣어놨습니다.
이제 맨 하단에 도커 레이블을 작성해주면 끝입니다.
다시 작업 정의쪽으로 나와서 맨 밑으로 내려가면 Tag가 있는데 저는 이름으로 구분해야 될 게 있었기 때문에 태그에 Name을 넣어주고 원하는 이름을 넣어줬습니다. 그리고 생성하면 작업 정의:1 버전이 나옵니다!
Service 생성
시작 유형에서 우리가 원하는 Fargate를 선택하고 작업정의에 아까 만든 것을 선택해줍니다. 개정은 revision으로 수정된 버전을 말합니다. 기본값은 latest로 최근에 만든 것을 반영합니다. 클러스터도 만든 것을 넣어주고 서비스 이름을 넣어줍니다.
Task 개수는 원하는 만큼 넣어주면 됩니다. 최소 정상 상태 백분율은, 기본적으로 띄워져 있는 Task의 상태를 말하는데, 문서에는 배포 과정에서 실행 중인 작업의 개수에 하한선을 제공하여 추가 클러스터 용량을 사용하지 않고도 배포할 수 있게 해줍니다.
라고 나와있습니다. 그러니까 기본 값인 100이라고 설정하게 되면 위에서 설명한대로, 태스크를 중지해도 계속 올라오게 되는 것입니다.
다음으로 넘어가면 보안그룹과 VPC를 설정할 수 있습니다. 사용하는 VPC를 넣어주고 보안그룹에는 Airflow worker를 위한 보안 그룹을 넣어주면 됩니다. Airflow worker의 보안그룹은 메인서버와 메타디비를 연결해주는 부분만 열어주면 되겠습니다. Airflow는 기본적으로 flask를 통해 8793포트로 열리고 열린 8793포트를 통해서 작업을 할당받으며, 작업로그를 메타디비(mysql)에 3306포트를 통해 남기므로 메인서버와 8793을 매핑해주고 메타디비와 3306(mysql)로 매핑해주겠습니다.
로드 밸런싱은 필요하다면 미리 LB를 만들어놓고 넣어주면 되지만, Airflow에는 LB가 따로 필요하지 않습니다. 메시지 브로커인 Redis를 통해서 작업을 워커에서 받아가기 때문입니다. 그냥 airlfow.cfg에 메인서버 아이피와 metaDB 아이피를 적어주면 알아서 가져가는 것을 보실 수 있습니다.
오토스케일링을 패스하고 만들어주면? 서비스가 만들어졌습니다.
이제 클러스터화면에서 작업 탭을 누르면 Task와 상태를 확인하실 수 있습니다.
여기서 원하는 작업을 이름으로 선택하고 log로 들어가면, 컨테이너에서 발생하고 있는 이력들을 확인하실 수 있습니다. 이 곳에서 airflow가 잘 설치가 됐는지 정상적으로 동작이 되고있는지, 포트는 제대로 열려서 작업을 받고있는지 등등을 체크할 수 있습니다. 이 로그는 CloudWatch에 로그그룹에서 더 자세하게 확인할 수 있습니다.
Trouble Shooting
이제 제가 겪은 자잘한 에러들에 대한 해결에 대해서 설명드리겠습니다. 알고보면 간단한데 해결을 못해서 쩔쩔매느라 아까운 시간을 다 보냈습니다… 저 같이 시간을 버리시지 말기를 바라며…
Airflow initdb시 SqlAlchemy ModelSchema Not found error
작업 로그를 보던 중에 airflow initdb를 할 때마다 ModelSchema 관련 에러가 발생하는 것을 확인했습니다. SqlAlchemy부분에서 에러가 나길래 열심히 구글링을 해보니, 제가 사용하고 있는 airflow 1.10.9
버전에서 사용하는 sql_alchemy, marshmallow 등의 버전이 맞지 않아 발생하는 문제였습니다. 버전이 바뀐 이후부터 ModelSchema를 불러오는 메서드의 디렉토리 구조가 변경되었는데, 이 때문에 메서드를 제대로 불러오지 못해 발생한 것이었습니다.
requirements.txt에 다음과 같이 설정하면 해결이 가능합니다.
1 | #requirements.txt |
hostname_resolver.py 문제
1 | requests.exceptions.ConnectionError: HTTPConnectionPool(host='169.254.169.254', port=80): Max retries exceeded with url: /latest/meta-data/local-ipv4 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f3e4d9966d8>: Failed to establish a new connection: [Errno 22] Invalid argument',)) |
작업 로그에서 또 다시 에러가 발생해서 확인해보니 위의 에러가 발생하고 있었습니다. 169.254.169.254는 AWS의 meta데이터가 담겨있는 곳인데 이 곳에 접근하지 못해서 발생하는 에러였습니다. 기존 ASG워커에서는 metadata를 통해 EC2의 privateIP를 갖고와야 해당 IP에 작업내용을 전달할 수 있었습니다. 그렇지 않으면 docker 컨테이너의 호스트명만 갖고오게 되어 작업이 제대로 실행되지 않습니다. 하지만 ECS에서 Fargate는 docker의 컨테이너 호스트명을 부여하는 대신 privateIP를 부여합니다. AWS 문서 기본적으로 모든Amazon ECS작업Fargate에는 기본 프라이빗 IP 주소가 포함된 elastic network interface (ENI) 가 제공됩니다.
물론 ECS에서 metadata를 활성화 하는 것도 한 방법이겠으나, Private IP가 제공되니, 이 IP를 갖고오면 작업을 바로 할당받을 수 있으므로, 굳이 metadata를 이용할 필요가 없었습니다. 따라서 hostname_resolver.py의 파일안의 resolve함수의 내용을
1 | import socket |
이런식으로 사용해서 privateIP를 전달했습니다.
airflow에서 emr에 작업 실행시 권한 문제
이제 정말 다 되었다고 생각하고 airflow에 EMR에 실행하는 작업을 내렸습니다. 하지만 또 다시
ERROR - An error occurred (AccessDeniedException) when calling the RunJobFlow operation: User: arn:aws:sts::xxxxx:assumed-role/ecsTaskExecutionRole/xxxxxxxx is not authorized to perform: elasticmapreduce:RunJobFlow on resource: arn:aws:elasticmapreduce:ap-northeast-2:xxxxxx:cluster
권한이 없다는 얘기가 나왔습니다. 다행히 로그에 권한 문제라고 써있어서, 쉽게 해결할 수 있었습니다. IAM에서 ecsTaskExecutionRole
를 찾았고, 여기에 기존에 사용하던 권한을 넣어주니, EMR에서 작업이 실행되고 EMR 클러스터도 컨트롤 하는 것을 확인할 수 있었습니다.
이외에도 자잘한 이슈들이 많았지만, 너무 자잘해서 글로 옮기는게 민망해 올리지는 않았습니다. 이렇게 1-2주간의 삽질이 글 한편으로 정리가 되었습니다. 어려움도 많았지만 이를 통해서 ECS를 자세하게 배워보고 활용할 수 있었던 기회였던 것 같아 보람있었네요. ECS로 바꾸게 되면서, 여러 기능적 문제 때문에 점점 Kubernetes로 바꾸어야 겠다는 생각이 많이 들기 시작했습니다. 한 달 이내에는 Airflow를 Kubernetes에 올리는 글을 쓰게 되지 않을까 싶습니다. 읽어주셔서 감사합니다. 부족한 부분은 댓글로 남겨주세요!
Reference
Airflow Workers on ECS Fargate