- How can I determine the number of messages in an
apache-kafka
topic?
- Are the messages distributed equally over all partitions?
Kafka UI
- The most comfortable way is to use a UI like Kafka UI
Command line
- First start a consumer with a consumer group
- This will record the offsets per partition for the given consumer group
$ kafka-console-consumer.sh --bootstrap-server $BROKER \
--group spark-perf-console-consumer-1 --topic spark-perf-input
^C
Processed a total of 0 messages
- Then you can inspect the consumer group:
$ kafka-consumer-groups.sh --bootstrap-server $BROKER \
--group spark-perf-console-consumer-1 --describe
Consumer group 'spark-perf-console-consumer-1' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
spark-perf-console-consumer-1 spark-perf-input 0 439497 439497 0 - - -
spark-perf-console-consumer-1 spark-perf-input 1 438078 438078 0 - - -
spark-perf-console-consumer-1 spark-perf-input 2 440793 440793 0 - - -
spark-perf-console-consumer-1 spark-perf-input 3 442113 442113 0 - - -
spark-perf-console-consumer-1 spark-perf-input 4 441595 441595 0 - - -
spark-perf-console-consumer-1 spark-perf-input 5 440864 440864 0 - - -
API
- Again, start a consumer, then you can inspect the consumer group
Spark checkpoints
- After a
spark-streaming
job has written a checkpiont, you can have a look at the
apache-spark
checkpoints directory:
- Spark stores the offsets of a completed micro batch n in …/checkpoints/offsets/n/
$ cat /tmp/spark-perf/com.rhaag.spark.perf.testdata.fares.spark.SparkKafkaStreamJob/checkpoints/offsets/0
v1
{"batchWatermarkMs":0,"batchTimestampMs":1702550641451,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"spark-perf-input":{"2":440808,"5":440879,"4":441612,"1":438098,"3":442134,"0":439509}}
- The last line of the output (JSON pretty-printed) shows the offset for each partition:
{
"spark-perf-input": {
"2": 440808,
"5": 440879,
"4": 441612,
"1": 438098,
"3": 442134,
"0": 439509
}
}
Tags
apache-kafka
spark-streaming