Hello, Freakin world!

Spring Cloud Stream 3.0, 카프카로 이벤트 기반 캐시 기능 구현하기 본문

Spring Cloud/Stream

Spring Cloud Stream 3.0, 카프카로 이벤트 기반 캐시 기능 구현하기

johnna_endure 2021. 3. 25. 17:07

시나리오

- 멤버 서비스, 팀 서비스 2개의 서비스가 존재한다.

- 멤버는 하나의 팀에 소속될 수 있다.

- 클라이언트가 멤버를 조회할 때, 멤버가 팀에 속해있다면 팀 서비스에 요청에 팀 정보를 가져온다.

- 멤버 서비스에서 팀 서비스에 요청 데이터를 받으면 이를 멤버 서비스에 캐시한다.

- 팀 서비스 데이터가 수정되거나 삭제될 때, 이벤트를 발생시켜 멤버 서비스에 캐시된 내용을 수정하거나 삭제한다.


멤버 서비스

 

모든 클래스를 살펴보는건 너무 피곤한 일이니, 중요한 부분만 살펴보자.

멤버 서비스에서 중요한 부분은 3가지다.

 

1. 팀 서비스에 데이터를 요청하기

2. 팀 데이터를 캐시하기

3. 팀 서비스에서 보내는 이벤트를 받아 처리하기


1. 데이터 요청 부분

 

RestTemplate 빈을 생성해 동기식 요청을 하고 있다.

...

@RequiredArgsConstructor
@Component
public class TeamClient {

    private final RestTemplate restTemplate;
    private final TeamCache teamCache;

    private static Logger logger = LoggerFactory.getLogger(TeamClient.class);

    public TeamDto getTeam(Long teamId) {
        //캐시 구현
        if(teamCache.containsKey(teamId)) {
            logger.info("캐시 히트");
            return teamCache.get(teamId);
        }
        ResponseEntity<TeamDto> response =
                restTemplate.exchange("http://localhost:8086/teams/" + teamId, HttpMethod.GET, null, TeamDto.class);
        TeamDto result = response.getBody();
        logger.info("캐시에 저장");
        teamCache.put(teamId, result);
        return result;
    }
}

2. 캐시 저장소

 

간단한게 Map을 이용해 캐시 저장소를 구현했다.

import org.springframework.stereotype.Component;
import spring.cloud.memberservice.client.dto.TeamDto;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class TeamCache {
    private final ConcurrentHashMap<Long, TeamDto> teamCacheMap = new ConcurrentHashMap<>();

    public boolean containsKey(Long teamId) {
        return teamCacheMap.containsKey(teamId);
    }

    public void put(Long teamId, TeamDto team) {
        teamCacheMap.put(teamId, team);
    }

    public TeamDto get(Long teamId) {
        return teamCacheMap.get(teamId);
    }

    public void remove(Long teamId) {
        if(containsKey(teamId)) teamCacheMap.remove(teamId);
    }

    @Override
    public String toString() {
        return teamCacheMap.toString();
    }
}

3. 이벤트 핸들러

 

사실 이 부분이 가장 눈여겨볼 부분이다. 일단 아래 코드를 살펴보자.

...

import java.util.function.Consumer;

@RequiredArgsConstructor
@Configuration
public class TeamEventConsumers {

    private final TeamCache teamCache;
    private final TeamClient teamClient;
    private Jackson2JsonObjectMapper mapper = new Jackson2JsonObjectMapper();
    private static Logger logger = LoggerFactory.getLogger(TeamEventConsumers.class);

    @Bean
    public Consumer teamUpdated() {
        return (o) -> {
            logger.info("teamUpdated 이벤트 수신");
            try {
                TeamUpdatedEvent event = mapper.fromJson(o, TeamUpdatedEvent.class);
                updateCache(event.getTeamId());
            } catch (IOException e) {
                e.printStackTrace();
            }
        };
    }

    private void updateCache(Long teamId) {
        if(teamCache.containsKey(teamId)) {
            teamCache.remove(teamId);
            TeamDto teamDto = teamClient.getTeam(teamId);
            teamCache.put(teamId, teamDto);
            logger.info("[teamId : {}, data : {}] is updated.", teamId, teamDto);
        };
    }

    @Bean
    public Consumer teamDeleted() {
        return (o)->{
            logger.info("teamDeleted 이벤트 수신");
            try {
                TeamDeletedEvent event = mapper.fromJson(o, TeamDeletedEvent.class);
                deleteCache(event.getTeamId());
            } catch (IOException e) {
                e.printStackTrace();
            }
        };
    }

    private void deleteCache(Long teamId) {
        if(teamCache.containsKey(teamId)) {
            teamCache.remove(teamId);
            logger.info("teamId : {}  is deleted.", teamId);
        }

    }


}

Spring Cloud Stream 버전 3.0을 지나면서 코드 작성 방식이 많이 달라졌다.

 

2.0 버전에선 @EnableBinding, @StreamListener 등 사용했는데 3.0버전이 되면서 2.0에서 지원하던 애너테이션들이 deprecated 대상이 되면서,  2.0 버전의 채널 인터페이스 Source, Sink, Process 들은 각각 함수형 인터페이스인 Supplier, Consumer, Function으로 대체됐다.

 

위에서는 두 개의 Consumer 반환 타입의 빈 두개를 선언했는데, 이는 각기 다른 두 개의 카프카 컨슈머를 만든 것과 같다.

위의 컨슈머들을 카프카 토픽에 바인딩해줘야 하는데 이는 application.yml 과 같은 구성 정보에 기입한다.

 

application.yml

# DB 설정
spring:
  datasource:
    driver-class-name: org.h2.Driver
    url: jdbc:h2:mem:test_member
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create

# 카프카 세팅
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9091,localhost:9092,localhost:9093
          zkNodes: localhost:2181,localhost:2182,localhost:2183
      function:
        definition: teamUpdated;teamDeleted;
      bindings:
        teamUpdated-in-0:
          destination: team-updated-topic
        teamDeleted-in-0:
          destination: team-deleted-topic

server:
  port: 8085

프로퍼티 중 일부만 살펴보자.

 

위처럼 반환타입이 Consumer, Supplier, Function인 빈들을 스트림에 사용하려면 cloud.stream.function.definition에 등록된 빈 이름을 알려줘야 된다.

 

cloud.stream.bindings.teamUpdated-in-0 프로퍼티는 좀 기괴해보인다.

 

cloud.stream.bindings 뒤에 오는 teamUpdated-in-0는 그냥 bindingName이다. 이는 바인딩 정보를 참조하기 위한 이름이다. 이름은 아무렇게나 지어도 상관없지만 위와 같이 함수형 빈들을 바인딩할 때는 규칙이 있다.

 

<빈 이름> + in / out + index

 

메세지를 보낸다면 out, 메세지를 소비한다면 in을 사용한다. index는 다수의 토픽에 메세지를 보내거나, 다수의 토픽으로부터 메세지를 읽을 때 사용할 값이다. 그럴 용도가 아니라면 그냥 0을 써주면 된다.

 

destination 뒤에 오는건 토픽이름이다. destination 뿐만 아니라 다양한 구성 정보를 정의할 수 있다.

 

위의 설정으로 teamUpdated 빈을 team-updated-topic에 바인딩했고, teamDeleted 빈을 team-deleted-topic에 바인딩했다.


팀 서비스

 

팀 서비스에서는 데이터를 수정하거나 삭제한 뒤, 이벤트를 토픽에 보내야 한다.

이는 Spring Cloud Stream 3.0 버전에서 제공하는 StreamBridge를 이용하면 간단하게 수행 가능하다

 

...
import org.springframework.cloud.stream.function.StreamBridge;

@RequiredArgsConstructor
@Service
@Transactional
public class TeamService {
    private final TeamRepository teamRepository;
    private final StreamBridge streamBridge;

    private static Logger logger = LoggerFactory.getLogger(TeamService.class);

    public TeamQueryDto createTeam(TeamCreateDto teamCreateDto) {
        Team team = new Team(teamCreateDto.getName());
        team = teamRepository.save(team);
        return new TeamQueryDto(team.getId(), team.getName());
    }

    public TeamQueryDto findTeam(Long teamId) {
        return teamRepository.findById(teamId)
                .map(t -> new TeamQueryDto(t.getId(), t.getName()))
                .orElse(new TeamQueryDto(null, null));
    }

    public TeamQueryDto updateTeam(Long teamId, TeamUpdateDto updateDto) {
        Team team = teamRepository.findById(teamId)
                .orElseThrow(() -> new NoSuchElementException("id : " + teamId + " 회원을 찾을 수 없습니다."));
        team.update(updateDto.getName());

        TeamUpdatedEvent event = new TeamUpdatedEvent("updated",team.getId());
        streamBridge.send("teamUpdated", event);
        logger.info("team-updated 이벤트 발신 : {} ", event);
        return new TeamQueryDto(team.getId(), team.getName());
    }

    public void deleteTeam(Long teamId) {
        teamRepository.deleteById(teamId);
        TeamDeletedEvent event = new TeamDeletedEvent("deleted", teamId);
        streamBridge.send("teamDeleted", event);
        logger.info("team-deleted 이벤트 발신 : {}", event);
    }
}

 

스프링에서 제공하는 StreamBridge 빈을 주입받아 send() 를 이용해 메세지를 보낸다.

send 메서드는 bindingName을 인수로 받으며 이는 구성 정보에 지정한 cloud.stream.bindings.<bindingName>이다.

위처럼 객체 데이터를 보내면 JSON 컨버터가 동작해 JSON 형태로 바뀐다.

 

application.yml

# DB 세팅
spring:
  datasource:
    url: jdbc:h2:mem:test_team
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
  # 카프카 세팅
  cloud:
    stream:
      bindings:
        teamUpdated:
          destination: team-updated-topic
        teamDeleted:
          destination: team-deleted-topic
      kafka:
        binder:
          brokers: localhost:9091,localhost:9092,localhost:9093
          zkNodes: localhost:2181,localhost:2182,localhost:2183
server:
  port: 8086

전체 코드

 

johnna-endure/event-driven-cache-example

Spring Cloud Stream 3.0, Kafka. Contribute to johnna-endure/event-driven-cache-example development by creating an account on GitHub.

github.com

 

Comments