ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Apache NiFi를 사용해 IoT 센서 데이터를 수집하고 Snowflake 데이터 웨어하우스에 저장하는 구체적인 예
    카테고리 없음 2025. 1. 10. 23:27

    Apache NiFi를 사용해 IoT 센서 데이터를 수집하고 Snowflake 데이터 웨어하우스에 저장하는 구체적인 예

    목표

    1. MQTT 브로커에서 IoT 센서 데이터를 실시간으로 수집.
    2. JSON 데이터를 정제하고 SQL 형식으로 변환.
    3. Snowflake 데이터 웨어하우스에 데이터를 적재.

    단계별 구현

    1단계: NiFi 설치 및 설정

    1. Apache NiFi를 설치합니다.
    2. NiFi를 실행:
      bash
      코드 복사
      ./bin/nifi.sh start
    3. 브라우저에서 NiFi UI에 접속: http://localhost:8080/nifi

    2단계: MQTT 브로커에서 데이터 수집

    1. ConsumeMQTT 프로세서 추가
      • 프로세서 패널에서 ConsumeMQTT를 검색하여 Canvas에 추가.
      • MQTT 브로커 설정:
        • Broker URI: tcp://broker.hivemq.com:1883
        • Topic Name: iot/sensor_data
        • Client ID: nifi_mqtt_consumer
      • 프로세서 속성 설정 예:속성값
        Broker URI tcp://broker.hivemq.com:1883
        Topic Filter iot/sensor_data
        Client ID nifi_mqtt_consumer
        QoS Level 1
        Max Queue Size 1000
    2. 데이터 포맷 확인
      • MQTT 데이터 예:
        json
        코드 복사
        { "sensor_id": "12345", "temperature": 22.5, "humidity": 60.2, "timestamp": "2025-01-10T12:34:56Z" }
    3. ConsumeMQTT 프로세서를 실행하여 데이터를 수집.

    3단계: 데이터 정제 및 변환

    1. UpdateAttribute 프로세서 추가
      • ConsumeMQTT 프로세서의 출력 연결선을 UpdateAttribute 프로세서로 연결.
      • UpdateAttribute 설정:
        • filename: ${sensor_id}_${timestamp}.json
    2. EvaluateJsonPath 프로세서 추가
      • JSON 데이터를 추출하여 속성으로 변환.
      • 설정 예:속성 이름JSON Path
        sensor_id $.sensor_id
        temperature $.temperature
        humidity $.humidity
        timestamp $.timestamp
    3. ConvertJSONToSQL 프로세서 추가
      • JSON 데이터를 SQL 삽입문으로 변환.
      • 설정 예:
        • Table Name: iot_data
        • SQL Statement:
          sql
          코드 복사
          INSERT INTO iot_data (sensor_id, temperature, humidity, timestamp) VALUES (${sensor_id}, ${temperature}, ${humidity}, '${timestamp}');

    4단계: 데이터 웨어하우스에 적재

    1. PutDatabaseRecord 프로세서 추가
      • SQL 데이터를 Snowflake로 전송.
      • 설정 예:속성값
        Database Connection jdbc:snowflake://your_account.snowflakecomputing.com
        Table Name iot_data
        Schema Name public
    2. JDBC Controller Service 설정
      • DBCPConnectionPool를 설정:
        • Driver Class Name: com.snowflake.client.jdbc.SnowflakeDriver
        • URL: jdbc:snowflake://your_account.snowflakecomputing.com
        • Username: <username>
        • Password: <password>

    5단계: 데이터 흐름 연결

    • ConsumeMQTT → UpdateAttribute → EvaluateJsonPath → ConvertJSONToSQL → PutDatabaseRecord
    • 워크플로우를 연결한 모습:
    css
    코드 복사
    [ConsumeMQTT] --> [UpdateAttribute] --> [EvaluateJsonPath] --> [ConvertJSONToSQL] --> [PutDatabaseRecord]

    6단계: 테스트 및 실행

    1. 프로세서 실행
      • 모든 프로세서를 Start 상태로 설정.
      • MQTT 브로커에서 데이터를 수신하고 변환 과정을 거쳐 Snowflake로 적재되는지 확인.
    2. 결과 확인
      • Snowflake 데이터베이스에서 테이블 iot_data의 데이터를 조회:
        sql
        코드 복사
        SELECT * FROM iot_data;

    결과

    • NiFi를 통해 MQTT 브로커에서 IoT 데이터를 실시간으로 수집하고, 데이터 정제 및 SQL 변환을 거쳐 Snowflake에 저장할 수 있습니다.

    장점

    1. GUI 기반 설정: 코딩 없이도 복잡한 데이터 워크플로우 설계 가능.
    2. 확장성: 다양한 데이터 소스와 유연하게 연결 가능.
    3. 실시간 처리: 스트리밍 데이터와 배치 데이터를 동시에 처리.
Designed by Tistory.