#!/usr/bin/env python3
import sys
import subprocess
import logging
import json
from logging.handlers import RotatingFileHandler
LOGFILE = "/var/log/snmp/snmp-batt-test.log"
DATA_FILE="/var/lib/snmp/daly.json"
UPS_NAME = "mpp"
def flatten_dict(d, parent_key="", sep="_"):
flat = {}
for key, value in d.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else str(key)
if isinstance(value, dict):
flat.update(flatten_dict(value, new_key, sep=sep))
else:
flat[new_key] = value
return flat
def get_oid_data(key):
with open(DATA_FILE, 'r') as file:
data = json.load(file)
flat = flatten_dict(data)
logger.debug(f"[get_oid_data] data: {data} б flat: {flat}")
val = flat.get(key)
logger.debug(f"[get_oid_data] value: {val}")
return str(val)
def batt_status(nut, nut_key):
val = get_oid_data(nut, nut_key)
s = str(batt_status_code(val))
logger.debug("[batt_status] Status: {s}")
return s
def mosfet_status(key):
with open(DATA_FILE, 'r') as file:
data = json.load(file)
flat = flatten_dict(data)
logger.debug(f"mosfet_status] data: {data} flat: {flat}")
val = flat.get(key)
logger.debug(f"[mosfet_status] value: {val}")
if key == "mosfet_status_mode":
if val == "discharging":
return "1"
else:
return "0"
elif key == "mosfet_status_charging_mosfet":
if val:
return "1"
else:
return "0"
elif key == "mosfet_status_discharging_mosfet":
if val:
return "1"
else:
return "0"
else:
return str(val)
def status_status(key):
with open(DATA_FILE, 'r') as file:
data = json.load(file)
flat = flatten_dict(data)
logger.debug(f"status_status] data: {data} flat: {flat}")
val = flat.get(key)
logger.debug(f"[status_status] value: {val}")
if key == "status_charger_running":
if val:
return "1"
else:
return "0"
elif key == "status_load_running":
if val:
return "1"
else:
return "0"
elif key == "status_states_DI1":
if val:
return "1"
else:
return "0"
else:
return str(val)
OIDS = {
# SOC
"1.3.6.1.4.1.418.1.1": {
"name": "DALYBMS-MIB::soc_total_voltage",
"object_key": "soc_total_voltage",
"max_value": 60
},
"1.3.6.1.4.1.418.1.2": {
"name": "DALYBMS-MIB::soc_current",
"object_key": "soc_current",
"max_value": 100,
"min_value": -80,
},
"1.3.6.1.4.1.418.1.3": {
"name": "DALYBMS-MIB::soc_soc_percent",
"object_key": "soc_soc_percent",
},
# CELL VOLTAGE
"1.3.6.1.4.1.418.2.1": {
"name": "DALYBMS-MIB::cell_voltage_range_highest_voltage",
"object_key": "cell_voltage_range_highest_voltage",
},
"1.3.6.1.4.1.418.2.2": {
"name": "DALYBMS-MIB::cell_voltage_range_highest_cell",
"object_key": "cell_voltage_range_highest_cell",
},
"1.3.6.1.4.1.418.2.3": {
"name": "DALYBMS-MIB::cell_voltage_range_lowest_voltage",
"object_key": "cell_voltage_range_lowest_voltage",
},
"1.3.6.1.4.1.418.3.4": {
"name": "DALYBMS-MIB::cell_voltage_range_lowest_cell",
"object_key": "cell_voltage_range_lowest_cell",
},
# TEMPERATURE RANGE
"1.3.6.1.4.1.418.3.1": {
"name": "DALYBMS-MIB::temperature_range_highest_temperature",
"object_key": "temperature_range_highest_temperature",
"max_value": 35
},
"1.3.6.1.4.1.418.3.2": {
"name": "DALYBMS-MIB::temperature_range_highest_sensor",
"object_key": "temperature_range_highest_sensor",
},
"1.3.6.1.4.1.418.3.3": {
"name": "DALYBMS-MIB::temperature_range_lowest_temperature",
"object_key": "temperature_range_lowest_temperature",
"min_value":0
},
"1.3.6.1.4.1.418.3.4": {
"name": "DALYBMS-MIB::temperature_range_lowest_sensor",
"object_key": "temperature_range_lowest_sensor",
},
# MOSFET STATUS
"1.3.6.1.4.1.418.4.1": {
"name": "DALYBMS-MIB::mosfet_status_mode",
"object_key": "mosfet_status_mode",
"get_oid_method": mosfet_status,
},
"1.3.6.1.4.1.418.4.2": {
"name": "DALYBMS-MIB::mosfet_status_charging_mosfet",
"object_key": "mosfet_status_charging_mosfet",
"get_oid_method": mosfet_status,
},
"1.3.6.1.4.1.418.4.3": {
"name": "DALYBMS-MIB::mosfet_status_discharging_mosfet",
"object_key": "mosfet_status_discharging_mosfet",
"get_oid_method": mosfet_status,
},
"1.3.6.1.4.1.418.4.4": {
"name": "DALYBMS-MIB::mosfet_status_capacity_ah",
"object_key": "mosfet_status_capacity_ah",
"get_oid_method": mosfet_status,
"max_value": 400,
"min_value": 0,
},
#
#status_cycles
"1.3.6.1.4.1.418.5.1.0": {
"name": "DALYBMS-MIB::status_cells",
"object_key": "status_cells",
},
"1.3.6.1.4.1.418.5.2.0": {
"name": "DALYBMS-MIB::status_temperature_sensors",
"object_key": "status_temperature_sensors",
},
"1.3.6.1.4.1.418.5.3.0": {
"name": "DALYBMS-MIB::status_charger_running",
"object_key": "status_charger_running",
"get_oid_method": status_status,
},
"1.3.6.1.4.1.418.5.4.0": {
"name": "DALYBMS-MIB::status_load_running",
"object_key": "status_load_running",
"get_oid_method": status_status,
},
"1.3.6.1.4.1.418.5.5.0": {
"name": "DALYBMS-MIB::status_states_DI1",
"object_key": "status_states_DI1",
"get_oid_method": status_status,
},
"1.3.6.1.4.1.418.5.6.0": {
"name": "DALYBMS-MIB::status_cycles",
"object_key": "status_cycles",
},
# cell_voltages
"1.3.6.1.4.1.418.6.1": {
"name": "DALYBMS-MIB::cell_voltages_1",
"object_key": "cell_voltages_1",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.2": {
"name": "DALYBMS-MIB::cell_voltages_2",
"object_key": "cell_voltages_2",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.3": {
"name": "DALYBMS-MIB::cell_voltages_3",
"object_key": "cell_voltages_3",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.4": {
"name": "DALYBMS-MIB::cell_voltages_4",
"object_key": "cell_voltages_4",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.5": {
"name": "DALYBMS-MIB::cell_voltages_5",
"object_key": "cell_voltages_5",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.6": {
"name": "DALYBMS-MIB::cell_voltages_6",
"object_key": "cell_voltages_6",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.7": {
"name": "DALYBMS-MIB::cell_voltages_6",
"object_key": "cell_voltages_7",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.8": {
"name": "DALYBMS-MIB::cell_voltages_8",
"object_key": "cell_voltages_8",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.9": {
"name": "DALYBMS-MIB::cell_voltages_9",
"object_key": "cell_voltages_9",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.10": {
"name": "DALYBMS-MIB::cell_voltages_10",
"object_key": "cell_voltages_10",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.11": {
"name": "DALYBMS-MIB::cell_voltages_11",
"object_key": "cell_voltages_11",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.12": {
"name": "DALYBMS-MIB::cell_voltages_12",
"object_key": "cell_voltages_12",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.13": {
"name": "DALYBMS-MIB::cell_voltages_13",
"object_key": "cell_voltages_13",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.14": {
"name": "DALYBMS-MIB::cell_voltages_14",
"object_key": "cell_voltages_14",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.15": {
"name": "DALYBMS-MIB::cell_voltages_15",
"object_key": "cell_voltages_15",
"min_value": 2.7,
},
"1.3.6.1.4.1.418.6.16": {
"name": "DALYBMS-MIB::cell_voltages_16",
"object_key": "cell_voltages_16",
"min_value": 2.7,
},
# temperatures
"1.3.6.1.4.1.418.7.1": {
"name": "DALYBMS-MIB::temperatures_1",
"object_key": "temperatures_1",
},
"1.3.6.1.4.1.418.7.2": {
"name": "DALYBMS-MIB::temperatures_2",
"object_key": "temperatures_2",
},
}
def get_logger():
logger = logging.getLogger("snmp_test_oid")
if logger.handlers:
return logger
# logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler(
LOGFILE,
maxBytes=200_000_000,
backupCount=3,
)
fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
"%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(fmt)
logger.addHandler(handler)
return logger
logger = get_logger()
# Это нужно что бы сортировать OID правильно
# строковая сортировка в лоб дает неправильнй результат
# ".1.3.6.1.2.1.33.1.10" < ".1.3.6.1.2.1.33.1.2" как строки
# а по SNMP-логике должно быть наоборот (10 > 2).
def oid_key(oid: str):
return tuple(int(x) for x in oid.strip(".").split(".") if x)
def batt_status_code(status_str: str) -> int:
"""
UPS-MIB upsBatteryStatus:
1=unknown, 2=normal, 3=low, 4=depleted
"""
if "LB" in status_str:
return 3
if "OB" in status_str or "OL" in status_str:
return 2
return 1
def value_for_oid(oid: str):
"""Вернуть (type, value) или (None, None), если OID не знаем."""
# По умолчанию
OID = OIDS.get(oid)
logger.debug(f"OID object: {OID}")
if OID:
logger.debug(f"[value_for_oid] OID = {OID.get('name')}: {oid}")
get_method = OID.get("get_oid_method", get_oid_data)
try:
oid_value = get_method(OID.get("object_key"))
except TypeError:
oid_value = get_method()
oid_type = OID.get("type", "string")
custom_multiplier = OID.get("custom_multiplier", 1)
logger.debug(f"[value_for_oid] custom_multiplier: {custom_multiplier}")
if custom_multiplier != 1:
try:
oid_value_digit = float(oid_value)
oid_value = oid_value_digit * custom_multiplier
except Exception as e:
logger.debug(f"[value_for_oid] Error: {e}, ignoring. Original value: {oid_value}")
# ингда приходят странные данные которые я хочу игнорировать
try:
oid_value_digit = float(oid_value)
logger.debug(f"[value_for_oid] Check min/max value for OID {oid}")
max_value = OID.get("max_value", oid_value_digit)
min_value = OID.get("min_value", oid_value_digit)
logger.debug(f"[value_for_oid] MAX value for OID {oid} is {max_value}")
logger.debug(f"[value_for_oid] MIN value for OID {oid} is {min_value}")
if oid_value_digit > max_value:
oid_value = max_value
if oid_value_digit < min_value:
oid_value = min_value
except Exception as e:
logger.debug(f"[value_for_oid] Error (not float data): {e}, ignoring. Original value: {oid_value}")
logger.debug(f"OID Data: {oid} {oid_type} {oid_value}")
return (oid_type, str(oid_value))
logger.debug(f"[value_for_oid] OID is not in the list of known OIDs")
return (None, None)
def handle_get(oid: str):
logger.debug(f"handle GET: oid: {oid}")
t, v = value_for_oid(oid)
if t is None:
logger.debug(f"handle GET: type: None")
# Ничего не печатаем -> NoSuchInstance
return
logger.debug(f"handle GET: oid: {oid} type: {t} value: {v}")
logger.debug("Data printed to stdout 1")
print(f".{oid}")
print(f"{t}")
print(f"{v}")
logger.debug("Data printed to stdout")
def handle_getnext(request_oid: str):
# Преобразовать строковый OID в tuple
req_key = oid_key(request_oid)
logger.debug(f"[handle_getnext] {request_oid} {req_key}")
# Сортировка по ключу tuple
SORTED_OIDS = sorted(OIDS.keys(), key=oid_key)
# сначала пробуем найти точное совпадение
try:
# так как ищем след от текущего то находим индекс текущего
# если он не последний -- if idx + 1 < len(SORTED_OIDS)
idx = SORTED_OIDS.index(request_oid)
logger.debug(f"[handle_getnext] {request_oid} has index {idx}")
next_oid = SORTED_OIDS[idx + 1] if idx + 1 < len(SORTED_OIDS) else None
logger.debug(f"[handle_getnext] Next OID {next_oid}")
except ValueError:
# если точного совпадения нет — ищем первый > request_oid
next_oid = None
for candidate in SORTED_OIDS:
#
if oid_key(candidate) > req_key:
next_oid = candidate
logger.debug(f"[handle_getnext] Next OID {next_oid}")
break
if not next_oid:
logger.debug(f"[handle_getnext] Next OID ont found")
return # NoSuchInstance (ничего не печатаем)
oid_type, oid_value = value_for_oid(next_oid)
logger.debug(f"handle GET: oid: {next_oid} type: {oid_type} value: {oid_value}")
logger.debug("Data printed to stdout")
print(f".{next_oid}")
print(f"{oid_type}")
print(f"{oid_value}")
logger.debug("Data printed to stdout")
def main():
logger.debug(f"sys.argv: {sys.argv}")
if len(sys.argv) < 3:
logger.debug(f"sys.argv should be 3 arguments, nothing to do, exiting")
return
cmd = sys.argv[1]
# Remove leading dot if any
oid = sys.argv[2]
oid = oid.lstrip('.')
logger.debug(f"cmd: {cmd}. OID: {oid}")
if cmd == "-g":
logger.debug("Handle get")
handle_get(oid)
elif cmd == "-n":
logger.debug("Handle get next")
handle_getnext(oid)
# SET не поддерживаем, ничего не печатаем
else:
logger.debug("Handle is not configured")
if __name__ == "__main__":
logger.info("Start")
main()