Nifi with Jolt
Nifi를 잘 활용하는 방법! Jolt
Jolt
Jolt?
Jolt(JsOn Language for Transform) 는 JSON을 JSON으로 변환하는 데 사용할 수 있는 Java 라이브러리입니다. Jolt 변환 사양 자체도 JSON 파일이기 때문에, Apache NiFi 및 Apache Camel과 같은 제품에서 사용할 수 있습니다.
Jolt는 전체 JSON을 JSON으로 변환하기 위해 함께 연결할 수 있는 변환 셋들을 제공합니다. Jolt의 특징은 특정 값을 조작하는 것이 아니라 JSON 데이터의 구조를 변환하는 데 중점을 두고 있다는 것입니다.
Jolt의 공식 github에서는 다음과 같이 설명하고 있습니다.
JSON to JSON transformation library written in Java where the “specification” for the transform is itself a JSON document.
Jolt 는 다음과 같이 사용할 수 있습니다.
- Transforming JSON data from ElasticSearch, MongoDb, Cassandra, etc before sending it off to the world
- Extracting data from a large JSON documents for your own consumption
JSON 데이터를 변환하는 데 특화되었다는 것을 설명을 통해 알 수 있을 것입니다. Jolt가 갖고 있는 변환 셋들은 5가지가 있습니다.
- shift : copy data from the input tree and put it the output tree
- default : apply default values to the tree
- remove : remove data from the tree
- sort : sort the Map key values alphabetically ( for debugging and human readability )
- cardinality : “fix” the cardinality of input data. Eg, the “urls” element is usually a List, but if there is only one, then it is a String
이 글에서는 shift를 사용해서 nested array로 이루어진 Json을 Flatten하는 것을 예시로 살펴보겠습니다.
Jolt의 간단한 문법
그 전에 참고할 내용을 먼저 알려드리겠습니다. LHS (Left Hand Side)와 RHS (Right Hand Side)입니다. LHS는 콜론, 즉, : 를 기준으로 왼쪽을 뜻하고, RHS는 :를 기준으로 오른쪽을 뜻합니다.
예를 들자면 이렇습니다.
1 | [ |
이제 기본 구조에 대해서 알아보겠습니다. Jolt는 다음과 같은 기본적인 구조를 갖고 있습니다.
1 | [ |
- “operations”: 적용할 변환 유형을 정의합니다.
- “spec”: 어떤 변환을 넣을지 적는 필드입니다.
- “[]”: Jolt의 기본 구조도 JSON이므로 목록 내에서 여러 작업을 연결할 수 있습니다.
위에서 변환 셋들에 대해서 간단히 다루긴 했지만, 더 자세한 설명을 보고 싶다면 여기에서 확인할 수 있습니다.
Jolt를 활용한 Flatten 작업
Json 데이터가 nested로 이루어졌을때 Flatten이 필요한 경우가 있습니다. 이럴 때는 AWS라면 EMR, GCP라면 Dataproc에 Spark를 사용해서 처리를 하곤 합니다. 물론 대규모로 이루어진 데이터에 대해서 작업하려면 어쩔 수 없이 사용하겠지만, 미리 Flatten해서 저장을 하면 어떨까요? 그렇다면 비싼 EMR을 사용할 일도 없어질 듯 합니다.
먼저 이 작업을 위한 방식은 두 가지입니다. JoltTransformJSON
, JoltTransformRecord
입니다.
둘의 차이는 간단합니다. JoltTransformRecord
는 레코드 판독기와 레코드 작성기를 사용하는 반면 JoltTransformJSON
은 작업할 JSON 흐름 파일 콘텐츠를 찾을 것으로 예상합니다. 두 경우 모두 출력은 플로우 파일 내용으로 끝나고 변환 구성은 동일합니다.
이제 작업할 대상 데이터를 보겠습니다.
1 | { |
여기서 작업할 내용을 미리 정의하면
- data부분과 metadata에서 필요한 것만 가져올 것입니다.
- 필요한 것만 가져와서 합칠 것입니다.
- 그리고 array_deal_products에 싸여있는 데이터를 풀어줄 것입니다.
data, metadata 특정 키 값만 합치기
미리 만들어 둔 Nifi Task에서 properties로 들어갑니다. Jolt Transformation DSL을 Chain으로 설정하고 Jolt Specification을 작성해줍니다.
Jolt Specification
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15[
{
"operation": "shift",
"spec": {
"data": {
"_id": "&",
"array_deal_products": "&"
},
"metadata": {
"operation": "&",
"timestamp": "&"
}
}
}
]
이제 spec 부분에 어떤 작업을 원하는지를 넣어주면 됩니다.
- 위의 specification은 “data” 부분과 “metadata” 부분에 대해서 작업을 할 것이라는 뜻입니다.
- data에서는 id와 array_deal_products를 가져올 것이고, metedata에서는 operation과 timestamp를 가져올 것입니다.
이렇게 사용했을때 나오는 결과는 다음과 같습니다.
1 | { |
List로 묶인 값 풀어주기
- SplitJson을 사용합니다.
SplitJson 프로세서를 만들어주고 위에서 작성한 $.[*]
을 써주면, 원하는 값을 가져와서 리스트로 묶인 값을 풀어줄 수 있습니다.
파이프라인으로 확인한다면 이렇게 되겠네요
중간에 EvaluateJsonPath는 Json을 파싱하는 프로세서입니다. 이 설정에 Destination을 flowfile-content
로 설정해두시면 파싱 결과를 content에 저장하고 하나의 JSON path expression만 가질 수 있습니다. 반면 flowfile-attribute
라면 일단 attribute로 저장하겠죠? 그리고 각 JSON path가 명명된 속성 값으로 추출됩니다. 미리 지정한 ****attribute에 저장해둔 array_deal_products에 대해서 SplitJson을 해주는 것이라고 보면 되겠습니다.
.을 _로 변환하기
DB에서 데이터 연동을 하다보면 ~~~.~~~.~~
로 된 컬럼들이 있습니다. 해당 DB에서야 문제가 없겠는데, 다른 DB로 넣을 때면 문제가 발생하곤 합니다. 보통 _로 처리해서 넣으면 별 일 없기에 이렇게 변환해서 저장을 하거나 연동을 합니다. 하지만 이 간단한 작업을 하는데 Spark로 처리하기에는 너무 아까운 것 같습니다. 별다른 데이터 변환 없이 컬럼 명만 수정하면 되기 때문입니다. 그렇다고 손으로 일일이 하기엔 너무 힘이 들 것 같습니다. 이런 경우에 Jolt를 활용하면 쉽게 .이 들어간 컬럼(또는 키)을 _로 변환할 수 있습니다.
- 데이터 예시
1 | { |
이런 데이터가 있다고 하겠습니다. 이 Json데이터의 키를 잘 보면 중간 중간에 .이 들어간 것을 볼 수 있습니다. thumbnail.original.service_type
처럼 처음, 중간에 .이 들어간 키가 있는 반면에, purchase_policy.min
처럼 중간에 .이 있는 것처럼 보이는 키도 있습니다. 이런 경우에 .을 _로 변경하기 위한 Jolt Spec은 다음과 같습니다.
1 | [ |
이번에도 shift operation을 사용할 것입니다. 자세히 봐야할 부분은 *
가 있는 부분입니다. 인풋으로 들어오는 데이터의 키를 쭉 보다보니처음에 .이 붙는 경우와 처음과 그 다음 부분에 .이 붙는 경우가 있습니다. 그래서 spec에 "*.*": "&(0,1)_&(0,2)"
, "*.*.*": "&(0,1)_&(0,2)_&(0,3)"
이렇게 넣어줬습니다. 괄호에 있는 숫자는 depth의 level을 뜻합니다. level 0이라면 첫 번째 {}안에 있는 키들을 의미하고 1이라면 키의 첫번째 값을 의미합니다. 예를 들어 purchase_policy.min
자체는 level 0이겠고, purchase_policy는 level 1이 되겠습니다. 그렇다면 min은 level2가 되겠습니다.
따라서 thumbnail.original.service_type
를 thumbnail_original_service_type
로 바꾸겠다 하면 "*.*.*": "&(0,1)_&(0,2)_&(0,3)"
이렇게 Spec을 작성하면 되는 것입니다.
그리고 마지막으로 나머지 값들을 다 사용할 것이므로 "*": "&"
를 통해 가져오면 끝입니다.
이렇게 해서 Jolt에 대해서 간단하게 알아봤습니다. 생각보다 간단하게 데이터를 Transform할 수 있는데, 이를 통해서 대대적인 상품개편 시에 큰 도움을 받았었습니다. 이것을 일일이 내려서 EMR을 돌리고 하는 작업들을 했었으면 너무 파이프라인이 복잡해지고, 모니터링 할 것도 많아졌을 것 같네요. Nifi를 사용하시는 분들이나 Spark를 사용하시는 분들이라면 Jolt를 한 번쯤 살펴보면 좋을 것 같습니다.
Reference
Nifi with Jolt