Nifi with Jolt

Nifi with Jolt

Nifi를 잘 활용하는 방법! Jolt

Jolt

  • Jolt?

    Jolt(JsOn Language for Transform) 는 JSON을 JSON으로 변환하는 데 사용할 수 있는 Java 라이브러리입니다. Jolt 변환 사양 자체도 JSON 파일이기 때문에, Apache NiFi 및 Apache Camel과 같은 제품에서 사용할 수 있습니다.

    Jolt는 전체 JSON을 JSON으로 변환하기 위해 함께 연결할 수 있는 변환 셋들을 제공합니다. Jolt의 특징은 특정 값을 조작하는 것이 아니라 JSON 데이터의 구조를 변환하는 데 중점을 두고 있다는 것입니다.

    Jolt의 공식 github에서는 다음과 같이 설명하고 있습니다.

    JSON to JSON transformation library written in Java where the “specification” for the transform is itself a JSON document.



Jolt 는 다음과 같이 사용할 수 있습니다.

  1. Transforming JSON data from ElasticSearch, MongoDb, Cassandra, etc before sending it off to the world
  2. Extracting data from a large JSON documents for your own consumption



JSON 데이터를 변환하는 데 특화되었다는 것을 설명을 통해 알 수 있을 것입니다. Jolt가 갖고 있는 변환 셋들은 5가지가 있습니다.

  • shift : copy data from the input tree and put it the output tree
  • default : apply default values to the tree
  • remove : remove data from the tree
  • sort : sort the Map key values alphabetically ( for debugging and human readability )
  • cardinality : “fix” the cardinality of input data. Eg, the “urls” element is usually a List, but if there is only one, then it is a String

이 글에서는 shift를 사용해서 nested array로 이루어진 Json을 Flatten하는 것을 예시로 살펴보겠습니다.



Jolt의 간단한 문법

그 전에 참고할 내용을 먼저 알려드리겠습니다. LHS (Left Hand Side)RHS (Right Hand Side)입니다. LHS는 콜론, 즉, : 를 기준으로 왼쪽을 뜻하고, RHS는 :를 기준으로 오른쪽을 뜻합니다.

예를 들자면 이렇습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
[
{
"operation": "shift",
"spec": {
LHS -> "customer": {
LHS -> "name": "client.fullName", <- RHS
LHS -> "birhtDate": "client.dateOfBirth", <- RHS
LHS -> "address": "client.address.street", <- RHS
LHS -> "country": "client.address.country" <- RHS
}
}
}
]

이제 기본 구조에 대해서 알아보겠습니다. Jolt는 다음과 같은 기본적인 구조를 갖고 있습니다.

1
2
3
4
5
6
7
8
[
{
"operation": "",
"spec": {

}
}
]
  • “operations”: 적용할 변환 유형을 정의합니다.
  • “spec”: 어떤 변환을 넣을지 적는 필드입니다.
  • “[]”: Jolt의 기본 구조도 JSON이므로 목록 내에서 여러 작업을 연결할 수 있습니다.

위에서 변환 셋들에 대해서 간단히 다루긴 했지만, 더 자세한 설명을 보고 싶다면 여기에서 확인할 수 있습니다.



Jolt를 활용한 Flatten 작업

Json 데이터가 nested로 이루어졌을때 Flatten이 필요한 경우가 있습니다. 이럴 때는 AWS라면 EMR, GCP라면 Dataproc에 Spark를 사용해서 처리를 하곤 합니다. 물론 대규모로 이루어진 데이터에 대해서 작업하려면 어쩔 수 없이 사용하겠지만, 미리 Flatten해서 저장을 하면 어떨까요? 그렇다면 비싼 EMR을 사용할 일도 없어질 듯 합니다.

먼저 이 작업을 위한 방식은 두 가지입니다. JoltTransformJSON, JoltTransformRecord 입니다.

둘의 차이는 간단합니다. JoltTransformRecord는 레코드 판독기와 레코드 작성기를 사용하는 반면 JoltTransformJSON은 작업할 JSON 흐름 파일 콘텐츠를 찾을 것으로 예상합니다. 두 경우 모두 출력은 플로우 파일 내용으로 끝나고 변환 구성은 동일합니다.

이제 작업할 대상 데이터를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"data": {
"_id": 1000009195,
"code": "*******",
"name": "************",
"array_deal_products": "[ { \"deal_product_no\" : ***, \"code\" : \"***\", \"name\" : \"***\", \"seq\" : 1, \"master_product_code\" : \"***\", ]",
"short_description": "*******",
"array_site_attributes": "[ \"MARKET\" ]",
"array_product_tags": "[ 2 ]",
},
"metadata": {
"timestamp": "2022-**-**T01:12:05.790692Z",
"record-type": "data",
"operation": "load",
"partition-key-type": "attribute-name",
"schema-name": "commerce_product",
"table-name": "contents_products"
}
}

여기서 작업할 내용을 미리 정의하면

  1. data부분과 metadata에서 필요한 것만 가져올 것입니다.
  2. 필요한 것만 가져와서 합칠 것입니다.
  3. 그리고 array_deal_products에 싸여있는 데이터를 풀어줄 것입니다.



data, metadata 특정 키 값만 합치기

미리 만들어 둔 Nifi Task에서 properties로 들어갑니다. Jolt Transformation DSL을 Chain으로 설정하고 Jolt Specification을 작성해줍니다.

  • Jolt Specification

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    [
    {
    "operation": "shift",
    "spec": {
    "data": {
    "_id": "&",
    "array_deal_products": "&"
    },
    "metadata": {
    "operation": "&",
    "timestamp": "&"
    }
    }
    }
    ]

이제 spec 부분에 어떤 작업을 원하는지를 넣어주면 됩니다.

  • 위의 specification은 “data” 부분과 “metadata” 부분에 대해서 작업을 할 것이라는 뜻입니다.
    • data에서는 id와 array_deal_products를 가져올 것이고, metedata에서는 operation과 timestamp를 가져올 것입니다.

이렇게 사용했을때 나오는 결과는 다음과 같습니다.

1
2
3
4
5
6
{
"_id" : 1000009195,
"array_deal_products" : "[ { \"deal_product_no\" : ***, \"code\" : \"***\", \"name\" : \"***\", \"seq\" : 1, \"master_product_code\" : \"***\", ]",
"operation" : "load",
"timestamp" : "2022-**-**T01:12:05.790692Z"
}



List로 묶인 값 풀어주기

SplitJson 프로세서를 만들어주고 위에서 작성한 $.[*] 을 써주면, 원하는 값을 가져와서 리스트로 묶인 값을 풀어줄 수 있습니다.

파이프라인으로 확인한다면 이렇게 되겠네요

EvaluateJsonPath

중간에 EvaluateJsonPath는 Json을 파싱하는 프로세서입니다. 이 설정에 Destination을 flowfile-content로 설정해두시면 파싱 결과를 content에 저장하고 하나의 JSON path expression만 가질 수 있습니다. 반면 flowfile-attribute 라면 일단 attribute로 저장하겠죠? 그리고 각 JSON path가 명명된 속성 값으로 추출됩니다. 미리 지정한 **attribute에 저장해둔 array_deal_products에 대해서 SplitJson을 해주는 것이라고 보면 되겠습니다.



.을 _로 변환하기

DB에서 데이터 연동을 하다보면 ~~~.~~~.~~로 된 컬럼들이 있습니다. 해당 DB에서야 문제가 없겠는데, 다른 DB로 넣을 때면 문제가 발생하곤 합니다. 보통 _로 처리해서 넣으면 별 일 없기에 이렇게 변환해서 저장을 하거나 연동을 합니다. 하지만 이 간단한 작업을 하는데 Spark로 처리하기에는 너무 아까운 것 같습니다. 별다른 데이터 변환 없이 컬럼 명만 수정하면 되기 때문입니다. 그렇다고 손으로 일일이 하기엔 너무 힘이 들 것 같습니다. 이런 경우에 Jolt를 활용하면 쉽게 .이 들어간 컬럼(또는 키)을 _로 변환할 수 있습니다.

  • 데이터 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"_id": 1234,
"purchase_policy.min": 1,
"product_notice.type": "***",
"product_notice.template_id": "****",
"product_notice.is_free_template": true,
"normal_order_type_policy": "DEFAULT",
"is_search_enabled": true,
"is_expose_product_list": true,
"version": 8,
"thumbnail.original.service_type": "***",
"thumbnail.share.service_type": "***",
"operation": "update",
"timestamp": "***************"
}

이런 데이터가 있다고 하겠습니다. 이 Json데이터의 키를 잘 보면 중간 중간에 .이 들어간 것을 볼 수 있습니다. thumbnail.original.service_type 처럼 처음, 중간에 .이 들어간 키가 있는 반면에, purchase_policy.min 처럼 중간에 .이 있는 것처럼 보이는 키도 있습니다. 이런 경우에 .을 _로 변경하기 위한 Jolt Spec은 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
[

{
"operation": "shift",
"spec": {
"*.*": "&(0,1)_&(0,2)",
"*.*.*": "&(0,1)_&(0,2)_&(0,3)",
"*": "&"
}
}
]

이번에도 shift operation을 사용할 것입니다. 자세히 봐야할 부분은 * 가 있는 부분입니다. 인풋으로 들어오는 데이터의 키를 쭉 보다보니처음에 .이 붙는 경우와 처음과 그 다음 부분에 .이 붙는 경우가 있습니다. 그래서 spec에 "*.*": "&(0,1)_&(0,2)", "*.*.*": "&(0,1)_&(0,2)_&(0,3)" 이렇게 넣어줬습니다. 괄호에 있는 숫자는 depth의 level을 뜻합니다. level 0이라면 첫 번째 {}안에 있는 키들을 의미하고 1이라면 키의 첫번째 값을 의미합니다. 예를 들어 purchase_policy.min 자체는 level 0이겠고, purchase_policy는 level 1이 되겠습니다. 그렇다면 min은 level2가 되겠습니다.

따라서 thumbnail.original.service_typethumbnail_original_service_type 로 바꾸겠다 하면 "*.*.*": "&(0,1)_&(0,2)_&(0,3)" 이렇게 Spec을 작성하면 되는 것입니다.

그리고 마지막으로 나머지 값들을 다 사용할 것이므로 "*": "&" 를 통해 가져오면 끝입니다.




이렇게 해서 Jolt에 대해서 간단하게 알아봤습니다. 생각보다 간단하게 데이터를 Transform할 수 있는데, 이를 통해서 대대적인 상품개편 시에 큰 도움을 받았었습니다. 이것을 일일이 내려서 EMR을 돌리고 하는 작업들을 했었으면 너무 파이프라인이 복잡해지고, 모니터링 할 것도 많아졌을 것 같네요. Nifi를 사용하시는 분들이나 Spark를 사용하시는 분들이라면 Jolt를 한 번쯤 살펴보면 좋을 것 같습니다.

Reference

Author

SangHyub Lee, Jose

Posted on

2022-08-06

Updated on

2023-12-08

Licensed under

Comments