Google Cloud Platform - Task

Google Cloud Platform - Task

Google Task를 통해 비동기처리에 대한 Toy Project를 진행해보자.

1. Google Cloud Task

먼저 Google Cloud Task가 무엇인지 살펴보자. Google Cloud Tasks는 대규모 분산형 태스크의 실행, 디스패치, 전송을 관리할 수 있는 완전 관리형 서비스이다. Cloud Tasks를 사용하게 되면 사용자 또는 서비스 간 요청 이외의 작업을 비동기적으로 수행할 수 있게 된다. 여기서 비동기적으로 작업을 수행한다는 말이 있다. 비동기적으로 작업을 수행한다는 것은 어떤 의미이고 어떤 상황에서 사용해야 할까?

2. 비동기 처리

먼저 동기식 처리가 뭔지 알아보자. 동기식 처리 모델(Synchronous processing model)은 직렬적으로 태스크(task)를 수행한다는 의미이다. 즉, 태스크는 순차적으로 실행되며 어떤 작업이 수행 중이면 다음 작업은 대기하게 된다. 반면, 비동기식 처리는 (Asynchronous processing model 또는 Non-Blocking processing model)은 병렬적으로 태스크를 수행한다. 즉, 태스크가 종료되지 않은 상태라 하더라도 대기하지 않고 다음 태스크를 실행한다. 예를 들어 서버에서 데이터를 가져와서 화면에 표시하는 태스크를 수행할 때, 서버에 데이터를 요청한 이후 서버로부터 데이터가 응답될 때까지 대기하지 않고(Non-Blocking) 즉시 다음 태스크를 수행한다. 이후 서버로부터 데이터가 응답되면 이벤트가 발생하고 이벤트 핸들러가 데이터를 가지고 수행할 태스크를 계속해 수행한다. (출처 : https://poiemaweb.com/js-async)

다른 상황을 가정해보자. 어떤 API가 두 개 있다. 여기서 정보를 가져올 건데, get_user_list라는 API에서는 이름만을 가져올 수 있는 API이고, get_user_record에는 특정 유저의 구체적인 정보가 담겨 있다. get_user_record는 제한 조건이 있어 한번에 많은 데이터를 가져올 수 없다. 예를 들어 get_user_list에서 모든 user list를 받는다고 하더라도 bulk로 get_user_record에 쿼리를 보내 데이터를 갖고 오는 것은 불가능하다는 것이다. 이런 조건에서 빠르게 데이터를 확보하려면 어떻게 해야할까? for loop을 돌려 user name 하나씩 정보를 얻어 온다면 많은 시간이 걸릴 것이다.

비동기 처리를 활용해 보자. 워커를 여러 대 만들고 워커에 수행할 작업(get_user_record에서 name별 정보를 갖고 오는 일)을 넣어주면 훨씬 빠르게 데이터를 모을 수 있을 것이다. 그렇다면 파이프 라인은 다음과 같다.

  1. get_user_record에 쿼리를 날리고 실시간성 DB에 데이터를 담아두기
  2. 실시간성 DB에 있는 데이터를 배치성 DB에 넣기
    여기서 1번 작업은 Google Cloud Function에 작업 내용을 넣어주고 Google Cloud Task를 이용해서 비동기 작업을 통해 빠르게 수행하도록 할 것이다.
1
2
3
4
5
6
7
https://us-central1-contxtsio-267105.cloudfunctions.net/get_user_list
param: {}

https://us-central1-contxtsio-267105.cloudfunctions.net/get_user_by_name
param: {
"name": "Braund, Mr. Owen Harris"
}

다음과 같이 API가 있다. 첫 번째 API는 user의 list를 받아올 수 있는 API이고, 두 번째는 user의 name별로 정보를 가져올 수 있는 API이다. 먼저 Local에서 정보를 제대로 받아오는지 테스트를 해보자.

Toy Project

Toy1. get_user_list, get_user_by_name 확인

API에서 정보를 받기 위해서 자주 사용하는 라이브러리인 requests를 이용한다. requests 쉽게 사용하기

1
2
3
4
5
6
7
import json
import requests

url = 'https://us-central1-contxtsio-267105.cloudfunctions.net/get_user_list'
response = requests.post(url)
user_list = response.json()
user_list = user_list['data']

user_list는 아주 잘 나온다. 이제 get_user_by_name을 확인해보자

1
2
3
4
url2 = 'https://us-central1-contxtsio-267105.cloudfunctions.net/get_user_by_name'
params = {'name' : user_list[0]}
response = requests.post(url2, json=params)
response.json()

post로 parameter를 보내면 역시 값이 잘 나온다.

Toy2. mongoDB 설정

작업을 수행하고 나온 데이터를 임시로 넣을 DB는 mongoDB로 해볼 것이다. 무료로 사용할 수 있는 mongoDB가 있는데 https://mlab.com/ 여기를 이용하면 된다. 아니면 mongoDB Atlas를 사용해도 된다. mongoDB 구축하는 건 간단하니 Pass한다.

Toy3. gcloud 설정

google cloud의 다양한 서비스를 이용하려면 gcloud를 설정해야 한다. google cloud Task를 사용하기 위한 퀵 가이드 문서는 잘 되어 있는 편이다. Cloud Task Quick Guide

1
pip install gcloud

gcloud를 먼저 설치하고 따라하면 된다. SDK를 만들고 난 후에 Cloud Tasks API, API 라이브러리 페이지에 갔을 때 사용가능 하다는 초록 표시가 나오면 퀵 가이드 문서의 샘플 설정 및 하단의 내용을 수행하면 된다.
사용 가능!

퀵 가이드 문서 내용을 다 따라하고 task페이지로 오게 되면 my-queue가 생성된 것을 확인할 수 있다. 이렇게 되면 성공이다.
task 작업 준비 완료

Toy4. Functions 설정

Cloud Functions로 이동해보자. Cloud Functions는 AWS Lambda와 비슷하다(서버리스 아키텍쳐) .
Functions를 세팅할 때 주의할 점이 있다. 위치나 메모리는 아무렇게나 설정해도 되지만 인증 부분을 꼭 체크하고 코드를 작성할 런타임을 python 3.7로 바꿔주도록 한다.
인증되지 않은 호출 허용 체크

조금 기다리면 funtion이 만들어진다. funtion에 들어가서 수정 버튼을 누르면 작업을 수행할 소스코드가 등장한다. main.py에 작업할 코드를 넣어주고 requirements.txt에 필요한 라이브러리들을 작성해 import할 수 있도록 한다.

1
2
3
4
#requirements.txt
pymongo
requests
dnspython

requirements에는 mongoDB 연결과 저장을 위한 pymongo, API를 위한 requests 그리고 기타 dnspython을 넣어준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#MAIN.py
from pymongo import MongoClient
import ast
import pymongo
import requests

mongo_uri = "mongodb://****:****@ds263018.mlab.com:63018/****?retryWrites=false"
client = pymongo.MongoClient(mongo_uri)

db = client.****

def hello_world(request):
"""Responds to any HTTP request.
Args:
request (flask.Request): HTTP request object.
Returns:
The response text or any set of values that can be turned into a
Response object using
`make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
"""
payload = request.get_data(as_text=True)
request_json = ast.literal_eval(payload)

r = requests.post(url = "https://us-central1-contxtsio-267105.cloudfunctions.net/get_user_by_name", json=request_json)

db.temp.insert_one(r.json())

return "Success"

main 소스에는 local에서 테스트한 내용을 넣어준다. mongo_url에서 retryWrite=false는 mongoDB에서 MongoError: This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.이런 에러가 나온다면 사용해보자.

소스를 넣은 후에 잘 세팅 되었는지를 확인해보기 위해 테스트를 이용해 볼 수 있다. 테스트를 누르면 트리거 이벤트라고 나오고 {}이렇게 나와 있다. 여기에 테스트 할 파라미터 값을 넣어보면 된다. 우리의 파라미터는 name이므로

1
{"name": "Braund, Mr. Owen Harris"}

이렇게 파라미터를 적어주고 테스트한다. success가 나오면 성공이다.

Toy5. Task

이제 로컬로 돌아와서 task와 function을 붙여볼 차례다. 먼저 코드부터 소개한다.

1
2
3
4
5
6
7
8
9
10
11
from google.cloud import storage
from google.cloud import tasks_v2
task_client = tasks_v2.CloudTasksClient() # Credential

def dispatch_task(name):
#json 같은 string
payload = str({
"name": name,
})

resp = create_task(project='affable-**********', queue='my-queue', location='asia-northeast3', payload=payload)

dispatch_task를 통해서 수행할 작업을 어떤 프로젝트에 연결하고 어떤 큐에 보낼 것인지 선택할 수 있다. 이렇게 프로젝트와 Task에 연결하고 난 뒤에 create_task를 수행하게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def create_task(project, queue, location, payload=None, in_seconds=None):
parent = task_client.queue_path(project, location, queue)

# Construct the request body.
task = {
'http_request': { # Specify the type of request.
'http_method': 'POST',
'url': 'https://us-central1-affable-audio-277311.cloudfunctions.net/function-2' #function url
}
}
if payload is not None:
# The API expects a payload of type bytes.
converted_payload = payload.encode() #여기서 인코딩

# Add the payload to the request.
task['http_request']['body'] = converted_payload

if in_seconds is not None:
# Convert "seconds from now" into an rfc3339 datetime string.
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)

# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)

# Add the timestamp to the tasks.
task['schedule_time'] = timestamp

# Use the client to build and send the task.
response = task_client.create_task(parent, task)
print(response)
return response

create_task에서 본격적으로 funtions와 연결이 되어 작업들이 돌아가게 된다. task에 보면 url 파라미터가 보인다. 이 url이 작업을 수행할 funtions의 위치를 나타내는 것으로, functions의 트리거 부분에 있는 url을 넣어주면 된다.

이렇게 넣어주고 나면 payload를 받아서 인코딩을 해주고 task에서 request를 날려주게 된다.

함수를 다 지정하고 나서 user_list있는 name들을 dispatch_task에 넣어주게 되면 task가 돌아간다!

1
2
3
4
5
# dispatch_task('Braund, Mr. Owen Harris') #하나짜리로 먼저 테스트


for user in user_list:
dispatch_task(user)

Trouble Shooting

Trouble, Credential

바로 성공이 되는 경우가 있는 반면, 에러가 터져나와 제대로 돌아가지 않는 경우가 발생하곤 한다. 보통은 credential 문제가 대부분이다. dispatch_task의 윗부분에 #credential이라고 작성한 부분에서 보통 에러가 나는데

DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started

이러한 에러가 발생한다면 Credential관련한 에러이며, 이런 에러가 나오지 않고 아래 for loop을 돌 때,

AttributeError: ‘str’ object has no attribute ‘create_task’

이런 에러가 나와도 task_client에서 credential이 제대로 되지 않았다는 뜻이다.

Solution.1

1
task_client = tasks_v2.CloudTasksClient(<json>)

다음과 같이 task_client에 cloud 프로젝트에 있는 json key를 받아서 절대경로를 에 넣어준다.
json key는 다음과 같이 얻을 수 있다. 내 프로젝트로 들어가서 작업을 누르고 키를 받는다.
json 키

Solution.2

이렇게 해결이 되는 경우도 있지만 사용하는 컴퓨터의 종류나 gcloud설정에 따라 제대로 되지 않을 수 있다. 두 번째 방법은 구글 문서에 있는 방법이다.

1
2
3
4
5
6
7
8
9
def explicit():
# Explicitly use service account credentials by specifying the private key
# file.
storage_client = storage.Client.from_service_account_json(
<json>)

# Make an authenticated API request
buckets = list(storage_client.list_buckets())
print(buckets)

이 역시 아까 받은 json 키를 사용하는 방법이다. 이렇게 함수를 만든 뒤 explicit함수를 실행시키다. 그리고 다시 for loop을 돌려보자.

Solution.3

마지막 방법이다. os 라이브러리를 이용해 google credential에 사용하는 json 키 값을 직접 지정해 주는 방식이다. 본인 생각으로는 가장 확실한 방법이라고 생각한다.

1
2
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '<json>'
Author

SangHyub Lee, Jose

Posted on

2020-05-19

Updated on

2023-12-08

Licensed under

Comments