4강. Broker, Replication, ISR(In Sync Replica) in Apache Kafka
Broker : 카프카가 설치되어 있는 서버 단위. 보통 3개 이상의 브로커를 구성할 것을 권장한다.
만약 토픽이 1개고 브로커가 3대라면 브로커 3대 중 1대에 해당 토픽의 정보가 저장된다.
Replication은 카프카 서버 운영에 있어서 핵심적인 요소다.
클러스터에서 서버 장애가 발생했을 때 카프카의 가용성을 보장하는 가장 좋은 방법이다.
Replication은 파티션의 복제를 의미한다. 만약 레플리케이션이 1이라면 파티션은 1개만 존재한다는 것이다.
레플리케이션이 2라면 원본 1개 + 복제본 1개, 총 2개의 파티션이 존재하게 된다.
3이라면 원본 1개와 복제본 2개, 총 3개의 파티션이 존재.
원본 파티션은 Leader partition, 복제본들은 Follower partition이라고 부른다.
리더 파티션과 팔로워 파티션을 합쳐 ISR(In Sync Replica)이라 부른다.
※ 단, [Broker 개수 ≥ Replication 개수] 다.
-> 브로커 1개에 레플리케이션 2개 이상이 존재할 수 없다.
왜 레플리케이션을 사용하는가? 당연히 가용성을 위해서다.
특정 브로커에 문제가 생겨 해당 브로커에 있는 리더 파티션을 사용할 수 없는 경우, 다른 브로커에 있는 복제본으로 복구.
이 경우, 복제본(팔로워 파티션)이 원본(리더 파티션)의 역할을 승계하여 새로운 리더 파티션이 된다.
그렇다면 리더 파티션과 팔로워 파티션의 역할은 무엇인가?
프로듀서가 토픽의 파티션에 데이터를 전달하는데, 이때 데이터를 전달받는 주체가 바로 리더 파티션이다.
프로듀서에는 ack라는 상세 옵션이 있는데, 이 옵션을 통해 고가용성을 유지할 수 있다.
ack 옵션은 파티션의 레플리케이션과 관련이 깊다.
ack는 { 0, 1, all } 3가지 선택지가 있다.
1) 0 일 경우 : 프로듀서가 리더 파티션에 데이터를 전달한 후 응답값을 받지 않는다.
-> 리더 파티션이 데이터를 제대로 받았는지 알 수 없다.
-> 팔로워 파티션들에 데이터가 정상적으로 복제되었는지 알 수 없다.
-> 속도는 빠르지만 데이터 유실 가능성이 있다.
2) 1 일 경우 : 프로듀서가 리더 파티션에 데이터를 전달한 후 데이터를 정상적으로 받았는지 응답값을 받는다.
-> 나머지 팔로워 파티션에 제대로 복제되었는지는 알 수 없다.
-> 리더 파티션이 데이터를 받고 응답값을 보낸 직후에 문제가 생길 경우 데이터 유실 가능성이 있다.
-> 팔로워 파티션에 데이터가 잘 전달되었는지는 보장할 수 없다.
3) all 일 경우 : 프로듀서가 팔로워 파티션들로부터도 응답값을 받는다.
-> 팔로워 파티션들이 리더 파티션으로부터 데이터를 전달받은 후 프로듀서로 응답값 전송
-> 데이터 유실 가능성 없음
-> 0, 1 옵션에 비해 속도가 현저히 느리다
고가용성을 위해 레플리케이션을 사용한다면, 레플리케이션이 많으면 무조건 좋은 것인가?
아니다.
레플리케이션 수가 많아지면 브로커의 리소스 사용량도 늘어난다.
카프카에 들어오는 데이터량과 데이터 저장 시간(retention date)을 고려해 레플리케이션 수를 정해야 한다.
보통 3개 이상의 브로커를 사용한다면 레플리케이션은 3개를 사용한다.
5강. Consumer in Apache Kafka
다른 메시징 시스템들은 보통 컨슈머가 데이터를 가져가면 메시징 큐 안에서 데이터가 사라진다.
하지만 카프카에서는 컨슈머가 데이터를 가져가도 데이터가 사라지지 않는다.
이 특징은 카프카가 데이터 파이프라인에서 핵심적인 역할을 하게 한다.
카프카로 전달된 데이터는 토픽 내부의 파티션에 저장된다.
그리고 컨슈머는 파티션에 저장된 데이터를 가져온다.
-> polling 방식
컨슈머의 역할은 크게 3가지.
1. 토픽의 파티션으로부터 데이터 가져오기(폴링) : 가져와서 특정 DB에 저장하거나, 다른 파이프라인으로 전달
2. 파티션 offset(데이터 번호) 위치 기록(commit)
3. 컨슈머 그룹을 통해 병렬 처리
컨슈머를 사용하려면 프로듀서 때와 동일하게 별도의 라이브러리가 필요하다.
// gradle
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'
// maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
기본적으로 자바 라이브러리를 지원하며, 프로듀서와 마찬가지로 브로커/클라이언트 간 버전 차이에 유의해야 한다.
그럼 간단한 컨슈머 코드를 살펴보자.
public class Consumer {
public static void main(String[] args) {
// Java Properties 객체를 통해 컨슈머의 설정 정의
Properties configs = new Properties();
// bootstrap 서버 설정을 로컬 호스트의 카프카를 바라보도록 설정
// 카프카 브로커의 주소 목록은 이중화 권장(2개 이상의 IP/port)
config.put("bootstrap.servers", "localhost:9092");
// 그룹 아이디(컨슈머 그룹) 지정
configs.put("group.id", "click_log_group");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 설정한 Properties로 카프카 컨슈머 인스턴스 생성
KafkaConsumer <String, String> consumer = new KafkaConsumer <String, String> (configs);
// subscribe() 메서드로 데이터를 가져올 대상 토픽 선언
consumer.subscribe(Array.asList("click_log"));
// 데이터를 실질적으로 가져오는 polling-loop 구문
// poll() 메서드가 포함된 무한루프 : 컨슈머 API의 핵심 로직
while (true) {
// 500ms 동안 데이터를 기다린 후 다음 코드 실행
// 받은 데이터를 records 변수에 담아 반환
ConsumerRecords <String, String> records = consumer.poll(500);
for (ConsumerRecord <String, String> record : records) {
System.out.println(record.value());
}
}
}
}
특정 토픽의 전체 파티션의 데이터가 아니라 일부 파티션의 데이터만 가져오길 원한다면 아래와 같이 assign() 메서드 사용
-> key가 존재하는 데이터라면 이 방식으로 데이터의 순서 보장하며 처리 가능
TopicPartition partition0 = new TopicPartition(topicName, 0);
TopicPartition partition1 = new TopicPartition(topicName, 1);
consumer.assign(Arrays.asList(partition0, partition1));
컨슈머 API의 핵심은 브로커로부터 연속적으로, 컨슈머가 허락하는 한 많은 데이터를 읽어오는 것이다.
while (true) {
ConsumerRecords <String, String> records = consumer.poll(500);
for (ConsumerRecord <String, String> record : records) {
System.out.println(record.value());
}
}
대기한 시간 동안 받은 데이터를 records(record의 묶음 list) 변수에 모아서 보내는데(데이터 배치)
데이터를 처리할 때에는 가장 작은 단위인 record 단위로 나눠서 처리한다 : records 변수를 for loop에서 반복 처리
records를 record로 나누고, record 변수를 value() 메서드로 반환된 값이 프로듀서가 전송한 데이터다.
위 코드는 단순히 println으로 출력하는 식이지만, 실제로는 저 부분에 데이터를 저장소에 저장하는 코드를 작성한다.
1개의 토픽에 2개의 파티션이 있다고 가정하고, 프로듀서에서 컨슈머로 메시지가 전달되는 과정을 보면...
1) 2개의 파티션에 key 값에 따라 데이터를 넣는다.
2) 파티션에 들어간 데이터는 파티션 내에서 고유한 번호(offset)를 가진다.
-> offset은 파티션/토픽별로 별개로 지정된다.
-> 컨슈머가 데이터를 어디까지 읽었는지 확인하는 용도
3) 컨슈머가 데이터를 읽은 후 offset을 commit한다.
4) 가져간 내용에 대한 정보는 카프카의 __consumer_offset 토픽에 저장된다.
-> 컨슈머가 어떤 토픽의 어떤 파티션의 어디가지 읽었는지 저장
-> 컨슈머가 중지되었다가 다시 실행되어도 중지된 시점을 알 수 있다.
-> 시작 위치 복구 가능 : 고가용성
컨슈머 그룹과 파티션 간 동작은 어떻게 굴러가는가?
파티션이 2개가 있고, 컨슈머 그룹에 컨슈머가 1개라면 컨슈머 1개가 파티션 2개에서 데이터를 가져간다.
컨슈머가 2개라면 각 컨슈머가 파티션 1개씩 맡아 데이터를 가져간다.
컨슈머가 3개라면 파티션이 각 컨슈머에 매칭된 후에도 컨슈머가 1개 남는데, 이 컨슈머는 데이터를 가져가지 못한다.
-> 병렬 처리를 위해서 컨슈머 개수는 파티션 개수보다 적거나 같아야 한다.
특정 컨슈머 그룹은 다른 컨슈머 그룹에 영향을 미치지 않는다.
컨슈머 그룹 A에서 데이터를 읽으며 offset을 commit한다 해도 컨슈머 그룹 B에서 commit되는 offset과는 상관 없다.
-> __consumer_offset 토픽에는 컨슈머 그룹별/토픽별로 offset을 나누어 저장한다.
∴ 하나의 토픽에 있는 데이터가 여러 방식(컨슈머 그룹)으로 처리될 수 있다.
'IT 지식' 카테고리의 다른 글
Kafka 입문 (4) (0) | 2022.10.06 |
---|---|
Kafka 입문 (3) (0) | 2022.09.20 |
Kafka 입문 (1) (0) | 2022.09.01 |
TypeORM Migration (0) | 2022.04.25 |
Isolation level (0) | 2022.03.25 |
댓글