Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 백트래킹
- dp
- 유레카
- 비트마스킹
- 게이트웨이
- 구간 트리
- BFS
- 주울
- 완전 탐색
- Zuul
- 이분 매칭
- 다익스트라
- 트리
- spring boot
- 도커
- 스프링 시큐리티
- Java
- 서비스 디스커버리
- Spring Cloud Config
- Logback
- 달팽이
- 플로이드 와샬
- docker-compose
- spring cloud
- 구현
- ZuulFilter
- 이분 탐색
- 메모이제이션
- 스택
- Gradle
Archives
- Today
- Total
Hello, Freakin world!
[카프카] 자바 프로듀서/컨슈머로 간단하게 카프카 클러스터 테스트 본문
카프카, 주키퍼 클러스터를 구성하자.
docker-compose.yml
version: "3.0"
services:
zk1:
image: zookeeper
restart: always
hostname: zk1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk1/data:/data"
zk2:
image: zookeeper
restart: always
hostname: zk2
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk2/data:/data"
zk3:
image: zookeeper
restart: always
hostname: zk3
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
volumes:
- "~/zk-cluster/zk3/data:/data"
zoo-navi:
image: elkozmon/zoonavigator
ports:
- 9000:9000
environment:
HTTP_PORT: 9000
kafka1:
image: confluentinc/cp-kafka
depends_on:
- zk1
- zk2
- zk3
restart: on-failure
ports:
- 9091:9091
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9091
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
- KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
- KAFKA_BROKER_ID=1
- BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093
kafka2:
image: confluentinc/cp-kafka
depends_on:
- zk1
- zk2
- zk3
restart: on-failure
ports:
- 9092:9092
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
- KAFKA_BROKER_ID=2
- BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093
kafka3:
image: confluentinc/cp-kafka
depends_on:
- zk1
- zk2
- zk3
restart: on-failure
ports:
- 9093:9093
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9093
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
- KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
- KAFKA_BROKER_ID=3
- BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093
주키퍼, 카프카 각각 3개의 서버로 구성한다.
도커에 추가한 zookeeper navigator를 이용해 카프카 브로커들이 제대로 주키퍼와 연동됐는지 확인해보자.
컨테이너들을 띄우고 브라우저에서 http://localhost:9000 를 입력한다.
여러 지노드들이 보인다. 그중에 brokers/ids를 따라가보면 현재 주키퍼에 등록된 브로커들의 id들을 볼 수 있다.
이제 간단한 프로듀서, 컨슈머를 생성해 카프카가 제대로 동작하는지 확인해보자.
그래이들로 자바 프로젝트를 생성하고 아래와 같이 카프카 디펜던시를 추가.
build.gradle
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
// https://mvnrepository.com/artifact/org.apache.kafka/kafka
implementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0'
}
test {
useJUnitPlatform()
}
간단하게 프로듀서에서 hello라는 메세지를 보내고, 컨슈머에서 메세지를 콘솔에 출력하는 예제를 만들어 보자.
프로듀서
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "hello"));
producer.close();
}
}
test-topic 이라는 토픽에 "hello" 라는 메세지를 보낸다. 테스트의 편의성을 위해 토픽은 자동 생성되도록 했다.
컨슈머
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class SimpleKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-consumer");
props.put("enable.auto.commit", "true");
KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(List.of("test-topic"));
while(true) {
// System.out.println("루프 시작");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}
컨슈머를 무한 루프를 돌기 때문에 한번만 실행시키고 프로듀서를 여러번 실행시켜보면 hello가 반복적으로 찍히는걸 확인할 수 있다.
'메세징' 카테고리의 다른 글
카프카 매니저 설치 및 사용 (0) | 2021.03.23 |
---|
Comments