Initial Commit

This commit is contained in:
Exil Productions
2025-12-04 14:47:12 +01:00
commit 640301e4e3
13 changed files with 968 additions and 0 deletions

3
src/statsman/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
__version__ = "0.1.2"
__author__ = "ExilProductions"
__email__ = "exil.productions.business@gmail.com"

9
src/statsman/__main__.py Normal file
View File

@@ -0,0 +1,9 @@
try:
from .cli import main
except ImportError:
print("Error: Could not import CLI module")
import sys
sys.exit(1)
if __name__ == "__main__":
main()

102
src/statsman/app.py Normal file
View File

@@ -0,0 +1,102 @@
import sys
import time
import signal
import threading
import click
from rich.console import Console
from rich.live import Live
from .system_monitor import SystemMonitor
from .ui.dashboard import Dashboard
class StatsManApp:
def __init__(self, refresh_rate: float = 1.0, no_color: bool = False):
self.refresh_rate = refresh_rate
self.no_color = no_color
self.console = Console(color_system=None if no_color else "auto")
self.dashboard = Dashboard(self.console, no_color)
self.live = None
self.running = False
self.paused = False
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
self.running = False
if self.live:
self.live.stop()
def _handle_keyboard_input(self):
try:
import select
import termios
import tty
old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
while self.running:
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
char = sys.stdin.read(1)
if char.lower() == 'q':
self.running = False
break
elif char.lower() == 'p':
self.paused = not self.paused
elif char.lower() == 'c':
self.dashboard.set_process_sort('cpu')
elif char.lower() == 'm':
self.dashboard.set_process_sort('memory')
elif char.lower() == 'r':
self.dashboard.set_process_sort('cpu')
time.sleep(0.1)
except (ImportError, OSError):
pass
finally:
try:
if 'old_settings' in locals():
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
except:
pass
def run(self):
self.running = True
keyboard_thread = threading.Thread(target=self._handle_keyboard_input, daemon=True)
keyboard_thread.start()
try:
with Live(
self.dashboard.render(),
console=self.console,
refresh_per_second=1.0 / self.refresh_rate,
screen=True,
auto_refresh=True,
) as self.live:
self.live = self.live
while self.running:
if not self.paused:
layout = self.dashboard.render()
self.live.update(layout)
time.sleep(self.refresh_rate)
except KeyboardInterrupt:
pass
finally:
self.running = False
if self.live:
self.live.stop()
self.console.clear()
self.console.print("StatsMan - Goodbye!", justify="center")

26
src/statsman/cli.py Normal file
View File

@@ -0,0 +1,26 @@
import click
from .app import StatsManApp
@click.command()
@click.option(
"--refresh-rate",
"-r",
default=1.0,
type=float,
help="Refresh rate in seconds (default: 1.0)",
)
@click.option(
"--no-color",
is_flag=True,
default=False,
help="Disable colored output",
)
@click.version_option(version="0.1.0", prog_name="statsman")
def main(refresh_rate: float, no_color: bool) -> None:
app = StatsManApp(refresh_rate=refresh_rate, no_color=no_color)
app.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,220 @@
import psutil
import time
from dataclasses import dataclass
from typing import List, Dict, Any
from collections import deque
@dataclass
class CPUInfo:
percent: float
percent_per_core: List[float]
frequency: float
load_avg: List[float]
count: int
count_logical: int
@dataclass
class MemoryInfo:
total: int
available: int
used: int
percent: float
swap_total: int
swap_used: int
swap_percent: float
@dataclass
class DiskInfo:
total: int
used: int
free: int
percent: float
read_bytes: int
write_bytes: int
read_count: int
write_count: int
@dataclass
class NetworkInfo:
bytes_sent: int
bytes_recv: int
packets_sent: int
packets_recv: int
interfaces: Dict[str, Dict[str, int]]
@dataclass
class ProcessInfo:
"""Process information."""
pid: int
name: str
cpu_percent: float
memory_percent: float
memory_rss: int
status: str
cmdline: List[str]
class SystemMonitor:
def __init__(self, history_size: int = 60):
self.history_size = history_size
self.cpu_history = deque(maxlen=history_size)
self.memory_history = deque(maxlen=history_size)
self.network_history = deque(maxlen=history_size)
self._last_net_io = psutil.net_io_counters()
self._last_disk_io = psutil.disk_io_counters()
self._last_time = time.time()
def get_cpu_info(self) -> CPUInfo:
cpu_percent = psutil.cpu_percent(interval=0.1)
cpu_percent_per_core = psutil.cpu_percent(interval=0.1, percpu=True)
try:
freq = psutil.cpu_freq().current if psutil.cpu_freq() else 0.0
except (AttributeError, OSError):
freq = 0.0
#linux only
try:
load_avg = list(psutil.getloadavg())
except (AttributeError, OSError):
load_avg = [0.0, 0.0, 0.0]
return CPUInfo(
percent=cpu_percent,
percent_per_core=cpu_percent_per_core,
frequency=freq,
load_avg=load_avg,
count=psutil.cpu_count(logical=False) or 0,
count_logical=psutil.cpu_count(logical=True) or 0,
)
def get_memory_info(self) -> MemoryInfo:
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return MemoryInfo(
total=memory.total,
available=memory.available,
used=memory.used,
percent=memory.percent,
swap_total=swap.total,
swap_used=swap.used,
swap_percent=swap.percent,
)
def get_disk_info(self) -> DiskInfo:
disk_usage = psutil.disk_usage('/')
current_disk_io = psutil.disk_io_counters()
if current_disk_io and self._last_disk_io:
time_delta = time.time() - self._last_time
read_bytes = max(0, current_disk_io.read_bytes - self._last_disk_io.read_bytes)
write_bytes = max(0, current_disk_io.write_bytes - self._last_disk_io.write_bytes)
read_count = max(0, current_disk_io.read_count - self._last_disk_io.read_count)
write_count = max(0, current_disk_io.write_count - self._last_disk_io.write_count)
else:
read_bytes = write_bytes = read_count = write_count = 0
self._last_disk_io = current_disk_io
return DiskInfo(
total=disk_usage.total,
used=disk_usage.used,
free=disk_usage.free,
percent=disk_usage.percent,
read_bytes=read_bytes,
write_bytes=write_bytes,
read_count=read_count,
write_count=write_count,
)
def get_network_info(self) -> NetworkInfo:
current_net_io = psutil.net_io_counters()
if current_net_io and self._last_net_io:
bytes_sent = max(0, current_net_io.bytes_sent - self._last_net_io.bytes_sent)
bytes_recv = max(0, current_net_io.bytes_recv - self._last_net_io.bytes_recv)
packets_sent = max(0, current_net_io.packets_sent - self._last_net_io.packets_sent)
packets_recv = max(0, current_net_io.packets_recv - self._last_net_io.packets_recv)
else:
bytes_sent = bytes_recv = packets_sent = packets_recv = 0
self._last_net_io = current_net_io
interfaces = {}
try:
net_if_addrs = psutil.net_if_addrs()
net_io_stats = psutil.net_io_counters(pernic=True)
for interface_name in net_if_addrs:
if interface_name in net_io_stats:
interfaces[interface_name] = {
'bytes_sent': net_io_stats[interface_name].bytes_sent,
'bytes_recv': net_io_stats[interface_name].bytes_recv,
'packets_sent': net_io_stats[interface_name].packets_sent,
'packets_recv': net_io_stats[interface_name].packets_recv,
}
except (OSError, AttributeError):
pass
return NetworkInfo(
bytes_sent=bytes_sent,
bytes_recv=bytes_recv,
packets_sent=packets_sent,
packets_recv=packets_recv,
interfaces=interfaces,
)
def get_process_info(self, limit: int = 10) -> List[ProcessInfo]:
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent',
'memory_info', 'status', 'cmdline']):
try:
pinfo = proc.info
if pinfo['cpu_percent'] is None:
pinfo['cpu_percent'] = 0.0
if pinfo['memory_percent'] is None:
pinfo['memory_percent'] = 0.0
processes.append(ProcessInfo(
pid=pinfo['pid'],
name=pinfo['name'] or 'Unknown',
cpu_percent=pinfo['cpu_percent'],
memory_percent=pinfo['memory_percent'],
memory_rss=pinfo['memory_info'].rss if pinfo['memory_info'] else 0,
status=pinfo['status'] or 'Unknown',
cmdline=pinfo['cmdline'] or [],
))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
processes.sort(key=lambda p: p.cpu_percent, reverse=True)
return processes[:limit]
def update_history(self) -> None:
cpu_info = self.get_cpu_info()
memory_info = self.get_memory_info()
network_info = self.get_network_info()
self.cpu_history.append(cpu_info.percent)
self.memory_history.append(memory_info.percent)
self.network_history.append({
'bytes_sent': network_info.bytes_sent,
'bytes_recv': network_info.bytes_recv,
})
self._last_time = time.time()
def get_cpu_history(self) -> List[float]:
return list(self.cpu_history)
def get_memory_history(self) -> List[float]:
return list(self.memory_history)
def get_network_history(self) -> List[Dict[str, int]]:
return list(self.network_history)

View File

157
src/statsman/ui/charts.py Normal file
View File

@@ -0,0 +1,157 @@
from rich.console import Console, Group
from rich.progress import Progress, BarColumn, TextColumn
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.align import Align
from typing import List, Dict, Any
import math
class ChartRenderer:
def __init__(self, console: Console):
self.console = console
def create_sparkline(self, data: List[float], width: int = 80, height: int = 8) -> str:
if not data:
return " " * width
min_val = min(data)
max_val = max(data)
range_val = max_val - min_val if max_val != min_val else 1
spark_chars = [' ', '', '', '', '', '', '', '', '']
recent_data = data[-width:] if len(data) > width else data
sparkline = ""
for value in recent_data:
normalized = (value - min_val) / range_val
char_index = min(int(normalized * len(spark_chars)), len(spark_chars) - 1)
sparkline += spark_chars[max(0, char_index)]
return sparkline
def create_vertical_bars(self, data: Dict[str, float], height: int = 12, width: int = 60) -> Panel:
if not data:
return Panel("No data", border_style="red")
max_val = max(data.values()) if data.values() else 1
labels = list(data.keys())
values = list(data.values())
bar_widths = [int((v / max_val) * width) if max_val > 0 else 0 for v in values]
lines = []
for level in range(height, 0, -1):
line = ""
for bar_width in bar_widths:
threshold = (level / height) * width
if bar_width >= threshold:
line += ""
else:
line += " "
line += " "
lines.append(line)
label_line = ""
for label in labels:
label_line += f"{label[:8]:8} "
chart_text = "\n".join(lines + [label_line])
return Panel(chart_text, border_style="blue")
def create_horizontal_bars(self, data: Dict[str, float], max_width: int = 70) -> Panel:
if not data:
return Panel("No data", border_style="red")
max_val = max(data.values()) if data.values() else 1
lines = []
for label, value in data.items():
bar_width = int((value / max_val) * max_width) if max_val > 0 else 0
bar = "" * bar_width + "" * (max_width - bar_width)
lines.append(f"{label:6} {bar} {value:5.1f}%")
return Panel("\n".join(lines), border_style="green")
def create_mini_process_table(self, processes: List[Any], limit: int = 12) -> Panel:
if not processes:
return Panel("No processes", border_style="red")
sorted_processes = sorted(processes, key=lambda p: p.cpu_percent, reverse=True)
lines = ["PID Name CPU MEM ", "=" * 55]
for proc in sorted_processes[:limit]:
cpu_bar = self._create_mini_bar(proc.cpu_percent, 10)
mem_bar = self._create_mini_bar(proc.memory_percent, 10)
name = proc.name[:14] + ".." if len(proc.name) > 16 else proc.name.ljust(16)
lines.append(f"{proc.pid:<7} {name} {cpu_bar} {mem_bar}")
return Panel("\n".join(lines), title="Top Processes", border_style="magenta")
def _create_mini_bar(self, percentage: float, width: int = 10) -> str:
filled = int((percentage / 100) * width)
bar = "" * filled + "" * (width - filled)
return bar
def create_system_gauges(self, cpu_info: Any, memory_info: Any, disk_info: Any) -> Panel:
gauges = []
cpu_gauge = self._create_gauge(cpu_info.percent, "CPU")
gauges.append(cpu_gauge)
mem_gauge = self._create_gauge(memory_info.percent, "MEM")
gauges.append(mem_gauge)
disk_gauge = self._create_gauge(disk_info.percent, "DSK")
gauges.append(disk_gauge)
return Panel(Group(*gauges), border_style="cyan")
def _create_gauge(self, percentage: float, label: str, width: int = 30) -> Text:
filled = int((percentage / 100) * width)
bar = "" * filled + "" * (width - filled)
return Text.from_markup(f"[bold]{label}:[/bold] {bar} {percentage:5.1f}%")
def create_network_visualization(self, network_info: Any) -> Panel:
sent_mb = network_info.bytes_sent / (1024 * 1024)
recv_mb = network_info.bytes_recv / (1024 * 1024)
network_data = {
"UPLOAD": min(sent_mb * 10, 100),
"DOWNLOAD": min(recv_mb * 10, 100),
}
return self.create_horizontal_bars(network_data)
def create_cpu_core_visualization(self, cpu_info: Any) -> Panel:
if not cpu_info.percent_per_core:
return Panel("No core data", border_style="red")
core_data = {}
for i, core_percent in enumerate(cpu_info.percent_per_core):
core_data[f"C{i:02d}"] = core_percent
return self.create_vertical_bars(core_data, height=8, width=40)
def create_memory_breakdown(self, memory_info: Any) -> Panel:
used_gb = memory_info.used / (1024**3)
total_gb = memory_info.total / (1024**3)
memory_data = {
"USED": (used_gb / total_gb) * 100,
"FREE": ((total_gb - used_gb) / total_gb) * 100,
}
return self.create_horizontal_bars(memory_data)
def format_bytes(self, bytes_value: int) -> str:
bytes_float = float(bytes_value)
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_float < 1024.0:
return f"{bytes_float:.1f} {unit}"
bytes_float /= 1024.0
return f"{bytes_float:.1f} PB"

View File

@@ -0,0 +1,133 @@
from rich.console import Console, Group
from rich.layout import Layout
from rich.panel import Panel
from rich.live import Live
from rich.text import Text
from rich.align import Align
from typing import Optional
import time
from ..system_monitor import SystemMonitor
from .charts import ChartRenderer
class Dashboard:
def __init__(self, console: Optional[Console] = None, no_color: bool = False):
self.console = console or Console(color_system=None if no_color else "auto")
self.monitor = SystemMonitor()
self.charts = ChartRenderer(self.console)
self.layout = Layout()
self.sort_processes_by = "cpu"
self._setup_visual_layout()
def _setup_visual_layout(self) -> None:
self.layout.split(
Layout(name="header", size=3),
Layout(name="main"),
Layout(name="footer", size=3),
)
self.layout["main"].split_column(
Layout(name="top", size=16),
Layout(name="middle", size=14),
Layout(name="bottom", ratio=1),
)
self.layout["top"].split_row(
Layout(name="gauges", ratio=1),
Layout(name="cores", ratio=1),
)
self.layout["middle"].split_row(
Layout(name="memory", ratio=1),
Layout(name="network", ratio=1),
)
self.layout["bottom"].split_row(
Layout(name="processes"),
)
def _create_header(self) -> Panel:
header_text = Text.from_markup(
"[bold blue]StatsMan[/bold blue] - System Monitor",
justify="center"
)
return Panel(
Align.center(header_text),
border_style="blue"
)
def _create_footer(self) -> Panel:
controls = Text.from_markup(
"[cyan]q:quit p:pause c:cpu m:mem r:reset[/cyan]",
justify="center"
)
return Panel(
Align.center(controls),
border_style="cyan"
)
def _create_system_gauges(self) -> Panel:
cpu_info = self.monitor.get_cpu_info()
memory_info = self.monitor.get_memory_info()
disk_info = self.monitor.get_disk_info()
return self.charts.create_system_gauges(cpu_info, memory_info, disk_info)
def _create_cpu_cores(self) -> Panel:
cpu_info = self.monitor.get_cpu_info()
cpu_history = self.monitor.get_cpu_history()
sparkline = self.charts.create_sparkline(cpu_history, width=60, height=8)
sparkline_text = Text.from_markup(f"[cyan]CPU History:[/cyan] {sparkline}")
cores_panel = self.charts.create_cpu_core_visualization(cpu_info)
return Panel(
Group(sparkline_text, cores_panel),
title=f"CPU: {cpu_info.percent:.1f}%",
border_style="red"
)
def _create_memory_visual(self) -> Panel:
memory_info = self.monitor.get_memory_info()
memory_history = self.monitor.get_memory_history()
sparkline = self.charts.create_sparkline(memory_history, width=50, height=6)
sparkline_text = Text.from_markup(f"[green]Memory History:[/green] {sparkline}")
breakdown_panel = self.charts.create_memory_breakdown(memory_info)
return Panel(
Group(sparkline_text, breakdown_panel),
title=f"Memory: {memory_info.percent:.1f}%",
border_style="green"
)
def _create_network_visual(self) -> Panel:
network_info = self.monitor.get_network_info()
return self.charts.create_network_visualization(network_info)
def _create_processes_visual(self) -> Panel:
processes = self.monitor.get_process_info(limit=20)
return self.charts.create_mini_process_table(processes, limit=16)
def update_layout(self) -> None:
self.layout["header"].update(self._create_header())
self.layout["footer"].update(self._create_footer())
self.layout["top"]["gauges"].update(self._create_system_gauges())
self.layout["top"]["cores"].update(self._create_cpu_cores())
self.layout["middle"]["memory"].update(self._create_memory_visual())
self.layout["middle"]["network"].update(self._create_network_visual())
self.layout["bottom"]["processes"].update(self._create_processes_visual())
def render(self) -> Layout:
self.monitor.update_history()
self.update_layout()
return self.layout
def set_process_sort(self, sort_by: str) -> None:
if sort_by in ['cpu', 'memory']:
self.sort_processes_by = sort_by