自制Kafka消费Lag监控

1
2
3
4
5
sudo apt install -y openjdk-11-jdk

wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

tar xf kafka_2.12-2.1.0.tgz
1
2
3
4
5
6
7
# 查看消费组
/opt/services/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kf1:9095,kf2:9095,kf3:9095 \
--list

# 查看消费Lag
/opt/services/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kf1:9095,kf2:9095,kf3:9095 \
--describe --group notification
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import subprocess

kafka_path = '/opt/services/kafka/bin/'
kafka_servers = 'kf1:9095,kf2:9095,kf3:9095'
consumer_group = 'notification'


def parse_lag(output):
lag = 0
output_without_empty_lines = "\n".join(
[line for line in output.split("\n") if line.strip()])
for line in output_without_empty_lines.splitlines():
if "LAG" in line:
continue
lag_str = line.split()[4]
if '-' not in lag_str:
lag += int(lag_str)
return lag


def kafka_monitor():
lag = 0
command = f'{kafka_path}kafka-consumer-groups.sh --bootstrap-server {kafka_servers} --describe --group {consumer_group}'
output = subprocess.check_output(
command, shell=True, universal_newlines=True)
lag = parse_lag(output)
return lag


print(kafka_monitor())