Hello, Freakin world!

[카프카] 자바 프로듀서/컨슈머로 간단하게 카프카 클러스터 테스트 본문

메세징

[카프카] 자바 프로듀서/컨슈머로 간단하게 카프카 클러스터 테스트

johnna_endure 2021. 3. 23. 04:15

카프카, 주키퍼 클러스터를 구성하자.

 

docker-compose.yml

version: "3.0"
services:
  zk1:
    image: zookeeper
    restart: always
    hostname: zk1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk1/data:/data"

  zk2:
    image: zookeeper
    restart: always
    hostname: zk2
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk2/data:/data"

  zk3:
    image: zookeeper
    restart: always
    hostname: zk3
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    volumes:
      - "~/zk-cluster/zk3/data:/data"

  zoo-navi:
    image: elkozmon/zoonavigator
    ports:
      - 9000:9000
    environment:
      HTTP_PORT: 9000

  kafka1:
    image: confluentinc/cp-kafka
    depends_on:
    - zk1
    - zk2
    - zk3
    restart: on-failure
    ports:
    - 9091:9091
    environment:
    - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9091
    - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
    - KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
    - KAFKA_BROKER_ID=1
    - BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093

  kafka2:
    image: confluentinc/cp-kafka
    depends_on:
    - zk1
    - zk2
    - zk3
    restart: on-failure
    ports:
    - 9092:9092
    environment:
    - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9092
    - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    - KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
    - KAFKA_BROKER_ID=2
    - BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093

  kafka3:
    image: confluentinc/cp-kafka
    depends_on:
    - zk1
    - zk2
    - zk3
    restart: on-failure
    ports:
    - 9093:9093
    environment:
    - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://호스트IP:9093
    - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
    - KAFKA_ZOOKEEPER_CONNECT=zk1:2181,zk2:2181,zk3:2181
    - KAFKA_BROKER_ID=3
    - BOOTSTRAP_SERVERS=kafka1:9091,kafka2:9092,kafka3:9093

 

주키퍼, 카프카 각각 3개의 서버로 구성한다.

 

도커에 추가한 zookeeper navigator를 이용해 카프카 브로커들이 제대로 주키퍼와 연동됐는지 확인해보자.

 

컨테이너들을 띄우고 브라우저에서 http://localhost:9000 를 입력한다.

 

도커에 정의했던 서비스 이름과 포트를 적어준다.


현재 위치("/")에서 진입가능한 znode들이 왼쪽에 있다.


여러 지노드들이 보인다. 그중에 brokers/ids를 따라가보면 현재 주키퍼에 등록된 브로커들의 id들을 볼 수 있다.

3개의 브로커가 모두 등록된 것을 확인.


이제 간단한 프로듀서, 컨슈머를 생성해 카프카가 제대로 동작하는지 확인해보자.

 

그래이들로 자바 프로젝트를 생성하고 아래와 같이 카프카 디펜던시를 추가.

 

build.gradle

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

    // https://mvnrepository.com/artifact/org.apache.kafka/kafka
    implementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.7.0'

}

test {
    useJUnitPlatform()
}

간단하게 프로듀서에서 hello라는 메세지를 보내고, 컨슈머에서 메세지를 콘솔에 출력하는 예제를 만들어 보자.

 

프로듀서

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleKafkaProducer {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("test-topic", "hello"));

        producer.close();
    }
}

test-topic 이라는 토픽에 "hello" 라는 메세지를 보낸다. 테스트의 편의성을 위해 토픽은 자동 생성되도록 했다.

 


컨슈머

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "test-consumer");
        props.put("enable.auto.commit", "true");

        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(List.of("test-topic"));
        while(true) {
//            System.out.println("루프 시작");
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record: records) {
                System.out.println(record.value());
            }
        }
    }
}

 

컨슈머를 무한 루프를 돌기 때문에 한번만 실행시키고 프로듀서를 여러번 실행시켜보면 hello가 반복적으로 찍히는걸 확인할 수 있다.

 

'메세징' 카테고리의 다른 글

카프카 매니저 설치 및 사용  (0) 2021.03.23
Comments