D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
snapshots
/
Filename :
reader.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import datetime import json import sys import lvestats.lib.commons.decorators import prettytable from lvestats.lib import dbengine, uidconverter from lvestats.lib.commons import dateutil from lvestats.lib.jsonhandler import prepare_data_json from lvestats.lib.parsers.lve_read_snapshot_argparse import lve_read_snapshot_parser from lvestats.lib.snapshot import Snapshot from lvestats.orm.incident import incident from sqlalchemy import or_ from sqlalchemy.orm import sessionmaker REPORT_HEADER = "Snapshots collected starting from %s to %s for lve id %d @ %s:\n" REPORT_FOOTER = "Done..\n" DEFAULT_SERVER_ID = "localhost" # used when nothing is configured or specified in cmdline def _calculate_period(opts): if opts.period: start, end = opts.period elif opts.timestamp: start = opts.timestamp end = start + 0.999999 else: try: start = dateutil.parse_date(" ".join(opts.ffrom)) end = dateutil.parse_date(" ".join(opts.to)) except ValueError: print("please use [YY]YY-MM-DD[ HH:MM] format for --from and --to") return None, None return start, end @lvestats.lib.commons.decorators.no_sigpipe def snapshot_reader_main(config, argv_=None): parser = lve_read_snapshot_parser() opts = parser.parse_args(argv_) if not opts.id and not opts.user: parser.print_help() print("One of -u --user or -i --id should be specified") return 1 if opts.snap_sql_item is not None and (not opts.timestamp or not opts.json): print("--snap-sql-item can only be used with --timestamp and --json options") return 1 try: engine = dbengine.make_db_engine(config) except dbengine.MakeDbException as e: print(e) return 1 server_id = config.get("server_id", DEFAULT_SERVER_ID) if opts.user: uid = uidconverter.username_to_uid(opts.user, server_id, server_id, engine) if uid is None: print(f"User {opts.user}@{server_id} not found") return 1 else: uid = opts.id start, end = _calculate_period(opts) if start is None and end is None: return 1 lve_read_snapshot = LVEReadSnaphot( engine, start, end, uid, server_id, opts.output, opts.json, opts.compact, opts.snap_sql_item, ) if opts.list: lve_read_snapshot.list() # show snapshots timestamps list elif opts.stats: lve_read_snapshot.stats(opts.unit) else: lve_read_snapshot.run() def _try_convert_to_timestamp(o): """ Convert local datetime to unix timestamp, or just passes unix timestamp as output if specified. :param o: :return: """ if isinstance(o, datetime.datetime): return dateutil.gm_datetime_to_unixtimestamp(dateutil.local_to_gm(o)) return o class LVEReadSnaphot: def __init__( self, engine, start, end, uid, server_id, output_file, do_json, compact_mode=False, snap_sql_item=None, ): """ :param start: datetime.datetime | int (unix timestamp) :param end: datetime.datetime | int (unix timestamp) :param uid: :param server_id: :param output_file: filename :param do_json: boolean :param compact_mode: boolean - truncate SQL queries for memory efficiency :param snap_sql_item: int - index of specific SQL query to return (only with --timestamp) """ self.engine = engine self.do_json = do_json self.output_file = output_file self.uid = uid self.start = _try_convert_to_timestamp(start) self.end = _try_convert_to_timestamp(end) self.server_id = server_id self.compact_mode = compact_mode self.snap_sql_item = snap_sql_item # Faults names dictionary self.fault_names = { "cpu_fault": "CPU", "mem_fault": "Virtual memory", "mep_fault": "EP", "memphy_fault": "Physical memory", "nproc_fault": "NPROC", "io_fault": "IO", "iops_fault": "IOPS", } def truncate_sql_queries(self, snap_sql, max_length=200): """ Truncate SQL queries for compact mode to reduce memory usage. :param snap_sql: List of SQL query arrays [cmd, time, sql_query] :param max_length: Maximum length for SQL query strings :return: List of truncated SQL query arrays """ if not snap_sql: return snap_sql truncated = [] for query_entry in snap_sql: # has SQL query at index 2 if len(query_entry) >= 3: cmd, time, sql = query_entry[0], query_entry[1], query_entry[2] if len(sql) > max_length: sql = sql[:max_length] + "..." truncated.append([cmd, time, sql]) else: # preserve as-is if unexpected format truncated.append(query_entry) return truncated def _handle_sql_item_request(self, snapshots_generator, out): """ Handle --snap-sql-item request: return specific SQL query by index. :param snapshots_generator: Generator of snapshot data :param out: Output stream :return: None (writes JSON directly to output) """ snapshots_list = list(snapshots_generator) if not snapshots_list: out.write('{"success": 0, "error": "No snapshot found for the specified timestamp"}\n') out.flush() return if len(snapshots_list) > 1: out.write('{"success": 0, "error": "--snap-sql-item only works with --timestamp for single snapshots"}\n') out.flush() return snapshot_data = snapshots_list[0] snap_sql = snapshot_data.get("snap_sql", []) if self.snap_sql_item < 0 or self.snap_sql_item >= len(snap_sql): error_msg = f"SQL item index {self.snap_sql_item} out of range. Available indices: 0-{len(snap_sql) - 1}" out.write(f'{{"success": 0, "error": "{error_msg}"}}\n') out.flush() return sql_item = snap_sql[self.snap_sql_item] result = {"success": 1, "data": sql_item} out.write(json.dumps(result, indent=2)) out.write("\n") out.flush() def get_incidents(self, session): return ( session.query(incident) .filter( incident.uid == self.uid, incident.server_id == self.server_id, or_( incident.incident_start_time.between(self.start, self.end), incident.incident_end_time.between(self.start, self.end), ), ) .order_by(incident.incident_start_time) .all() ) def stats_by_incident(self, session): result = [] for i in self.get_incidents(session): result.append( { "from": i.incident_start_time, "to": max(i.dump_time, i.incident_end_time or 0), "incidents": 1, "snapshots": i.snapshot_count, "duration": self.get_duration(i), } ) return result @staticmethod def get_duration(i, from_=0, to_=sys.maxsize): from_ = max(i.incident_start_time, from_) to_ = min(max(i.dump_time, i.incident_end_time or 0), to_) return to_ - from_ @staticmethod def get_incident_count(incidents, pos, from_ts, to_ts): count = 0 duration = 0 while pos < len(incidents): i = incidents[pos] if i.dump_time < from_ts: pos += 1 continue if i.incident_start_time > to_ts: break # we are done count += 1 pos += 1 duration += LVEReadSnaphot.get_duration(i, from_ts, to_ts) if count == 0: return 0, 0, pos else: return count, duration, pos - 1 def stats_by_time_unit(self, session, time_unit): incidents = self.get_incidents(session) snapshot_files = Snapshot({"uid": self.uid}).get_file_list() result = [] from_ts = self.start pos = 0 while from_ts < self.end: to_ts = min(from_ts + time_unit, self.end) incident_count, duration, pos = self.get_incident_count(incidents, pos, from_ts, to_ts) if incident_count == 0: # skip this one, we have nothing here from_ts = to_ts continue snapshots = Snapshot.snapshot_filter(snapshot_files, from_ts, to_ts) if len(snapshots) == 0: snapshots.append(0) # always show like there is at least one snapshot result.append( { "from": from_ts, "to": to_ts, "incidents": incident_count, "snapshots": len(snapshots), "duration": duration, } ) from_ts = to_ts return result def print_stats_json(self, stats): data = {"from": self.start, "to": self.end, "stats": stats} out = self.open() out.write(prepare_data_json(data)) out.write("\n") out.flush() def print_stats(self, stats): out = self.open() out.write(f"Stats from {dateutil.ts_to_iso(self.start)} to {dateutil.ts_to_iso(self.end)}\n") for stat in stats: out.write("---\n") out.write(f"\tfrom: {dateutil.ts_to_iso(stat['from'])}\n") out.write(f"\tto: {dateutil.ts_to_iso(stat['to'])}\n") out.write(f"\tincidents: {stat['incidents']}\n") out.write(f"\tsnapshots: {stat['snapshots']}\n") out.write(f"\tduration: {stat['duration']} sec.\n") out.flush() def stats(self, stats_unit_str): try: time_unit = dateutil.parse_period2(stats_unit_str) if self.end - self.start < time_unit: # this prevents situations when we get stats for last 10 minutes, but group it by 1 day self.start = self.end - time_unit group_by_incident = False except ValueError: time_unit = None group_by_incident = stats_unit_str == "auto" if not group_by_incident: raise session = sessionmaker(bind=self.engine)() try: if group_by_incident: stats = self.stats_by_incident(session) else: stats = self.stats_by_time_unit(session, time_unit) session.expunge_all() if self.do_json: self.print_stats_json(stats) else: self.print_stats(stats) finally: session.close() def run(self): snapshots_obj = Snapshot({"uid": self.uid}) self.report(snapshots_obj.get_snapshots(self.start, self.end)) def list(self): snapshots = Snapshot({"uid": self.uid}) snapshots_list = snapshots.get_ts_list(self.start, self.end) out = self.open() if self.do_json: out.write(prepare_data_json(snapshots_list)) else: out.write( f"Snapshots timestamp list; from {dateutil.ts_to_iso(self.start)} " f"to {dateutil.ts_to_iso(self.end)} for lve id {self.uid}\n" ) for ts in snapshots_list: out.write(dateutil.ts_to_iso(ts)) out.write("\n") out.write(REPORT_FOOTER) out.flush() def report(self, snapshots_generator): out = self.open() if self.do_json: # Handle --snap-sql-item request (single SQL query by index) if self.snap_sql_item is not None: self._handle_sql_item_request(snapshots_generator, out) return # Normal snapshot list output out.write('{"success": 1, "data": {"snapshots": [') first_snapshot = True for snapshot_data in snapshots_generator: # Apply SQL truncation in compact mode if self.compact_mode and "snap_sql" in snapshot_data: snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"]) if not first_snapshot: out.write(", ") else: first_snapshot = False out.write(json.dumps(snapshot_data)) out.write("]}}") out.write("\n") out.flush() return out.write( REPORT_HEADER % (dateutil.ts_to_iso(self.start), dateutil.ts_to_iso(self.end), self.uid, self.server_id) ) snapshot_count = 0 for snapshot_data in snapshots_generator: # Apply SQL truncation in compact mode if self.compact_mode and "snap_sql" in snapshot_data: snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"]) self.format_snapshot(out, snapshot_data) snapshot_count += 1 if snapshot_count == 0: out.write("No snapshots found for the specified time period.\n") out.write(REPORT_FOOTER) out.flush() def open(self): if self.output_file: try: return open(self.output_file, "w", encoding="utf-8") except IOError: pass # maybe need error message # fixme --> if we are trying to write to a file, and cannot, # this is an error, we shouldn't write to stdout return sys.stdout @staticmethod def _process_data_aggregate(process_data): """ Aggregate process data by PID by summing CPU % and MEM for same PIDs :param process_data: input data. Dictionary: { u'151048': {u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'}, u'151047': {u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'}, u'153642': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'}, u'153641': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'}, u'153640': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'} } :return: Output data - List of dictionaries: [ {u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'}, {u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'}, {u'MEM': u'3', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'}, ] """ # 1. Build thread dictionary as # pid: {'PID', 'CMD', 'MEM', 'CPU'} # and aggregate data thread_dict = {} for _, proc_data in process_data.items(): if "PID" not in proc_data: # old format snapshot, do not aggregate it # Example of old format snapshot: # {u'31228': {u'MEM': u'1', u'CMD': u'31228', u'IOPS': u'N/A', u'CPU': u'1%', u'IO': u'N/A'}} pid = proc_data["CMD"] process_data_new = {} process_data_new["PID"] = pid process_data_new["MEM"] = proc_data["MEM"] process_data_new["CMD"] = pid process_data_new["CPU"] = proc_data["CPU"] thread_dict[pid] = process_data_new continue pid = proc_data["PID"] # remove '%' from CPU value and convert CPU/MEM to integers if proc_data["CPU"] != "N/A": proc_data["CPU"] = int(proc_data["CPU"].replace("%", "")) if proc_data["MEM"] != "N/A": proc_data["MEM"] = int(proc_data["MEM"]) if pid in thread_dict: # PID already present, add new data to it if proc_data["CPU"] != "N/A": if thread_dict[pid]["CPU"] != "N/A": thread_dict[pid]["CPU"] += proc_data["CPU"] else: thread_dict[pid]["CPU"] = proc_data["CPU"] if proc_data["MEM"] != "N/A": if thread_dict[pid]["MEM"] != "N/A": thread_dict[pid]["MEM"] += proc_data["MEM"] else: thread_dict[pid]["MEM"] = proc_data["MEM"] else: # PID absent, add it thread_dict[pid] = proc_data # 2. Build output list out_data = list(thread_dict.values()) return out_data def format_snapshot(self, out, snapshot_data): out.write(f">>> {dateutil.ts_to_iso(snapshot_data['dump_time'])}, UID {snapshot_data['uid']}\n") out.write("\nFaults:\n") for k, v in snapshot_data["snap_faults"].items(): out.write(f"\t* {self.fault_names.get(k, k)}: {v}\n") if snapshot_data["snap_sql"]: out.write("\nSQL Queries:\n") sql_table = prettytable.PrettyTable(["CMD", "Time", "SQL-query"]) list(map(sql_table.add_row, snapshot_data["snap_sql"])) out.write(sql_table.get_string()) out.write("\nProcesses:\n") # fields = ('PID', 'COM', 'SPEED', 'MEM', 'IO', 'IOPS') # table = prettytable.PrettyTable(fields=fields) fields = set() for data in list(snapshot_data["snap_proc"].values()): for key in list(data.keys()): fields.add(key) # Keys list for data extacting data_keys = list(["PID"]) # Form table header: PID, CMD, Memory (Mb), CPU (%) table_columns = ["PID"] if "MEM" in fields: table_columns.append("Memory (Mb)") data_keys.append("MEM") if "CPU" in fields: table_columns.append("CPU (%)") data_keys.append("CPU") if "CMD" in fields: table_columns.append("CMD") data_keys.append("CMD") table = prettytable.PrettyTable(table_columns) # Left align for CMD column, if it present if "CMD" in table_columns: table.align["CMD"] = "l" # Do process data aggregation (CPU/MEM summing for all threads of same process) snap_proc_aggr = self._process_data_aggregate(snapshot_data["snap_proc"]) for data in snap_proc_aggr: table.add_row([data.get(k, "N/A") for k in data_keys]) out.write(str(table)) out.write("\n\n") if snapshot_data["snap_http"]: out.write("Http requests:\n") http_table = prettytable.PrettyTable(["Pid", "Domain", "Http type", "Path", "Http version", "Time"]) list(map(http_table.add_row, snapshot_data["snap_http"])) out.write(str(http_table)) out.write("\n\n")