1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import subprocess
kafka_path = '/opt/services/kafka/bin/' kafka_servers = 'kf1:9095,kf2:9095,kf3:9095' consumer_group = 'notification'
def parse_lag(output): lag = 0 output_without_empty_lines = "\n".join( [line for line in output.split("\n") if line.strip()]) for line in output_without_empty_lines.splitlines(): if "LAG" in line: continue lag_str = line.split()[4] if '-' not in lag_str: lag += int(lag_str) return lag
def kafka_monitor(): lag = 0 command = f'{kafka_path}kafka-consumer-groups.sh --bootstrap-server {kafka_servers} --describe --group {consumer_group}' output = subprocess.check_output( command, shell=True, universal_newlines=True) lag = parse_lag(output) return lag
print(kafka_monitor())
|