#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TuneVault Oracle HTTP Proxy ============================ Runs on your Oracle server. Exposes an HTTP endpoint that TuneVault calls through your HTTPS proxy to collect Oracle health metrics. Architecture: TuneVault (Render) -> HTTPS -> Outbound Proxy -> localhost:3100 -> Oracle :1521 Quick Start: export TUNEVAULT_API_KEY="your-secret-key-here" python3 oracle-proxy.py With systemd (recommended for production): systemctl start tunevault-proxy systemctl enable tunevault-proxy Requirements: - Python 3.6+ - cx_Oracle (pip3 install cx_Oracle) - ORACLE_HOME and LD_LIBRARY_PATH set to include Oracle client libs - TUNEVAULT_API_KEY environment variable The proxy listens on 127.0.0.1:3100 only. Your HTTPS proxy should point to http://localhost:3100 Update your proxy config to route: hostname: oracledb.yourdomain.com service: http://localhost:3100 (was: tcp://localhost:1521) """ from __future__ import print_function import hashlib import json import os import shutil import sys import tempfile import threading import time import traceback from datetime import datetime, timedelta # Python 3.6 compatible HTTP server try: from http.server import HTTPServer, BaseHTTPRequestHandler except ImportError: print("FATAL: Python 3 is required. http.server module not found.") sys.exit(1) try: from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError except ImportError: print("FATAL: Python 3 is required. urllib.request module not found.") sys.exit(1) try: import cx_Oracle except ImportError: print("FATAL: cx_Oracle is not installed.") print("Install it with: pip3 install cx_Oracle") sys.exit(1) # ============================================================ # Config # ============================================================ PORT = int(os.environ.get("PORT", "3100")) HOST = os.environ.get("BIND_HOST", "127.0.0.1") API_KEY = os.environ.get("TUNEVAULT_API_KEY", "") if not API_KEY: print("FATAL: TUNEVAULT_API_KEY environment variable is required.") print('Set it with: export TUNEVAULT_API_KEY="your-secret-key-here"') sys.exit(1) VERSION = "3.2.0" # TuneVault server URL for version checks and downloads TUNEVAULT_URL = os.environ.get("TUNEVAULT_URL", "https://tunevault.app").rstrip("/") # ============================================================ # Auto-Update # ============================================================ def _ts(): """Return ISO timestamp prefix for log lines.""" return datetime.now().strftime("[%Y-%m-%dT%H:%M:%S]") def _compute_sha256(path): """Compute sha256 hex digest of a file.""" h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() def check_for_update(): """ Contact TuneVault to check for a newer proxy version. Returns (needs_update, remote_version, download_url, expected_checksum) or raises. """ version_url = TUNEVAULT_URL + "/api/proxy/version" req = Request(version_url, headers={"User-Agent": "TuneVault-Proxy/%s" % VERSION}) try: resp = urlopen(req, timeout=30) data = json.loads(resp.read().decode("utf-8")) except HTTPError as e: raise RuntimeError("Version check HTTP error: %s" % str(e)) except URLError as e: raise RuntimeError("Version check network error: %s" % str(e)) remote_version = data.get("version", "") expected_checksum = data.get("checksum", "") download_path = data.get("download_url", "/downloads/oracle-proxy.py") download_url = TUNEVAULT_URL + download_path if not remote_version: raise RuntimeError("Version check returned empty version") # Simple tuple comparison for semver (works for x.y.z) def parse(v): try: return tuple(int(x) for x in v.split(".")) except Exception: return (0,) needs_update = parse(remote_version) > parse(VERSION) return needs_update, remote_version, download_url, expected_checksum def perform_update(remote_version, download_url, expected_checksum): """ Download the new proxy, validate checksum, replace self, restart via os.execv. Never raises — logs and returns False on any failure. """ self_path = os.path.abspath(__file__) print("%s [auto-update] New version %s available (running %s) — downloading..." % ( _ts(), remote_version, VERSION)) # Download to a temp file in the same directory so rename is atomic tmp_dir = os.path.dirname(self_path) try: tmp_fd, tmp_path = tempfile.mkstemp(dir=tmp_dir, suffix=".py.tmp") os.close(tmp_fd) except Exception as e: print("%s [auto-update] WARN: Could not create temp file: %s — keeping current version." % (_ts(), e)) return False try: req = Request(download_url, headers={"User-Agent": "TuneVault-Proxy/%s" % VERSION}) try: resp = urlopen(req, timeout=120) except (HTTPError, URLError) as e: print("%s [auto-update] WARN: Download failed: %s — keeping current version." % (_ts(), e)) return False with open(tmp_path, "wb") as f: shutil.copyfileobj(resp, f) # Validate checksum actual_sha256 = _compute_sha256(tmp_path) if expected_checksum: expected_hex = expected_checksum.replace("sha256:", "") if actual_sha256 != expected_hex: print("%s [auto-update] WARN: Checksum mismatch (got %s, expected %s) — keeping current version." % ( _ts(), actual_sha256, expected_hex)) return False # Preserve permissions (executable bit etc.) try: st = os.stat(self_path) os.chmod(tmp_path, st.st_mode) except Exception: pass # Atomic replace shutil.move(tmp_path, self_path) print("%s [auto-update] Updated to v%s — restarting..." % (_ts(), remote_version)) # Restart in-place: replace current process image # os.execv preserves PID; systemd/PM2 won't notice a restart try: os.execv(sys.executable, [sys.executable] + sys.argv) except Exception as e: print("%s [auto-update] WARN: os.execv failed (%s) — restart manually." % (_ts(), e)) return False except Exception as e: print("%s [auto-update] WARN: Update failed: %s — keeping current version." % (_ts(), e)) return False finally: # Clean up temp if still exists (shouldn't be after shutil.move or execv) if os.path.exists(tmp_path): try: os.unlink(tmp_path) except Exception: pass return True def auto_update_loop(interval_seconds=21600): """ Background thread: check for updates at startup, then every `interval_seconds` (default 6h). Runs as daemon so it doesn't block clean shutdown. """ # Initial check after 30 seconds (let server fully start first) time.sleep(30) while True: try: needs_update, remote_version, download_url, expected_checksum = check_for_update() if needs_update: perform_update(remote_version, download_url, expected_checksum) # If we get here, update failed (execv would have replaced us) else: print("%s [auto-update] v%s is current." % (_ts(), VERSION)) except Exception as e: print("%s [auto-update] WARN: Version check failed: %s" % (_ts(), e)) time.sleep(interval_seconds) # ============================================================ # Oracle Error Formatting # ============================================================ def format_oracle_error(err): msg = str(err) if "ORA-12154" in msg: return "TNS name could not be resolved. Check service name." if "ORA-12541" in msg: return "No listener at the specified host and port. Check host and port." if "ORA-12514" in msg: return "Service name not found. Check the service name or SID." if "ORA-12170" in msg: return "Connection timed out. Host may be unreachable." if "ORA-01017" in msg: return "Invalid username or password." if "ORA-28000" in msg: return "Account is locked." if "ORA-28001" in msg: return "Password has expired." if "ORA-01031" in msg: return "Insufficient privileges. User needs SELECT_CATALOG_ROLE or explicit grants." if "ORA-00942" in msg: return "Table or view does not exist. User may need additional grants." return msg # ============================================================ # Oracle Query Functions # ============================================================ def safe_int(val, default=0): """Safely convert to int.""" if val is None: return default try: return int(val) except (ValueError, TypeError): return default def safe_float(val, default=0.0): """Safely convert to float.""" if val is None: return default try: return float(val) except (ValueError, TypeError): return default def query_instance_info(cursor): try: cursor.execute(""" SELECT d.NAME, i.INSTANCE_NAME, i.HOST_NAME, i.VERSION, d.PLATFORM_NAME, TO_CHAR(i.STARTUP_TIME, 'YYYY-MM-DD HH24:MI:SS'), ROUND(SYSDATE - i.STARTUP_TIME), (SELECT VALUE FROM v$parameter WHERE name = 'cpu_count'), ROUND((SELECT TO_NUMBER(VALUE)/1024/1024/1024 FROM v$parameter WHERE name = 'sga_target'), 1), ROUND((SELECT TO_NUMBER(VALUE)/1024/1024/1024 FROM v$parameter WHERE name = 'pga_aggregate_target'), 1), (SELECT TO_NUMBER(VALUE) FROM v$parameter WHERE name = 'db_block_size') FROM v$database d, v$instance i """) row = cursor.fetchone() if not row: raise Exception("No instance data") return { "db_name": row[0] or "UNKNOWN", "instance_name": row[1] or "unknown", "host_name": row[2] or "unknown", "version": row[3] or "Unknown", "platform": row[4] or "Unknown", "startup_time": row[5] or "", "uptime_days": safe_int(row[6]), "rac": False, "cpus": safe_int(row[7], 1), "sga_target_gb": safe_float(row[8]), "pga_aggregate_target_gb": safe_float(row[9]), "db_block_size": safe_int(row[10], 8192), } except Exception as e: print("Instance query failed: %s" % str(e)) # Fallback to simpler queries try: cursor.execute("SELECT name FROM v$database") r1 = cursor.fetchone() cursor.execute("SELECT instance_name, host_name, version FROM v$instance") r2 = cursor.fetchone() return { "db_name": (r1[0] if r1 else "UNKNOWN"), "instance_name": (r2[0] if r2 else "unknown"), "host_name": (r2[1] if r2 else "unknown"), "version": (r2[2] if r2 else "Unknown"), "platform": "Unknown", "startup_time": "", "uptime_days": 0, "rac": False, "cpus": 1, "sga_target_gb": 0, "pga_aggregate_target_gb": 0, "db_block_size": 8192, } except Exception: return { "db_name": "UNKNOWN", "instance_name": "unknown", "host_name": "unknown", "version": "Unknown", "platform": "Unknown", "startup_time": "", "uptime_days": 0, "rac": False, "cpus": 1, "sga_target_gb": 0, "pga_aggregate_target_gb": 0, "db_block_size": 8192, } def query_tablespaces(cursor): try: cursor.execute(""" SELECT ts.TABLESPACE_NAME, ROUND(um.USED_SPACE * ts_block.BLOCK_SIZE / 1024 / 1024 / 1024, 1), ROUND(um.TABLESPACE_SIZE * ts_block.BLOCK_SIZE / 1024 / 1024 / 1024, 1), ROUND(um.USED_PERCENT, 1), CASE WHEN df.autoext > 0 THEN 1 ELSE 0 END FROM DBA_TABLESPACE_USAGE_METRICS um JOIN DBA_TABLESPACES ts ON ts.TABLESPACE_NAME = um.TABLESPACE_NAME LEFT JOIN (SELECT TABLESPACE_NAME, BLOCK_SIZE FROM DBA_TABLESPACES) ts_block ON ts_block.TABLESPACE_NAME = um.TABLESPACE_NAME LEFT JOIN ( SELECT TABLESPACE_NAME, SUM(CASE WHEN AUTOEXTENSIBLE = 'YES' THEN 1 ELSE 0 END) as autoext FROM DBA_DATA_FILES GROUP BY TABLESPACE_NAME ) df ON df.TABLESPACE_NAME = um.TABLESPACE_NAME ORDER BY um.USED_PERCENT DESC """) rows = cursor.fetchall() results = [] for row in rows: pct = safe_float(row[3]) if pct > 90: status = "critical" elif pct > 80: status = "warning" else: status = "ok" results.append({ "name": row[0], "used_gb": safe_float(row[1]), "total_gb": safe_float(row[2]), "pct_used": pct, "autoextend": safe_int(row[4]) > 0, "status": status, }) return results except Exception as e: print("Tablespace primary query failed: %s" % str(e)) # Fallback query try: cursor.execute(""" SELECT df.TABLESPACE_NAME, ROUND(SUM(df.BYTES) / 1024 / 1024 / 1024, 1), ROUND((SUM(df.BYTES) - NVL(fs.free_bytes, 0)) / 1024 / 1024 / 1024, 1), ROUND((1 - NVL(fs.free_bytes, 0) / SUM(df.BYTES)) * 100, 1), MAX(CASE WHEN df.AUTOEXTENSIBLE = 'YES' THEN 1 ELSE 0 END) FROM DBA_DATA_FILES df LEFT JOIN ( SELECT TABLESPACE_NAME, SUM(BYTES) as free_bytes FROM DBA_FREE_SPACE GROUP BY TABLESPACE_NAME ) fs ON fs.TABLESPACE_NAME = df.TABLESPACE_NAME GROUP BY df.TABLESPACE_NAME, fs.free_bytes ORDER BY 4 DESC """) rows = cursor.fetchall() results = [] for row in rows: pct = safe_float(row[3]) if pct > 90: status = "critical" elif pct > 80: status = "warning" else: status = "ok" results.append({ "name": row[0], "used_gb": safe_float(row[2]), "total_gb": safe_float(row[1]), "pct_used": pct, "autoextend": safe_int(row[4]) > 0, "status": status, }) return results except Exception: return [] def query_wait_events(cursor): try: cursor.execute(""" SELECT EVENT, WAIT_CLASS, TOTAL_WAITS, ROUND(TIME_WAITED / 100, 1), CASE WHEN TOTAL_WAITS > 0 THEN ROUND((TIME_WAITED / 100 / TOTAL_WAITS) * 1000, 2) ELSE 0 END FROM V$SYSTEM_EVENT WHERE WAIT_CLASS NOT IN ('Idle') AND TOTAL_WAITS > 0 ORDER BY TIME_WAITED DESC FETCH FIRST 15 ROWS ONLY """) rows = cursor.fetchall() total_time = sum(safe_float(r[3]) for r in rows) results = [] for row in rows: time_waited = safe_float(row[3]) pct = round(time_waited / total_time * 100, 1) if total_time > 0 else 0 results.append({ "event": row[0], "wait_class": row[1], "total_waits": safe_int(row[2]), "time_waited_s": time_waited, "avg_wait_ms": safe_float(row[4]), "pct_db_time": pct, }) return results except Exception as e: print("Wait events query failed: %s" % str(e)) return [] def query_top_sql(cursor): try: cursor.execute(""" SELECT SQL_ID, SUBSTR(SQL_TEXT, 1, 500), EXECUTIONS, ROUND(ELAPSED_TIME / 1000000, 1), ROUND(CPU_TIME / 1000000, 1), BUFFER_GETS, DISK_READS, ROWS_PROCESSED, CASE WHEN EXECUTIONS > 0 THEN ROUND(ELAPSED_TIME / EXECUTIONS / 1000, 2) ELSE 0 END, CASE WHEN EXECUTIONS > 0 THEN ROUND(BUFFER_GETS / EXECUTIONS) ELSE 0 END, PLAN_HASH_VALUE FROM V$SQL WHERE EXECUTIONS > 0 AND ELAPSED_TIME > 0 AND PARSING_SCHEMA_NAME NOT IN ( 'SYS','SYSTEM','DBSNMP','SYSMAN','OUTLN','MDSYS', 'ORDSYS','EXFSYS','WMSYS','APPQOSSYS','DBSFWUSER' ) AND SQL_TEXT NOT LIKE '%v$%' AND SQL_TEXT NOT LIKE '%V$%' AND COMMAND_TYPE IN (2, 3, 6, 7, 189) ORDER BY ELAPSED_TIME DESC FETCH FIRST 10 ROWS ONLY """) rows = cursor.fetchall() results = [] for row in rows: elapsed_per_exec = safe_float(row[8]) buffer_gets_per_exec = safe_int(row[9]) disk_reads = safe_int(row[6]) buffer_gets = safe_int(row[5]) issue = "Normal operation" if elapsed_per_exec > 20: issue = "Very slow execution - check execution plan" elif elapsed_per_exec > 5: issue = "Slow execution - review query and indexes" if buffer_gets_per_exec > 1000: issue = "High buffer gets - possible full table scan or missing index" if disk_reads > buffer_gets * 0.1 and disk_reads > 10000: issue = "High disk reads relative to buffer gets - data not in cache" results.append({ "sql_id": row[0], "sql_text": row[1] or "", "executions": safe_int(row[2]), "elapsed_time_s": safe_float(row[3]), "cpu_time_s": safe_float(row[4]), "buffer_gets": buffer_gets, "disk_reads": disk_reads, "rows_processed": safe_int(row[7]), "elapsed_per_exec_ms": elapsed_per_exec, "buffer_gets_per_exec": buffer_gets_per_exec, "plan_hash": str(row[10] or "0"), "issue": issue, }) return results except Exception as e: print("Top SQL query failed: %s" % str(e)) return [] def query_index_analysis(cursor): try: cursor.execute(""" SELECT i.OWNER, i.INDEX_NAME, i.TABLE_NAME, ROUND(s.LEAF_BLOCKS * (SELECT TO_NUMBER(VALUE) FROM v$parameter WHERE name = 'db_block_size') / 1024 / 1024), i.BLEVEL, s.LEAF_BLOCKS, i.CLUSTERING_FACTOR, NVL(s.PCT_DIRECT_ACCESS, 100), i.STATUS, CASE WHEN i.STATUS != 'VALID' THEN 'unusable' WHEN i.BLEVEL > 4 THEN 'critical' WHEN NVL(s.PCT_DIRECT_ACCESS, 100) < 50 THEN 'critical' WHEN i.BLEVEL > 3 THEN 'fragmented' WHEN NVL(s.PCT_DIRECT_ACCESS, 100) < 70 THEN 'fragmented' ELSE 'ok' END FROM DBA_INDEXES i LEFT JOIN DBA_IND_STATISTICS s ON s.OWNER = i.OWNER AND s.INDEX_NAME = i.INDEX_NAME AND s.PARTITION_NAME IS NULL WHERE i.OWNER NOT IN ( 'SYS','SYSTEM','DBSNMP','SYSMAN','OUTLN','MDSYS','ORDSYS', 'EXFSYS','WMSYS','XDB','CTXSYS','APPQOSSYS','DBSFWUSER', 'APEX_040000','APEX_040200','APEX_050000','FLOWS_FILES' ) AND i.INDEX_TYPE = 'NORMAL' AND NVL(s.LEAF_BLOCKS, 0) > 100 ORDER BY CASE WHEN i.STATUS != 'VALID' THEN 1 WHEN i.BLEVEL > 4 THEN 2 WHEN i.BLEVEL > 3 THEN 3 ELSE 4 END, s.LEAF_BLOCKS DESC NULLS LAST FETCH FIRST 20 ROWS ONLY """) rows = cursor.fetchall() results = [] for row in rows: blevel = safe_int(row[4]) pct_direct = safe_int(row[7], 100) extra = (blevel - 3) * 15 if blevel > 3 else 0 est_pct_deleted = max(0, min(100, round(100 - pct_direct + extra))) results.append({ "owner": row[0], "index_name": row[1], "table_name": row[2], "size_mb": safe_int(row[3]), "blevel": blevel, "leaf_blocks": safe_int(row[5]), "clustering_factor": safe_int(row[6]), "pct_deleted": est_pct_deleted, "status": row[9] or "ok", }) return results except Exception as e: print("Index analysis query failed: %s" % str(e)) return [] def query_sga_stats(cursor): try: cursor.execute("SELECT ROUND(SUM(VALUE) / 1024 / 1024 / 1024, 1) FROM V$SGA") sga_size = safe_float((cursor.fetchone() or [0])[0]) cursor.execute(""" SELECT ROUND( (1 - (phys.VALUE / (db_gets.VALUE + con_gets.VALUE))) * 100, 1) FROM (SELECT VALUE FROM V$SYSSTAT WHERE NAME = 'physical reads') phys, (SELECT VALUE FROM V$SYSSTAT WHERE NAME = 'db block gets') db_gets, (SELECT VALUE FROM V$SYSSTAT WHERE NAME = 'consistent gets') con_gets """) buffer_hit = safe_float((cursor.fetchone() or [0])[0]) cursor.execute(""" SELECT ROUND(SUM(PINS - RELOADS) / NULLIF(SUM(PINS), 0) * 100, 1) FROM V$LIBRARYCACHE """) lib_hit = safe_float((cursor.fetchone() or [0])[0]) cursor.execute(""" SELECT ROUND(SUM(GETS - GETMISSES) / NULLIF(SUM(GETS), 0) * 100, 1) FROM V$ROWCACHE """) dict_hit = safe_float((cursor.fetchone() or [0])[0]) cursor.execute(""" SELECT ROUND(free_bytes.val / total_bytes.val * 100, 1) FROM (SELECT SUM(BYTES) as val FROM V$SGASTAT WHERE POOL = 'shared pool' AND NAME = 'free memory') free_bytes, (SELECT SUM(BYTES) as val FROM V$SGASTAT WHERE POOL = 'shared pool') total_bytes """) sp_free = safe_float((cursor.fetchone() or [0])[0]) cursor.execute(""" SELECT ROUND(hp.VALUE / NULLIF(GREATEST(uptime.VALUE, 1), 0), 1), ROUND((tp.VALUE - hp.VALUE) / NULLIF(GREATEST(uptime.VALUE, 1), 0), 1) FROM (SELECT VALUE FROM V$SYSSTAT WHERE NAME = 'parse count (hard)') hp, (SELECT VALUE FROM V$SYSSTAT WHERE NAME = 'parse count (total)') tp, (SELECT (SYSDATE - STARTUP_TIME) * 86400 as VALUE FROM V$INSTANCE) uptime """) parse_row = cursor.fetchone() or [0, 0] hard_parses = safe_float(parse_row[0]) soft_parses = safe_float(parse_row[1]) cursor.execute("SELECT NAME, ROUND(VALUE / 1024 / 1024 / 1024, 1) FROM V$SGA") comp_rows = cursor.fetchall() components = {} for r in comp_rows: name = (r[0] or "").lower() val = safe_float(r[1]) if "buffer" in name: components["buffer_cache_gb"] = val if "shared" in name: components["shared_pool_gb"] = val if "large" in name: components["large_pool_gb"] = val if "java" in name: components["java_pool_gb"] = val if "stream" in name: components["streams_pool_gb"] = val return { "sga_size_gb": sga_size, "buffer_cache_gb": components.get("buffer_cache_gb", 0), "shared_pool_gb": components.get("shared_pool_gb", 0), "large_pool_gb": components.get("large_pool_gb", 0), "java_pool_gb": components.get("java_pool_gb", 0), "streams_pool_gb": components.get("streams_pool_gb", 0), "buffer_cache_hit_ratio": buffer_hit, "library_cache_hit_ratio": lib_hit, "dictionary_cache_hit_ratio": dict_hit, "shared_pool_free_pct": sp_free, "hard_parses_per_sec": hard_parses, "soft_parses_per_sec": soft_parses, } except Exception as e: print("SGA stats query failed: %s" % str(e)) return { "sga_size_gb": 0, "buffer_cache_gb": 0, "shared_pool_gb": 0, "large_pool_gb": 0, "java_pool_gb": 0, "streams_pool_gb": 0, "buffer_cache_hit_ratio": 0, "library_cache_hit_ratio": 0, "dictionary_cache_hit_ratio": 0, "shared_pool_free_pct": 0, "hard_parses_per_sec": 0, "soft_parses_per_sec": 0, } def query_pga_stats(cursor): try: cursor.execute(""" SELECT ROUND((SELECT TO_NUMBER(VALUE)/1024/1024/1024 FROM v$parameter WHERE name = 'pga_aggregate_target'), 1), ROUND((SELECT VALUE/1024/1024/1024 FROM V$PGASTAT WHERE NAME = 'total PGA allocated'), 1), ROUND((SELECT VALUE/1024/1024/1024 FROM V$PGASTAT WHERE NAME = 'maximum PGA allocated'), 1), (SELECT VALUE FROM V$PGASTAT WHERE NAME = 'over allocation count'), ROUND((SELECT VALUE FROM V$PGASTAT WHERE NAME = 'cache hit percentage'), 1) FROM DUAL """) row = cursor.fetchone() or [0, 0, 0, 0, 0] cursor.execute(""" SELECT ROUND(optimal.cnt / NULLIF(total.cnt, 0) * 100, 1), ROUND(onepass.cnt / NULLIF(total.cnt, 0) * 100, 1), ROUND(multipass.cnt / NULLIF(total.cnt, 0) * 100, 1) FROM (SELECT SUM(OPTIMAL_EXECUTIONS + ONEPASS_EXECUTIONS + MULTIPASSES_EXECUTIONS) as cnt FROM V$SQL_WORKAREA_HISTOGRAM WHERE LOW_OPTIMAL_SIZE > 0) total, (SELECT SUM(OPTIMAL_EXECUTIONS) as cnt FROM V$SQL_WORKAREA_HISTOGRAM WHERE LOW_OPTIMAL_SIZE > 0) optimal, (SELECT SUM(ONEPASS_EXECUTIONS) as cnt FROM V$SQL_WORKAREA_HISTOGRAM WHERE LOW_OPTIMAL_SIZE > 0) onepass, (SELECT SUM(MULTIPASSES_EXECUTIONS) as cnt FROM V$SQL_WORKAREA_HISTOGRAM WHERE LOW_OPTIMAL_SIZE > 0) multipass """) wa_row = cursor.fetchone() or [0, 0, 0] return { "pga_target_gb": safe_float(row[0]), "pga_allocated_gb": safe_float(row[1]), "pga_max_allocated_gb": safe_float(row[2]), "over_allocation_count": safe_int(row[3]), "cache_hit_pct": safe_float(row[4]), "optimal_executions_pct": safe_float(wa_row[0]), "onepass_executions_pct": safe_float(wa_row[1]), "multipass_executions_pct": safe_float(wa_row[2]), } except Exception as e: print("PGA stats query failed: %s" % str(e)) return { "pga_target_gb": 0, "pga_allocated_gb": 0, "pga_max_allocated_gb": 0, "over_allocation_count": 0, "cache_hit_pct": 0, "optimal_executions_pct": 0, "onepass_executions_pct": 0, "multipass_executions_pct": 0, } def query_os_stats(cursor): try: cursor.execute(""" SELECT STAT_NAME, VALUE FROM V$OSSTAT WHERE STAT_NAME IN ( 'NUM_CPUS','IDLE_TIME','BUSY_TIME','USER_TIME', 'SYS_TIME','IOWAIT_TIME','PHYSICAL_MEMORY_BYTES','FREE_MEMORY_BYTES' ) """) rows = cursor.fetchall() stats = {} for r in rows: stats[r[0]] = safe_float(r[1]) total_cpu_time = stats.get("IDLE_TIME", 0) + stats.get("BUSY_TIME", 0) cpu_pct = round(stats.get("BUSY_TIME", 0) / total_cpu_time * 100, 1) if total_cpu_time > 0 else 0 io_pct = round(stats.get("IOWAIT_TIME", 0) / total_cpu_time * 100, 1) if total_cpu_time > 0 else 0 return { "cpu_count": safe_int(stats.get("NUM_CPUS", 1), 1), "avg_cpu_utilization_pct": cpu_pct, "max_cpu_utilization_pct": min(cpu_pct * 1.3, 100), "avg_io_wait_pct": io_pct, "physical_memory_gb": round(stats.get("PHYSICAL_MEMORY_BYTES", 0) / 1024 / 1024 / 1024, 1), "free_memory_gb": round(stats.get("FREE_MEMORY_BYTES", 0) / 1024 / 1024 / 1024, 1), "swap_used_gb": 0, "avg_disk_read_ms": 0, "avg_disk_write_ms": 0, } except Exception as e: print("OS stats query failed: %s" % str(e)) return { "cpu_count": 1, "avg_cpu_utilization_pct": 0, "max_cpu_utilization_pct": 0, "avg_io_wait_pct": 0, "physical_memory_gb": 0, "free_memory_gb": 0, "swap_used_gb": 0, "avg_disk_read_ms": 0, "avg_disk_write_ms": 0, } # ============================================================ # Wave A: Undo, Temp, Alert Log, Resource Limits, SGA/PGA History # ============================================================ def query_undo_stats(cursor): """Undo tablespace stats — current usage from V$UNDOSTAT + DBA_DATA_FILES.""" try: # Current undo stats from V$UNDOSTAT (last 10-min interval) cursor.execute(""" SELECT UNDOBLKS, TXNCOUNT, MAXQUERYLEN, MAXCONCURRENCY, TUNED_UNDORETENTION, EXPIREDBLKS, UNEXPIREDBLKS, ACTIVEBLKS FROM V$UNDOSTAT WHERE ROWNUM = 1 ORDER BY END_TIME DESC """) undo_row = cursor.fetchone() or [0] * 8 # Undo tablespace size from DBA_TABLESPACES + DBA_DATA_FILES cursor.execute(""" SELECT d.TABLESPACE_NAME, SUM(d.BYTES) / 1073741824, SUM(d.BYTES - NVL(f.FREE_BYTES, 0)) / 1073741824, ROUND(SUM(d.BYTES - NVL(f.FREE_BYTES, 0)) / SUM(d.BYTES) * 100, 1), t.RETENTION FROM DBA_DATA_FILES d JOIN DBA_TABLESPACES t ON t.TABLESPACE_NAME = d.TABLESPACE_NAME LEFT JOIN ( SELECT FILE_ID, SUM(BYTES) AS FREE_BYTES FROM DBA_FREE_SPACE GROUP BY FILE_ID ) f ON f.FILE_ID = d.FILE_ID WHERE t.CONTENTS = 'UNDO' GROUP BY d.TABLESPACE_NAME, t.RETENTION """) ts_row = cursor.fetchone() or [None] * 5 current = { "undo_blocks": safe_int(undo_row[0]), "transaction_count": safe_int(undo_row[1]), "max_query_length_s": safe_int(undo_row[2]), "max_concurrency": safe_int(undo_row[3]), "tuned_undo_retention_s": safe_int(undo_row[4], 900), "expired_blocks": safe_int(undo_row[5]), "unexpired_blocks": safe_int(undo_row[6]), "active_blocks": safe_int(undo_row[7]), "tablespace_name": ts_row[0] or "UNDOTBS1", "total_gb": round(safe_float(ts_row[1]), 2), "used_gb": round(safe_float(ts_row[2]), 2), "pct_used": safe_float(ts_row[3]), "retention_mode": ts_row[4] or "NOGUARANTEE", } return { "current": current, "historical": {"peak_pct_used": None, "peak_time": None, "lookback_days": 30}, "awr_available": False, } except Exception as e: print("Undo stats query failed: %s" % str(e)) return { "current": { "tablespace_name": "UNDOTBS1", "total_gb": 0, "used_gb": 0, "pct_used": 0, "tuned_undo_retention_s": 900, "max_query_length_s": 0, "retention_mode": "NOGUARANTEE", }, "historical": {"peak_pct_used": None, "peak_time": None, "lookback_days": 30}, "awr_available": False, } def query_temp_stats(cursor): """Temp tablespace stats — current usage from DBA_TEMP_FREE_SPACE.""" try: # Current temp usage cursor.execute(""" SELECT TABLESPACE_NAME, ROUND(TABLESPACE_SIZE / 1073741824, 2), ROUND(FREE_SPACE / 1073741824, 2), ROUND((TABLESPACE_SIZE - FREE_SPACE) / NULLIF(TABLESPACE_SIZE, 0) * 100, 1) FROM DBA_TEMP_FREE_SPACE """) free_row = cursor.fetchone() or [None] * 4 total_gb = safe_float(free_row[1]) free_gb = safe_float(free_row[2]) used_gb = max(0, total_gb - free_gb) # Top temp consumers by session sessions = [] try: cursor.execute(""" SELECT s.SID, s.SERIAL#, NVL(p.USERNAME, s.USERNAME), ROUND(s.BLOCKS * t.BLOCK_SIZE / 1048576, 1), s.TABLESPACE FROM V$TEMPSEG_USAGE s JOIN DBA_TABLESPACES t ON t.TABLESPACE_NAME = s.TABLESPACE JOIN V$SESSION p ON p.SID = s.SESSION_ADDR ORDER BY s.BLOCKS DESC FETCH FIRST 10 ROWS ONLY """) sessions = [ { "sid": safe_int(r[0]), "serial": safe_int(r[1]), "username": r[2] or "UNKNOWN", "temp_mb": safe_float(r[3]), "tablespace": r[4] or "", } for r in cursor.fetchall() ] except Exception: # V$TEMPSEG_USAGE join may not work on all versions try: cursor.execute(""" SELECT SID, SERIAL#, USERNAME, ROUND(BLOCKS * 8192 / 1048576, 1), TABLESPACE FROM V$TEMPSEG_USAGE ORDER BY BLOCKS DESC FETCH FIRST 10 ROWS ONLY """) sessions = [ { "sid": safe_int(r[0]), "serial": safe_int(r[1]), "username": r[2] or "UNKNOWN", "temp_mb": safe_float(r[3]), "tablespace": r[4] or "", } for r in cursor.fetchall() ] except Exception: pass current = { "tablespace_name": free_row[0] or "TEMP", "total_gb": total_gb, "used_gb": used_gb, "free_gb": free_gb, "pct_used": safe_float(free_row[3]), "top_sessions": sessions, } return { "current": current, "historical": {"peak_gb": None, "peak_pct": None, "peak_time": None, "lookback_days": 30}, "awr_available": False, } except Exception as e: print("Temp stats query failed: %s" % str(e)) return { "current": {"tablespace_name": "TEMP", "total_gb": 0, "used_gb": 0, "free_gb": 0, "pct_used": 0, "top_sessions": []}, "historical": {"peak_gb": None, "peak_pct": None, "peak_time": None, "lookback_days": 30}, "awr_available": False, } def query_alert_log(cursor): """Alert log — last 24 hours from V$DIAG_ALERT_EXT.""" try: cursor.execute(""" SELECT TO_CHAR(ORIGINATING_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS'), MESSAGE_TEXT FROM V$DIAG_ALERT_EXT WHERE ORIGINATING_TIMESTAMP > SYSDATE - 1 AND ( MESSAGE_TEXT LIKE 'ORA-%%' OR MESSAGE_TEXT LIKE '%%checkpoint%%' OR MESSAGE_TEXT LIKE '%%corruption%%' OR MESSAGE_TEXT LIKE '%%recovery%%' OR MESSAGE_TEXT LIKE '%%error%%' OR MESSAGE_TEXT LIKE '%%warning%%' OR MESSAGE_TEXT LIKE '%%TNS-%%' OR MESSAGE_TEXT LIKE '%%instance%%' OR MESSAGE_TEXT LIKE 'Thread%%' ) ORDER BY ORIGINATING_TIMESTAMP DESC FETCH FIRST 200 ROWS ONLY """) rows = cursor.fetchall() import re entries = [] for r in rows: ts = r[0] or "" msg = (r[1] or "").strip() # Classify severity severity = "info" if re.search(r"ORA-600|ORA-7445|ORA-1578|ORA-04031|ORA-01555", msg): severity = "critical" elif re.search(r"ORA-\d{4,5}", msg): severity = "warning" elif re.search(r"checkpoint not complete|cannot allocate new log|block corruption|instance termination", msg, re.IGNORECASE): severity = "critical" elif re.search(r"checkpoint|redo log switch|archiv|TNS-1\d{4}", msg, re.IGNORECASE): severity = "warning" elif re.search(r"TNS-12560|TNS-12537|opiodr aborting|Fatal NI", msg): severity = "noise" entries.append({"ts": ts, "message": msg, "severity": severity}) summary = { "total": len(entries), "critical": len([e for e in entries if e["severity"] == "critical"]), "warning": len([e for e in entries if e["severity"] == "warning"]), "info": len([e for e in entries if e["severity"] == "info"]), "noise": len([e for e in entries if e["severity"] == "noise"]), } return {"entries": entries[:100], "summary": summary} except Exception as e: msg = str(e) # V$DIAG_ALERT_EXT requires an explicit grant even with SELECT_CATALOG_ROLE missing_grant = "ORA-00942" in msg or "ORA-01031" in msg error_msg = ( "Alert log access requires GRANT SELECT ON V_$DIAG_ALERT_EXT TO your_user" if missing_grant else msg ) if not missing_grant: print("Alert log query failed: %s" % msg) return {"entries": [], "summary": {"total": 0, "critical": 0, "warning": 0, "info": 0, "noise": 0}, "error": error_msg} def query_resource_limits(cursor): """Resource limits — current from V$RESOURCE_LIMIT.""" try: cursor.execute(""" SELECT RESOURCE_NAME, CURRENT_UTILIZATION, MAX_UTILIZATION, INITIAL_ALLOCATION, LIMIT_VALUE FROM V$RESOURCE_LIMIT WHERE RESOURCE_NAME IN ( 'sessions', 'processes', 'enqueue_locks', 'enqueue_resources', 'dml_locks', 'temporary_table_locks', 'transactions', 'max_rollback_segments', 'sort_segment_locks' ) ORDER BY CASE RESOURCE_NAME WHEN 'sessions' THEN 1 WHEN 'processes' THEN 2 WHEN 'transactions' THEN 3 WHEN 'enqueue_locks' THEN 4 WHEN 'enqueue_resources' THEN 5 WHEN 'dml_locks' THEN 6 ELSE 9 END """) rows = cursor.fetchall() current = [] for r in rows: limit_val = None if r[4] == "UNLIMITED" else safe_int(r[4], None) max_util = safe_int(r[2]) pct_used = round(max_util / limit_val * 100) if limit_val else None status = "ok" if pct_used is not None: if pct_used >= 90: status = "critical" elif pct_used >= 80: status = "warning" current.append({ "resource": r[0] or "", "current_utilization": safe_int(r[1]), "max_utilization": max_util, "initial_allocation": r[3] or "0", "limit_value": limit_val, "limit_display": r[4] or "0", "pct_max_used": pct_used, "status": status, }) return {"current": current, "historical": {}, "awr_available": False} except Exception as e: print("Resource limits query failed: %s" % str(e)) return {"current": [], "historical": {}, "awr_available": False} def query_sga_pga_history(cursor): """SGA/PGA sizing — current targets from V$PARAMETER + recent resize ops.""" try: # Current SGA/PGA targets from V$PARAMETER cursor.execute(""" SELECT NAME, VALUE FROM V$PARAMETER WHERE NAME IN ('sga_target', 'pga_aggregate_target', 'sga_max_size', 'memory_target', 'memory_max_target') """) params = {} for r in cursor.fetchall(): params[r[0]] = safe_int(r[1]) sga_target_gb = round(params.get("sga_target", 0) / 1073741824, 1) pga_target_gb = round(params.get("pga_aggregate_target", 0) / 1073741824, 1) sga_max_gb = round(params.get("sga_max_size", 0) / 1073741824, 1) mem_target_gb = round(params.get("memory_target", 0) / 1073741824, 1) # Recent ASMM resize operations resize_ops = [] try: cursor.execute(""" SELECT TO_CHAR(START_TIME, 'YYYY-MM-DD HH24:MI'), COMPONENT, OPER_TYPE, ROUND(INITIAL_SIZE / 1073741824, 2), ROUND(FINAL_SIZE / 1073741824, 2), STATUS FROM V$SGA_RESIZE_OPS ORDER BY START_TIME DESC FETCH FIRST 20 ROWS ONLY """) resize_ops = [ { "op_time": r[0] or "", "component": r[1] or "", "oper_type": r[2] or "", "from_gb": safe_float(r[3]), "to_gb": safe_float(r[4]), "status": r[5] or "", } for r in cursor.fetchall() ] except Exception: pass return { "current": { "sga_target_gb": sga_target_gb, "pga_target_gb": pga_target_gb, "sga_max_gb": sga_max_gb, "memory_target_gb": mem_target_gb, }, "resize_ops": resize_ops, "pga_history": {"peak_allocated_gb": None, "peak_time": None}, "sga_component_history": [], "awr_available": False, } except Exception as e: print("SGA/PGA history query failed: %s" % str(e)) return { "current": {"sga_target_gb": 0, "pga_target_gb": 0, "sga_max_gb": 0, "memory_target_gb": 0}, "resize_ops": [], "pga_history": {"peak_allocated_gb": None, "peak_time": None}, "sga_component_history": [], "awr_available": False, } # ============================================================ # Wave B: Backup & Recovery Health Checks # ============================================================ def query_backup_stats(cursor): """Collect backup and recovery health data — RMAN, FRA, archivelog, validation.""" rman_backup = query_rman_backup(cursor) fra_usage = query_fra_usage(cursor) archivelog_rate = query_archivelog_rate(cursor) backup_validation = query_backup_validation(cursor) statuses = [c.get("status", "unknown") for c in [rman_backup, fra_usage, archivelog_rate, backup_validation]] if "critical" in statuses: overall_status = "critical" elif "warning" in statuses: overall_status = "warning" elif all(s == "ok" for s in statuses): overall_status = "ok" else: overall_status = "unknown" return { "rman_backup": rman_backup, "fra_usage": fra_usage, "archivelog_rate": archivelog_rate, "backup_validation": backup_validation, "overall_status": overall_status, } def query_rman_backup(cursor): """RMAN Backup Freshness — 🔴 >48h, 🟡 >24h, 🟢 <24h""" try: cursor.execute(""" SELECT INPUT_TYPE, STATUS, TO_CHAR(START_TIME, 'YYYY-MM-DD HH24:MI:SS'), TO_CHAR(END_TIME, 'YYYY-MM-DD HH24:MI:SS'), ROUND((SYSDATE - END_TIME) * 24, 1), ROUND(OUTPUT_BYTES / 1073741824, 2), ELAPSED_SECONDS FROM ( SELECT INPUT_TYPE, STATUS, START_TIME, END_TIME, OUTPUT_BYTES, ELAPSED_SECONDS, ROW_NUMBER() OVER (PARTITION BY INPUT_TYPE ORDER BY END_TIME DESC) AS RN FROM V$RMAN_BACKUP_JOB_DETAILS WHERE STATUS = 'COMPLETED' ) WHERE RN = 1 ORDER BY CASE INPUT_TYPE WHEN 'DB FULL' THEN 1 WHEN 'DB INCR' THEN 2 WHEN 'ARCHIVELOG' THEN 3 ELSE 4 END """) last_by_type_rows = cursor.fetchall() cursor.execute(""" SELECT INPUT_TYPE, STATUS, TO_CHAR(START_TIME, 'YYYY-MM-DD HH24:MI:SS'), TO_CHAR(END_TIME, 'YYYY-MM-DD HH24:MI:SS'), ROUND((SYSDATE - END_TIME) * 24, 1), ROUND(OUTPUT_BYTES / 1073741824, 2), ELAPSED_SECONDS FROM V$RMAN_BACKUP_JOB_DETAILS ORDER BY START_TIME DESC FETCH FIRST 10 ROWS ONLY """) recent_rows = cursor.fetchall() def row_to_job(r): return { "input_type": r[0] or "", "status": r[1] or "", "start_time": r[2] or "", "end_time": r[3] or "", "hours_ago": safe_float(r[4]), "size_gb": safe_float(r[5]), "elapsed_seconds": safe_int(r[6]), } last_by_type = [row_to_job(r) for r in last_by_type_rows] recent_jobs = [row_to_job(r) for r in recent_rows] full_backup = next((b for b in last_by_type if b["input_type"] == "DB FULL"), None) incr_backup = next((b for b in last_by_type if b["input_type"] == "DB INCR"), None) arch_backup = next((b for b in last_by_type if b["input_type"] == "ARCHIVELOG"), None) full_hours_ago = full_backup["hours_ago"] if full_backup else None if not last_by_type and not recent_jobs: status = "unknown" elif full_hours_ago is None: status = "critical" elif full_hours_ago > 48: status = "critical" elif full_hours_ago > 24: status = "warning" else: status = "ok" return { "status": status, "rman_available": len(recent_jobs) > 0 or len(last_by_type) > 0, "full_backup_hours_ago": full_hours_ago, "last_full_backup": full_backup, "last_incremental_backup": incr_backup, "last_archivelog_backup": arch_backup, "last_by_type": last_by_type, "recent_jobs": recent_jobs, } except Exception as e: print("RMAN backup query failed: %s" % str(e)) return {"status": "unknown", "rman_available": False, "last_by_type": [], "recent_jobs": [], "error": str(e)} def query_fra_usage(cursor): """FRA Usage — 🔴 >90% used and <10% reclaimable, 🟡 >80%, 🟢 <80%""" try: cursor.execute(""" SELECT NAME, ROUND(SPACE_LIMIT / 1073741824, 2), ROUND(SPACE_USED / 1073741824, 2), ROUND(SPACE_RECLAIMABLE / 1073741824, 2), NUMBER_OF_FILES FROM V$RECOVERY_FILE_DEST """) dest_row = cursor.fetchone() or [None, 0, 0, 0, 0] cursor.execute(""" SELECT FILE_TYPE, ROUND(PERCENT_SPACE_USED, 1), ROUND(PERCENT_SPACE_RECLAIMABLE, 1), NUMBER_OF_FILES FROM V$FLASH_RECOVERY_AREA_USAGE ORDER BY PERCENT_SPACE_USED DESC """) usage_rows = cursor.fetchall() cursor.execute(""" SELECT ROUND(SUM(BLOCKS * BLOCK_SIZE) / 1073741824, 2) FROM V$ARCHIVED_LOG WHERE COMPLETION_TIME > SYSDATE - 1 AND STANDBY_DEST = 'NO' """) gen_row = cursor.fetchone() or [0] limit_gb = safe_float(dest_row[1]) used_gb = safe_float(dest_row[2]) reclaimable_gb = safe_float(dest_row[3]) fra_location = dest_row[0] or "" pct_used = round((used_gb / limit_gb) * 100, 1) if limit_gb > 0 else 0 pct_reclaimable = round((reclaimable_gb / limit_gb) * 100, 1) if limit_gb > 0 else 0 archivelogs_24h_gb = safe_float(gen_row[0]) available_gb = limit_gb - used_gb + reclaimable_gb hourly_rate_gb = archivelogs_24h_gb / 24 if archivelogs_24h_gb > 0 else 0 hours_until_full = round(available_gb / hourly_rate_gb) if hourly_rate_gb > 0 and limit_gb > 0 else None file_type_breakdown = [{ "file_type": r[0] or "", "pct_used": safe_float(r[1]), "pct_reclaimable": safe_float(r[2]), "number_of_files": safe_int(r[3]), } for r in usage_rows] if limit_gb == 0: status = "unknown" elif pct_used > 90 and pct_reclaimable < 10: status = "critical" elif pct_used > 80: status = "warning" else: status = "ok" return { "status": status, "fra_configured": limit_gb > 0, "location": fra_location, "limit_gb": limit_gb, "used_gb": used_gb, "reclaimable_gb": reclaimable_gb, "pct_used": pct_used, "pct_reclaimable": pct_reclaimable, "archivelogs_24h_gb": archivelogs_24h_gb, "hours_until_full": hours_until_full, "file_type_breakdown": file_type_breakdown, } except Exception as e: print("FRA usage query failed: %s" % str(e)) return {"status": "unknown", "fra_configured": False, "error": str(e)} def query_archivelog_rate(cursor): """Archivelog generation rate — 🟡 >20 switches/hr, 🔴 not in archivelog mode""" try: cursor.execute("SELECT LOG_MODE FROM V$DATABASE") mode_row = cursor.fetchone() or ["ARCHIVELOG"] log_mode = mode_row[0] or "ARCHIVELOG" cursor.execute(""" SELECT TO_CHAR(COMPLETION_TIME, 'YYYY-MM-DD HH24') AS HOUR, COUNT(*), ROUND(SUM(BLOCKS * BLOCK_SIZE) / 1048576, 1) FROM V$ARCHIVED_LOG WHERE COMPLETION_TIME > SYSDATE - 1 AND STANDBY_DEST = 'NO' GROUP BY TO_CHAR(COMPLETION_TIME, 'YYYY-MM-DD HH24') ORDER BY HOUR DESC """) arch_rows = cursor.fetchall() cursor.execute(""" SELECT l.GROUP#, l.MEMBERS, ROUND(l.BYTES/1048576,0), l.STATUS, l.ARCHIVED FROM V$LOG l ORDER BY l.GROUP# """) log_rows = cursor.fetchall() cursor.execute(""" SELECT ROUND(COUNT(*) / 24.0, 1), COUNT(*) FROM V$LOG_HISTORY WHERE FIRST_TIME > SYSDATE - 1 """) switch_row = cursor.fetchone() or [0, 0] hourly_breakdown = [{"hour": r[0] or "", "log_count": safe_int(r[1]), "size_mb": safe_float(r[2])} for r in arch_rows] log_groups = [{"group_num": safe_int(r[0]), "members": safe_int(r[1]), "size_mb": safe_int(r[2]), "status": r[3] or "", "archived": r[4] or ""} for r in log_rows] switches_per_hour = safe_float(switch_row[0]) switches_24h = safe_int(switch_row[1]) archivelogs_24h = sum(h["log_count"] for h in hourly_breakdown) total_size_mb_24h = sum(h["size_mb"] for h in hourly_breakdown) if log_mode != "ARCHIVELOG": status = "critical" elif switches_per_hour > 20: status = "warning" else: status = "ok" return { "status": status, "log_mode": log_mode, "archivelog_mode": log_mode == "ARCHIVELOG", "switches_per_hour": switches_per_hour, "switches_24h": switches_24h, "archivelogs_24h": archivelogs_24h, "total_size_mb_24h": total_size_mb_24h, "hourly_breakdown": hourly_breakdown[:24], "log_groups": log_groups, } except Exception as e: print("Archivelog rate query failed: %s" % str(e)) return {"status": "unknown", "archivelog_mode": None, "error": str(e)} def query_backup_validation(cursor): """Backup validation — 🔴 corruption found or last 3 jobs failed""" try: cursor.execute(""" SELECT OPERATION, STATUS, TO_CHAR(START_TIME, 'YYYY-MM-DD HH24:MI:SS'), TO_CHAR(END_TIME, 'YYYY-MM-DD HH24:MI:SS'), MBYTES_PROCESSED, SUBSTR(NVL(OUTPUT,''), 1, 300) FROM V$RMAN_STATUS WHERE OPERATION IN ('BACKUP','RESTORE','RECOVER','DELETE','VALIDATE') AND START_TIME > SYSDATE - 7 ORDER BY START_TIME DESC FETCH FIRST 20 ROWS ONLY """) ops_rows = cursor.fetchall() cursor.execute("SELECT COUNT(*), NVL(SUM(BLOCKS),0) FROM V$BACKUP_CORRUPTION") bc_row = cursor.fetchone() or [0, 0] cursor.execute("SELECT COUNT(*), NVL(SUM(BLOCKS),0) FROM V$COPY_CORRUPTION") cc_row = cursor.fetchone() or [0, 0] recent_ops = [{ "operation": r[0] or "", "status": r[1] or "", "start_time": r[2] or "", "end_time": r[3] or "", "mbytes_processed": safe_float(r[4]), "output": (r[5] or "")[:300], } for r in ops_rows] backup_corruptions = safe_int(bc_row[0]) backup_corrupt_blocks = safe_int(bc_row[1]) copy_corruptions = safe_int(cc_row[0]) copy_corrupt_blocks = safe_int(cc_row[1]) total_corruptions = backup_corruptions + copy_corruptions recent_backups = [op for op in recent_ops if op["operation"] == "BACKUP"][:3] last_3_failed = len(recent_backups) > 0 and all(b["status"] == "FAILED" for b in recent_backups) if total_corruptions > 0: status = "critical" elif last_3_failed: status = "critical" elif any(b["status"] == "FAILED" for b in recent_backups): status = "warning" else: status = "ok" return { "status": status, "backup_corruptions": backup_corruptions, "backup_corrupt_blocks": backup_corrupt_blocks, "copy_corruptions": copy_corruptions, "copy_corrupt_blocks": copy_corrupt_blocks, "total_corruptions": total_corruptions, "last_3_backups_failed": last_3_failed, "recent_operations": recent_ops, } except Exception as e: print("Backup validation query failed: %s" % str(e)) return {"status": "unknown", "total_corruptions": 0, "error": str(e)} # ============================================================ # Wave E: EBS Apps Health Checks # All EBS tables are schema-qualified with APPS. so queries # resolve correctly when connected as SYSTEM user. # ============================================================ def query_apps_health(cursor): """Query Oracle E-Business Suite application tier health. Returns None for non-EBS databases (graceful fallback). OACore, Forms Server, OHS checks require OS Agent (not via DB). All table references use APPS. schema prefix so they resolve when connected as SYSTEM (with SELECT ANY TABLE privilege). """ try: # EBS detection try: cursor.execute('SELECT COUNT(*) FROM APPS.FND_APPLICATION WHERE ROWNUM = 1') cursor.fetchone() except Exception: return None # ── Concurrent Manager queues (verified against EBS 12.2.12) ── managers = [] try: cursor.execute(''' SELECT DISTINCT b.user_concurrent_queue_name AS manager_name, a.target_node AS node, a.running_processes AS actual_procs, a.max_processes AS target_procs, DECODE(b.control_code, 'D', 'Deactivating', 'E', 'Deactivated', 'N', 'Node unavai', 'A', 'Activating', 'X', 'Terminated', 'T', 'Terminating', 'V', 'Verifying', 'O', 'Suspending', 'P', 'Suspended', 'Q', 'Resuming', 'R', 'Restarting', 'Running') AS status_label FROM apps.fnd_concurrent_queues a, apps.fnd_concurrent_queues_vl b WHERE a.concurrent_queue_id = b.concurrent_queue_id AND a.max_processes != 0 ORDER BY a.max_processes DESC ''') for r in cursor.fetchall(): managers.append({ 'display_name': r[0] or '', 'node': r[1] or '', 'actual': safe_int(r[2]), 'target': safe_int(r[3]), 'status_label': r[4] or 'Running', }) except Exception as e: print('CM query failed (non-fatal): %s' % str(e)) # ── Pending / running request counts ────────────────────── pending_count = 0 running_count = 0 try: cursor.execute(''' SELECT PHASE_CODE, COUNT(*) FROM APPS.FND_CONCURRENT_REQUESTS WHERE PHASE_CODE IN ('P', 'R') AND HOLD_FLAG = 'N' GROUP BY PHASE_CODE ''') for r in cursor.fetchall(): if r[0] == 'P': pending_count = safe_int(r[1]) elif r[0] == 'R': running_count = safe_int(r[1]) except Exception as e: print('Request count query failed (non-fatal): %s' % str(e)) # ── OPP (dedicated query) ────────────────────────────────────── opp_data = {'manager_name': 'Output Post Processor', 'actual': 0, 'target': 0, 'status': 'ok', 'recommendation': 'OPP healthy: 0/0 process(es) running.'} try: cursor.execute(''' SELECT b.user_concurrent_queue_name AS manager_name, a.running_processes AS actual, a.max_processes AS target FROM apps.fnd_concurrent_queues a, apps.fnd_concurrent_queues_vl b WHERE a.concurrent_queue_id = b.concurrent_queue_id AND a.concurrent_queue_name = 'FNDCPOPP' ''') row = cursor.fetchone() if row: opp_actual = safe_int(row[1]) opp_target = safe_int(row[2]) if opp_actual == 0 and opp_target > 0: opp_st = 'critical' opp_rec = 'OPP is not running — PDF/output generation will fail. Start OPP from the CM admin page.' elif opp_actual < opp_target: opp_st = 'warning' opp_rec = 'OPP has %d/%d processes running — throughput may be degraded.' % (opp_actual, opp_target) else: opp_st = 'ok' opp_rec = 'OPP healthy: %d/%d process(es) running.' % (opp_actual, opp_target) opp_data = { 'manager_name': row[0] or 'Output Post Processor', 'actual': opp_actual, 'target': opp_target, 'status': opp_st, 'recommendation': opp_rec, } except Exception as e: print('OPP query failed (non-fatal): %s' % str(e)) # ── All Workflow components (WF% types — all 13 components) ── wf_services = [] try: cursor.execute(''' SELECT component_type, component_name, component_status, startup_mode FROM apps.fnd_svc_components WHERE component_type LIKE 'WF%%' ORDER BY 1, 2 ''') for r in cursor.fetchall(): wf_services.append({ 'type': r[0] or '', 'name': r[1] or '', 'status': r[2] or 'UNKNOWN', 'startup_mode': r[3] or '', }) except Exception as e: print('WF components query failed (non-fatal): %s' % str(e)) # ── Stuck notifications ─────────────────────────────────── stuck_notif = 0 try: cursor.execute(''' SELECT COUNT(*) FROM APPS.WF_NOTIFICATIONS WHERE STATUS = 'OPEN' AND MAIL_STATUS = 'MAIL' AND BEGIN_DATE < SYSDATE - 1/24 ''') row = cursor.fetchone() stuck_notif = safe_int(row[0]) if row else 0 except Exception as e: print('WF stuck notif query failed (non-fatal): %s' % str(e)) # ── WF_ERROR queue depth ────────────────────────────────── wf_err_count = 0 try: cursor.execute('SELECT COUNT(*) FROM APPS.WF_ERROR') row = cursor.fetchone() wf_err_count = safe_int(row[0]) if row else 0 except Exception as e: print('WF error query failed (non-fatal): %s' % str(e)) # ── Process CM status ──────────────────────────────────── icm = None for m in managers: if 'internal concurrent manager' in (m.get('display_name') or '').lower(): icm = m break icm_down = not icm or (icm.get('actual', 0) == 0 and icm.get('target', 0) > 0) cm_status = 'critical' if icm_down else 'ok' if not icm_down: under_staffed = any(m.get('target', 0) > 0 and m.get('actual', 0) < m.get('target', 0) for m in managers) if under_staffed or pending_count > 50: cm_status = 'warning' # ── Workflow status (all WF components) ───────────────── mailer = None for s in wf_services: if s.get('type') == 'WF_MAILER' or 'mailer' in (s.get('name') or '').lower(): mailer = s break mailer_running = bool(mailer and mailer.get('status') == 'RUNNING') critical_comps = [s for s in wf_services if s.get('status') in ('DEACTIVATED_SYSTEM', 'STOPPED')] warning_comps = [s for s in wf_services if s.get('status') not in ('RUNNING', 'DEACTIVATED_SYSTEM', 'STOPPED', 'UNKNOWN', '')] if mailer and not mailer_running: wf_status = 'critical' elif critical_comps: wf_status = 'critical' elif stuck_notif > 50 or wf_err_count > 20: wf_status = 'critical' elif warning_comps or stuck_notif > 5 or wf_err_count > 0: wf_status = 'warning' else: wf_status = 'ok' # ── Wave F: APPS_ENV and ADOP_STATUS ──────────────────────── apps_env_result = query_apps_env(cursor) adop_result = query_adop_status() # ── Overall apps status ─────────────────────────────────── statuses = [cm_status, wf_status, opp_data['status'], apps_env_result['status'], adop_result['status']] if 'critical' in statuses: overall_status = 'critical' elif 'warning' in statuses: overall_status = 'warning' else: overall_status = 'ok' return { 'is_ebs': True, 'status': overall_status, 'concurrent_managers': { 'status': cm_status, 'icm_down': icm_down, 'managers': managers, 'pending_requests': pending_count, 'running_requests': running_count, }, 'opp': opp_data, 'workflow': { 'status': wf_status, 'mailer_running': mailer_running, 'mailer_component': mailer, 'services': wf_services, 'stuck_notifications': stuck_notif, 'error_count': wf_err_count, }, 'apps_env': apps_env_result, 'adop_status': adop_result, # oacore and forms: not available via DB — display placeholder via OS Agent } except Exception as e: print('EBS apps health query failed: %s' % str(e)) return None # ============================================================ # Wave F: APPS_ENV Check (Python proxy — OS + DB sources) # ============================================================ def query_apps_env(cursor): """Query EBS environment variables from OS environment and Oracle DB. Since the proxy runs on the EBS app tier, process.environ holds the variables set when the EBS context file is sourced. DB sources supplement APPS_JDBC_URL and ORACLE_SID. """ ALL_VARS = [ 'CONTEXT_FILE', 'ADMIN_SCRIPTS_HOME', 'INST_TOP', 'COMMON_TOP', 'FMW_HOME', 'EBS_DOMAIN_HOME', 'APPS_JDBC_URL', 'TWO_TASK', 'ORACLE_HOME', 'ORACLE_SID', 'TNS_ADMIN', ] CRITICAL_VARS = frozenset(['CONTEXT_FILE', 'INST_TOP', 'COMMON_TOP', 'ADMIN_SCRIPTS_HOME']) env_vars = {} # Step 1: OS environment (highest priority) for v in ALL_VARS: val = os.environ.get(v, '') if val: env_vars[v] = val # Step 2: Supplement from Oracle DB try: cursor.execute( "SELECT METVAL_CLOB FROM APPS.FND_OAM_METVAL WHERE METNAME = 'APPS_JDBC_URL' AND ROWNUM = 1" ) row = cursor.fetchone() if row and row[0] and 'APPS_JDBC_URL' not in env_vars: env_vars['APPS_JDBC_URL'] = str(row[0])[:200] except Exception: pass try: cursor.execute("SELECT INSTANCE_NAME FROM v$instance") row = cursor.fetchone() if row and row[0] and 'ORACLE_SID' not in env_vars: env_vars['ORACLE_SID'] = str(row[0]) except Exception: pass try: cursor.execute("SELECT VALUE FROM v$parameter WHERE name = 'tns_admin' AND ROWNUM = 1") row = cursor.fetchone() if row and row[0] and 'TNS_ADMIN' not in env_vars: env_vars['TNS_ADMIN'] = str(row[0]) except Exception: pass # Step 3: Build rows rows = [] for v in ALL_VARS: val = env_vars.get(v) rows.append({ 'variable': v, 'value': val, 'state': 'OK' if val else 'MISSING', 'critical': v in CRITICAL_VARS, }) missing_critical = [r for r in rows if r['critical'] and r['state'] == 'MISSING'] missing_any = [r for r in rows if r['state'] == 'MISSING'] if missing_critical: status = 'critical' message = '%d critical EBS env var(s) missing: %s' % ( len(missing_critical), ', '.join(r['variable'] for r in missing_critical)) elif missing_any: status = 'warning' message = '%d EBS env var(s) not detected (non-critical): %s' % ( len(missing_any), ', '.join(r['variable'] for r in missing_any)) else: status = 'ok' message = 'All %d EBS environment variables are present.' % len(rows) os_env_available = any(os.environ.get(v) for v in ALL_VARS) return { 'check_id': 'APPS_ENV', 'status': status, 'severity': 'critical' if missing_critical else ('warning' if missing_any else 'ok'), 'message': message, 'os_env_available': os_env_available, 'display': { 'columns': ['Variable', 'State', 'Value'], 'rows': [[r['variable'], r['state'], (r['value'] or '')[:120] or '\u2014'] for r in rows], }, 'variables': rows, } # ============================================================ # Wave F: ADOP_STATUS Check (Python proxy — OS command) # ============================================================ def query_adop_status(): """Run `adop -status -detail` on the EBS app tier and parse output. Requires APPS_PWD in environment. SKIP if not set. """ import subprocess apps_pwd = os.environ.get('APPS_PWD', '') or os.environ.get('APPS_PASSWORD', '') if not apps_pwd: return { 'check_id': 'ADOP_STATUS', 'status': 'skip', 'severity': 'info', 'message': 'Skipped: APPS_PWD not set in proxy environment.', 'display': {'columns': ['Field', 'Value'], 'rows': [['Status', 'SKIPPED \u2014 APPS_PWD not available']]}, 'sessions': [], } admin_scripts_home = os.environ.get('ADMIN_SCRIPTS_HOME', '') adop_bin = os.path.join(admin_scripts_home, 'adop') if admin_scripts_home else 'adop' try: proc = subprocess.run( [adop_bin, '-status', '-detail'], env=dict(os.environ, APPS_PWD=apps_pwd), capture_output=True, text=True, timeout=30, ) output = (proc.stdout or '') + (proc.stderr or '') except subprocess.TimeoutExpired: return { 'check_id': 'ADOP_STATUS', 'status': 'warning', 'severity': 'warning', 'message': 'adop -status timed out after 30s.', 'display': {'columns': ['Field', 'Value'], 'rows': [['Status', 'TIMEOUT']]}, 'sessions': [], } except (OSError, Exception) as e: msg = str(e)[:200] return { 'check_id': 'ADOP_STATUS', 'status': 'warning', 'severity': 'warning', 'message': 'adop command failed: %s' % msg, 'display': {'columns': ['Field', 'Value'], 'rows': [['Error', msg]]}, 'sessions': [], } sessions = _parse_adop_output(output) failed_sessions = [s for s in sessions if (s.get('status') or '').upper() == 'FAILED'] active_sessions = [s for s in sessions if (s.get('status') or '').upper() in ('INPROGRESS', 'IN_PROGRESS', 'RUNNING', 'ACTIVE')] if not sessions: status = 'ok' message = 'No ADOP sessions found \u2014 no active or recent patching activity.' elif failed_sessions: status = 'critical' message = '%d ADOP session(s) in FAILED state. Review before next patch cycle.' % len(failed_sessions) elif active_sessions: status = 'warning' message = '%d ADOP session(s) currently active.' % len(active_sessions) else: status = 'ok' message = 'ADOP: %d historical session(s) found, none in FAILED state.' % len(sessions) return { 'check_id': 'ADOP_STATUS', 'status': status, 'severity': 'critical' if failed_sessions else ('warning' if active_sessions else 'ok'), 'message': message, 'display': { 'columns': ['Session ID', 'Node', 'Phase', 'Status'], 'rows': [[s.get('session_id', '\u2014'), s.get('node', '\u2014'), s.get('phase', '\u2014'), s.get('status', '\u2014')] for s in sessions], }, 'sessions': sessions, 'raw_output': output[:2000], } def _parse_adop_output(text): """Parse `adop -status -detail` output into session dicts.""" sessions = [] # Split by blank lines to get per-session blocks import re blocks = re.split(r'\n\s*\n', text) for block in blocks: session = {} for line in block.splitlines(): line = line.strip() m = re.match(r'^(SESSION\s*ID|NODE|PHASE|STATUS)\s*[:\-]\s*(.+)', line, re.IGNORECASE) if m: key = m.group(1).strip().lower().replace(' ', '_') value = m.group(2).strip() if key == 'session_id': session['session_id'] = value elif key == 'node': session['node'] = value elif key == 'phase': session['phase'] = value elif key == 'status': session['status'] = value if session.get('session_id') or session.get('phase') or session.get('status'): sessions.append(session) return sessions # ============================================================ # Wave G: Database Objects, Sessions, Security, Schema Stats # ============================================================ def query_db_objects(cursor): """Collect invalid objects, stale stats, SCN headroom, SPFILE, recycle bin, control files.""" EXCLUDED_OWNERS = "('SYS','SYSTEM','DBSNMP','SYSMAN','OUTLN','MDSYS','ORDSYS','XDB','CTXSYS','WMSYS','EXFSYS','APPQOSSYS','DBSFWUSER','OJVMSYS','DVSYS','LBACSYS')" # Invalid objects try: cursor.execute(""" SELECT COUNT(*) AS invalid_count, COUNT(CASE WHEN OBJECT_TYPE IN ('PACKAGE BODY','PACKAGE') THEN 1 END), COUNT(CASE WHEN OBJECT_TYPE = 'PROCEDURE' THEN 1 END), COUNT(CASE WHEN OBJECT_TYPE = 'VIEW' THEN 1 END), COUNT(CASE WHEN OBJECT_TYPE = 'TRIGGER' THEN 1 END) FROM DBA_OBJECTS WHERE STATUS = 'INVALID' AND OWNER NOT IN """ + EXCLUDED_OWNERS) r = cursor.fetchone() or (0, 0, 0, 0, 0) invalid = {"count": safe_int(r[0]), "packages": safe_int(r[1]), "procedures": safe_int(r[2]), "views": safe_int(r[3]), "triggers": safe_int(r[4])} except Exception as e: print("Invalid objects query failed: %s" % str(e)) invalid = {"count": 0, "packages": 0, "procedures": 0, "views": 0, "triggers": 0} # Stale statistics try: cursor.execute(""" SELECT COUNT(*) AS stale_count, COUNT(CASE WHEN LAST_ANALYZED IS NULL THEN 1 END) FROM DBA_TAB_STATISTICS WHERE STALE_STATS = 'YES' AND OWNER NOT IN """ + EXCLUDED_OWNERS) r = cursor.fetchone() or (0, 0) stale = {"stale_count": safe_int(r[0]), "never_analyzed": safe_int(r[1])} except Exception as e: print("Stale stats query failed: %s" % str(e)) stale = {"stale_count": 0, "never_analyzed": 0} # SCN headroom try: cursor.execute(""" SELECT CURRENT_SCN, ROUND((TO_NUMBER(SYSDATE - TO_DATE('01-01-1988','DD-MM-YYYY')) * 24 * 3600 * 16384 * 1024 - CURRENT_SCN) / (24*3600*16384), 0) FROM V$DATABASE""") r = cursor.fetchone() or (None, None) scn = {"current_scn": safe_int(r[0]) if r[0] is not None else None, "days_remaining": safe_int(r[1]) if r[1] is not None else None} except Exception as e: print("SCN headroom query failed: %s" % str(e)) scn = {"current_scn": None, "days_remaining": None} # SPFILE try: cursor.execute("SELECT DECODE(COUNT(*),0,'PFILE','SPFILE') FROM V$PARAMETER WHERE NAME='spfile' AND VALUE IS NOT NULL") r = cursor.fetchone() pft = str(r[0]) if r else "UNKNOWN" spfile = {"param_file_type": pft, "using_spfile": pft == "SPFILE"} except Exception as e: print("SPFILE query failed: %s" % str(e)) spfile = {"param_file_type": "UNKNOWN", "using_spfile": False} # Recycle bin try: cursor.execute("SELECT COUNT(*), ROUND(SUM(SPACE)*8192/1073741824, 2) FROM DBA_RECYCLEBIN") r = cursor.fetchone() or (0, 0) recyclebin = {"object_count": safe_int(r[0]), "size_gb": safe_float(r[1])} except Exception as e: print("Recyclebin query failed: %s" % str(e)) recyclebin = {"object_count": 0, "size_gb": 0} # Control files try: cursor.execute(""" SELECT COUNT(*), SUM(CASE WHEN STATUS IS NOT NULL AND STATUS != '' THEN 1 ELSE 0 END) FROM V$CONTROLFILE""") r = cursor.fetchone() or (0, 0) controlfiles = {"count": safe_int(r[0]), "invalid_count": safe_int(r[1])} except Exception as e: print("Controlfile query failed: %s" % str(e)) controlfiles = {"count": 0, "invalid_count": 0} return { "invalid_objects": invalid, "stale_stats": stale, "scn_headroom": scn, "spfile": spfile, "recyclebin": recyclebin, "controlfiles": controlfiles, } def query_session_stats(cursor): """Active/blocked session counts and long-running SQL.""" # Session counts try: cursor.execute(""" SELECT COUNT(*), COUNT(CASE WHEN STATUS = 'ACTIVE' AND TYPE = 'USER' THEN 1 END), COUNT(CASE WHEN TYPE = 'USER' THEN 1 END) FROM V$SESSION""") r = cursor.fetchone() or (0, 0, 0) total = safe_int(r[0]) active = safe_int(r[1]) user = safe_int(r[2]) except Exception as e: print("Session count query failed: %s" % str(e)) total, active, user = 0, 0, 0 # Blocked sessions try: cursor.execute("SELECT COUNT(*) FROM V$SESSION WHERE BLOCKING_SESSION IS NOT NULL AND STATUS='ACTIVE'") r = cursor.fetchone() blocked = safe_int(r[0]) if r else 0 except Exception as e: print("Blocked session query failed: %s" % str(e)) blocked = 0 # Long-running SQL (>5 min) try: cursor.execute(""" SELECT COUNT(*), ROUND(MAX((SYSDATE - SQL_EXEC_START) * 1440), 1) FROM V$SESSION WHERE STATUS = 'ACTIVE' AND TYPE = 'USER' AND SQL_EXEC_START IS NOT NULL AND (SYSDATE - SQL_EXEC_START) * 1440 > 5""") r = cursor.fetchone() or (0, 0) long_count = safe_int(r[0]) max_min = safe_float(r[1]) except Exception as e: print("Long SQL query failed: %s" % str(e)) long_count, max_min = 0, 0 return { "total_sessions": total, "active_sessions": active, "user_sessions": user, "blocked_sessions": blocked, "long_running_sql_count": long_count, "max_runtime_min": max_min, } def query_security_stats(cursor): """Security posture checks: default passwords, public privs, audit, password policy.""" # Default passwords try: cursor.execute("SELECT COUNT(*) FROM DBA_USERS_WITH_DEFPWD WHERE ACCOUNT_STATUS='OPEN'") r = cursor.fetchone() def_pwd = safe_int(r[0]) if r else 0 except Exception as e: print("Default pwd query failed: %s" % str(e)) def_pwd = 0 # Dangerous PUBLIC grants try: cursor.execute(""" SELECT COUNT(*) FROM DBA_SYS_PRIVS WHERE GRANTEE='PUBLIC' AND PRIVILEGE IN ('CREATE PROCEDURE','CREATE ANY PROCEDURE','CREATE ANY TRIGGER', 'ALTER SYSTEM','ALTER DATABASE','DROP ANY TABLE','EXECUTE ANY PROCEDURE')""") r = cursor.fetchone() pub_privs = safe_int(r[0]) if r else 0 except Exception as e: print("PUBLIC privs query failed: %s" % str(e)) pub_privs = 0 # Open schema-only accounts try: cursor.execute(""" SELECT COUNT(*) FROM DBA_USERS WHERE ACCOUNT_STATUS='OPEN' AND AUTHENTICATION_TYPE='NONE' AND USERNAME NOT IN ('SYS','SYSTEM')""") r = cursor.fetchone() open_schema = safe_int(r[0]) if r else 0 except Exception as e: print("Schema accounts query failed: %s" % str(e)) open_schema = 0 # Password verify function try: cursor.execute("SELECT LIMIT FROM DBA_PROFILES WHERE PROFILE='DEFAULT' AND RESOURCE_NAME='PASSWORD_VERIFY_FUNCTION'") r = cursor.fetchone() pwd_fn = str(r[0]) if r else "NULL" except Exception as e: print("Password profile query failed: %s" % str(e)) pwd_fn = "UNKNOWN" # Audit trail try: cursor.execute("SELECT VALUE FROM V$PARAMETER WHERE NAME='audit_trail'") r = cursor.fetchone() audit = str(r[0]).upper() if r else "NONE" except Exception as e: print("Audit trail query failed: %s" % str(e)) audit = "UNKNOWN" # DBA users try: cursor.execute("SELECT COUNT(DISTINCT GRANTEE) FROM DBA_SYS_PRIVS WHERE PRIVILEGE='DBA' AND GRANTEE NOT IN ('SYS','SYSTEM','DBA','SYSMAN')") r = cursor.fetchone() dba_count = safe_int(r[0]) if r else 0 except Exception as e: print("DBA users query failed: %s" % str(e)) dba_count = 0 return { "default_pwd_accounts": def_pwd, "dangerous_public_grants": pub_privs, "open_schema_accounts": open_schema, "password_verify_function": pwd_fn, "password_policy_active": pwd_fn not in ("NULL", "null", "UNKNOWN", None), "audit_trail": audit, "audit_enabled": audit not in ("NONE", "UNKNOWN"), "dba_user_count": dba_count, } def query_schema_stats(cursor): """Top segments, datafile status, sort and full-scan ratios.""" EXCLUDED_OWNERS = "('SYS','SYSTEM','DBSNMP','SYSMAN','OUTLN','MDSYS','ORDSYS','XDB','CTXSYS','WMSYS','EXFSYS')" # Top segments try: cursor.execute(""" SELECT OWNER, SEGMENT_NAME, SEGMENT_TYPE, ROUND(SUM(BYTES)/1073741824, 2) FROM DBA_SEGMENTS WHERE OWNER NOT IN """ + EXCLUDED_OWNERS + """ GROUP BY OWNER, SEGMENT_NAME, SEGMENT_TYPE ORDER BY 4 DESC FETCH FIRST 10 ROWS ONLY""") rows = cursor.fetchall() or [] top_segs = [{"owner": r[0] or "", "segment_name": r[1] or "", "segment_type": r[2] or "", "size_gb": safe_float(r[3])} for r in rows] except Exception as e: print("Top segments query failed: %s" % str(e)) top_segs = [] # Problem datafiles try: cursor.execute(""" SELECT COUNT(*), COUNT(CASE WHEN STATUS='OFFLINE' THEN 1 END) FROM DBA_DATA_FILES WHERE STATUS NOT IN ('AVAILABLE','ONLINE')""") r = cursor.fetchone() or (0, 0) problem_df = safe_int(r[0]) offline_df = safe_int(r[1]) except Exception as e: print("Datafile status query failed: %s" % str(e)) problem_df, offline_df = 0, 0 # Sort ratios from V$SYSSTAT try: cursor.execute("SELECT VALUE FROM V$SYSSTAT WHERE NAME='sorts (disk)'") r = cursor.fetchone() disk_sorts = safe_int(r[0]) if r else 0 cursor.execute("SELECT VALUE FROM V$SYSSTAT WHERE NAME='sorts (memory)'") r = cursor.fetchone() mem_sorts = safe_int(r[0]) if r else 0 total_sorts = disk_sorts + mem_sorts disk_sort_pct = round(disk_sorts / total_sorts * 100, 2) if total_sorts > 0 else 0.0 except Exception as e: print("Sort stats query failed: %s" % str(e)) disk_sorts, mem_sorts, disk_sort_pct = 0, 0, 0.0 # Full table scan ratio try: cursor.execute("SELECT VALUE FROM V$SYSSTAT WHERE NAME='table scans (long tables)'") r = cursor.fetchone() long_scans = safe_int(r[0]) if r else 0 cursor.execute("SELECT VALUE FROM V$SYSSTAT WHERE NAME='table fetch by rowid'") r = cursor.fetchone() index_lookups = safe_int(r[0]) if r else 0 total_lookups = long_scans + index_lookups ft_pct = round(long_scans / total_lookups * 100, 2) if total_lookups > 0 else 0.0 except Exception as e: print("FT scan stats query failed: %s" % str(e)) long_scans, ft_pct = 0, 0.0 return { "top_segments": top_segs, "problem_datafiles": problem_df, "offline_datafiles": offline_df, "disk_sort_pct": disk_sort_pct, "disk_sorts": disk_sorts, "mem_sorts": mem_sorts, "full_table_scan_pct": ft_pct, "long_scans": long_scans, } # ============================================================ # Oracle Metrics Collection # ============================================================ def _safe_query(name, fn, cursor, fallback): """Run a query function with error handling — returns (result, error_or_None).""" try: return fn(cursor), None except Exception as e: err_msg = str(e) print("Query %s failed (non-fatal): %s" % (name, err_msg)) # Return fallback + error detail so callers know which queries failed return fallback, err_msg def collect_metrics(host, port, service_name, username, password): """Connect to Oracle and collect all health metrics.""" dsn = cx_Oracle.makedsn(host, port, service_name=service_name) conn = None query_errors = [] try: conn = cx_Oracle.connect(user=username, password=password, dsn=dsn) cursor = conn.cursor() instance_info, err = _safe_query("instance_info", query_instance_info, cursor, {}) if err: query_errors.append({"query": "instance_info", "error": err}) tablespaces, err = _safe_query("tablespaces", query_tablespaces, cursor, []) if err: query_errors.append({"query": "tablespaces", "error": err}) wait_events, err = _safe_query("wait_events", query_wait_events, cursor, []) if err: query_errors.append({"query": "wait_events", "error": err}) top_sql, err = _safe_query("top_sql", query_top_sql, cursor, []) if err: query_errors.append({"query": "top_sql", "error": err}) index_analysis, err = _safe_query("index_analysis", query_index_analysis, cursor, []) if err: query_errors.append({"query": "index_analysis", "error": err}) sga_stats, err = _safe_query("sga_stats", query_sga_stats, cursor, {}) if err: query_errors.append({"query": "sga_stats", "error": err}) pga_stats, err = _safe_query("pga_stats", query_pga_stats, cursor, {}) if err: query_errors.append({"query": "pga_stats", "error": err}) os_stats, err = _safe_query("os_stats", query_os_stats, cursor, {}) if err: query_errors.append({"query": "os_stats", "error": err}) # Wave A queries undo_stats, err = _safe_query("undo_stats", query_undo_stats, cursor, None) if err: query_errors.append({"query": "undo_stats", "error": err}) temp_stats, err = _safe_query("temp_stats", query_temp_stats, cursor, None) if err: query_errors.append({"query": "temp_stats", "error": err}) alert_log, err = _safe_query("alert_log", query_alert_log, cursor, None) if err: query_errors.append({"query": "alert_log", "error": err}) resource_limits, err = _safe_query("resource_limits", query_resource_limits, cursor, None) if err: query_errors.append({"query": "resource_limits", "error": err}) sga_pga_history, err = _safe_query("sga_pga_history", query_sga_pga_history, cursor, None) if err: query_errors.append({"query": "sga_pga_history", "error": err}) # Wave B queries backup_stats, err = _safe_query("backup_stats", query_backup_stats, cursor, None) if err: query_errors.append({"query": "backup_stats", "error": err}) # Wave E: EBS Apps Health (returns None for non-EBS databases) apps_health, err = _safe_query("apps_health", query_apps_health, cursor, None) if err: query_errors.append({"query": "apps_health", "error": err}) # Wave G: DB Objects, Sessions, Security, Schema Stats db_objects, err = _safe_query("db_objects", query_db_objects, cursor, {}) if err: query_errors.append({"query": "db_objects", "error": err}) session_stats, err = _safe_query("session_stats", query_session_stats, cursor, {}) if err: query_errors.append({"query": "session_stats", "error": err}) security_stats, err = _safe_query("security_stats", query_security_stats, cursor, {}) if err: query_errors.append({"query": "security_stats", "error": err}) schema_stats, err = _safe_query("schema_stats", query_schema_stats, cursor, {}) if err: query_errors.append({"query": "schema_stats", "error": err}) cursor.close() now = datetime.utcnow() twelve_hours_ago = now - timedelta(hours=12) result = { "instance": instance_info, "tablespaces": tablespaces, "wait_events": wait_events, "top_sql": top_sql, "index_analysis": index_analysis, "sga_stats": sga_stats, "pga_stats": pga_stats, "redo_stats": { "redo_size_mb_per_hour": 0, "log_switches_per_hour": 0, "log_file_size_mb": 0, "log_groups": 0, "avg_log_sync_ms": 0, "max_log_sync_ms": 0, }, "os_stats": os_stats, "undo_stats": undo_stats, "temp_stats": temp_stats, "alert_log": alert_log, "resource_limits": resource_limits, "sga_pga_history": sga_pga_history, "backup_stats": backup_stats, "apps_health": apps_health, "db_objects": db_objects, "session_stats": session_stats, "security_stats": security_stats, "schema_stats": schema_stats, "proxy_version": "3.2.0", "snapshot_info": { "begin_snap_id": 0, "end_snap_id": 0, "begin_time": twelve_hours_ago.strftime("%Y-%m-%dT%H:%M:%S.000Z"), "end_time": now.strftime("%Y-%m-%dT%H:%M:%S.000Z"), "elapsed_time_min": 720, "db_time_min": 0, }, } if query_errors: result["query_errors"] = query_errors return result finally: if conn: try: conn.close() except Exception: pass def test_connection(host, port, service_name, username, password): """Quick connection test - returns version string.""" dsn = cx_Oracle.makedsn(host, port, service_name=service_name) conn = None try: conn = cx_Oracle.connect(user=username, password=password, dsn=dsn) cursor = conn.cursor() cursor.execute("SELECT banner FROM v$version WHERE ROWNUM = 1") row = cursor.fetchone() version = row[0] if row else "Connected" cursor.close() return {"success": True, "message": "Connection successful", "version": version} except Exception as e: return {"success": False, "message": format_oracle_error(e)} finally: if conn: try: conn.close() except Exception: pass # ============================================================ # HTTP Request Handler # ============================================================ class ProxyHandler(BaseHTTPRequestHandler): """HTTP request handler for the TuneVault Oracle Proxy.""" # Suppress default logging def log_message(self, fmt, *args): timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") print("[%s] %s %s" % (timestamp, self.command, self.path)) def send_json(self, status_code, data): """Send a JSON response.""" body = json.dumps(data).encode("utf-8") self.send_response(status_code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("X-Powered-By", "TuneVault-Proxy") self.end_headers() self.wfile.write(body) def read_body(self): """Read the request body.""" content_length = int(self.headers.get("Content-Length", 0)) if content_length > 0: return self.rfile.read(content_length).decode("utf-8") return "{}" def check_auth(self): """Check API key authentication. Returns True if authorized.""" api_key = self.headers.get("X-Api-Key", "") if not api_key: auth_header = self.headers.get("Authorization", "") if auth_header.startswith("Bearer "): api_key = auth_header[7:] if api_key != API_KEY: self.send_json(401, {"error": "Unauthorized - invalid or missing API key"}) return False return True def do_GET(self): """Handle GET requests.""" # Health ping - no auth required if self.path == "/health" or self.path == "/api/health": self.send_json(200, { "status": "healthy", "proxy": "TuneVault Oracle Proxy", "version": VERSION, "runtime": "python", }) return self.send_json(404, {"error": "Not found"}) def do_POST(self): """Handle POST requests.""" # POST /api/healthcheck - run Oracle health queries if self.path == "/api/healthcheck": if not self.check_auth(): return try: body = self.read_body() params = json.loads(body) service_name = params.get("service_name", "") username = params.get("username", "") password = params.get("password", "") host = params.get("host", "localhost") port = int(params.get("port", 1521)) if not service_name or not username or not password: self.send_json(400, { "error": "service_name, username, and password are required" }) return timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") print("[%s] Health check: %s@%s:%d/%s" % ( timestamp, username, host, port, service_name)) metrics = collect_metrics(host, port, service_name, username, password) self.send_json(200, {"success": True, "metrics": metrics}) except Exception as e: print("Health check failed: %s" % str(e)) traceback.print_exc() self.send_json(500, { "success": False, "error": format_oracle_error(e), }) return # POST /api/test - quick connection test if self.path == "/api/test": if not self.check_auth(): return try: body = self.read_body() params = json.loads(body) service_name = params.get("service_name", "") username = params.get("username", "") password = params.get("password", "") host = params.get("host", "localhost") port = int(params.get("port", 1521)) if not service_name or not username or not password: self.send_json(400, { "error": "service_name, username, and password are required" }) return result = test_connection(host, port, service_name, username, password) status_code = 200 if result.get("success") else 400 self.send_json(status_code, result) except Exception as e: self.send_json(500, { "success": False, "error": format_oracle_error(e), }) return self.send_json(404, {"error": "Not found"}) # ============================================================ # Main # ============================================================ def main(): # ---- CLI argument parsing (Python 3.6 compatible, no argparse needed) ---- args = sys.argv[1:] no_auto_update = "--no-auto-update" in args update_now = "--update-now" in args if update_now: # Immediate update check — perform update if available, then exit print("%s [auto-update] Checking for updates (--update-now)..." % _ts()) try: needs_update, remote_version, download_url, expected_checksum = check_for_update() if needs_update: result = perform_update(remote_version, download_url, expected_checksum) # If perform_update returns (execv failed), exit non-zero sys.exit(1 if not result else 0) else: print("%s [auto-update] Already at latest version v%s." % (_ts(), VERSION)) sys.exit(0) except Exception as e: print("%s [auto-update] ERROR: %s" % (_ts(), e)) sys.exit(1) # ---- Normal startup ---- if not no_auto_update: updater = threading.Thread(target=auto_update_loop, daemon=True) updater.start() print("Auto-update enabled (checks every 6 hours). Use --no-auto-update to disable.") else: print("Auto-update disabled (--no-auto-update).") server = HTTPServer((HOST, PORT), ProxyHandler) print("TuneVault Oracle Proxy v%s (Python) listening on %s:%d" % (VERSION, HOST, PORT)) print("Health: http://localhost:%d/health" % PORT) print("Proxy is ready. Update your outbound HTTPS proxy to route to http://localhost:%d" % PORT) try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down proxy...") server.shutdown() if __name__ == "__main__": main()