One Hat Cyber Team
Your IP :
3.135.215.148
Server IP :
192.145.235.60
Server :
Linux ngx365.inmotionhosting.com 5.14.0-427.33.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Aug 30 09:45:56 EDT 2024 x86_64
Server Software :
Apache
PHP Version :
8.2.27
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
opt
/
sharedrads
/
View File Name :
check_mailchannels_dns
#!/opt/imh-python/bin/python3 from argparse import ArgumentParser, ArgumentTypeError as BadArg import json import re import subprocess import sys import ipaddress from typing import Literal from collections.abc import Iterator import time import dns.name import dns.resolver import dns.reversename import dns.query import dns.message from dns.rcode import Rcode from dns.rdatatype import RdataType from cpapis import whmapi1, CpAPIError import rads IMH_NS = { "74.124.210.242", "173.231.218.151", "70.39.150.2", "213.165.240.102", "173.231.218.41", "213.165.240.101", "70.39.146.236", "216.194.168.112", "173.231.218.110", } WHH_NS = {"173.205.127.4", "209.182.197.185"} def iter_system_ips(): stdout = subprocess.check_output( ["ip", "addr", "show", "scope", "global"], encoding="utf-8" ) for line in stdout.splitlines(): line = line.strip() if not line.startswith("inet "): continue addr = line.split()[1].split("/")[0] try: ipaddr = ipaddress.IPv4Address(addr) except ValueError: continue if ipaddr.is_global: yield addr SYSTEM_IPS = set(iter_system_ips()) PTR_SHARED = {x: True for x in SYSTEM_IPS} if rads.IMH_CLASS == "hub": THIS_BRAND = WHH_NS WRONG_BRAND = IMH_NS elif rads.IMH_CLASS == "reseller": THIS_BRAND = IMH_NS | SYSTEM_IPS WRONG_BRAND = WHH_NS else: THIS_BRAND = IMH_NS WRONG_BRAND = WHH_NS SHARED_RE = re.compile( r"(?:(?:ec|ams)?(biz|ld|res|ngx)(?:dev)?\d+\." r"(?:inmotionhosting|servconfig)|[ew]hub(?:dev)?\d+" r"\.webhostinghub)\.com$" ) def cpuser_safe_arg(user: str) -> str: """Argparse type: checks rads.cpuser_safe""" if not rads.cpuser_safe(user): raise BadArg("user does not exist or is restricted") return user class Output: """Handles printing to stdout""" def __init__(self, use_json: bool, verbose: bool, no_edits: bool): self.use_json = use_json self.use_verbose = verbose self.no_edits = no_edits self.proposed: list[dict] = [] self.data = {} def verbose(self, msg: str, end: str = "\n"): """Print if --verbose was set""" if self.use_verbose and not self.use_json: print(msg, end=end) def head(self, msg: str, end: str = "\n"): """Print in blue""" if not self.use_json: print(rads.color.blue(msg), end=end) def error(self, msg: str, end: str = "\n"): """Print in red""" if not self.use_json: print(rads.color.red(msg), end=end) def warn(self, msg: str, end: str = "\n"): """Print in yellow""" if not self.use_json: print(rads.color.yellow(msg), end=end) def okay(self, msg: str, end: str = "\n"): """Print in green""" if not self.use_json: print(rads.color.green(msg), end=end) def info(self, msg: str, end: str = "\n"): """Print non-colored""" if not self.use_json: print(msg, end=end) def parse_args() -> tuple[Output, dict[str, set[str]]]: parser = ArgumentParser(description=__doc__) output = parser.add_mutually_exclusive_group() # fmt: off output.add_argument('--verbose', action='store_true') output.add_argument('--json', action='store_true') parser.add_argument( '--reseller', '-r', action='store_true', help='Enable reseller mode' ) parser.add_argument( '--no-edits', '-n', action='store_true', help="Do not offer to make automated changes", ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--domain', '-d', nargs='+', dest='domains') group.add_argument( '--user', '-u', nargs='+', type=cpuser_safe_arg, dest='users' ) # fmt: on args = parser.parse_args() out = Output(args.json, args.verbose, args.no_edits) allusers = [] if args.reseller: for user in args.users: allusers.append(user) allusers.extend(rads.get_children(user)) else: allusers = args.users if args.users: domains = {} for user in allusers: udata = rads.UserData(user) domains[udata.primary.domain] = {x.domain for x in udata.subs} for dom in [x.domain for x in udata.addons + udata.parked]: if dom not in domains: domains[dom] = set() return out, domains return out, {x: set() for x in args.domains} def get_ns_ips(domain: str, out: Output) -> tuple[str, dict[str, str]]: name = dns.name.from_text(domain) depth = 2 default = dns.resolver.get_default_resolver() # On shared, the first resolver is our ecmp one. # The second is a public resolver. Use ours. resolver = default.nameservers[0] ns_ips = {} last_split = False last_success = "" while not last_split: name_split = name.split(depth) last_split = name_split[0].to_unicode() == "@" sub_name = name_split[1] out.verbose(f"Looking up {sub_name} on {resolver}") try: ans = dns.resolver.resolve_at( resolver, sub_name, 'NS', lifetime=8, raise_on_no_answer=False ) except dns.exception.Timeout: out.verbose(f"Timed out querying {domain} NS, waiting 10 seconds.") time.sleep(10) ans = dns.resolver.resolve_at( resolver, sub_name, 'NS', lifetime=8, raise_on_no_answer=False ) rcode = ans.response.rcode() if rcode != Rcode.NOERROR: if rcode == Rcode.NXDOMAIN: raise RuntimeError( f"{sub_name} doesn't appear to be registered" ) raise RuntimeError( f"Error looking up {sub_name}: {Rcode.to_text(rcode)}" ) if len(ans.response.authority) > 0: rrset = ans.response.authority[0] else: rrset = ans.response.answer[0] for line in rrset.to_text().splitlines(): out.verbose(f" => {line}") if not getattr(rrset[0], "target", None): out.verbose(f"Stopped at {name}. last lookup {sub_name} gave none") if depth < 3: raise RuntimeError( f"{domain} doesn't appear to have its NS records setup" ) return last_success, ns_ips rdtype: RdataType = rrset[0].rdtype if rdtype.name == 'SOA': out.verbose(f' ** Same server is authoritative for {sub_name}') else: auth = rrset[0].target resolver = default.resolve(auth).rrset[0].to_text() out.verbose( f' ** {auth} ({resolver}) is authoritative for {sub_name}' ) last_success = sub_name ns_ips.clear() for result in rrset: hostname = result.target ipaddr = default.resolve(hostname).rrset[0].to_text() ns_ips[ipaddr] = hostname if last_split: return last_success, ns_ips depth += 1 raise RuntimeError(f"{domain} doesn't appear to be registered") def get_zone(domain: str) -> dict: ret = whmapi1("dumpzone", {"domain": domain}, check=True) return ret["data"]["zone"][0]["record"] def check_spf( out: Output, zone: str, txt_records: list, external: bool ) -> bool: found = False records: list = out.data[zone]["spf"]["records"] for record in txt_records: txt: str = record["txtdata"] if not txt.startswith("v=spf1 "): continue # add, then edit by reference later if needed errors = [] data = {"txt": txt, "new": None, "errors": errors} records.append(data) out.info(f"Found SPF record: {txt}") found = True try: parsed = SpfRecord(txt) except Exception as exc: msg = f"Failed to parse above SPF record: {exc}" out.error(msg) errors.append(msg) continue if errs := parsed.errors | parsed.fatal: out.error("Errors in above SPF record:") for err in errs: out.error(f" - {err}") errors.append(err) if parsed.orig_had_mailchan: out.okay("SPF include found") data["had_mailchan"] = 1 else: out.warn("SPF include not found") data["had_mailchan"] = 0 if not parsed.fatal and str(parsed).split() != txt.split(): out.warn("Suggested SPF record change:") out.info(f"Old: {txt}") new = str(parsed) data["new"] = new out.info(f"New: {new}") if external: out.warn("Cannot automatically fix as DNS is external.") else: out.proposed.append( { "domain": zone, "record": "@", "data": new, "func": "editzonerecord", "args": { "domain": zone, "line": record["Line"], "type": "TXT", "txtdata": new, }, } ) if not found: out.info(f"No SPF records Found for {zone}") return found def check_spf_external(out: Output, zone: str) -> bool: """Check SPF by resolving the DNS""" txt_records = [] try: for result in dns.resolver.resolve(zone, "TXT"): txt_records.append({"txtdata": str(result)}) except Exception: pass return check_spf(out, zone, txt_records, external=True) def check_spf_local(out: 'Output', zone: str) -> bool: """Check SPF by dumping the local zone""" try: zone_file = get_zone(zone) except Exception as exc: out.data[zone]["spf"]["error"] = str(exc) out.warn(f"Unable to find zone for {zone}") return False txt_records = [ record for record in zone_file if record["type"] == "TXT" and record["name"] == f"{zone}." ] if not txt_records: out.info(f"No SPF records Found for {zone}") return False return check_spf(out, zone, txt_records, external=False) def check_auth( out: Output, zone: str, require: bool, txt_records: list, external: bool ): name = f"_mailchannels.{zone}." if rads.IMH_CLASS == "hub": auth = "v=mc1 auth=webhostinghub" else: auth = "v=mc1 auth=inmotionhosting" found = False records: list = out.data[zone]["auth"]["records"] records.extend(txt_records) for record in txt_records: txt: str = record["txtdata"].strip('"') found = True if txt == auth: out.okay("_mailchannels auth matches:", end=" ") out.data[zone]["auth"]['match'] = 1 out.info(auth) else: out.error("_mailchannels has unexpected auth:", end=" ") out.info(txt) out.info(f"Expected: {auth}") if not found: if require: out.warn(f"{name} not found") if external: out.warn("Cannot automatically fix as DNS is external.") else: out.proposed.append( { "domain": zone, "record": "_mailchannels", "data": auth, "func": "addzonerecord", "args": { "domain": zone, "name": "_mailchannels", "type": "TXT", "txtdata": auth, }, } ) else: out.info(f"{name} not found (OK: no SPF either)") def check_auth_external(out: Output, zone: str, require: bool): """Check Auth by resolving the DNS""" name = f"_mailchannels.{zone}." txt_records = [] try: for result in dns.resolver.resolve(name, "TXT"): txt_records.append({"txtdata": str(result)}) except Exception: pass check_auth(out, zone, require, txt_records, external=True) def check_auth_local(out: 'Output', zone: str, require: bool): """Check Auth by dumping the local zone""" name = f"_mailchannels.{zone}." try: zone_file = get_zone(zone) except Exception: if require: out.warn(f"{name} not found") else: out.info(f"{name} not found (OK: no SPF either)") return txt_records = [ record for record in zone_file if record["type"] == "TXT" and record["name"] == name ] check_auth(out, zone, require, txt_records, external=False) class SpfRecord: """Parses an SPF record""" def __init__(self, line: str): self.errors = set() self.fatal = set() self.orig_had_mailchan = False items = line.split() if items[0] != "v=spf1": self.errors.add("Invalid or missing SPF version") items.insert(0, "v=spf1") self._items = [] for item in items[1:]: if "v=spf" in item: self.errors.add("v=spf1 must be in the beginning only") continue try: spf_item = SpfItem(self, item) except Exception as exc: self.fatal.add(f"Error parsing {item}: {exc}") else: self._items.append(spf_item) self.pre_check() self.orig_had_mailchan = self.has_mailchan() if not self.fatal: self.fix() self.post_check() def pre_check(self): """Pre self.fix() validation checks""" if len([x for x in self._items if x.mechanism == "include"]) > 10: self.errors.add("exceeds 10 include statements") if not self._items: raise ValueError("No clauses in SPF record") if len(str(self)) > 255: self.errors.add("SPF record length is too long") if len([x for x in self._items if x.mechanism == "all"]) > 1: self.errors.add("More than one 'all' clause") if self._items[-1].mechanism != "all": self.errors.add("Last statement is not an 'all' clause") def post_check(self): """Post self.fix() validation checks""" if len(str(self)) > 255: self.fatal.add("Post auto-fix SPF length is too long") if len([x for x in self._items if x.mechanism == "include"]) > 10: self.fatal.add("exceeds 10 include statements") def _iter_str_rules(self): for item in self._items: yield str(item) def __str__(self) -> str: return f"v=spf1 {' '.join(self._iter_str_rules())}" def fix(self): """Try to fix the SPF record shared -> mailchannels""" for index, item in enumerate(self._items): if item.is_shared(): self._items[index] = SpfItem( self, "include:relay.mailchannels.net" ) self._items = list( self._dedupe(self._fix_alls(self._add_record(self._items))) ) def _dedupe(self, items: Iterator["SpfItem"]): found = set() for item in items: if item.mechanism == "ptr": self.errors.add("The 'ptr' clause is no longer in the SPF spec") continue val = str(item) if val in found: continue yield item found.add(val) def _fix_alls(self, items: Iterator["SpfItem"]): first_all = None for item in items: item: SpfItem if item.mechanism == "all": if first_all is None: first_all = item continue yield item if first_all is None: # This is a soft fail. Ideally it should be a hard fail, -all, but # when editing customer records, let's be more cautious. yield SpfItem(self, "~all") return first_all: SpfItem if first_all.qualifier not in "-~": self.errors.add(f"{str(first_all)} would allow all") first_all.qualifier = "~" yield first_all def _add_record(self, items: Iterator["SpfItem"]): # We want to add it last, but before any "all" statements added = False for item in items: if item.mechanism == "all": yield SpfItem(self, "include:relay.mailchannels.net") added = True yield item if not added: yield SpfItem(self, "include:relay.mailchannels.net") def has_mailchan(self) -> bool: """Return whether this SPF record contains the mailchan include""" for item in self._items: if str(item) == "include:relay.mailchannels.net": return True return False class SpfItem: """Parses an SPF record column""" qualifier: Literal["", "+", "-", "~", "?"] mechanism: Literal[""] extra: str def __init__(self, parent: "SpfRecord", data: str): # Qualifier: # "+" Pass # "-" Fail # "~" SoftFail # "?" Neutral self.parent = parent self.mechanism = "" self.extra = "" if data[0] in "+-~?": self.qualifier = data[0] data = data[1:] else: # unspecified means '?' but we're not looking to edit those self.qualifier = "" # Mechanism: if data.startswith("a/"): # a/<prefix-length> self.mechanism = "a" self.extra = data[1:] return if data.startswith("mx/"): # mx/<prefix-length> self.mechanism = "mx" self.extra = data[2:] return for bare_mech in ("all", "mx", "a", "ptr"): # mechanisms that can be specified with nothing following if data == bare_mech: self.mechanism = data self.extra = "" return # mx:<domain> # mx:<domain>/<prefix-length> # a:<domain> # a:<domain>/<prefix-length> # ip4:<ip4-address> # ip4:<ip4-network>/<prefix-length> # ip6:<ip6-address> # ip6:<ip6-network>/<prefix-length> # include:<domain> # exists:<domain> # ptr:<domain> (deprecated) mechanism_re = re.compile(r"(exists|include|ip4|ip6|a|mx)(:[^\s]+)$") # exp=<domain> # redirect=<domain> modifier_re = re.compile(r"(exp|redirect)(=[^\s]+)") for regex in (mechanism_re, modifier_re): if match := regex.match(data): self.mechanism, self.extra = match.groups() if self.mechanism in ("exp", "redirect"): # Qualitifiers are invalid here. self.qualifier = "" if ( self.mechanism == "include" and self.extra == ":relay.mailchannels.net" ) and self.qualifier != "": self.parent.errors.add( "include:relay.mailchannels.net should not have a prefix" ) self.qualifier = "" if self.mechanism == "include" and self.qualifier not in ("", "?"): self.parent.errors.add( f"{self.qualifier} is not a valid prefix for include:" ) self.qualifier = "" elif self.mechanism is None: raise ValueError("Unknown mechanism") def __str__(self) -> str: return f"{self.qualifier}{self.mechanism}{self.extra}" def is_shared(self) -> bool: """Return True if this SPF rule looks like it's pointing to a shared server""" if self.mechanism in ("a", "include"): host = self.extra.lower().rstrip(".") if host in ( ":servconfig.com", ":smtp.servconfig.com", ): return True if self.mechanism == "a" and self.extra.startswith(":"): try: host = self.extra.lower()[1:] except IndexError: host = None if host and SHARED_RE.match(host): return True if self.mechanism == "ip4" and self.extra.startswith(":"): try: ipaddr = self.extra[1:] except IndexError: ipaddr = None if ipaddr and ptr_is_shared(ipaddr): return True if self.mechanism == "mx" and self.extra.startswith(":"): try: host = self.extra.lower()[1:] except IndexError: host = None if host: if SHARED_RE.match(host): return True if host.endswith("inmotionhosting.com"): return True if host.endswith("servconfig.com"): return True return False def ptr_is_shared(ipaddr: str) -> bool: """Check if an IP has the PTR of a shared server""" try: rev = dns.reversename.from_address(ipaddr) for result in dns.resolver.resolve(rev, "PTR"): host = str(result).rstrip(".") if SHARED_RE.match(host): PTR_SHARED[ipaddr] = True return True except Exception: pass PTR_SHARED[ipaddr] = False return False def main(): out, domains = parse_args() if rads.IMH_ROLE != "shared": out.error( "This script is meant for shared servers." "Assuming this is a non-res IMH server." ) for domain, subdomains in domains.items(): out.head(f"== Checking domain: {domain} ==") out.data[domain] = {"ns": {"error": None, "msgs": [], "ns_here": None}} try: check_ns(out, domain) except Exception as exc: out.error(str(exc)) out.data[domain]["ns"]["error"] = str(exc) continue check_records(out, domain) for subdomain in subdomains: out.head(f"== Checking subdomain: {subdomain} ==") out.data[subdomain] = {} check_records(out, subdomain) if out.use_json: print(json.dumps(out.data, indent=4)) elif out.proposed: out.info("PROPOSED CHANGES") for change in out.proposed: domain = change["domain"] record = change["record"] data = change["data"] out.info(f"{domain} {record} {data}") if out.no_edits or not rads.prompt_y_n("Approve & commit DNS changes?"): return for change in out.proposed: try: whmapi1(change['func'], change['args'], check=True) except CpAPIError as exc: out.error(str(exc)) def check_ns(out: Output, domain: str): ns_dom, ns_ips = get_ns_ips(domain, out) ns_msgs: list = out.data[domain]["ns"]['msgs'] here = None # Compare by IPs for ns here becasue of vanity nameservers for ns_ip, ns_name in ns_ips.items(): if ns_ip in THIS_BRAND: if here is None: here = 1 msg = f"{ns_dom} has correct ns - {ns_name} ({ns_ip})" out.okay(msg) elif ns_ip in WRONG_BRAND: here = 0 msg = f"{ns_dom} has ns of wrong brand - {ns_name} ({ns_ip})" out.error(msg) else: here = 0 msg = f"{ns_dom} has external ns - {ns_name} ({ns_ip})" out.error(msg) ns_msgs.append(msg) out.data[domain]["ns"]["ns_here"] = here def check_records(out: 'Output', zone: str): ns_data = out.data[zone].get("ns", {}) ns_here = ns_data.get('ns_here') out.data[zone]["spf"] = {"error": None, "records": []} try: if ns_here or ns_here is None: has_spf = check_spf_local(out, zone) else: has_spf = check_spf_external(out, zone) except Exception as exc: out.data[zone]["spf"]["error"] = str(exc) out.error(f"Could not lookup TXT records for {zone}: {exc}") return out.data[zone]["auth"] = {"Error": None, "match": 0, "records": []} try: if ns_here or ns_here is None: check_auth_local(out, zone, require=has_spf) else: check_auth_external(out, zone, require=has_spf) except Exception as exc: out.data[zone]["auth"]["error"] = str(exc) out.error(f"Error looking up _mailchannels auth for {zone}: {exc}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Exited on SIGINT", file=sys.stderr) except BrokenPipeError: print("Exited with broken pipe", file=sys.stderr)