본문 바로가기
IT 지식

Kafka 입문 (1)

by ballena 2022. 9. 1.

여기저기 둘러봐도 카프카를 안쓰는 곳이 없다. 데이터 분석 관련 수요가 아니더라도 생각보다 훨씬 널리 사용되고 있었다.

그럼에도 불구하고 얼핏 보기에는 난이도가 높아 입문에 대한 심리적 장벽이 높았다.

시간끌다 뭣도 안되니 일단 박자.

 

유튜브님 도와주세요!


1강. 아파치 카프카 개요 및 설명

https://youtu.be/waw0XXNX-uQ

'데브원영'님의 카프카 입문 강의

앱을 만들 때, 데이터를 주고받는다. 이때 소스 앱과 타겟 앱이 있는데, 서비스가 확장되며 소스/타겟의 숫자가 늘어난다.

그래서 소스와 타겟 1:1의 연결로는 결국 구성이 그물처럼 복잡해진다.

이렇게 되면 데이터 관리, 유지/보수 측면에서 최악이다.

 

아파치 카프카는 이런 문제를 해결하기 위해 개발되었다.

여러 데이터 소스들은 카프카로 데이터를 보내고, 다수의 타겟들은 카프카에서 데이터를 가져온다.

(데이터의 형식은 딱히 제한 없다)

이 결과로 소스 앱과 타겟 앱 간의 연결이 느슨해진다(decoupling).

느슨한 연결은 굳이 MSA/EDA 구조에서만 사용하는 것이 아니라 서비스를 안정적으로 운영하기 위한 중요한 요소다.

여러 처리 단계 중 한 단계에서 문제가 생겼다고 전체 단계가 멈춰버리면 서비스 품질에 영향을 크게 미치기 때문이다.

 

ex)

소스 앱 : 클릭/결제 로그

타겟 앱 : 로그 적재 및 처리

단순히 위 앱들 2가지만 존재하는 서비스라면 몰라도 로그를 적재/처리하는 것 말고도 다른 용도로 사용하기 위해 다른 타겟 앱을 새로 붙여야 할 수 있다.

카프카를 가운데 끼워 넣으면 새로운 타겟 앱을 뭘 개발해도 데이터를 카프카에서 가져오면 된다. 구성이 쉬워진다는 것.

 

카프카에는 Topic이라는 개념이 있다. 간단히 생각하면 소스에서 보내온 데이터를 담는 큐(Queue)라고 생각하면 된다.

Kafka Producer(소스 앱)에서 정해진 토픽으로 데이터를 보내고, Kafka Consumer(타겟 앱)은 정해진 토픽에서 데이터를 가져온다.

 

그럼 카프카는 그냥 메시지 큐 서비스일 뿐인가? 

카프카는 데이터 흐름에 있어 고가용성을 제공한다. 서버에 이슈가 생겨도 손실 없이 복구할 수 있다.

지연도 적고, 처리량도 높다.


2강. Topic in Apache Kafka

https://youtu.be/7QfEpRTRdIQ

위에서 말했듯 카프카에는 다양한 데이터가 들어갈 수 있는데, 데이터가 들어가는 공간을 Topic이라고 한다.

카프카 안에서는 토픽을 여러 개 생성할 수 있다. 토픽의 이름을 목적을 알 수 있게 지으면 유지 보수에 편하다.

 

하나의 토픽 안에는 여러 개의 Partition을 만들 수 있다.

데이터가 파티션으로 들어오면 끝(큐의 출구 부분)에서부터 데이터가 쌓인다.

-> N번...3번 2번 1번 0번 (출구)

그리고 컨슈머는 오래된 데이터(0번)부터 파티션에서 가져간다.

단, 컨슈머가 데이터(record)를 가져가도 파티션에서 데이터가 삭제되지는 않는다.

-> 카프카를 사용하는 가장 큰 이유 중 하나다 : 동일 데이터를 여러 번/다양한 방법으로 처리 가능하다는 것.

 

※ 새로운 타겟(컨슈머)이 파티션에서 데이터를 가져갈 때, 다시 0번에서부터 데이터를 가져간다.

    대신 컨슈머 그룹이 달라야 하며, auto.offset.reset = earliest로 세팅되어 있어야 한다.

 

그렇다면 파티션이 하나 더 늘어난다면?

※ 파티션을 늘릴 수는 있지만 줄일 수는 없다. 늘릴 때 신중하게 늘려야 한다.

프로듀서는 하나의 토픽으로 데이터를 전송할텐데, 하나의 토픽 안에 있는 여러 파티션에 어떻게 데이터를 분산시키는가?

-> 프로듀서가 데이터를 보낼 때 키를 지정할 수 있다.

   1) 키가 null값이고, 기본 파티셔너를 사용한다면

      -> Round-Robin(RR) 방식으로 할당된다.

   2) 키가 있고, 기본 파티셔너를 사용한다면

      -> 키의 hash 값을 구하고, 특정 파티션에 할당한다.

 

그럼 왜 파티션을 늘리는가?

-> 파티션의 개수를 늘리고 컨슈머의 개수를 늘려 데이터 처리를 분산시킬 수 있다.

(그냥 파티션 하나에 다 꼴아박고 컨슈머만 늘려도 분산 가능하지 않나? 파티션 당 용량 제한 같은 것이 있나?)

-> 지금까지 찾아본 바로는 Kafka에서는 파티션 용량 제한 같은 것은 없는 것 같다. 하지만 MSK나 Confluent처럼 제품 레벨로 가면 제한이 있을 수 있다.

-> 데이터가 한 가지 종류만 있으면 모르겠지만, 나중에 설명될 key 값 등으로 용도별 데이터를 나누기 시작하면 분기는 필수. 용도 관계없이 다 하나에 때려박을거면 뭐하러 카프카를 쓰나?

 

그렇다면 파티션에 쌓인 데이터는 언제 삭제되는가?

-> 설정을 통해 레코드가 저장되는 시간과 데이터의 크기 지정 가능

log.retention.ms : 레코드 최대 보존 시간

log.retention.byte : 레코드 최대 보존 크기(byte)


3강. Producer in Apache Kafka

https://youtu.be/aAu0FE3nvbk

Producer는 데이터를 생성하고 카프카 토픽으로 보내는 역할을 한다.

1. 토픽에 해당하는 메시지 생성

2. 특정 토픽으로 데이터 Publish

3. Kafka broker로 데이터를 전송할 때, 전송 성공 여부 인식 및 처리 실패 시 재시도

 

카프카 클라이언트인 컨슈머/프로듀서를 사용하기 위해서는 아파치 카프카 라이브러리 설치 필요.

// gradle
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

// maven
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
</dependency>

위처럼 Gradle이나 Maven으로 쉽게 가져올 수 있는데,  dependency를 잡을 때 버전에 주의해야 한다. 

-> 브로커 버전과 클라이언트 버전의 하위호환성이 완벽하게 지원되지 않는다.

-> 일부 카프카 브로커 버전은 특정 카프카 클라이언트 버전을 지원하지 않을 수 있다.

사용하는 버전의 하위호환성 확인 필요.

https://blog.voidmainvoid.net/193

 

Kafka broker와 java client의 버젼 하위호환성 정리

하위 호환성은 기술 및 컴퓨터 분야에서 새 제품이 이전 제품을 염두에 두고 만들어진 제품에서 별도의 수정 없이 그대로 쓰일 수 있는 것을 뜻한다. Kafka는 1.XX version으로 올라가기 전까지는 "one

blog.voidmainvoid.net

 

다음은 간단한 프로듀서 코드다.

public class Producer {
	public static void main(String[] args) throws IOException {
    
    	// L7 ~ L15 : Producer를 위한 설정 부분.
        
        // Java Properties 객체를 통해 프로듀서의 설정 정의      
    	Properties configs = new Properties();
        
        // bootstrap 서버 설정을 로컬 호스트의 카프카를 바라보도록 설정
        // 카프카 브로커의 주소 목록은 이중화 권장(2개 이상의 IP/port)
        config.put("bootstrap.servers", "localhost:9092");
        
        // key와 value에 대해 StringSerializer로 직렬화 설정
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 설정한 Properties로 카프카 프로듀서 인스턴스 생성
        KafkaProducer <String, String> producer = new KafkaProducer <String, String> (configs);
        
        // 전송할 객체 생성 : 카프카 클라이언트에서 ProducerRecord 클래스 제공
        // ProducerRecord <String, String> (topic_name, key, value)
        // "click_log"는 토픽 이름, "login"은 value. 아래 코드에서는 key 생략됨(null)
        ProducerRecord record = new ProducerRecord <String, String> ("click_log", "login");
        
        // send() 메서드의 파라미터로 ProducerRecord 삽입
        producer.send(record);
        
        // close() 메서드로 프로듀서 종료
        producer.close();
 	}
 }

※ Serializer는 key/value를 직렬화하기 위해 사용 : Byte array, String, Integer Serializer 사용 가능

※ 직렬화 : 자바 시스템 내부의 객체/데이터를 외부의 자바 시스템에서도 사용할 수 있도록 byte 형태로 데이터를 변환하는 기술

 

key는 2강에서 말했듯 토픽의 파티션이 지정될 때 쓰인다.

 

객체 전송을 위한 인스턴스 생성 시 전송할 토픽/key/value를 지정할 수 있다.

 

ProducerRecord 인스턴스 생성 시 파라미터 개수에 따라 자동으로 오버로딩되어 인스턴스가 생성된다.

위 코드에서 key 값을 넣은 인스턴스를 생성하려면

ProducerRecord <String, String> ("click_log", "1", "login")

이렇게 될 것이다.

 

파티션이 2개 이상일 때 key가 생략된(null) 레코드를 보낸다면 RR 방식으로 각 파티션에 쌓일 것이다.

그렇다면 다음 경우를 보자.

producer.send(new ProducerData<String, String>("click_log", "1", "buy"));
producer.send(new ProducerData<String, String>("click_log", "2", "review"));

key 값이 null이 아닐 경우 해당 key 값을 특정 hash 값으로 변경시켜 파티션과 1:1 매칭을 시킨다.

위 코드를 실행시켰을 때 partition #0과 partition #1이 있다면 각 파티션에는 동일한 key 값을 가진 value만 쌓일 것이다.

보기 좋게 partition #0에는 buy 레코드만, partition #1에는 review 레코드만 쌓일 것이다.

 

그런데 여기서 파티션을 하나 더 추가한다면?

보기 이쁘게 매칭되던 key/value - 파티션 구성이 깨진다.

-> 키와 파티션의 일관성(연결)이 보장되지 않는다.

 

즉, key를 사용할 경우 키-파티션 간 일관성이 깨지는 상황에 대비해 파티션 개수를 생성해야 한다.

첫 설정 이후 파티션을 추가하는 것은 권장하지 않는다.

 

단순히 카프카에 데이터를 보내는 것은 위에서 본 코드와 같이 간단하다.

하지만 데이터 유실/브로커 이슈에 대응하기 위해서는 추가적인 옵션과 코드가 필요하다.

'IT 지식' 카테고리의 다른 글

Kafka 입문 (3)  (0) 2022.09.20
Kafka 입문 (2)  (0) 2022.09.05
TypeORM Migration  (0) 2022.04.25
Isolation level  (0) 2022.03.25
대칭키/비대칭키 암호화 방식  (0) 2022.03.21

댓글