One Hat Cyber Team
Your IP :
3.23.59.187
Server IP :
192.145.235.60
Server :
Linux ngx365.inmotionhosting.com 5.14.0-427.33.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Aug 30 09:45:56 EDT 2024 x86_64
Server Software :
Apache
PHP Version :
8.2.27
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
opt
/
sharedrads
/
View File Name :
check_max_children
#!/opt/imh-python/bin/python3 import argparse from datetime import datetime import glob import os import re import json import platform import subprocess import yaml from prettytable import PrettyTable def set_environment_vars(): """ Sets the value of the global variable log_glob based on hostname and server type. Returns: tuple: - log_glob (str or None): The glob pattern for log file paths based on the server type. - srv_type (str or None): The environment name based on the server type. """ # Get the hostname hostname = platform.node() # Check if it's a Shared 'ngx' server if re.match(r"^(amsngx|ecngx|ngx|ngxdev)[0-9]+\.inmotionhosting\.com$", hostname): log_glob = "/var/ngxconf/phpfpm/logs/*.log" srv_type = "shared_ngx" # Check if it's a cPanel server elif os.path.exists("/opt/cpanel"): log_glob = "/opt/cpanel/ea-php*/root/usr/var/log/php-fpm/error.log" srv_type = "cpanel_fpm" # Check if it's a Platform i VPS elif os.path.exists("/etc/ansible/wordpress-ultrastack"): log_glob = "/var/log/php-fpm/error.log*" srv_type = "ultrastack" # Check if it's a CWP Server elif os.path.exists("/usr/local/cwpsrv/bin/cwpsrv"): log_glob = "/opt/alt/php-fpm*/usr/var/log/php-fpm.log" srv_type = "cwp" else: log_glob = None # Default value if neither condition matches srv_type = None # Default value if neither condition matches return log_glob, srv_type def parse_args(srv_type): """ Parses command-line arguments for the script. Args: srv_type (str): An identifier for the server type. Returns: argparse.Namespace: Parsed command-line arguments. """ parser = argparse.ArgumentParser( description="Parse log files for max_children errors and generate reports." ) if srv_type != "ultrastack": parser.add_argument( "-u", "--username", required=True, help="The username to search for." ) parser.add_argument( "--no-truncate", action="store_true", help="Disable truncation of log messages for pools with more than 10 errors.", ) parser.add_argument( "--no-pager", action="store_true", default=False, help="Disable pager and print output directly to the screen. Default is False.", ) return parser.parse_args() def get_pools_for_user(srv_type, username=None): """ Generates a list of pool names for a given username. Args: srv_type (str): An identifier for the server type. username (str): The username to process. Returns: list: A list of pool names. """ all_domains = [] # Check if the environment is "ultrastack" if srv_type == "ultrastack": # Define the path to the JSON file json_file_path = "/root/.ansible/logs/wp3_run/latest/variables" try: # Open and parse the JSON file with open(json_file_path, "r", encoding="utf-8") as file: data = json.load(file) # Extract the "site_domain" value and set it as the all_domains list all_domains = [data.get("site_domain", "")] except FileNotFoundError: print(f"Error: JSON file '{json_file_path}' not found.") except json.JSONDecodeError: print(f"Error: Failed to decode JSON file '{json_file_path}'.") except Exception as e: print(f"Unexpected error: {e}") # Check if the environment is "cwp" elif srv_type == "cwp": # FPM Pools are named as the CWP user all_domains = [username] if username else [] # Check if the environment is "shared_ngx" elif srv_type == "shared_ngx": yaml_file_path = f"/var/cpanel/userdata/{username}/main" # Check if the file exists if not os.path.exists(yaml_file_path): raise FileNotFoundError( f"YAML file not found for user '{username}' at {yaml_file_path}" ) try: with open(yaml_file_path, "r", encoding="utf-8") as file: data = yaml.safe_load(file) # Extract domains main_domain = data.get("main_domain", "") sub_domains = data.get("sub_domains", []) # Combine main_domain and sub_domains into a single list all_domains = [main_domain] + sub_domains except yaml.YAMLError as e: raise RuntimeError( f"Error reading YAML file for user '{username}': {e}" ) from e except KeyError as e: raise RuntimeError( f"Missing expected key in YAML file for user '{username}': {e}" ) from e # It's a regular cPanel server else: yaml_file_path = f"/var/cpanel/userdata/{username}/main" # Check if the file exists if not os.path.exists(yaml_file_path): raise FileNotFoundError( f"YAML file not found for user '{username}' at {yaml_file_path}" ) try: with open(yaml_file_path, "r", encoding="utf-8") as file: data = yaml.safe_load(file) # Extract domains main_domain = data.get("main_domain", "") addon_domains = list(data["addon_domains"].keys()) sub_domains = [ item for item in data["sub_domains"] if item not in set(data["addon_domains"].values()) ] # Combine main_domain addon_domains and sub_domains into a single list all_domains = [main_domain] + addon_domains + sub_domains except yaml.YAMLError as e: raise RuntimeError( f"Error reading YAML file for user '{username}': {e}" ) from e except KeyError as e: raise RuntimeError( f"Missing expected key in YAML file for user '{username}': {e}" ) from e # Convert domains to pool names by replacing '.' with '_' pools = [domain.replace(".", "_") for domain in all_domains] return pools def search_logs_for_max_children(pools, log_glob): """ Searches log files for instances of pools reaching max_children. Args: pools (list): List of pool names to search for. log_glob (str): Glob pattern for log file paths. Returns: list: A list of log messages where pools reached max_children. """ log_messages = [] # Escape pool names for regex (in case they have special characters) pool_regex = "|".join(re.escape(pool) for pool in pools) max_children_pattern = re.compile( rf"\[pool ({pool_regex})\] server reached (?:pm\.)?max_children setting" ) # Use glob to find matching log files log_files = glob.glob(log_glob) print("Searching log files... This may take a few minutes if logs are large.") for log_file in log_files: try: with open(log_file, "r", encoding="utf-8") as file: for line in file: # Search for lines matching the max_children pattern if max_children_pattern.search(line): log_messages.append(line.strip()) except Exception as e: print(f"Error reading file {log_file}: {e}") return log_messages def analyze_log_messages(pools, log_messages): """ Analyzes log messages for max_children errors related to specific pools. Args: pools (list): List of pool names to search for. log_messages (list): List of log messages to analyze. Returns: str: JSON-formatted results containing error counts, timespan, and log messages per pool. """ results = { pool: {"error_count": 0, "timespan": None, "log_messages": []} for pool in pools } # Escape pool names for regex pool_regex = "|".join(re.escape(pool) for pool in pools) max_children_pattern = re.compile( rf"\[(?P<timestamp>[^\]]+)\] WARNING: \[pool (?P<pool>{pool_regex})\] server reached (?:pm\.)?max_children setting" ) for line in log_messages: match = max_children_pattern.search(line) if match: timestamp_str = match.group("timestamp") pool_name = match.group("pool") timestamp = datetime.strptime(timestamp_str, "%d-%b-%Y %H:%M:%S") # Update results for the pool pool_data = results[pool_name] pool_data["error_count"] += 1 pool_data["log_messages"].append(line.strip()) # Update timespan if pool_data["timespan"] is None: pool_data["timespan"] = {"start": timestamp, "end": timestamp} else: pool_data["timespan"]["start"] = min( pool_data["timespan"]["start"], timestamp ) pool_data["timespan"]["end"] = max( pool_data["timespan"]["end"], timestamp ) # Convert datetime objects to strings for JSON serialization for pool_data in results.values(): if pool_data["timespan"]: pool_data["timespan"]["start"] = pool_data["timespan"]["start"].strftime( "%Y-%m-%d %H:%M:%S" ) pool_data["timespan"]["end"] = pool_data["timespan"]["end"].strftime( "%Y-%m-%d %H:%M:%S" ) return json.dumps(results, indent=4) def generate_human_readable_report(error_data, truncate_logs=True): """ Generates a human-readable report from the JSON output of analyze_log_messages(), including a tabular summary report aggregating data from all pools. Allows control over log message truncation. Args: error_data (str): JSON string generated by analyze_log_messages(). truncate_logs (bool): Whether to truncate log messages for pools with more than 10 errors. Returns: str: Formatted human-readable report with a summary table section. """ try: data = json.loads(error_data) except json.JSONDecodeError: return "Error: Invalid JSON data." report_lines = [] # Variables for summary data total_error_count = 0 overall_timespan_start = None overall_timespan_end = None pool_error_percentages = [] for pool, details in data.items(): # Update total error count and record percentage breakdown total_error_count += details["error_count"] pool_error_percentages.append( { "pool": pool, "error_count": details["error_count"], "percentage": 0, # Placeholder for now } ) # Update overall timespan if details["timespan"]: pool_start = details["timespan"]["start"] pool_end = details["timespan"]["end"] if overall_timespan_start is None or pool_start < overall_timespan_start: overall_timespan_start = pool_start if overall_timespan_end is None or pool_end > overall_timespan_end: overall_timespan_end = pool_end # Calculate percentages for entry in pool_error_percentages: if total_error_count > 0: entry["percentage"] = (entry["error_count"] / total_error_count) * 100 # Sort pools by percentage in descending order pool_error_percentages.sort(key=lambda x: x["percentage"], reverse=True) # Add summary section report_lines.append("Summary Report") report_lines.append("=" * 30) report_lines.append(f"Total Pools: {len(data)}") report_lines.append(f"Total Errors: {total_error_count}") if overall_timespan_start and overall_timespan_end: report_lines.append( f"Overall Timespan: {overall_timespan_start} to {overall_timespan_end}" ) else: report_lines.append("Overall Timespan: No errors recorded across pools") # Create tabular summary using PrettyTable table = PrettyTable() table.field_names = ["Pool", "Errors", "% of Total Errors"] for entry in pool_error_percentages: table.add_row( [entry["pool"], entry["error_count"], f"{entry['percentage']:.2f}"] ) table.align["Pool"] = "l" # Left-align pool names for better readability report_lines.append("\nError Breakdown by Pool:") report_lines.append(table.get_string()) # Add detailed report for each pool report_lines.append("\nMax Children Errors Report") report_lines.append("=" * 30) for pool, details in data.items(): report_lines.append(f"\nPool: {pool}") report_lines.append("-" * 30) report_lines.append(f"Error Count: {details['error_count']}") if details["timespan"]: report_lines.append( f"Timespan: {details['timespan']['start']} to {details['timespan']['end']}" ) else: report_lines.append("Timespan: No errors recorded") # Handle log messages if details["log_messages"]: details["log_messages"] = sorted( details["log_messages"], key=extract_timestamp ) log_count = len(details["log_messages"]) if truncate_logs and log_count > 10: report_lines.append("\nLog Messages (truncated):") report_lines.extend(f" - {msg}" for msg in details["log_messages"][:5]) report_lines.append(" ...") report_lines.extend( f" - {msg}" for msg in details["log_messages"][-5:] ) report_lines.append( " (Note: Log messages truncated. Use `--no-truncate` to see all.)" ) else: report_lines.append("\nLog Messages:") report_lines.extend(f" - {msg}" for msg in details["log_messages"]) else: report_lines.append("Log Messages: None") return "\n".join(report_lines) # Function to extract and parse the timestamp def extract_timestamp(log_entry): timestamp_str = log_entry.split("]")[0][ 1: ] # Extract the part inside square brackets return datetime.strptime(timestamp_str, "%d-%b-%Y %H:%M:%S") def main(): """Main function""" # Set variables for the FPM log paths and server type log_glob, srv_type = set_environment_vars() # Parse args args = parse_args(srv_type) # Generate a list of FPM pool names if srv_type != "ultrastack": pools = get_pools_for_user(srv_type, args.username) else: pools = get_pools_for_user(srv_type) # Generate a list of log messages containing errors for user's FPM pools log_msgs = search_logs_for_max_children(pools, log_glob) # Aggregate max_children errors from log messages for pools error_data = analyze_log_messages(pools, log_msgs) # Generate human readable report from the aggregated ata report = generate_human_readable_report(error_data, not args.no_truncate) if args.no_pager: # Print the output directly to the screen print(report) elif args.no_truncate: # Use a pager to display the output if --no-truncate is True pager = os.getenv("PAGER", "less") # Default to 'less' if PAGER is not set try: with subprocess.Popen( [pager, "-S"], stdin=subprocess.PIPE, text=True ) as pager_process: pager_process.communicate(report) except FileNotFoundError: # If the pager is not found, fall back to printing to the screen print("Pager not found. Printing output to the screen:") print(report) else: # Default behavior: print output to the screen print(report) if __name__ == "__main__": main()