Ping the services and create a dropdown report. It sends a notification when a service fails and can show the error from the server in an editor.
#!/usr/bin/env python3
# coding=utf-8
#
# <xbar.title>Service Monitor</xbar.title>
# <xbar.version>v1.0</xbar.version>
# <xbar.author>Cristian</xbar.author>
# <xbar.author.github>cmaluend</xbar.author.github>
# <xbar.desc>Ping the services and create a dropdown report. It sends a notification when a service fails and can show the error from the server in an editor.</xbar.desc>
# <xbar.image>https://cmaluend.github.io/images/xbar/servicemonitor1.0.png</xbar.image>
# <xbar.dependencies>python3</xbar.dependencies>
# <xbar.var>string(TITLE="xbar Service Monitor"): Menu bar title.</xbar.var>
# <xbar.var>number(LIMIT_TOTAL_ISSUES=5): Max issues shown in the menu.</xbar.var>
# <xbar.var>number(RECOVERY_TIME=4): Recovery time in hours.</xbar.var>
# <xbar.var>number(TIMEOUT=10): Timeout for the request in seconds.</xbar.var>
# <xbar.var>boolean(ALLOW_NOTIFICATIONS=true): Allow notifications.</xbar.var>
#
# by Cristian
import os
import sqlite3
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from threading import Thread
try:
import requests
except:
subprocess.check_call([sys.executable, "-m", "pip", "install", "requests"])
TITLE = os.getenv("TITLE", "xbar")
LIMIT_TOTAL_ISSUES= int(os.getenv("LIMIT_TOTAL_ISSUES", 5))
RECOVERY_TIME= int(os.getenv("RECOVERY_TIME",24))
TIMEOUT= int(os.getenv("TIMEOUT",10))
ALLOW_NOTIFICATIONS = bool(os.getenv("ALLOW_NOTIFICATIONS",True))
#Documentation
# 🟢: Healthy service
# 🔴: Service with issues
# ⚠️: Service recovered.
'''
SERVICES = {
"tier: [
{
"name": "Service name",
"endpoint": "https://httpstat.us/200",
"headers": {} #optional
"method": "GET", #optional
"body": {} #optional
"status_code": 200
}
]
}
'''
SERVICES = {
"dev": [
{
"name": "service 1",
"endpoint": "https://httpstat.us/200",
"headers": {
"Content-type": "text/html"
},
"status_code": 200
},
{
"name": "service 2",
"endpoint": "https://httpstat.us/400",
"status_code": 200
},
{
"name": "service 3",
"endpoint": "https://httpstat.us/200",
"headers": {
"Content-type": "text/html"
},
"status_code": 200
},
],
"uat": [
{
"name": "service 1",
"endpoint": "https://httpstat.us/200",
"status_code": 200
},
]
}
class ServiceMonitor:
def __init__(self, reporter) -> None:
self.reporter = reporter
self.notificator = Notificator(reporter)
def process_environments(self, environments):
envs = environments.keys()
result = {}
for env in envs:
result[env] = self.process_services(env, environments[env])
return result
def process_services(self, env, services):
servicesnames = [ x["name"] for x in services]
with ThreadPoolExecutor(max_workers=5) as executor:
result = executor.map(partial(self.call_service), services)
serviceStatus = dict(zip(servicesnames, result))
#Add issues to the reporter
for service in serviceStatus:
if serviceStatus[service]["healthy"] == False:
self.reporter.add_issue(env, service, serviceStatus[service]["httpStatus"], serviceStatus[service]["errorMessage"])
self.notificator.send_message(env, service)
else:
self.reporter.update_notification_status(env, service, False)
return serviceStatus
def call_service(self, service):
method = service["method"] if "method" in service.keys() else "GET"
data = service["body"] if "body" in service.keys() else None
headers = {"user-agent":"xbar"}
if "headers" in service.keys():
headers.update(service["headers"])
healthy = False
errorMessage = ""
httpStatus = ""
try:
response = requests.request(method=method, url=service["endpoint"], headers=headers, data=data, timeout=TIMEOUT)
if response.status_code == service["status_code"]:
healthy = True
else:
errorMessage = response.text
httpStatus = response.status_code
except Exception as err:
errorMessage = err
httpStatus = "NA"
return { 'healthy': healthy, 'errorMessage': errorMessage, "httpStatus": httpStatus}
class Reporter:
def __init__(self) -> None:
[path, filename] = os.path.split(sys.argv[0])
db = path+"/."+filename+".db"
self.conn = sqlite3.connect(db, isolation_level=None)
self.conn.execute("CREATE TABLE IF NOT EXISTS issues (env NOT NULL, service NOT NULL, http_status, error_message, timestamp timestamp)")
self.conn.execute("CREATE TABLE IF NOT EXISTS notifications (env NOT NULL, service NOT NULL, triggered BOOLEAN, timestamp timestamp, PRIMARY KEY (env, service))")
def __del__(self):
self.clean_old_records()
def add_issue(self, env, service, http_status, message):
self.conn.execute("INSERT INTO issues VALUES(?, ?, ?, ?, ?)", (env, service, http_status, message, datetime.now()))
def read_issues_by_service(self, env, service):
fromDate = datetime.now() - timedelta(hours=RECOVERY_TIME)
r = self.conn.execute("SELECT * FROM issues WHERE env=? AND service=? AND timestamp > ? ORDER BY timestamp DESC LIMIT ?", (env, service, fromDate, LIMIT_TOTAL_ISSUES))
return r.fetchall()
def clean_old_records(self):
fromDate = datetime.now() - timedelta(hours=RECOVERY_TIME)
self.conn.execute("DELETE FROM issues WHERE timestamp < ?", [fromDate])
def update_notification_status(self, env, service, hasBeenNotified):
try:
self.conn.execute("INSERT INTO notifications VALUES(?, ?, ?, ?) ON CONFLICT (env, service) DO UPDATE SET triggered = ?, timestamp = ?", (env, service, hasBeenNotified, datetime.now(), hasBeenNotified, datetime.now()))
self.conn.commit()
except Exception as err:
# print(err)
pass
def read_notification_status_by_service(self, env, service):
r = self.conn.execute("SELECT * FROM notifications WHERE env=? AND service=?", (env, service))
values = r.fetchone()
return values
def read_notification_has_been_triggered(self, env, service):
result = self.read_notification_status_by_service(env, service)
if not result:
return False
return bool(result[2])
class Notificator:
def __init__(self, reporter):
self.reporter = reporter
def send_alert(self, message):
subprocess.check_call(["osascript", "-e", "tell application (path to frontmost application as text) to display dialog \""+message+"\" buttons {\"OK\"} with icon stop"])
def send_notification(sefl, title, message):
subprocess.check_call(["osascript", "-e", f"display notification \"{message}\" with title \"{title}\""])
def send_message(self, env, service):
title = f"{TITLE}"
message = f"[{env}] '{service}' has issues."
if not ALLOW_NOTIFICATIONS:
return
hasBeenTriggered = self.reporter.read_notification_has_been_triggered(env, service)
if hasBeenTriggered:
return
self.reporter.update_notification_status(env, service, True)
if ALLOW_NOTIFICATIONS:
t = Thread(target=self.send_notification, args=[title, message])
t.start()
class MenuGenerator:
# green: \x1b[42m
# red: \x1b[41m
# yellow: \x1b[43m
HEALTHY='\x1b[42m'
UNHEALTHY='\x1b[41m'
ALERTED='\x1b[43m'
NC='\x1b[0m'
def __init__(self, reporter) -> None:
self.reporter = reporter
def create_dropdown_report(self, values):
summary = ""
menu = "\n---"
for env in values.keys():
healthyEnv = True
alertedEnv = False
envmenu = ""
servicemenu = ""
for servicename in values[env].keys():
healthy = True
service = values[env][servicename]
alerted = len(self.reporter.read_issues_by_service(env,servicename)) > 0
if not service["healthy"]:
healthy = False
servicemenu += f'\n--{self.get_icon(healthy, alerted)} {servicename}'
servicemenu += self.create_issues_report(env, servicename)
if not alertedEnv:
alertedEnv = alerted
if healthyEnv:
healthyEnv = healthy
envmenu = f'\n{self.get_icon(healthyEnv, alertedEnv)} {env}{servicemenu}'
menu+= envmenu
summary += self.get_env_colored(env, healthyEnv, alertedEnv)
return f'{summary}{menu}'
def create_issues_report(self, env, service):
issues = self.reporter.read_issues_by_service(env, service)
menu = ""
for issue in issues:
date = issue[4][:issue[4].rfind(".")]
message = issue[3].replace("\"", "\\\"")
menu += f'\n----[{date}] ({issue[2]}) : {message} | length=100 shell="/bin/bash" param1="-c" param2=\'echo "{message}" | open -a TextEdit -f\''
return menu
def get_icon(self, healthy, alerted):
if not healthy:
return "🔴"
elif healthy and alerted:
return "⚠️ "
else:
return "🟢"
def get_env_colored(self, env, healthy, alerted):
color = MenuGenerator.HEALTHY
if not healthy:
color = MenuGenerator.UNHEALTHY
elif healthy and alerted:
color = MenuGenerator.ALERTED
return f'{color} {env} {MenuGenerator.NC}'
def print(self, servicesStatus):
report = self.create_dropdown_report(servicesStatus)
print(f'{TITLE}: {report}')
if __name__ == "__main__":
reporter = Reporter()
servicesStatus = ServiceMonitor(reporter).process_environments(SERVICES)
MenuGenerator(reporter).print(servicesStatus)