Kafka 구현 중 발생했던 모든 문제 모음

6 분 소요

1. Kafka Docker 구현 중 Network 문제

문제

#!/bin/bash
port=2181
port_kafka_server_1=9092
name_kafka_server_1="kafka-server-1"
name="zookeeper"
zookeeper_server="${name}:${port}"
network="kafka"

# Must Be Configure network And Installed bitnami/kafka And zookeeper

echo " Config Zookeeper 	=> port : ${port} network : ${network} "
echo " Config kafka-server1     => port : ${port_kafka_server_1} network : ${name_kafka_server_1} , zookeeper : ${zookeeper_server}"
echo " Config Kakfa-UI(Kafdrop) => port : 9000"
## MAKE ZOOKEEPER CONTAINER
echo " => Make Zookeeper Container"
docker run -p $port:$port -d --network ${network} --name ${name} \
	--restart=always \
       	zookeeper

sleep 20

## MAKE KAFKA-BROKER-1 CONTAINER
echo " => Make kafka Broker Container"
docker run -p $port_kafka_server_1:$port_kafka_server_1 -d --network $network --name $name_kafka_server_1 \
	-e ALLOW_PLAINTEXT_LISTENER=yes \
	-e KAFKA_CFG_ZOOKEEPER_CONNECT=$zookeeper_server \
	-e KAFKA_CFG_BROKER_ID=1 \
	-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
	-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 \
	-u 1000 \
	bitnami/kafka

## MAKE KAFKA UI
echo " => Make Kafka UI (Kafdrop) "
docker run -d --rm -p 9000:9000 \
	--network $network \
	--name kafdrop\
	-e KAFKA_BROKERCONNECT=kafka-server-1:9092 \
	-e JVM_OPTS="-Xms32M -Xmx64M" \
	-e SERVER_SERVLET_CONTEXTPATH="/" \
	obsidiandynamics/kafdrop

원인

KAFKA_CFG_ADVERTISED_LISTENERS(advertised.listeners) 여기서 호스트가 문제

  1. UI APP이 컨테이너 인데 다른 컨테이너 간에는 Docker Network를 사용해서 연결했는데 이 경우 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server-1:9092 을 사용해야 한다.
  2. , 외부 클라이언트인 웹소켓 서버는 host ip로 접근해야 하는 상황
    • -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092

즉 한개의 리스너로는 UI Tool과 외부 클라이언트를 동시에 접속하도록 설정할 수 없음, 설치 버전으로는 가능할지 모르나 최소한 Docker로 구성하였을 경우엔 불가능함

해결

리스너를 분리해야 한다 : Internal과 external로, internal은 Docker Network로, external은 외부 클라이언트가 붙도록 해야한다.

KAFKA_CFG_ADVERTISED_LISTENERS,KAFKA_CFG_LISTENERS,KAFKA_CFG_INTER_BROKER_LISTENER_NAME,KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP까지 전부 설정해야 한다.

  1. KAFKA_CFG_LISTENERS에서 외부에서 사용하기 위한 Listener Name을 정의한다.
    • 여기서는 INTERNAL, EXTERNAL 두개로 사용
    • INTERNAL은 Docker 간 네트워킹용, Docker Network에 물려서 Docker Container간 사용한다.
    • EXTERNAL은 외부 클라이언트가 Kafka에 접근할 때 사용한다.
      • 127.0.0.1인 이유는 브로커 서버와 컨슈머 서버가 같으니까 상관없다.
      • ⭐ 만약 브로커 서버가 분리된다면 반드시 HOST IP를 적어야 외부에서 접근이 가능하다
  2. KAFKA_CFG_ADVERTISED_LISTENERS : 실제로 접근 가능한 ip와 포트를 노출
    • INTERNAL은 도커 컨테이너간 통신용이니까 docker network 규칙대로 사용
    • EXTERNAL는 외부 클라이언트 접근용이므로 host ip를 적음, 브로커 서버와 컨슈머 서버가 같으므로 localhost 처리
  3. KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT가 아닌경우 sequrity를 요구한다. PLAINTEXT는 sequrity 상관없이 작동이 되므로 모든 Listener를 PLAINTEXT로 연결한다.
    • ⭐ 운영에서는 SSL 처리해서 해야한다. PLAINTEXT는 절대 안됨
  4. KAFKA_CFG_INTER_BROKER_LISTENER_NAME : 클러스터링 시 브로커간 네트워크 설정, 기본값이 PLAINTEXT라서 Listener Name중 하나를 정해서 사용,
    • 내부 통신용이므로 그냥 INTERNAL 사용
#!/bin/bash
port=2181
port_kafka_server_1=9092
name_kafka_server_1="kafka-server-1"
name="zookeeper"
zookeeper_server="${name}:${port}"
network="kafka"

# Must Be Configure network And Installed bitnami/kafka And zookeeper

echo " Config Zookeeper 	=> port : ${port} network : ${network} "
echo " Config kafka-server1     => port : ${port_kafka_server_1} network : ${name_kafka_server_1} , zookeeper : ${zookeeper_server}"
echo " Config Kakfa-UI(Kafdrop) => port : 9000"
## MAKE ZOOKEEPER CONTAINER
echo " => Make Zookeeper Container"
docker run -p $port:$port -d --network ${network} --name ${name} \
	--restart=always \
       	zookeeper

sleep 20

## MAKE KAFKA-BROKER-1 CONTAINER
echo " => Make kafka Broker Container"
docker run -p $port_kafka_server_1:$port_kafka_server_1 -p 9093:9093 -d --network $network --name $name_kafka_server_1 \
	-e ALLOW_PLAINTEXT_LISTENER=yes \
	-e KAFKA_CFG_ZOOKEEPER_CONNECT=$zookeeper_server \
	-e KAFKA_CFG_BROKER_ID=1 \
	-e KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-server-1:9092,EXTERNAL://127.0.0.1:9093 \
	-e KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9093 \
	-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL \
	-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT \
	-u 1000 \
	bitnami/kafka

## MAKE KAFKA UI
echo " => Make Kafka UI (Kafdrop) "
docker run -d --rm -p 9000:9000 \
	--network $network \
	--name kafdrop\
	-e KAFKA_BROKERCONNECT=kafka-server-1:9092 \
	-e JVM_OPTS="-Xms32M -Xmx64M" \
	-e SERVER_SERVLET_CONTEXTPATH="/" \
	obsidiandynamics/kafdrop

해결 후

해결 완료, Consumer를 볼 수 있다.

Spring APP의 경우 EXTERNAL Listener로 Kafka 접근, Kafdrop은 INTERAL로 Docker Network로 Kafka 접근


Kafka에서 console produce 했음에도 Spring에서 Consume 하지 못하였던 이슈

문제

Spring에서 kafka Consumer 설정을 완료하고 Kafka를 띄어서 콘솔로 메시지를 보냈는데 아무 반응이 없는 것을 확인하였다.

참고로 이건 Docker로 구현했으므로 설치한 버전과 다를 수 있다.

원인

기본 설정만 사용하다보니 외부 네트워크 설정이 안되어있었다. Docker Network를 사용해서 연결하였다 보니 Docker Container 간에는 네트워킹이 가능했는데 막상 외부 클라이언트인 Spring Application에서 네트워킹이 불가능했던 것

따라서 Kafka 설정시 내/외부 클라이언트끼리 접속을 하려면 네트워크 정보 수정이 필수이다.

해결

설치한 Kafka일 경우 ${kafkaHome}/config/server.properties 에서 advertised.listeners, listeners를 수정해야 한다.

Docker를 사용할 경우 대부분 bitnami/kafka를 사용하므로 네트워킹을 위한 ENV를 넘겨야 한다.

내/외부 네트워킹에 필요한 KEY

  • server.properties
    1. listeners
    2. advertised.listeners
    3. inter.broker.listener.name
    4. KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP ← 이건 봐야함
  • docker bitnami/kafka
    1. KAFKA_CFG_ADVERTISED_LISTENERS,
    2. KAFKA_CFG_LISTENERS,
    3. KAFKA_CFG_INTER_BROKER_LISTENER_NAME,
    4. KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP

Consume 되도록 변경 → listeners 켜야함

Kafka에서 listeners는 Kafka 브로커가 클라이언트 요청을 처리하기 위해 리스닝할 호스트 이름 또는 IP 주소와 포트 번호를 설정하는 속성이다. listeners 추가 해야 외부 클라이언트가 Kafka에 접근할 수 있다.

listeners 는 아래 4가지 프로토콜을 지닌다.

  • PLAINTEXT : 평문(Plain text) 통신을 사용하는 리스너입니다.
  • SSL : SSL(Secure Sockets Layer) 암호화 프로토콜을 사용하는 리스너입니다.
  • SASL_PLAINTEXT : SASL(Simple Authentication and Security Layer) 프로토콜과 평문 통신을 사용하는 리스너입니다.
  • SASL_SSL : SASL 프로토콜과 SSL 암호화 프로토콜을 모두 사용하는 리스너입니다.

외부 클라이언트가 —bootstrap-server 로 접근이 가능하도록 설정해야 한다. 즉 아래 설정을 변경해야한다. 기본적으로 주석처리 되어 있다.

  1. advertised.liseners,
  2. listeners

localhost일 경우 필요없긴한데 여러 이득때문에 이 설정을 켜는것이 좋다.

  • 이 설정은 설치하여 받는 kafka일 경우 ${kafka-home}/config/server.properties 에 있다.
  • bitnami/kafka의 경우 다음 ENV를 추가해야 한다.
    • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
    • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092

근데 이걸 완료하고나니까 별도의 컨테이너로 올려둔 UI Tool이 작동이 안된다.

이 이슈가 왜 터졌는지, 어떻게 해결하였는지는 위에 정리하였다.

끝내고 소감

네트워크 공부를 해야겠다.


문제점 1 : 토픽이 2개이고 각각 다른 클래스로 역직렬화 해야함

문제점 : 토픽 2개, 각각 다른 클래스로 역직렬화되어야 함

public <K,V> ConsumerFactory<K,V> consumerFactory ( Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer ) {
                return new DefaultKafkaConsumerFactory<>( new StringDeserializer() , keyDeserializer , new JsonDeserializer<>( ChatInfo.class , new ObjectMapper() ));
}

해결 : KafkaListner를 Topic 별로 분리하여 Bean을 생성하고 각각 클래스로 생성하게 만든다.

/**
 * Chat Topic에 대한 KafkaListener 등록
 *
 * ConcurrentMessageListenerContainer라서 Thread-safe하지 않으나 일단 이거로 구현
 * */
@Bean
public ConcurrentKafkaListenerContainerFactory<String,ChatInfo> kafkaListenerTopicChat() {
        
        ConcurrentKafkaListenerContainerFactory<String, ChatInfo> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory( new StringDeserializer() , new JsonDeserializer<>( ChatInfo.class , new ObjectMapper() ) ) );
        
        return factory;
}

/**
 * Note Topic에 대한 KafkaListener 등록
 *
 * ConcurrentMessageListenerContainer라서 Thread-safe하지 않으나 일단 이거로 구현
 * */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NoteInfo> kafkaListenerTopicNote() {
        
        ConcurrentKafkaListenerContainerFactory<String,NoteInfo> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory( new StringDeserializer() , new JsonDeserializer<>( NoteInfo.class , new ObjectMapper() ) ) );
        
        return factory;
}

public <K,V> ConsumerFactory<K,V> consumerFactory ( Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer ) {
        return new DefaultKafkaConsumerFactory<>( consumerConfigs() , keyDeserializer , valueDeserializer );
}

실제 KafkaListener에서 containerFactory 를 지정한다. 이제 KafkaListener에서 객체를 사용할 수 있다.

@KafkaListener( topics = { "kafkaTest" }, containerFactory = "kafkaListenerTopicNote" )
public void kafkaTest ( NoteInfo data ) {
        
        System.out.println( "@@@@@@@@@@@@@@@@@@@" );
        System.out.println( data.getTestKey() );
        System.out.println( "@@@@@@@@@@@@@@@@@@@" );
        
}

문제점 2 : Message JSON의 정합성 보장 문제

문제점

  • JSONDeserialize로 설정된 Topic에 Producer가 String을 보내는 등 Deserialize에 실패하면 아래 오류가 발생함, 문제는 폴링이라서 이거 오류 무한 호출됨
    paused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sad': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
     at [Source: (byte[])" sad"; line: 1, column: 5]
      at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2418)
      at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:759)
      at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3693)
      at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2781)
      at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:907)
      at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:793)
      at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
      at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2095)
      at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1583)
      at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:585)
      ... 15 common frames omitted
    

해결

  • ErrorHandlingDeserializer를 사용해야한다.
    public <K,V> ConsumerFactory<K,V> consumerFactory ( Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer ) {
              return new DefaultKafkaConsumerFactory<>( consumerConfigs() , keyDeserializer , new ErrorHandlingDeserializer<>( valueDeserializer ) );
      } 
    

[Kafka] 하나의 파티션은 하나의 컨슈머만 받을 수 있다.

문제점

  • 다른 개발자님 추가 후 내 Consumer가 작동이 안되는 것을 확인

    원인

  • 크게 2 가지임
    • Partition은 하나의 Consumer만 연결됨
      • 따라서 다른 개발자님이 프로젝트를 킨 순간 Consumer가 2개가 되어서 이슈가 발생하였던 것
      • Kafka에서 Partition ≥ Consumer이어야 한다.
        • BEST는 Partition = Consumer
        • 이게 성능을 챙기는 가장 좋은 방법임
      • Kafka에서 partiton 1개 늘림
        # partition 수 2개로 늘림
        sh kafka-topic.sh --bootstrap-server testserver:9092 --topic test -partition 2
      
      • GroupID 문제, 모든 Consumer Server가 동일한 Group → 로드밸런싱됨
        • websocket ( Consumer Group )
          • 다른 개발자님 Server Consumer
          • 내 Server Consumer
        • 따라서 개발 서버에서는 모든 Consumer가 같은 토픽에 대해 같은 Consume이 되어야 하므로 Group id를 다르게 처리
        • 운영에서 문제점
          1. 이중화가 될 경우 웹소켓과 별도로 simpleBroker 는 불가능, 구독 정보가 다르게 유지됨
        • 수정
          • 동일한 Message Key를 가진 Message는 항상 동일한 Partition을 타도록 해야함

            KafkaHeaders.KEY 헤더 추가

              Message<NoteInfo> message = MessageBuilder.withPayload( noteInfo )
              										  .setHeader( KafkaHeaders.TOPIC , noteInfo.getTopic() ) // Go Topic
              										  .setHeader( KafkaHeaders.KEY , noteInfo.getKey() ) // 같은 key일 경우 동일한 Portition에서 작업되는 것을 보장하기 위해
              										  .build();
            
          • 설정에서 groupId 동적으로 생성하도록 변경

              /**
              * Group ID 처리
              * 로컬 개발시 모든 개발자 동일한 Topic에 대한 동일한 Consume이 가능하도록 Group ID를 반환
              * - 로컬일때 -> 랜덤으로 GroupID 생성해서 같은 Topic에 대해 동일한 Consume이 가능하도록 설정
              * - 운영일때 -> 웹소켓이 아닐 경우 또는 웹소켓 서버라도 서버가 하나라면 group id 통일해도 상관없음, 단 웹소켓 서버가 2개라면 다르개 설정해야함
              * */
              private String getGroupId () {
              	return SystemConfig.IS_LOCAL ? random : ApplicationPropertiesUtils.getValue( "kafka.consumer.groupId" ,"websocket" );
              }
            

업데이트: