일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 구현
- Java
- 달팽이
- 이분 탐색
- 유레카
- docker-compose
- 다익스트라
- Gradle
- Spring Cloud Config
- 스프링 시큐리티
- 이분 매칭
- 게이트웨이
- 플로이드 와샬
- spring boot
- 트리
- dp
- 메모이제이션
- 백트래킹
- 도커
- 구간 트리
- 서비스 디스커버리
- BFS
- Zuul
- 주울
- ZuulFilter
- 스택
- 비트마스킹
- Logback
- 완전 탐색
- spring cloud
- Today
- Total
Hello, Freakin world!
Spring Cloud Stream 3.0, 카프카로 이벤트 기반 캐시 기능 구현하기 본문
Spring Cloud Stream 3.0, 카프카로 이벤트 기반 캐시 기능 구현하기
johnna_endure 2021. 3. 25. 17:07시나리오
- 멤버 서비스, 팀 서비스 2개의 서비스가 존재한다.
- 멤버는 하나의 팀에 소속될 수 있다.
- 클라이언트가 멤버를 조회할 때, 멤버가 팀에 속해있다면 팀 서비스에 요청에 팀 정보를 가져온다.
- 멤버 서비스에서 팀 서비스에 요청 데이터를 받으면 이를 멤버 서비스에 캐시한다.
- 팀 서비스 데이터가 수정되거나 삭제될 때, 이벤트를 발생시켜 멤버 서비스에 캐시된 내용을 수정하거나 삭제한다.
멤버 서비스
모든 클래스를 살펴보는건 너무 피곤한 일이니, 중요한 부분만 살펴보자.
멤버 서비스에서 중요한 부분은 3가지다.
1. 팀 서비스에 데이터를 요청하기
2. 팀 데이터를 캐시하기
3. 팀 서비스에서 보내는 이벤트를 받아 처리하기
1. 데이터 요청 부분
RestTemplate 빈을 생성해 동기식 요청을 하고 있다.
...
@RequiredArgsConstructor
@Component
public class TeamClient {
private final RestTemplate restTemplate;
private final TeamCache teamCache;
private static Logger logger = LoggerFactory.getLogger(TeamClient.class);
public TeamDto getTeam(Long teamId) {
//캐시 구현
if(teamCache.containsKey(teamId)) {
logger.info("캐시 히트");
return teamCache.get(teamId);
}
ResponseEntity<TeamDto> response =
restTemplate.exchange("http://localhost:8086/teams/" + teamId, HttpMethod.GET, null, TeamDto.class);
TeamDto result = response.getBody();
logger.info("캐시에 저장");
teamCache.put(teamId, result);
return result;
}
}
2. 캐시 저장소
간단한게 Map을 이용해 캐시 저장소를 구현했다.
import org.springframework.stereotype.Component;
import spring.cloud.memberservice.client.dto.TeamDto;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class TeamCache {
private final ConcurrentHashMap<Long, TeamDto> teamCacheMap = new ConcurrentHashMap<>();
public boolean containsKey(Long teamId) {
return teamCacheMap.containsKey(teamId);
}
public void put(Long teamId, TeamDto team) {
teamCacheMap.put(teamId, team);
}
public TeamDto get(Long teamId) {
return teamCacheMap.get(teamId);
}
public void remove(Long teamId) {
if(containsKey(teamId)) teamCacheMap.remove(teamId);
}
@Override
public String toString() {
return teamCacheMap.toString();
}
}
3. 이벤트 핸들러
사실 이 부분이 가장 눈여겨볼 부분이다. 일단 아래 코드를 살펴보자.
...
import java.util.function.Consumer;
@RequiredArgsConstructor
@Configuration
public class TeamEventConsumers {
private final TeamCache teamCache;
private final TeamClient teamClient;
private Jackson2JsonObjectMapper mapper = new Jackson2JsonObjectMapper();
private static Logger logger = LoggerFactory.getLogger(TeamEventConsumers.class);
@Bean
public Consumer teamUpdated() {
return (o) -> {
logger.info("teamUpdated 이벤트 수신");
try {
TeamUpdatedEvent event = mapper.fromJson(o, TeamUpdatedEvent.class);
updateCache(event.getTeamId());
} catch (IOException e) {
e.printStackTrace();
}
};
}
private void updateCache(Long teamId) {
if(teamCache.containsKey(teamId)) {
teamCache.remove(teamId);
TeamDto teamDto = teamClient.getTeam(teamId);
teamCache.put(teamId, teamDto);
logger.info("[teamId : {}, data : {}] is updated.", teamId, teamDto);
};
}
@Bean
public Consumer teamDeleted() {
return (o)->{
logger.info("teamDeleted 이벤트 수신");
try {
TeamDeletedEvent event = mapper.fromJson(o, TeamDeletedEvent.class);
deleteCache(event.getTeamId());
} catch (IOException e) {
e.printStackTrace();
}
};
}
private void deleteCache(Long teamId) {
if(teamCache.containsKey(teamId)) {
teamCache.remove(teamId);
logger.info("teamId : {} is deleted.", teamId);
}
}
}
Spring Cloud Stream 버전 3.0을 지나면서 코드 작성 방식이 많이 달라졌다.
2.0 버전에선 @EnableBinding, @StreamListener 등 사용했는데 3.0버전이 되면서 2.0에서 지원하던 애너테이션들이 deprecated 대상이 되면서, 2.0 버전의 채널 인터페이스 Source, Sink, Process 들은 각각 함수형 인터페이스인 Supplier, Consumer, Function으로 대체됐다.
위에서는 두 개의 Consumer 반환 타입의 빈 두개를 선언했는데, 이는 각기 다른 두 개의 카프카 컨슈머를 만든 것과 같다.
위의 컨슈머들을 카프카 토픽에 바인딩해줘야 하는데 이는 application.yml 과 같은 구성 정보에 기입한다.
application.yml
# DB 설정
spring:
datasource:
driver-class-name: org.h2.Driver
url: jdbc:h2:mem:test_member
username: sa
password:
jpa:
hibernate:
ddl-auto: create
# 카프카 세팅
cloud:
stream:
kafka:
binder:
brokers: localhost:9091,localhost:9092,localhost:9093
zkNodes: localhost:2181,localhost:2182,localhost:2183
function:
definition: teamUpdated;teamDeleted;
bindings:
teamUpdated-in-0:
destination: team-updated-topic
teamDeleted-in-0:
destination: team-deleted-topic
server:
port: 8085
프로퍼티 중 일부만 살펴보자.
위처럼 반환타입이 Consumer, Supplier, Function인 빈들을 스트림에 사용하려면 cloud.stream.function.definition에 등록된 빈 이름을 알려줘야 된다.
cloud.stream.bindings.teamUpdated-in-0 프로퍼티는 좀 기괴해보인다.
cloud.stream.bindings 뒤에 오는 teamUpdated-in-0는 그냥 bindingName이다. 이는 바인딩 정보를 참조하기 위한 이름이다. 이름은 아무렇게나 지어도 상관없지만 위와 같이 함수형 빈들을 바인딩할 때는 규칙이 있다.
<빈 이름> + in / out + index
메세지를 보낸다면 out, 메세지를 소비한다면 in을 사용한다. index는 다수의 토픽에 메세지를 보내거나, 다수의 토픽으로부터 메세지를 읽을 때 사용할 값이다. 그럴 용도가 아니라면 그냥 0을 써주면 된다.
destination 뒤에 오는건 토픽이름이다. destination 뿐만 아니라 다양한 구성 정보를 정의할 수 있다.
위의 설정으로 teamUpdated 빈을 team-updated-topic에 바인딩했고, teamDeleted 빈을 team-deleted-topic에 바인딩했다.
팀 서비스
팀 서비스에서는 데이터를 수정하거나 삭제한 뒤, 이벤트를 토픽에 보내야 한다.
이는 Spring Cloud Stream 3.0 버전에서 제공하는 StreamBridge를 이용하면 간단하게 수행 가능하다
...
import org.springframework.cloud.stream.function.StreamBridge;
@RequiredArgsConstructor
@Service
@Transactional
public class TeamService {
private final TeamRepository teamRepository;
private final StreamBridge streamBridge;
private static Logger logger = LoggerFactory.getLogger(TeamService.class);
public TeamQueryDto createTeam(TeamCreateDto teamCreateDto) {
Team team = new Team(teamCreateDto.getName());
team = teamRepository.save(team);
return new TeamQueryDto(team.getId(), team.getName());
}
public TeamQueryDto findTeam(Long teamId) {
return teamRepository.findById(teamId)
.map(t -> new TeamQueryDto(t.getId(), t.getName()))
.orElse(new TeamQueryDto(null, null));
}
public TeamQueryDto updateTeam(Long teamId, TeamUpdateDto updateDto) {
Team team = teamRepository.findById(teamId)
.orElseThrow(() -> new NoSuchElementException("id : " + teamId + " 회원을 찾을 수 없습니다."));
team.update(updateDto.getName());
TeamUpdatedEvent event = new TeamUpdatedEvent("updated",team.getId());
streamBridge.send("teamUpdated", event);
logger.info("team-updated 이벤트 발신 : {} ", event);
return new TeamQueryDto(team.getId(), team.getName());
}
public void deleteTeam(Long teamId) {
teamRepository.deleteById(teamId);
TeamDeletedEvent event = new TeamDeletedEvent("deleted", teamId);
streamBridge.send("teamDeleted", event);
logger.info("team-deleted 이벤트 발신 : {}", event);
}
}
스프링에서 제공하는 StreamBridge 빈을 주입받아 send() 를 이용해 메세지를 보낸다.
send 메서드는 bindingName을 인수로 받으며 이는 구성 정보에 지정한 cloud.stream.bindings.<bindingName>이다.
위처럼 객체 데이터를 보내면 JSON 컨버터가 동작해 JSON 형태로 바뀐다.
application.yml
# DB 세팅
spring:
datasource:
url: jdbc:h2:mem:test_team
driver-class-name: org.h2.Driver
username: sa
password:
jpa:
hibernate:
ddl-auto: create
# 카프카 세팅
cloud:
stream:
bindings:
teamUpdated:
destination: team-updated-topic
teamDeleted:
destination: team-deleted-topic
kafka:
binder:
brokers: localhost:9091,localhost:9092,localhost:9093
zkNodes: localhost:2181,localhost:2182,localhost:2183
server:
port: 8086
전체 코드