6강. Lag in Apache Kafka
프로듀서는 파티션으로 데이터를 넣고, 컨슈머는 파티션으로 들어온 데이터를 읽는다.
위 2개는 오프셋으로도 지정이 된다 : 프로듀서가 마지막으로 넣은 오프셋/컨슈머가 마지막으로 읽은 오프셋
만일 프로듀서가 데이터를 넣는 속도가 컨슈머가 데이터를 읽어가는 속도보다 빠르다면 2개 오프셋 간에 차이가 발생하게 되는데, 이 차이를 Consumer Lag이라고 한다.
토픽에 여러 파티션이 존재할 경우 lag도 여러 개가 존재할 수 있다. 이 여러 개의 lag 중 가장 높은 lag을 records-lag-max라고 한다.
lag의 숫자를 통해 해당 토픽에 연결되어 있는 프로듀서와 컨슈머의 상태를 추측할 수 있다.
주로 컨슈머의 상태를 볼 때 사용한다.
-> 컨슈머가 성능이 나오지 않거나 비정상적으로 동작할 때 lag이 발생하기 때문
7강. Lag을 모니터링하기 위한 오픈소스 Burrow
컨슈머 lag을 실시간으로 모니터링하고 싶다면, lag 데이터를 저장소(Elasticsearch/InfluxDB)에 에 넣은 뒤, Grafana 대시보드를 통해 확인할 수 있다.
하지만 위와 같은 방식은 또 다른 컨슈머로서 lag을 모니터링하는 것인데, 이런 구성은 그다지 권장되지 않는다.
왜나하면 컨슈머 로직으로 lag을 수집하게 되면 컨슈머의 상태에 의존성이 걸리기 때문이다.
-> 컨슈머가 비정상적으로 종료되면 컨슈머는 더 이상 lag 정보를 보낼 수 없어 lag을 측정할 수 없게 된다.
-> 컨슈머가 추가될 때마다 해당 컨슈머의 lag 정보를 특정 저장소로 던져주는 로직을 넣어야 한다.
-> lag을 수집할 수 없는 컨슈머라면 운영이 까다로워진다.
이러한 상황에 lag 모니터링을 위해 링크드인에서 만든 것이 Burrow.
Golang으로 작성되어 있다.
Burrow는 컨슈머 lag 모니터링을 도와주는 독립적인 애플리케이션이다.
Burrow의 특징
1. 멀티 카프카 클러스터 지원
카프카 클러스터가 여러 개가 있어도 Burrow application 1개만 연동하면 해당 클러스터들에 붙은 컨슈머의 lag을
모두 모니터링할 수 있다.
2. Sliding window를 통한 Consumer의 status 확인
Burrow는 sliding window를 통해 컨슈머의 status를 ERROR/WARNING/OK로 표현한다.
데이터량이 일시적으로 많아져 컨슈머 오프셋이 증가되고 있으면 WARNING으로 정의
데이터량이 많아지는데 컨슈머가 데이터를 가져가지 않으면 ERROR로 정의
3. HTTP API 제공
8강. Partitioner in Producer
프로듀서가 토픽 안의 파티션으로 데이터를 보내면 무조건 파티셔너를 통해 데이터를 전달하게 된다.
파티셔너는 데이터를 토픽의 어떤 파티션에 넣을지 결정하는 역할을 한다.
그럼 무엇을 기준으로 결정하는가?
레코드에 포함된 메시지 키/메시지 값에 따라서 파티션의 위치를 결정하게 된다.
파티셔너의 기본값은 UniformStickyPartitioner로 설정된다.
이 설정값은 메시지 키가 있는 경우/없는 경우로 나뉜다.
1. 메시지 키가 없을 경우
데이터는 라운드 로빈 방식으로 각 파티션에 들어간다. 하지만 기존 RR 방식과는 살짝 다른데, 각각의 데이터를 분배하는 것이 아니라 배치 단위로 데이터를 모아서 보낸다. 이 배치 단위의 모아진 데이터가 RR 방식으로 각 파티션에 분배되는 것.
2. 메시지 키가 있는 경우
각 레코드에 있는 메시지 키는 파티셔너에 의해 특정 해쉬값을 생성한다. 그리고 이 해쉬값을 기준으로 어떤 파티션에 들어갈지 정해진다. 메시지 키가 같다면 동일한 해쉬값을 만들어내기에 같은 메시지 키를 가진 레코드들은 같은 파티션에 모이게 된다. 같은 파티션에 모인 이상 하나의 큐 안에서 동작하므로 메시지의 순차성 또한 보장된다.
※ 메시지 키로 넣는 값은 string도 상관 없다.
위에서 언급한 파티셔너는 기본값으로 제공되는 파티셔너인데, 카프카에서는 Partitioner 인터페이스를 통해 사용자가 직접 설정하는 custom partitioner도 사용 가능하다.
파티셔너 인터페이스를 사용해 커스텀 파티셔너 클래스를 만들고, 메시지 키/메시지 값/토픽 이름에 따라서 특정 레코드를 특정 파티션으로 보낼 수 있다.
특정 레코드를 특정 파티션으로 보내서 얻는 이득은 무엇인가?
-> 해당 레코드를 더 빠르게 처리할 수 있다 : 영상에서는 VIP 고객을 위한 처리를 예시로 들었다.
AMQP 기반 메시징 시스템에서 우선 순위 큐를 만드는 것과 유사하다고 볼 수 있다.
※ AMQP(Advanced Message Queue Protocol)
클라이언트 미들웨어 브로커 간 데이터 교환을 위한 MQ 기반 메시지 교환 프로토콜.
특징으로는 이기종 간 메시지 교환/속도 및 응답성이 있다.
AMQP를 활용한 사례로는 RabbitMQ(OpenStack 기반), ActiveMQ(JVM 기반), ZeroMQ(임베디드 기반) 등이 있다.
※ MQ(Message Queue)
메시지 기반 미들웨어. 메시지를 사용해 여러 서비스/시스템/애플리케이션을 연결해 주는 솔루션이다.
9강. Apache Kafka Streams
실시간으로 발생하는 데이터를 처리하기 위해 사용하는 라이브러리.
카프카 스트림즈는 카프카에서 공식적으로 제공하는 자바 라이브러리다.
토픽에 있는 데이터를 낮은 지연과 함께 빠른 속도로 처리할 수 있다.
라이브러리로 제공되는 만큼 JVM 기반 언어(자바, 스칼라, 코틀린)를 사용해 개발할 수 있다.
카프카 스트림즈의 장점
1. 카프카와의 완벽 호환
일반적으로 카프카를 사용할 경우 카프카는 이벤트 저장소로만 사용하고 저장된 데이터를 Spark, LogStash 등의 외부 툴과 연동해 사용할 것이다. 하지만 외부 툴과 연동할 경우 오픈 소스 특성상 빠르게 변화하는 카프카의 버전을 따라가지 못할 수 있다. 그러나 카프카 스트림즈는 공식적으로 제공되는 라이브러리이기에 카프카가 릴리즈될 때마다 카프카 클러스터와 계속해서 호환된다. 또한 카프카에 보안 기능이나 ACL 등이 붙어 있어도 호환에 문제가 없다. 마지막으로, 데이터 유실/중복 전달의 가능성 없이 한 번만의 처리를 보장하는 기능을 가지고 있다. 이러한 기능은 카프카와 연동 가능한 이벤트 프로세싱 도구 중에서도 제공하는 도구가 거의 없다고 한다.
2. 스케줄링 도구가 필요 없다
카프카와 연동하는 스트림 프로세싱 도구로 가장 많이 사용되는 것은 스파크 스트림이다. 이 경우 카프카와 연동해 마이크로 배치 처리를 하는 이벤트 데이터 애플리케이션을 만들기 위해서는 스파크 스트리밍/스파크 구조적 스트림을 사용한다. 하지만 스파크를 운영하기 위해서는 yarn/mesos와 같은 클러스터 관리자/리소스 매니저 등이 필요하다. 추가로 클러스터 운영을 위한 대규모 장비도 필요해진다.
그러나 카프카 스트림즈를 사용하면 별도의 스케줄링 도구가 필요 없어진다. 스트림즈 애플리케이션을 원하는 만큼 배포하면 된다. 데이터량이 적을 때에는 대충 한두개 띄워서 사용하다가 데이터량이 늘어날 경우 스케일 아웃해서 확장하면 된다.
3. 스트림즈 DSL(Domain Specific Language)과 프로세서 API 제공
스트림즈를 구성할 때 대부분의 경우는 스트림즈 DSL을 사용해 구현할 수 있다. 스트림즈 DSL에는 이벤트 기반 데이터 처리를 할 때 필요한 다양한 기능들(map, join, window 등의 메서드)이 포함되어 있다. 만약 필요한 기능을 스트림즈 DSL에서 제공하지 않는다면 프로세서 API를 사용해 구현하면 된다.
스트림즈 DSL에서는 KStream, KTable, GlobalKTable이라는 스트림 처리 방식을 제공하는데, 이 방식들을 사용하면 카프카를 스트림 데이터 처리 뿐만 아니라 대규모 Key-Value 저장소로도 활용할 수 있다.
4. 자체적으로 로컬 상태 저장소를 사용한다.
실시간으로 들어오는 데이터를 처리하는 방식은 상태 기반(Stateful)/비 상태 기반(Stateless) 처리 방식이 있다. 비 상태기반 처리는 필터링이나 데이터를 변환하는 처리 방식이다. 이러한 방식은 데이터가 들어오는대로 바로 처리하고 프로듀스하기에 유실이나 중복이 발생할 염려가 적으며, 개발하기도 쉽다. 하지만 상태 기반 처리는 구현할 때 난이도가 급격하게 높아진다. window, join, aggregation과 같은 처리는 이전에 받은 데이터를 프로세스가 메모리에 저장하며 다음에 들어올 데이터를 참조해서 처리해야 하기 때문이다.
카프카 스트림즈는 이러한 어려운 부분을 해결하기 위해 로컬에 rocksdb를 사용해 상태를 저장하며, 상태에 대한 변환 정보를 카프카의 변경 로그(changelog) 토픽에 저장한다. 이렇게 되면 프로세스에 장애가 발생하더라도 상태들이 안전하게 저장되기에 장애 복구가 가능하다.
// payment 토픽에 들어온 데이터 중 메시지 키가 unknown인 데이터를 필터링해서
// unknown-payment 토픽으로 보내는 코드
KStream<String, String> paymentStream = builder.stream("payment");
KStream<String, String> filterStream = paymentStream
.filter((key, value) -> key.equals("unknown"));
filterStream.to("unknown-payment");
카프카 스트림즈를 사용하면 컨슈머로 폴링하거나 프로듀서를 어렵게 구현할 필요가 없다.
'IT 지식' 카테고리의 다른 글
Lag Compensation (0) | 2022.11.28 |
---|---|
Kafka 입문 (4) (0) | 2022.10.06 |
Kafka 입문 (2) (0) | 2022.09.05 |
Kafka 입문 (1) (0) | 2022.09.01 |
TypeORM Migration (0) | 2022.04.25 |
댓글