Inverter Voltronic: различия между версиями
Sirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) |
||
| Строка 3: | Строка 3: | ||
[[Категория:UPS]] |
[[Категория:UPS]] |
||
=Вольтроник= |
=Вольтроник= |
||
| + | =Внешний вид= |
||
| ⚫ | |||
| ⚫ | |||
[[Файл:Voltrronic 2E VP.JPG|200px]] |
[[Файл:Voltrronic 2E VP.JPG|200px]] |
||
| ⚫ | |||
| ⚫ | |||
<BR> |
<BR> |
||
[[Файл:Axpert V PF1 manual.pdf]] |
[[Файл:Axpert V PF1 manual.pdf]] |
||
Текущая версия на 10:41, 22 января 2026
Содержание
Вольтроник
Внешний вид
Инструкция
Снимать данные с линукса
mpp-solar -p /dev/hidraw0 -c QPIGS -I -P PI30
но работает лучше как минимум у меня через ком-порт и /dev/ttyUSB0
Но для этого нужен переходник USB -> COM
Если переходников несколько то лучше указывать by-id - /dev/serial/by-id/usb-1a86_USB2.0-Ser_-if00-port0
NUT
так как я хотел что бы инвертор прикинулся нормальным UPS а не вот это вот все то набросал простой скрипт для снятия данных (драйвер встроенный в NUT не заработал, написать свой на основе скрипта я не осилил)
Весь код написан на скорую руку, с кучей хардкода, так как надо было прям сейчас, а передывать пока нет времени
Логика работы такая:
- скрипт
mpp_nut_bridge.py(через systemd unit) работает в вечном цикле и складывает результат в файл/var/lib/nut/mpp.state - NUT умеет читать данные из внешнего файла через
driver = dummy-ups
[mpp]
driver = dummy-ups
port = /var/lib/nut/mpp.state
desc = "MPP via mppsolar"
- для того что бы отдавать данные на zabbix используется бридж в SNMP
mpp_nut_bridge.py
#!/usr/bin/env python3
import subprocess
import time
import tempfile
import os
import json
import logging
class CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
STATE_FILE = "/var/lib/nut/mpp.state"
LOGLEVEL = os.environ.get('LOGLEVEL', 'INFO').upper()
logger = logging.getLogger("mpp_nut_bridge")
logger.setLevel(LOGLEVEL)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
# Команда mppsolar
# /etc/nut/mpp-solar -p /dev/ttyUSB1 -c QPIGS -I -P PI30 -o json | jq '.'
DEV="/dev/serial/by-id/usb-1a86_USB2.0-Ser_-if00-port0"
#DEV="/dev/hidraw1"
MPPSOLAR_CMD = [
"/usr/local/virtualenvs/mppsolar/bin/python3",
"/etc/nut/mpp-solar",
"-p", DEV,
"-P", "PI30",
"-c", "QPIGS",
"-o", "json",
]
def write_state(vars_dict):
"""Атомарная запись файла состояния для dummy-ups"""
fd, tmp = tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for k, v in vars_dict.items():
f.write(f"{k}: {v}\n")
logger.info(f"Replacing {STATE_FILE}")
os.replace(tmp, STATE_FILE)
def build_ups_status(data):
flags = []
# UPS выключен
if not data.get("is_switched_on", 1):
return "OFF"
ac_in = data.get("ac_input_voltage", 0.0)
batt_cap = data.get("battery_capacity", 0)
charging = bool(data.get("is_charging_on", 0))
load_on = bool(data.get("is_load_on", 0))
if ac_in > 100:
flags.append("OL") # On Line
else:
flags.append("OB") # On Battery
if batt_cap < 10:
flags.append("LB") # Low Battery
if charging:
flags.append("CHRG")
elif load_on:
flags.append("DISCHRG")
if not flags:
return "UNKNOWN"
return " ".join(flags)
def poll_once():
attempts=3
while attempts>0:
try:
# Запускаем mppsolar и читаем JSON
res = subprocess.run(
MPPSOLAR_CMD,
capture_output=True,
text=True,
check=True,
)
logger.info(f"STDOUT: {res.stdout}")
data = json.loads(res.stdout)
error = data.get("error", False)
logger.info(f"DATA: {data} Error: {error}")
# Собираем переменные NUT
if error:
logger.info("ERROR DETECTED")
vars_dict = {
"battery.charge": int(data.get("battery_capacity")),
"battery.voltage": float(data.get("battery_voltage")),
"input.voltage": float(data.get("ac_input_voltage")),
"input.frequency": float(data.get("ac_input_frequency")),
"output.voltage": float(data.get("ac_output_voltage")),
"output.frequency": float(data.get("ac_output_frequency")),
"ups.load": int(data.get("ac_output_load")),
"ups.power": int(data.get("ac_output_apparent_power")),
"ups.realpower": int(data.get("ac_output_active_power")),
"inverter_heat_sink_temperature": int(data.get("inverter_heat_sink_temperature")),
"ups.status": build_ups_status(data),
# Можно добавить свои:
# "device.mfr": "MPP Solar",
# "device.model": "PI30",
}
write_state(vars_dict)
return
except Exception as e:
logger.info(f"Error during data collection: {e}")
logger.info(f"Sleep 30 sec. before next attempt")
time.sleep(30)
attempts = attempts - 1
# If exited on attempts < 0
vars_dict = {
"battery.charge": 0,
"battery.voltage": 0,
"input.voltage": 0,
"input.frequency": 0,
"output.voltage": 0,
"output.frequency": 0,
"ups.load": 0,
"ups.power": 0,
"ups.realpower": 0,
"inverter_heat_sink_temperature": 0,
"ups.status": "OFF"
}
write_state(vars_dict)
def main():
logger.info("Starting")
SLEEP = 20
while True:
try:
poll_once()
except Exception as e:
# При ошибке помечаем UPS как OFF
try:
logger.info(f"Error during poll: {e}")
write_state({"ups.status": "OFF"})
except Exception:
pass
logger.info(f"Sleeping {SLEEP} sec brfore next check")
time.sleep(SLEEP) # опрос каждые 5 секунд
if __name__ == "__main__":
main()
mpp-nut-bridge.service
/usr/local/virtualenvs/mppsolar/bin/python3- этот путь кvirtualenv
[Unit] Description=MPP Solar -> NUT bridge After=network.target [Service] User=nut Group=nut ExecStart=/usr/local/virtualenvs/mppsolar/bin/python3 /etc/nut/mpp_nut_bridge.py Restart=always RestartSec=3 [Install] WantedBy=multi-user.target
snmpd.conf
Часть конфига ответвенная за "проброс" запросов к snmpd
Тут три ветки
.1.3.6.1.2.1.33"Стандартный" MIB - просто для теста, у меня в заббиксе он не используется.1.3.6.1.4.1.318MIB специфичный для APC - все части взяты из конфига темплейта zabbix, возможно в оригинальном MIB больше данных.1.3.6.1.4.1.418- ветка выбрана от фонаря и используется для мониторинга BMS батареи (это отдельная задача - мониторинг батареи Daly BMS)ups-snmp-passpersist.pyимя файла который дергать на запрос (ему передается тип изапроса GET/GETNEXT и OID, другие типы игнорируем)
pass .1.3.6.1.2.1.33 /etc/nut/ups-snmp-passpersist.py pass .1.3.6.1.4.1.318 /etc/nut/ups-snmp-passpersist.py pass .1.3.6.1.4.1.418 /etc/nut/batt-dale-snmp.py
По сути этот конфиг на каждый запрос вызывает скрипт с параметрами (хорошо бы конечно сделать постоянно висящего демона что бы не форкать на каждый запрос но пока и так сойдет)
ups-snmp-passpersist.py
Тоже написано на скорую руку и с хардкодом - но за основу сойдет
#!/usr/bin/env python3
import sys
import subprocess
import logging
from logging.handlers import RotatingFileHandler
LOGFILE = "/var/log/snmp/snmp-ups.log"
UPS_NAME = "mpp"
def get_oid_data(nut, nut_key):
logger.debug(f"[get_oid_data] nut: {nut}. nut_key: {nut_key}")
val = nut.get(nut_key)
logger.debug(f"[get_oid_data] value: {val}")
return str(val)
def batt_status(nut, nut_key):
val = get_oid_data(nut, nut_key)
s = str(batt_status_code(val))
logger.debug("[batt_status] Status: {s}")
return s
# OID'ы из UPS-MIB (RFC1628)
OIDS = {
# upsIdent*
"1.3.6.1.2.1.33.1.1.1.0": {
"name": "UPS-MIB::upsIdentManufacturer.0",
"type": "string",
"get_oid_method": lambda: "VOLTRONIC"
},
"1.3.6.1.2.1.33.1.1.2.0": {
"name": "UPS-MIB::upsIdentModel.0",
"type": "string",
"get_oid_method": lambda: "2E-VP-5K48"
},
#1.3.6.1.2.1.33.1.1.3.0 UPS-MIB::upsIdentUPSSoftwareVersion.0
"1.3.6.1.2.1.33.1.1.3.0": {
"name": "UPS-MIB::upsIdentUPSSoftwareVersion.0",
"type": "string",
"get_oid_method": lambda: "1",
},
#1.3.6.1.2.1.33.1.1.4.0 UPS-MIB::upsIdentAgentSoftwareVersion.0
"1.3.6.1.2.1.33.1.1.5.0": {
"name": "UPS-MIB::upsIdentAgentSoftwareVersion.0",
"type": "string",
"get_oid_method": lambda: "UPS_ID_IS_NOT_SET"
},
"1.3.6.1.2.1.33.1.1.4.0": {
"name": "UPS-MIB::upsIdentName.0",
"type": "string",
"get_oid_method": lambda: "UPS_ID_IS_NOT_SET"
},
# Battery
"1.3.6.1.2.1.33.1.2.1.0": {
"name": "UPS-MIB::upsBatteryStatus.0",
"type": "integer", # enum
"nut_key": "ups.status",
"get_oid_method": batt_status
},
"1.3.6.1.2.1.33.1.2.2.0": {
"name": "UPS-MIB::upsSecondsOnBattery.0",
"type": "integer",
"get_oid_method": lambda: "0"
},
"1.3.6.1.2.1.33.1.2.3.0": {
"name": "UPS-MIB::upsEstimatedMinutesRemaining.0",
"type": "integer",
"get_oid_method": lambda: "0"
},
"1.3.6.1.2.1.33.1.2.4.0": {
"name": "UPS-MIB::upsEstimatedChargeRemaining.0",
"type": "integer",
#"nut_key": "battery.charge",
"get_oid_method": lambda: "0"
},
"1.3.6.1.4.1.318.1.1.1.2.3.1.0": {
"name": "UPS-MIB::upsEstimatedChargeRemaining.0",
"type": "integer",
#"nut_key": "battery.charge",
"get_oid_method": lambda: "0"
},
"1.3.6.1.2.1.33.1.2.5.0": {
"name": "UPS-MIB::upsBatteryVoltage.0",
"type": "integer", # 0.1 V
"custom_multiplier": 10,
"nut_key": "battery.voltage",
},
"1.3.6.1.2.1.33.1.2.7.0": {
"name": "UPS-MIB::upsBatteryTemperature.0",
"type": "integer",
"nut_key": "inverter_heat_sink_temperature"
},
# Input
"1.3.6.1.2.1.33.1.3.2.0": {
"name": "UPS-MIB::upsInputNumLines.0",
"type": "integer",
"get_oid_method": lambda: "1"
},
"1.3.6.1.2.1.33.1.3.3.1.1.1": {
"name": "UPS-MIB::upsInputLineBads.0",
"type": "integer",
"get_oid_method": lambda: "1"
},
"1.3.6.1.2.1.33.1.3.1.0": {
"name": "UPS-MIB::upsInputLineBads.0",
"type": "counter32",
"get_oid_method": lambda: "1"
},
"1.3.6.1.2.1.33.1.3.2.1": {
"name": "UPS-MIB::upsInputNumLines.1",
"type": "integer",
"get_oid_method": lambda: "1"
},
"1.3.6.1.2.1.33.1.3.3.1.2.1": {
"name": "UPS-MIB::upsInputFrequency.1",
"type": "integer", # 0.1 Hz
"custom_multiplier": 10,
"nut_key": "input.frequency",
},
"1.3.6.1.2.1.33.1.3.3.1.3.1": {
"name": "UPS-MIB::upsInputVoltage.1",
"type": "integer", # RMS Volts
"nut_key": "input.voltage",
},
"1.3.6.1.2.1.33.1.3.3.1.4.1": {
"name": "UPS-MIB::upsInputCurrent.1",
"type": "integer", # 0.1 A
"custom_multiplier": 10,
#"nut_key": "input.current"
"get_oid_method": lambda: "0"
},
"1.3.6.1.2.1.33.1.3.3.1.5.1": {
"name": "UPS-MIB::upsInputTruePower.1",
"type": "integer", # Watts
"get_oid_method": lambda: "0"
},
# Output
"1.3.6.1.2.1.33.1.4.1.0": {
"name": "UPS-MIB::upsOutputSource.0",
"type": "integer", # enum
"get_oid_method": lambda: "2"
},
"1.3.6.1.2.1.33.1.4.2.0": {
"name": "UPS-MIB::upsOutputFrequency.0",
"type": "integer", # 0.1 Hz
"custom_multiplier": 10,
"nut_key": "output.frequency",
},
"1.3.6.1.2.1.33.1.4.3.0": {
"name": "UPS-MIB::upsOutputNumLines.0",
"type": "integer",
"get_oid_method": lambda: "1"
},
"1.3.6.1.2.1.33.1.4.4.1.1.1": {
"name": "UPS-MIB::upsOutputLineIndex.1",
"type": "integer",
"get_oid_method": lambda: "1",
},
"1.3.6.1.2.1.33.1.4.4.1.2.1": {
"name": "UPS-MIB::upsOutputVoltage.1",
"type": "integer", # RMS
"nut_key": "output.voltage",
},
"1.3.6.1.2.1.33.1.4.4.1.3.1": {
"name": "UPS-MIB::upsOutputCurrent.1",
"type": "integer", # 0.1 A
"custom_multiplier": 10,
#"nut_key": "output_current",
"get_oid_method": lambda: "0"
},
"1.3.6.1.2.1.33.1.4.4.1.4.1": {
"name": "UPS-MIB::upsOutputPower.1",
"type": "integer", # Watts
"nut_key": "ups.power"
},
"1.3.6.1.2.1.33.1.4.4.1.5.1": {
"name": "UPS-MIB::upsOutputPercentLoad.1",
"type": "integer", # percent
"nut_key": "ups.load",
},
# Bypass
"1.3.6.1.2.1.33.1.5.1.0": {
"name": "UPS-MIB::upsBypassFrequency.0",
"type": "integer", # 0.1 Hz
"custom_multiplier": 10,
"nut_key": "input.frequency",
},
"1.3.6.1.2.1.33.1.5.2.0": {
"name": "UPS-MIB::upsBypassNumLines.0",
"type": "integer",
"get_oid_method": lambda: "1",
},
"1.3.6.1.2.1.33.1.5.3.1.1.1": {
"name": "UPS-MIB::upsBypassLineIndex.1",
"type": "integer",
"get_oid_method": lambda: "1",
},
"1.3.6.1.2.1.33.1.5.3.1.2.1": {
"name": "UPS-MIB::upsBypassVoltage.1",
"type": "integer", # RMS
"nut_key": "input.voltage",
},
"1.3.6.1.2.1.33.1.5.3.1.3.1": {
"name": "UPS-MIB::upsBypassCurrent.1",
"type": "integer", # 0.1 A
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.5.3.1.4.1": {
"name": "UPS-MIB::upsBypassPower.1",
"type": "integer", # Watts
"get_oid_method": lambda: "0",
},
# Alarms
"1.3.6.1.2.1.33.1.6.1.0": {
"name": "UPS-MIB::upsAlarmsPresent.0",
"type": "gauge", # Gauge32
"get_oid_method": lambda: "0",
},
# Tests
"1.3.6.1.2.1.33.1.7.1.0": {
"name": "UPS-MIB::upsTestId.0",
"type": "integer",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.7.2.0": {
"name": "UPS-MIB::upsTestSpinLock.0",
"type": "integer",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.7.3.0": {
"name": "UPS-MIB::upsTestResultsSummary.0",
"type": "integer", # enum
"get_oid_method": lambda: "1",
},
"1.3.6.1.2.1.33.1.7.4.0": {
"name": "UPS-MIB::upsTestResultsDetail.0",
"type": "string",
"get_oid_method": lambda: "OK PASS",
},
"1.3.6.1.2.1.33.1.7.5.0": {
"name": "UPS-MIB::upsTestStartTime.0",
"type": "timeticks",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.7.6.0": {
"name": "UPS-MIB::upsTestElapsedTime.0",
"type": "integer", # в твоём выводе APC косячит, но по MIB INTEGER
"get_oid_method": lambda: "0",
},
# Shutdown / control
"1.3.6.1.2.1.33.1.8.1.0": {
"name": "UPS-MIB::upsShutdownType.0",
"type": "integer",
"get_oid_method": lambda: "2",
},
"1.3.6.1.2.1.33.1.8.2.0": {
"name": "UPS-MIB::upsShutdownAfterDelay.0",
"type": "integer",
"get_oid_method": lambda: "-1",
},
"1.3.6.1.2.1.33.1.8.3.0": {
"name": "UPS-MIB::upsStartupAfterDelay.0",
"type": "integer",
"get_oid_method": lambda: "-1",
},
"1.3.6.1.2.1.33.1.8.4.0": {
"name": "UPS-MIB::upsRebootWithDuration.0",
"type": "integer",
"get_oid_method": lambda: "-1",
},
"1.3.6.1.2.1.33.1.8.5.0": {
"name": "UPS-MIB::upsAutoRestart.0",
"type": "integer",
"get_oid_method": lambda: "2",
},
# Config
"1.3.6.1.2.1.33.1.9.1.0": {
"name": "UPS-MIB::upsConfigInputFreq.0",
"type": "integer", # 0.1 Hz
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.2.0": {
"name": "UPS-MIB::upsConfigOutputVoltage.0",
"type": "integer", # RMS Volts
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.3.0": {
"name": "UPS-MIB::upsConfigOutputFreq.0",
"type": "integer", # 0.1 Hz
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.4.0": {
"name": "UPS-MIB::upsConfigOutputVA.0",
"type": "integer",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.5.0": {
"name": "UPS-MIB::upsConfigOutputPower.0",
"type": "integer",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.6.0": {
"name": "UPS-MIB::upsConfigLowBattTime.0",
"type": "integer", # minutes
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.7.0": {
"name": "UPS-MIB::upsConfigAudibleStatus.0",
"type": "integer",
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.8.0": {
"name": "UPS-MIB::upsConfigLowVoltageTransferPoint.0",
"type": "integer", # RMS Volts
"get_oid_method": lambda: "0",
},
"1.3.6.1.2.1.33.1.9.9.0": {
"name": "UPS-MIB::upsConfigHighVoltageTransferPoint.0",
"type": "integer", # RMS Volts
"get_oid_method": lambda: "0",
},
#POWERNET-MIB
"1.3.6.1.4.1.318.1.1.1.2.3.1.0": {
"name": "POWERNET-MIB::upsHighPrecBatteryCapacity.0",
"type": "gauge32", # percent
"custom_multiplier": 10,
"nut_key": "battery.charge",
},
"1.3.6.1.4.1.318.1.1.1.2.1.3.0": {
"name": "POWERNET-MIB::upsBasicBatteryLastReplaceDate.0",
"type": "string",
"get_oid_method": lambda: "07/07/25",
},
"1.3.6.1.4.1.318.1.1.1.2.2.4.0": {
"name": "POWERNET-MIB::upsAdvBatteryReplaceIndicator.0",
"type": "integer",
"get_oid_method": lambda: "1",
},
"1.3.6.1.4.1.318.1.1.1.2.2.3.0": {
"name": "POWERNET-MIB::upsAdvBatteryRunTimeRemaining.0",
"type": "timeticks",
"get_oid_method": lambda: "1000",
},
"1.3.6.1.4.1.318.1.1.1.2.1.1.0": {
"name": "POWERNET-MIB::upsBasicBatteryStatus.0",
"type": "integer",
"get_oid_method": lambda: "2",
},
"1.3.6.1.4.1.318.1.1.1.2.3.2.0": {
"name": "POWERNET-MIB::upsHighPrecBatteryTemperature.0",
# "type": "gauge32",
# "get_oid_method": lambda: "100",
"custom_multiplier": 10,
"type": "gauge32",
"nut_key": "inverter_heat_sink_temperature"
},
"1.3.6.1.4.1.318.1.1.1.2.3.4.0": {
"name": "POWERNET-MIB::upsHighPrecBatteryActualVoltage.0",
"type": "integer", # 0.1 V
"custom_multiplier": 10,
"nut_key": "battery.voltage",
},
"1.3.6.1.4.1.318.1.1.1.2.2.5.0": {
"name": "POWERNET-MIB::upsAdvBatteryNumOfBattPacks.0",
"type": "integer",
"get_oid_method": lambda: "1",
},
"1.3.6.1.4.1.318.1.1.1.3.2.5.0": {
"name": "POWERNET-MIB::upsAdvInputLineFailCause.0",
"type": "integer",
"get_oid_method": lambda: "10",
},
"1.3.6.1.4.1.318.1.1.1.3.3.4.0": {
"name": "POWERNET-MIB::upsHighPrecInputFrequency.0",
"type": "gauge32", # 0.1 Hz
"custom_multiplier": 10,
"nut_key": "input.frequency",
},
"1.3.6.1.4.1.318.1.1.1.3.3.1.0": {
"name": "POWERNET-MIB::upsHighPrecInputLineVoltage.0",
"type": "gauge32", # RMS Volts
"custom_multiplier": 10,
"nut_key": "input.voltage",
},
"1.3.6.1.4.1.318.1.1.1.1.1.1.0": {
"name": "POWERNET-MIB::upsBasicIdentModel.0",
"type": "string",
"get_oid_method": lambda: "VOLTRONIC 2E-VP-5K48"
},
"1.3.6.1.4.1.318.1.1.1.4.3.4.0": {
"name": "POWERNET-MIB::upsHighPrecOutputCurrent.0",
"custom_multiplier": 10,
#"nut_key": "input.current"
"get_oid_method": lambda: "0",
"type": "gauge32",
},
"1.3.6.1.4.1.318.1.1.1.4.3.3.0": {
"name": "POWERNET-MIB::upsHighPrecOutputLoad.0",
"custom_multiplier": 10,
"type": "gauge32", # percent
"nut_key": "ups.load",
},
"1.3.6.1.4.1.318.1.1.1.4.1.1.0": {
"name": "POWERNET-MIB::upsBasicOutputStatus.0",
"get_oid_method": lambda: "2",
},
"1.3.6.1.4.1.318.1.1.1.4.3.1.0": {
"type": "gauge32", # RMS
"custom_multiplier": 10,
"nut_key": "output.voltage",
},
"1.3.6.1.4.1.318.1.1.1.1.2.3.0": {
"name": "POWERNET-MIB::upsHighPrecOutputVoltage.0",
"type": "string",
"get_oid_method": lambda: "FUCKOFF",
},
}
def get_logger():
logger = logging.getLogger("snmp_test_oid")
if logger.handlers:
return logger
# logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = RotatingFileHandler(
LOGFILE,
maxBytes=200_000_000,
backupCount=3,
)
fmt = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
"%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(fmt)
logger.addHandler(handler)
return logger
logger = get_logger()
# Это нужно что бы сортировать OID правильно
# строковая сортировка в лоб дает неправильнй результат
# ".1.3.6.1.2.1.33.1.10" < ".1.3.6.1.2.1.33.1.2" как строки
# а по SNMP-логике должно быть наоборот (10 > 2).
def oid_key(oid: str):
return tuple(int(x) for x in oid.strip(".").split(".") if x)
def get_nut():
"""Читаем все переменные из upsc mpp@localhost"""
logger.debug(f"[get_nut] Start")
try:
out = subprocess.check_output(
["upsc", f"{UPS_NAME}@localhost"],
stderr=subprocess.DEVNULL,
text=True
)
logger.debug(f"[get_nut]: cmd out: {out}")
except Exception as e:
logger.debug(f"[get_nut]: {e}")
return {}
vals = {}
for line in out.splitlines():
logger.debug(f"[get_nut] Reading line {line}")
if ":" not in line:
continue
k, v = line.split(":", 1)
vals[k.strip()] = v.strip()
logger.debug(f"[get_nut] vals: {vals}")
return vals
def batt_status_code(status_str: str) -> int:
"""
UPS-MIB upsBatteryStatus:
1=unknown, 2=normal, 3=low, 4=depleted
"""
if "LB" in status_str:
return 3
if "OB" in status_str or "OL" in status_str:
return 2
return 1
def value_for_oid(oid: str):
"""Вернуть (type, value) или (None, None), если OID не знаем."""
nut = get_nut()
logger.debug(f"[value_for_oid] Data from NUT: {nut}")
logger.debug(f"[value_for_oid] oid: {oid}")
# По умолчанию
OID = OIDS.get(oid)
logger.debug(f"OID object: {OID}")
if OID:
logger.debug(f"[value_for_oid] OID = {OID.get('name')}: {oid}")
get_method = OID.get("get_oid_method", get_oid_data)
try:
oid_value = get_method(nut, OID.get("nut_key"))
except TypeError:
oid_value = get_method()
oid_type = OID.get("type", "integer")
custom_multiplier = OID.get("custom_multiplier", 1)
logger.debug(f"[value_for_oid] custom_multiplier: {custom_multiplier}")
if custom_multiplier != 1:
try:
oid_value_digit = float(oid_value)
oid_value = oid_value_digit * custom_multiplier
except Exception as e:
logger.debug(f"[value_for_oid] Error: {e}, ignoring. Original value: {oid_value}")
logger.debug(f"OID Data: {oid} {oid_type} {oid_value}")
return (oid_type, str(oid_value))
logger.debug(f"[value_for_oid] OID is not in the list of known OIDs")
return (None, None)
def handle_get(oid: str):
logger.debug(f"handle GET: oid: {oid}")
t, v = value_for_oid(oid)
if t is None:
logger.debug(f"handle GET: type: None")
# Ничего не печатаем -> NoSuchInstance
return
logger.debug(f"handle GET: oid: {oid} type: {t} value: {v}")
logger.debug("Data printed to stdout 1")
print(f".{oid}")
print(f"{t}")
print(f"{v}")
logger.debug("Data printed to stdout")
def handle_getnext(request_oid: str):
# Преобразовать строковый OID в tuple
req_key = oid_key(request_oid)
logger.debug(f"[handle_getnext] {request_oid} {req_key}")
# Сортировка по ключу tuple
SORTED_OIDS = sorted(OIDS.keys(), key=oid_key)
# сначала пробуем найти точное совпадение
try:
# так как ищем след от текущего то находим индекс текущего
# если он не последний -- if idx + 1 < len(SORTED_OIDS)
idx = SORTED_OIDS.index(request_oid)
logger.debug(f"[handle_getnext] {request_oid} has index {idx}")
next_oid = SORTED_OIDS[idx + 1] if idx + 1 < len(SORTED_OIDS) else None
logger.debug(f"[handle_getnext] Next OID {next_oid}")
except ValueError:
# если точного совпадения нет — ищем первый > request_oid
next_oid = None
for candidate in SORTED_OIDS:
#
if oid_key(candidate) > req_key:
next_oid = candidate
logger.debug(f"[handle_getnext] Next OID {next_oid}")
break
if not next_oid:
logger.debug(f"[handle_getnext] Next OID ont found")
return # NoSuchInstance (ничего не печатаем)
oid_type, oid_value = value_for_oid(next_oid)
logger.debug(f"handle GET: oid: {next_oid} type: {oid_type} value: {oid_value}")
logger.debug("Data printed to stdout")
print(f".{next_oid}")
print(f"{oid_type}")
print(f"{oid_value}")
logger.debug("Data printed to stdout")
def main():
logger.debug(f"sys.argv: {sys.argv}")
if len(sys.argv) < 3:
logger.debug(f"sys.argv should be 3 arguments, nothing to do, exiting")
return
cmd = sys.argv[1]
# Remove leading dot if any
oid = sys.argv[2]
oid = oid.lstrip('.')
logger.debug(f"cmd: {cmd}. OID: {oid}")
if cmd == "-g":
logger.debug("Handle get")
handle_get(oid)
elif cmd == "-n":
logger.debug("Handle get next")
handle_getnext(oid)
# SET не поддерживаем, ничего не печатаем
else:
logger.debug("Handle is not configured")
if __name__ == "__main__":
logger.info("Start")
main()