Kafka debug tools¶
Native kafka tools can be used to get an insight into clusters, topics, consumer groups, etc...
Setup¶
The tools require a java config file a.k.a properties, which can be generated from API key/secret with:
# switch to proper env/cluster
confluent environment use env-NNNN
confluent kafka cluster use lkc-NNNN
# generate properties file
confluent kafka client-config create java --api-key=ERRDXXXXXXXXXX --api-secret='TZXXXXXXXXXXXX'
There's a confluentinc/cp-kafka docker image that contains the tools:
DEV_PROPERTIES="/Users/pleskac/.confluent/dev-test_devops.properties"
BOOTSTRAP=$(grep bootstrap.servers $DEV_PROPERTIES | sed 's/bootstrap.servers=//g')
OO="--bootstrap-server $BOOTSTRAP --command-config /dev.properties"
CO="--bootstrap-server $BOOTSTRAP --consumer.config /dev.properties"
PO="--bootstrap-server $BOOTSTRAP --producer.config /dev.properties"
docker run -ti -v $DEV_PROPERTIES:/dev.properties -e OO="$OO" -e CO="$CO" -e PO="$PO" confluentinc/cp-kafka sh
Tools¶
# topics
sh-4.4$ kafka-topics $OO --list
_confluent-command
ctrader_ingest
test-group
test.connect-configs
test.connect-offsets
test.connect-status
# describe
sh-4.4$ kafka-topics $OO --describe --topic test-group
Topic: test-group TopicId: epMCmGArQbKJRM8ZgcOCqg PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,cleanup.policy=delete,segment.bytes=104857600,retention.ms=604800000,message.format.version=3.0-IV1,max.compaction.lag.ms=9223372036854775807,max.message.bytes=2097164,min.compaction.lag.ms=0,message.timestamp.type=CreateTime,retention.bytes=-1,delete.retention.ms=86400000,segment.ms=604800000,message.timestamp.difference.max.ms=9223372036854775807
Topic: test-group Partition: 0 Leader: 7 Replicas: 7,0,5 Isr: 7,0,5
Topic: test-group Partition: 1 Leader: 5 Replicas: 5,1,6 Isr: 5,1,6
# create
sh-4.4$ kafka-topics $OO --create --topic test-partitions --partitions 4
Created topic test-partitions.
sh-4.4$ kafka-topics $OO --describe --topic test-partitions
Topic: test-partitions TopicId: BULITgxFRFurtgJr3OIMtg PartitionCount: 4 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=104857600,message.format.version=3.0-IV1,max.message.bytes=2097164
Topic: test-partitions Partition: 0 Leader: 4 Replicas: 4,2,0 Isr: 4,2,0
Topic: test-partitions Partition: 1 Leader: 0 Replicas: 0,7,8 Isr: 0,7,8
Topic: test-partitions Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test-partitions Partition: 3 Leader: 1 Replicas: 1,5,6 Isr: 1,5,6
# alter
# WARNING - decreasing number of partitions is NOT supported
sh-4.4$ kafka-topics $OO --alter --topic test-partitions --partitions 3
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 3.
[2023-09-07 12:38:50,695] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 3.
(kafka.admin.TopicCommand$)
sh-4.4$ kafka-topics $OO --alter --topic test-partitions --partitions 5
sh-4.4$ kafka-topics $OO --describe --topic test-partitions
Topic: test-partitions TopicId: BULITgxFRFurtgJr3OIMtg PartitionCount: 5 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=104857600,message.format.version=3.0-IV1,max.message.bytes=2097164
Topic: test-partitions Partition: 0 Leader: 4 Replicas: 4,2,0 Isr: 4,2,0
Topic: test-partitions Partition: 1 Leader: 0 Replicas: 0,7,8 Isr: 0,7,8
Topic: test-partitions Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test-partitions Partition: 3 Leader: 1 Replicas: 1,5,6 Isr: 1,5,6
Topic: test-partitions Partition: 4 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2
# config change
sh-4.4$ kafka-configs $OO --describe --entity-type topics --entity-name test-partitions
Dynamic configs for topic test-partitions are:
sh-4.4$ kafka-configs $OO --alter --entity-type topics --entity-name test-partitions --add-config delete.retention.ms=172800000
Completed updating config for topic test-partitions.
sh-4.4$ kafka-configs $OO --describe --entity-type topics --entity-name test-partitions
Dynamic configs for topic test-partitions are:
delete.retention.ms=172800000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:delete.retention.ms=172800000, DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
# delete
sh-4.4$ kafka-topics $OO --delete --topic test-partitions
# consumer groups
sh-4.4$ kafka-consumer-groups $OO --list
one
sh-4.4$ kafka-consumer-groups $OO --describe --group one
Consumer group 'one' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
one test-group 0 3 3 0 - - -
one test-group 1 35 35 0 - - -
# producer
sh-4.4$ kafka-console-producer $PO --topic=test-group
>1
>2
>
# consumer
sh-4.4$ kafka-console-consumer $CO --topic=test-group --group=one
1
2
sh-4.4$ kafka-console-consumer $CO --topic=test-group --from-beginning
sh-4.4$ kafka-console-consumer $CO --topic=test-group --partition 0 --offset earliest
sh-4.4$ kafka-console-consumer $CO --topic=test-group print.key=true --property key.separator=: --from-beginning