Apache NiFi를 사용해 IoT 센서 데이터를 수집하고 Snowflake 데이터 웨어하우스에 저장하는 구체적인 예
목표
- MQTT 브로커에서 IoT 센서 데이터를 실시간으로 수집.
- JSON 데이터를 정제하고 SQL 형식으로 변환.
- Snowflake 데이터 웨어하우스에 데이터를 적재.
단계별 구현
1단계: NiFi 설치 및 설정
- Apache NiFi를 설치합니다.
- NiFi를 실행:
- 브라우저에서 NiFi UI에 접속: http://localhost:8080/nifi
2단계: MQTT 브로커에서 데이터 수집
- ConsumeMQTT 프로세서 추가
- 프로세서 패널에서 ConsumeMQTT를 검색하여 Canvas에 추가.
- MQTT 브로커 설정:
- Broker URI: tcp://broker.hivemq.com:1883
- Topic Name: iot/sensor_data
- Client ID: nifi_mqtt_consumer
- 프로세서 속성 설정 예:속성값
Broker URI |
tcp://broker.hivemq.com:1883 |
Topic Filter |
iot/sensor_data |
Client ID |
nifi_mqtt_consumer |
QoS Level |
1 |
Max Queue Size |
1000 |
- 데이터 포맷 확인
- MQTT 데이터 예:
json
{ "sensor_id": "12345", "temperature": 22.5, "humidity": 60.2, "timestamp": "2025-01-10T12:34:56Z" }
- ConsumeMQTT 프로세서를 실행하여 데이터를 수집.
3단계: 데이터 정제 및 변환
- UpdateAttribute 프로세서 추가
- ConsumeMQTT 프로세서의 출력 연결선을 UpdateAttribute 프로세서로 연결.
- UpdateAttribute 설정:
- filename: ${sensor_id}_${timestamp}.json
- EvaluateJsonPath 프로세서 추가
- JSON 데이터를 추출하여 속성으로 변환.
- 설정 예:속성 이름JSON Path
sensor_id |
$.sensor_id |
temperature |
$.temperature |
humidity |
$.humidity |
timestamp |
$.timestamp |
- ConvertJSONToSQL 프로세서 추가
- JSON 데이터를 SQL 삽입문으로 변환.
- 설정 예:
- Table Name: iot_data
- SQL Statement:
sql
INSERT INTO iot_data (sensor_id, temperature, humidity, timestamp) VALUES (${sensor_id}, ${temperature}, ${humidity}, '${timestamp}');
4단계: 데이터 웨어하우스에 적재
- PutDatabaseRecord 프로세서 추가
- SQL 데이터를 Snowflake로 전송.
- 설정 예:속성값
Database Connection |
jdbc:snowflake://your_account.snowflakecomputing.com |
Table Name |
iot_data |
Schema Name |
public |
- JDBC Controller Service 설정
- DBCPConnectionPool를 설정:
- Driver Class Name: com.snowflake.client.jdbc.SnowflakeDriver
- URL: jdbc:snowflake://your_account.snowflakecomputing.com
- Username: <username>
- Password: <password>
5단계: 데이터 흐름 연결
- ConsumeMQTT → UpdateAttribute → EvaluateJsonPath → ConvertJSONToSQL → PutDatabaseRecord
- 워크플로우를 연결한 모습:
css
[ConsumeMQTT] --> [UpdateAttribute] --> [EvaluateJsonPath] --> [ConvertJSONToSQL] --> [PutDatabaseRecord]
6단계: 테스트 및 실행
- 프로세서 실행
- 모든 프로세서를 Start 상태로 설정.
- MQTT 브로커에서 데이터를 수신하고 변환 과정을 거쳐 Snowflake로 적재되는지 확인.
- 결과 확인
- Snowflake 데이터베이스에서 테이블 iot_data의 데이터를 조회:
sql
SELECT * FROM iot_data;
결과
- NiFi를 통해 MQTT 브로커에서 IoT 데이터를 실시간으로 수집하고, 데이터 정제 및 SQL 변환을 거쳐 Snowflake에 저장할 수 있습니다.
장점
- GUI 기반 설정: 코딩 없이도 복잡한 데이터 워크플로우 설계 가능.
- 확장성: 다양한 데이터 소스와 유연하게 연결 가능.
- 실시간 처리: 스트리밍 데이터와 배치 데이터를 동시에 처리.