Skip to content

Kafka debug tools

Native kafka tools can be used to get an insight into clusters, topics, consumer groups, etc...

Setup

The tools require a java config file a.k.a properties, which can be generated from API key/secret with:

# switch to proper env/cluster
confluent environment use env-NNNN
confluent kafka cluster use lkc-NNNN

# generate properties file
confluent kafka client-config create java --api-key=ERRDXXXXXXXXXX --api-secret='TZXXXXXXXXXXXX'

There's a confluentinc/cp-kafka docker image that contains the tools:

DEV_PROPERTIES="/Users/pleskac/.confluent/dev-test_devops.properties"
BOOTSTRAP=$(grep bootstrap.servers $DEV_PROPERTIES | sed 's/bootstrap.servers=//g')
OO="--bootstrap-server $BOOTSTRAP --command-config /dev.properties"
CO="--bootstrap-server $BOOTSTRAP --consumer.config /dev.properties"
PO="--bootstrap-server $BOOTSTRAP --producer.config /dev.properties"
docker run -ti -v $DEV_PROPERTIES:/dev.properties -e OO="$OO" -e CO="$CO" -e PO="$PO" confluentinc/cp-kafka sh

Tools

# topics
sh-4.4$ kafka-topics $OO --list
_confluent-command
ctrader_ingest
test-group
test.connect-configs
test.connect-offsets
test.connect-status

# describe
sh-4.4$ kafka-topics $OO --describe --topic test-group
Topic: test-group   TopicId: epMCmGArQbKJRM8ZgcOCqg PartitionCount: 2   ReplicationFactor: 3    Configs: min.insync.replicas=2,cleanup.policy=delete,segment.bytes=104857600,retention.ms=604800000,message.format.version=3.0-IV1,max.compaction.lag.ms=9223372036854775807,max.message.bytes=2097164,min.compaction.lag.ms=0,message.timestamp.type=CreateTime,retention.bytes=-1,delete.retention.ms=86400000,segment.ms=604800000,message.timestamp.difference.max.ms=9223372036854775807
    Topic: test-group   Partition: 0    Leader: 7   Replicas: 7,0,5 Isr: 7,0,5
    Topic: test-group   Partition: 1    Leader: 5   Replicas: 5,1,6 Isr: 5,1,6

# create
sh-4.4$ kafka-topics $OO --create --topic test-partitions --partitions 4
Created topic test-partitions.

sh-4.4$ kafka-topics $OO --describe --topic test-partitions
Topic: test-partitions  TopicId: BULITgxFRFurtgJr3OIMtg PartitionCount: 4   ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=104857600,message.format.version=3.0-IV1,max.message.bytes=2097164
    Topic: test-partitions  Partition: 0    Leader: 4   Replicas: 4,2,0 Isr: 4,2,0
    Topic: test-partitions  Partition: 1    Leader: 0   Replicas: 0,7,8 Isr: 0,7,8
    Topic: test-partitions  Partition: 2    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: test-partitions  Partition: 3    Leader: 1   Replicas: 1,5,6 Isr: 1,5,6

# alter
# WARNING - decreasing number of partitions is NOT supported
sh-4.4$ kafka-topics $OO --alter --topic test-partitions --partitions 3
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 3.
[2023-09-07 12:38:50,695] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 3.
 (kafka.admin.TopicCommand$)

sh-4.4$ kafka-topics $OO --alter --topic test-partitions --partitions 5

sh-4.4$ kafka-topics $OO --describe --topic test-partitions
Topic: test-partitions  TopicId: BULITgxFRFurtgJr3OIMtg PartitionCount: 5   ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=104857600,message.format.version=3.0-IV1,max.message.bytes=2097164
    Topic: test-partitions  Partition: 0    Leader: 4   Replicas: 4,2,0 Isr: 4,2,0
    Topic: test-partitions  Partition: 1    Leader: 0   Replicas: 0,7,8 Isr: 0,7,8
    Topic: test-partitions  Partition: 2    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: test-partitions  Partition: 3    Leader: 1   Replicas: 1,5,6 Isr: 1,5,6
    Topic: test-partitions  Partition: 4    Leader: 3   Replicas: 3,4,2 Isr: 3,4,2

# config change
sh-4.4$ kafka-configs $OO --describe --entity-type topics --entity-name test-partitions
Dynamic configs for topic test-partitions are:

sh-4.4$ kafka-configs $OO --alter --entity-type topics --entity-name test-partitions --add-config delete.retention.ms=172800000
Completed updating config for topic test-partitions.

sh-4.4$ kafka-configs $OO --describe --entity-type topics --entity-name test-partitions
Dynamic configs for topic test-partitions are:
  delete.retention.ms=172800000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:delete.retention.ms=172800000, DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}

# delete
sh-4.4$ kafka-topics $OO --delete --topic test-partitions
# consumer groups
sh-4.4$ kafka-consumer-groups $OO --list
one

sh-4.4$ kafka-consumer-groups $OO --describe --group one

Consumer group 'one' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
one             test-group      0          3               3               0               -               -               -
one             test-group      1          35              35              0               -               -               -
# producer
sh-4.4$ kafka-console-producer $PO --topic=test-group
>1
>2
>

# consumer
sh-4.4$ kafka-console-consumer $CO --topic=test-group --group=one
1
2

sh-4.4$ kafka-console-consumer $CO --topic=test-group --from-beginning

sh-4.4$ kafka-console-consumer $CO --topic=test-group --partition 0 --offset earliest

sh-4.4$ kafka-console-consumer $CO --topic=test-group print.key=true --property key.separator=: --from-beginning