스프링 부트를 활용한 실시간 데이터 스트리밍 시스템

Java 예제

중급 난이도
예제 타입
11/05 등록일

스프링 부트를 활용한 실시간 데이터 스트리밍 시스템

중급
태그
예제 실습 프로젝트 kafka 스트리밍 데이터 스프링부트
## 프로젝트 개요
실시간 데이터 처리 및 전송이 필요한 엔터프라이즈 환경에서 발생하는 문제를 해결하기 위한 시스템입니다. 예를 들어, 센서 데이터 수집, 로그 분석, 실시간 통계 집계 등에 활용할 수 있습니다.

## 주요 기능
- Kafka를 사용한 메시지 브로커 통합
- 스트리밍 데이터 처리 및 저장
- 실시간 통계 시각화
- 외부 API 연동 및 데이터 전송

## 사용 방법
1. 프로젝트 생성 후 의존성 추가
2. Kafka 설정과 토픽 연결
3. 데이터 수신 및 처리 로직 구현
4. 메모리 기반의 실시간 통계 계산
5. REST API를 통해 결과 전송

## 확장 가능성
- Redis 또는 DB에 스태틱 데이터 저장
- 스트리밍 데이터 분석을 위한 Apache Flink 연동
- 웹소켓을 활용한 실시간 UI 통합
코드 예제
// Maven 의존성 추가
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

// Kafka 데이터 수신 및 처리 예제
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class DataStreamConsumer {
    private final Map<String, Long> statsMap = new ConcurrentHashMap<>();

    @KafkaListener(topics = "sensor-topic", groupId = "realtime-group")
    public void consume(String message) {
        try {
            // 메시지 파싱 (예: JSON)
            Map<String, Object> data = parseJson(message);
            String sensorId = (String) data.get("id");
            long value = ((Number) data.get("value")).longValue();

            // 실시간 통계 계산
            statsMap.put(sensorId, statsMap.getOrDefault(sensorId, 0L) + value);
        } catch (Exception e) {
            System.err.println("Error processing message: " + e.getMessage());
        }
    }

    private Map<String, Object> parseJson(String json) {
        // 실제 로직은 JSON 파서(예: Jackson) 사용
        return new ConcurrentHashMap<>(); // 예시로 빈 맵 반환
    }
}

// REST API를 통한 결과 전송
import org.springframework.web.bind.annotation.*;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/stats")
public class StatsController {
    private final Map<String, Long> statsMap = new ConcurrentHashMap<>();

    @GetMapping("/{sensorId}")
    public long getSensorStats(@PathVariable String sensorId) {
        return statsMap.getOrDefault(sensorId, 0L);
    }

    @PostMapping("/update")
    public void updateSensorData(@RequestBody Map<String, Object> data) {
        String sensorId = (String) data.get("id");
        long value = ((Number) data.get("value")).longValue();
        statsMap.put(sensor, statsMap.getOrDefault(sensorId, 0L) + value);
    }
}
등록일: 2025년 11월 05일 02:35
언어 정보
언어
Java
카테고리
Enterprise
인기도
#3
학습 팁
코드를 직접 실행해보세요
변수를 바꿔가며 실험해보세요
오류가 나도 포기하지 마세요
다른 예제도 찾아보세요