Hadoop eco

[Kafka] 성능 측정 지표

root_bridger 2023. 5. 23. 15:45

성능 지표


성능을 측정할 때 어디에 중점을 두고 테스트를 하느냐에 따라 적합과 부적합이 나뉠 수 있다고 생각한다. 아래는 카프카 클러스터의 성능을 체크할 때 알아볼 수 있는 지표를 추려본 내용이다.

각각의 목표는 trade-off가 발생한다.

  • Throughput vs Latency
  • Durability vs Availability

Throughput : 처리량


얼마나 많은 데이터를 처리할 수 있는 지에 대한 지표. 시간 당 얼마 만큼의 메세지를 처리할 수 있는지를 말한다.

 

고려할 만한 옵션들 - Producer

더보기
  • batch.size
    • 같은 파티션으로 보내는 다수의 레코드를 배치로 묶어 보내기 위해 bytes를 단위로 묶어 전송할 것인지 설정
  • linger.ms
    • 배치 형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정.
    • 프로듀서는 지정된 배치 사이즈에 도달하면 이 옵션과 관계없이 즉시 메시지를 전송하고, 배치 사이즈에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을 때 메시지들을 전송.
  • compression.type
  • acks
    • 해당 옵션은 0, 1, all or -1 값을 가질 수 있는데, 이 값에 따라서 프로듀서가 다음 메시지를 전달하는 시간을 결정할 수 있음
  • buffer.memory
    • producer가 kafka 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트
    • Producer가 보내지 못한 메시지를 보관할 메모리의 크기로 만약 메모라가 full이 되면, 다른 메시지 전송을 blocking 하게 됩니다. 또한 memory 여유가 생기거나, max.block.ms를 초과하면 전송 가능하게 된다.
    • 파티션이 많지 않으면, 조정할 필요가 없지만 파티션이 많다면 메모리를 늘려 blocking 없이 더 많은 데이터가 전송되도록 설정할 필요가 있음.
  • delivery.timeout.ms
    • send() 호출 후 반환에 대한 성공 또는 실패를 보고하는 시간의 상한.
    • 브로커로부터 ack을 받기 위해 대기하는 시간이며 재 전송에 허용된 시간.
    • retries 옵션 대신 재시도 제한에 대한 설정을 하는 옵션
  • enable.idempotence
    • 메시지의 순서를 보장해주는 옵
    • enable.idempotence를 사용 시 Batch 0이 실패한다면 그 뒤에 따라오는 1,2,3,4 .. 같은 후속 Batch 들도 실패 처리. (OutOfOrderSequenceException)
  • max.in.flight.requests.per.connection
    • 한 번에 몇 개의 요청(Request)을 전송할 것인가를 결정
    • 1로 설정되어 있으면 프로듀서는 한 번에 하나의 요청을 전송하고 응답을 받은 이후 다음 요청을 전송
    • 2 이상으로 설정되어 있으면 설정된 만큼 요청을 전송하고 응답을 기다림

고려할 만한 옵션 - Consumer

더보기
  • fetch.min.bytes
    • Consumer가 한번에 가져올 수 있는 최소 데이터 사이즈 설정으로 만약 지정한 사이즈보다 작은 경우, 데이터가 누적될 때까지 기다리게 됨.
    • 해당 옵션을 증가 시키면 브로커로 요청하는 횟수가 감소하며, 브로커의 리소스 사용을 절감. (Producer에서 batch.size를 증가 하는 것과 동일한 효과).
  • fetch.max.wait.ms
    • 해당 옵션은 Consumer의 옵션으로 consumer에서 데이터를 가져오는 최소 시간으로 새로운 데이터가 입력되어도, 해당 시간 이전에는 가져가지 않습니다.
    • 이 옵션을 통해서 내부적으로 consumer가 fetch 요청을 해도, 브로커가 보내지 않습니다.
  • Consumer group 활용
    • 이번에는 Consumer Group 을 활용해서 Kafka 브로커 Queue에 있는 데이터를 바로 바로 처리하며, 여러 개의 Consumer가 처리할 수 있어서 처리량이 높아짐.

고려할 만한 옵션 - Broker

더보기

 

  • Topic 당 1개의 Partition 만 설정
    • 순서 보장
  • Partition 증가
    • 1개 이상의 독립된 스레드가 붙어 작업할 수 있기 때문에
  • 많은 Memory, 많은 file descriptors
    • 충분히 많은 용량의 메모리가 있어야 페이지 캐싱 될 메모리 공간도 많아짐 (Kafka는 page cache를 적극 활용하기 때문에)
    • file descriptors 개수를 충분히 증가 시켜 producer와 consumer의 커넥션이 많아질 것을 준비.
  • message.max.bytes
    • 한번에 요청 batch에서 전송 가능한 최대 bytes
    • 해당 용량을 넘어가는 요청이 들어오면 전송 실패2
  • socket.send.buffer.bytes
    • 소켓 서버가 사용하는 송수신 버퍼 (SO_SNDBUF) 사이즈
  • socket.receive.buffer.bytes
    • 소켓 서버가 사용하는 송수신 버퍼 (SO_RCVBUF) 사이

Latency : 지연 시간


얼마나 빠르게 하나의 메시지를 빠르게 전달 하는 지에 대한 지표. 얼마 만큼의 메세지를 처리하는데 몇 초가 걸리나를 측정.

 

고려할 만한 옵션들 -  Producer

더보기
  • linger.ms
    • Producer에서 broker로 데이터를 보내기 위해 기다리는 시간
    • 0으로 설정해서 데이터를 수집하는 순간 기다리지 않고 바로 broker로 전송하도록 함.
  • compression.type
    • CPU : 압축을 위해 자원 사용
    • NW : 압축된 경우 NW bandwidth 사용량 줄어듬
    • 압축 성능에 따라 cpu 사용, nw bandwidth 줄여서 최소화 가능
    • 압축률이 적더라도 CPU 사용량이 적은 압축법으로 최대한 지연시간 단축(lz4)
    • 압축을 사용하게 되면 발송에서 약간의 지연이 발생하긴 함.
  • acks
    • Broker로부터 메세지 전송결과를 받는 방법에 대한 설정
    • 0 : Producer는 자신이 보낸 메시지에 대한 ack를 기다리지 않음. 서버가 데이터를 받았는지 보장하지 않고, 클라이언트는 전송 실패에 대한 결과를 알지 못하기 때문에 재요청 설정도 적용되지 않음. 다만 응답을 기다리지 않고 바로 다음 메세지를 보낼 수 있기 때문에 빠르게 전송 가능
    • 1 : Leader Partition으로 부터 데이터 복제 없이 원본만 확인되면 결과를 리턴

고려할 만한 옵션들 -  Consumer

더보기
  • fetch.min.bytes (default 1)
    • Broker에서 데이터를 가져오는 최소 size
    • 1: 1byte만 있어도 요청 시 바로 전송 (지연 없음)

고려할 만한 옵션들 -  Broker

더보기
  • Swap memory 비활성화
    • Kafka 구동 시 Heap memory를 초과하게 되는 경우 데이터를 swap 공간으로 복사하게 되는데, Kafka의 할당 자원 부족에도 계속 메모리 사용량을 유지할 수 있어 가용성은 올라가지만 성능 저하를 유발함.
      (한번 swap 메모리로 빠지면 다시 돌아오게 할 수 없기 때문)
    • Kafka Heap memory 초과는 저장하는 데이터가 아닌 Kafka 내부 프로세스에서 사용하는 메모리를 의미함.
    • vm.swappiness =0으로 맞춰 메모리에서만 처리하도록 해야 함.
  • Partition 개수 제한
    • 너무 많은 Partition 수는 메세지 지연을 유발함. (partition 복사를 위해 시간을 더 쓰기 때문에 그만큼 지연)
  • Broker 수는 많게, Partition 수는 적게
    • 하나의 Broker에서 담당하는 복제 본을 줄여 복제에 소요되는 시간을 최소화
  • num.relica.fetchers
    • Follow broker의 I/O 병렬 수준을 정의(기본 1)
    • leader borker에서 데이터를 복제하는 thread의 개수

Durability : 내구성


얼마나 메시지의 유실을 최소화 하는 지에 대한 지표. 중복된 메세지가 없도록 하는 신뢰성도 같이 고려 해봐야 함.

고려할 만한 옵션들 -  Producer

더보기
  • acks = all
    • 모든 replica에 복제가 완료된 후 producer에 ack 리턴
    • ack=all 이면, min.insync.replicas = replication.factor 동일하게 설정
  • min.insync.replicas
    • ISR 상태를 가지는 replica의 최소 개수
    • acks = all 이라면, producer에 응답하기 위한 replica의 ISR 최소 개수 (복제된 수)
  • retries
    • Producer의 전송 실패 시 자동으로 재 전송 하는 횟수
    • 위에서 설명 한 것처럼, delivery.timeout.ms를 사용해 재 전송 설정을 하는 것 권장.
    • max.in.flight.requests.per.connection = 1로 설정 (한번에 1개 요청)
      • 1개 이상 시 순서가 보장이 안되기 때문에 위험
  • enable.idempotence
    • 메시지의 순서를 보장해주는 옵션
    • enable.idempotence를 사용 시 Batch 0이 실패한다면 그 뒤에 따라오는 1,2,3,4 .. 같은 후속 Batch들도 실패 처리. (OutOfOrderSequenceException)

고려할 만한 옵션들 -  Consumer

더보기
  • auto.offset.reset
    • Consumer가 비정상적인 종료 혹은 처음 연결 된 이유로 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않은 경우에 설정 값에 따라 리셋.
      • earliest : 가장 초기의 오프셋 값으로 설정
      • latest : 가장 마지막의 오프셋 값으로 설정
      • none : 이전 오프셋 값을 찾지 못하면 에러
    • 메세지의 중복 읽기를 최소화 하기 위해서는 ‘latest’로 설정해야 함.
  • enable.auto.commit
    • 컨슈머의 오프셋 커밋을 자동으로 할 것인지를 설정하는 매개변수.(default true)
  • auto.commit.interval.ms
    • enable.auto.commit 설정 값이 true인 경우 자동으로 오프셋을 커밋하는 시간 간격을 설정
    • 오프셋 자동 커밋을 자주 할 수록 컨슈머의 중단으로 인해 초래될 수 있는 중복 메세지의 수를 줄일 수 있지만 약간의 부하 발생

고려할 만한 옵션들 -  Broker

더보기
  • replication.factor
    • 토픽 파티션 별 복제본 개수
    • 적정 개수 : 3
  • default.replication.factor
    • auto.create.topics.enable 가 true인 경우, 자동으로 생성되는 topic의 복제 수 설정 (운영 상의 안정성을 위해 auto.create.topics.enable는 false가 권장)
  • acks
    • all (acks = -1 동일)
    • 모든 replica에 복제가 완료된 후, producer에 ack 리턴
  • unclean.leader.election.enable
    • Broker가 죽었을 때, OSR replica도 leader로 선택될 수 있도록 설정 (true)
    • OSR(out-of sync replica) : 죽은 broker의 최신 메세지를 복사하지 못한 replica → 데이터 유실 가능.
    • 유실을 최소화 하는 경우 → false
  • log.flush.interval.ms
    • 입력된 message를 memory 영역의 page cache에서 disk로 저장하는 수준
    • 값이 클수록 Disk I/O가 적게 발생 ⇒ 메모리 데이터 유실 가능성 높아짐
    • 값이 작으면 Disk I/O 많이 발생 ⇒ 메모리 데이터 유실 가능성 거의 없음
    • 이와 관련한 다른 옵션들도 존재
      • log.flush.interval.messages : 메시지가 디스크로 플러시되기 전에 로그 파티션에 누적된 메시지 수
      • log.flush.interval.ms : 토픽의 메시지가 디스크로 플러시 되기 전에 메모리에 보관되는 최대 시간(ms). 설정하지 않으면 log.flush.scheduler.interval.ms의 값 사용.
      • log.flush.scheduler.interval.ms : 로그 플러셔가 로그를 디스크로 플러시해야 하는지 여부를 확인하는 빈도(ms)

Availability : 가용성


얼마나 서버의 중지 시간을 최소화 하는 지에 대한 지표. 또, 얼마나 오류를 빨리 복구 하는 지에 대한 지표.

고려할 만한 옵션들 -  Broker

더보기
  • n 수 유지
    • Partition별 leader 선출에 많은 시간 소요 (복구 시간 증가)
  • unclean.leader.election.enable
    • ISR(in-sync replica)가 아닌 OSR(out-of sync replica)를 가지고 있는 broker를 leader로 선출 할 수 있도록 설정하는 옵션.
    • 가용성을 최대한 보장하기 위한 설정으로, 데이터의 유실이 발생하더라도 kafka서버가 중지되는 상황을 막아서 서비스 가용성을 보장.
  • min.insync.replicas
    • Producer에 응답을 해주기 위한 최소한의 복제 수
    • 값을 낮게 설정해, 최소한의 복제만 된다면 바로 Producer가 동작할 수 있도록 해서 가용성을 높일 수 있음.
  • num.recovery.threads.per.data.dir
    • 각 log dir들을 기준으로 log data file을 스캔하여 복구에 사용되는 스레드의 최대 수
    • Broker의 시작과 종료 시에만 사용되므로 병행 처리를 하도록 많은 수의 Thread를 지정하여 장애 브로커를 빠르게 재시동 하는 것이 좋음.
      • logs.dirs가 3개고 이 값이 5라면 15개의 쓰레드가 실행됨.

측정법


고려 사항

  • Benchmark test 결과에 따라 환경에 맞는 Partition 수, Cluster size, Producer/Consumer 수 결정
  • Kafka 기본 설정으로 테스트 시작
    • Kafka의 default 설정에 익숙해 지는 것이 좋음
    • 이후 하나씩 변수를 바꿔 보며 추이 관찰
  • Producer 성능에 영향을 주는 종속성 제거
  • 모든 변수를 고정 값으로 통제 후, 변수 하나씩 조정해가며 측정

JMX Metrics

 


  • Kafka의 모든 Metric은 JMX(Java Management Extensions) 인터페이스를 통해 사용
  • 외부 모니터링 시스템에서 Kafka Metrics를 사용할 때는, 해당 모니터링 시스템의 모니터링 에이전트를 Kafka Process에 연결하는 것이 가장 쉬운 방법
  • Kafka Broker의 JMX에 직접 연결하는 애플리케이션을 도와주기 위해 Zookeeper에 저장된 브로커 정보에는 JMX포트가 설정.
  • 저장된 위치는 Zookeeper의 /brokers/ids/ 이며, hostname, jmx_port 값이 json 데이터에 포함.
  • JMX의 Mbean 객체를 통해 지원
  • JMX Metrics가 많음. 고로 상황에 맞는 매트릭을 볼 필요가 있음

출처 : https://www.slideshare.net/JANGWONSEO4/apache-kafka-250444885
출처 : https://www.slideshare.net/JANGWONSEO4/apache-kafka-250444885

  • Kafka 테스트 툴을 제공하고 있는 DataDog에서 Kafka 상태 모니터링을 할 때 필요한 JMX MBean 객체 목록을 잘 정리 해놨으니 참고하면 좋다.
  • 이 밖에도 Kafka Metrics에 관해서는 Kafka를 기반으로 서비스 하는 다양한 곳에서 공식문서에 잘 정리 해두었다. 메트릭이 너무 많기도 하고, 한글로 잘 번역해 둔 사이트들 역시 많기 때문에 같이 첨부해 두었다.
    • 메트릭 참고는 jmx 참고란에서 찾으시면 됩니다. 

Kafka JMX 활성화 방법

https://docs.snowflake.com/ko/user-guide/kafka-connector-monitor

Kafka 활성화 시 몇 가지 설정을 해준다면 Metrics를 모두 활성화 해서 볼 수 있게 된다.

 

1. Kafka 프로세스를 잠깐 중지 후 해당 서버의 환경 변수에 아래의 내용을 추가 해 준다.

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true
    -Dcom.sun.management.jmxremote.authenticate=false
    -Dcom.sun.management.jmxremote.ssl=false
    -Djava.rmi.server.hostname=<ip_address>
    -Dcom.sun.management.jmxremote.port=<jmx_port>"
export JMX_PORT=<port_number>

 

 

2. Kafka를 재 실행 해주면 해당 Kafka JMX를 활성화 해 Metric 값들을 볼 수 있게 된다.

JAVA가 아니면 JMX를 쓸 수 없을까?


```bash
pip install jmxquery
```

**사용 예제**

```python
jmxConnection = JMXConnection("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi")
jmxQuery = [JMXQuery("kafka.cluster:type=*,name=*,topic=*,partition=*",
                         metric_name="kafka_cluster_{type}_{name}",
                         metric_labels={"topic" : "{topic}", "partition" : "{partition}"})]
metrics = jmxConnection.query(jmxQuery)
for metric in metrics:
    print(f"{metric.metric_name}<{metric.metric_labels}> == {metric.value}")

### 출력 값
# kafka_cluster_Partition_UnderReplicated<{'partition': '0', 'topic': 'test'}> == 0
# kafka_cluster_Partition_UnderMinIsr<{'partition': '0', 'topic': 'test'}> == 0
# kafka_cluster_Partition_InSyncReplicasCount<{'partition': '0', 'topic': 'test'}> == 1
# kafka_cluster_Partition_ReplicasCount<{'partition': '0', 'topic': 'test'}> == 1
# kafka_cluster_Partition_LastStableOffsetLag<{'partition': '0', 'topic': 'test'}> == 0
```

```python
# 만약 Mbean 모든 값 목록을 가져오고 싶다면?

jmxConnection = JMXConnection("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi")
jmxQuery = [JMXQuery("*:*")]
metrics = jmxConnection.query(jmxQuery)
for metric in metrics:
    print(f"{metric.to_query_string()} ({metric.value_type}) = {metric.value}")
```

 

 

참고


https://learn.microsoft.com/ko-kr/azure/hdinsight/kafka/apache-kafka-performance-tuning

https://www.ibm.com/docs/ko/oala/1.3.5?topic=SSPFMY_1.3.5/com.ibm.scala.doc/config/iwa_cnf_scldc_kfk_prp_exmpl_c.html

https://www.popit.kr/kafka-운영자가-말하는-producer-acks/

https://velog.io/@busybean3/아파치-카프카Apache-Kafka의-성능-최적화-1

https://nobelbill.tistory.com/entry/kafka-consumer-성능-올리기

https://learn.microsoft.com/ko-kr/azure/hdinsight/kafka/apache-kafka-performance-tuning

 

벤치마크 참고


https://www.slideshare.net/freepsw/apache-kafka-performancethroughput-without-data-loss-and-guaranteeing-data-order

https://medium.com/naver-cloud-platform/이렇게-개발했습니다-simple-easy-notification-service-2-메시지-발송-처리를-위한-kafka-사용기-60beda9d5773
https://team-platform.tistory.com/6

https://www.slideshare.net/freepsw/apache-kafka-performancelatencybenchmarkv03

효율적인 분산 메시징 환경을 위한 Apache Kafka 벤치마크 테스트.pdf

https://blog.embian.com/19

https://m.blog.naver.com/PostView.naver?isHttpsRedirect=true&blogId=freepsw&logNo=221073557983

 

jmx 참고


https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/#kafka-emitted-metrics

https://www.slideshare.net/freepsw/understanding-of-apache-kafka-metrics-for-monitoring

https://www.slideshare.net/freepsw/apache-kafka-metrics-123663954

https://kong-dahye.tistory.com/18

https://docs.confluent.io/platform/current/kafka/monitoring.html#broker-metrics

https://www.ibm.com/docs/en/instana-observability/current?topic=technologies-monitoring-kafka#kafka-cluster-metrics-collection

https://kafka.apache.org/33/documentation.html#remote_jmx