New supervision

- using fastAPI
- offline/online working
- warning if ZABBIX API is too long
- showing settings
- showing last ack message
- showing procedure
- menu split by SU team
This commit is contained in:
sylvain.chateau
2024-02-05 15:44:27 +01:00
parent f493406b5b
commit f2235d4b45
60 changed files with 10017 additions and 221 deletions

17
schemas/__init__.py Normal file
View File

@ -0,0 +1,17 @@
from typing import Dict
from pydantic import BaseModel, Field, RootModel
class ZabbixServer(BaseModel):
ip: str
port: int
ServerStatuses = RootModel[Dict[str, bool]]
class HostGroupRequest(BaseModel):
output: str = "extend"
search: dict = Field(default_factory=lambda: {"name": ""})
searchWildcardsEnabled: bool = Field(default=True, alias="searchWildcardsEnabled")

58
schemas/alerts.py Normal file
View File

@ -0,0 +1,58 @@
from typing import List, Any, Dict, Union, Optional
from pydantic import BaseModel
from schemas.notes import Note
class Group(BaseModel):
groupid: str
name: str
class Host(BaseModel):
host: str
class Tag(BaseModel):
value: str
class Event(BaseModel):
eventid: str
source: str
object: str
objectid: str
clock: str
value: str
acknowledged: str
acknowledges: Optional[List[Any]] = []
ns: str
name: str
severity: str
class Trigger(BaseModel):
description: str
priority: str
triggerid: str
lastchange: str
hosts: List[Host]
groups: List[Group]
tags: List[Tag]
url: str
lastEvent: Event
class ContextModel(BaseModel):
zabbix_url: str
tv_mode: bool
hostgroups: Optional[List[str]] = []
check_servers: Optional[Dict] = {}
alerts: Optional[List] = []
total_alerts: Optional[int] = -1
notes: Any
request: Any
notes: List[Note]
teams: Union[List, Dict]
accepted_latency: Optional[bool] = False
config: Dict[str, Any]
zabbix_available: bool

51
schemas/notes.py Normal file
View File

@ -0,0 +1,51 @@
import time
from typing import Dict, List, Optional
from pydantic import BaseModel
from settings import settings
from utils import read_json_file, write_json_file
class Note(BaseModel):
name: str
msg: str
url: Optional[str] = None
lvl: str
team: str
save: Optional[bool] = None
ts: Optional[int] = None
class NoteManager:
def __init__(self):
self.file_path = f"{settings.DATA_DIR}/{settings.NOTES_JSON}"
async def add_note(self, note: Note) -> None:
note.ts = int(time.time())
new_note = {note.ts: [note.dict()]}
existing_data = await self.read_file()
existing_data.update(new_note)
await self.write_file(existing_data)
async def delete_note(self, note_id: str) -> None:
data = await self.read_file()
if note_id in data:
del data[note_id]
await self.write_file(data)
async def display_notes(self, teams: Optional[str]) -> List[Note]:
notes_data = await self.read_file()
return [
Note(**note)
for _, notes in notes_data.items()
for note in notes
if note["team"] in [teams, "all"]
]
async def read_file(self) -> Dict:
return await read_json_file(self.file_path)
async def write_file(self, data: Note) -> None:
await write_json_file(self.file_path, data)

96
schemas/zabbix_client.py Normal file
View File

@ -0,0 +1,96 @@
import asyncio
import sys
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from settings import settings
import aiohttp
from utils.log import logger
@dataclass
class ZabbixConfig:
api_url: str
user: str
password: str
class ZabbixApiNotResponding(Exception):
pass
async def on_request_start(session, trace_config_ctx, params):
trace_config_ctx.start = asyncio.get_event_loop().time()
async def on_request_end(session, trace_config_ctx, params):
request_duration = asyncio.get_event_loop().time() - trace_config_ctx.start
if request_duration > 1:
logger.info(f"Request took {request_duration:.2f} seconds")
class ZabbixClient:
_instance = None
def __init__(self, config: ZabbixConfig):
self.config = config
self.token: Optional[str] = None
self.trace_config = aiohttp.TraceConfig()
self.trace_config.on_request_start.append(on_request_start)
self.trace_config.on_request_end.append(on_request_end)
async def login(self) -> None:
payload = self._construct_payload(
"user.login", {"user": self.config.user, "password": self.config.password}
)
response_data = await self._send_request(payload)
self.token = response_data.get("result")
def is_logged_in(self) -> bool:
return bool(self.token)
async def call(self, request: Dict[str, Any], method: str) -> Any:
if not self.token:
await self.login()
payload = self._construct_payload(method, request, auth=self.token)
return await self._send_request(payload)
def _construct_payload(
self, method: str, params: Dict[str, Any], auth: Optional[str] = None
) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"method": method,
"params": params,
"auth": auth,
"id": 1,
}
async def _send_request(self, payload: Dict[str, Any]) -> Any:
timeout = aiohttp.ClientTimeout(total=settings.ZABBIX_API_TIMEOUT)
async with aiohttp.ClientSession(timeout=timeout, trace_configs=[self.trace_config]) as session:
for attempt in range(settings.ZABBIX_API_RETRY):
try:
async with session.post(
f"{self.config.api_url}/api_jsonrpc.php", json=payload
) as response:
if response.status == 200:
ret = await response.json()
ret["success"] = True
return ret
else:
raise Exception(f"HTTP Error: {response.status}")
except Exception as e:
logger.error(f"[ERR] - {datetime.now()} Retry #{attempt + 1}: {e}")
await asyncio.sleep(attempt + 1)
else:
break
else:
logger.error(
f"Failed after {settings.ZABBIX_API_RETRY} retries. Last method attempted: '{payload['method']}'."
)
raise ZabbixApiNotResponding("The Zabbix API is not respoding")