One Hat Cyber Team
Your IP :
3.135.215.148
Server IP :
192.145.235.60
Server :
Linux ngx365.inmotionhosting.com 5.14.0-427.33.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Aug 30 09:45:56 EDT 2024 x86_64
Server Software :
Apache
PHP Version :
8.2.27
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
opt
/
imh-spf-verify
/
View File Name :
spf_coordinator.py
#!/opt/imh-python/bin/python3 import json import sys import subprocess import re import rads import signal import logging import uuid NS_RE = re.compile( r"(.+) has (?P<status>correct|external) ns - (?P<ns>.+) \((?P<ns_ip>.+)\)" ) ALREADY_STDIN = None class DNSCheck: def __init__(self, check_name): self.check_name = check_name self.res_icon = "fa-question" self.res_class = "text-error" self.res_title = "An unknown error has occurred." self.error_tag = None self.current = [] self.recommended = {} self.has_problem = True self.is_error = False self.is_empty = False self.extra = {} def error(self, message): """Considers this DNSCheck an error with the passed in message.""" self.res_icon = "fa-exclamation-circle" self.res_class = "text-error" self.res_title = message self.has_problem = True self.is_error = True def warning(self, message, important = False): """Considers this DNSCheck a warning with the passed in message. If marked as important, uses a circle exclamation instead of triangle. """ self.res_icon = "fa-exclamation-triangle" if important: self.res_icon = "fa-exclamation-circle" self.res_class = "text-warning" self.res_title = message self.has_problem = True def success(self, message, important = False): """Considers this DNSCheck a success with the passed in message. If marked as important, uses a circle exclamation instead of triangle. """ self.res_class = "text-success" self.res_icon = "fa-check" if important: self.res_icon = "fa-exclamation-circle" self.res_title = message self.has_problem = False def recommend(self, name, value, message): """Provide recommended values for this DNSCheck""" self.recommended = { "name": name, "value": value, "message": message } def add_extra(self, name, value): """Adds extra data to the DNSCheck to be used by frontend""" self.extra[name] = value def build(self): """Builds the data required for this DNSCheck to be rendered on the frontend.""" if self.is_empty: return {} return { "title": self.check_name, "status": { "title": self.res_title, "icon": self.res_icon, "class": self.res_class }, "current": self.current, "recommended": self.recommended, "extra": self.extra } def parse_nameservers(ns_data: dict) -> DNSCheck: """Parses out the nameserver check data from check_mailchannels_dns""" ns_check = DNSCheck("Nameservers") ns_error = ns_data.get("error") ns_msgs = ns_data.get("msgs") ns_here = ns_data.get("ns_here") if ns_error: if "query name does not exist" in ns_error: ns_check.error_tag = new_tag( "Domain Not Registered", "Your domain is not registered.", "label-danger" ) else: ns_check.error_tag = new_tag( "Nameserver Error", "An error was encountered with your domain's nameservers.", "label-danger" ) ns_check.error("A problem was found with your domain") ns_check.current.append(ns_error) elif ns_msgs: ns_check.add_extra("external", not ns_here) for msg in ns_msgs: if ns_match := NS_RE.match(msg): ns_match_data = ns_match.groupdict() ns_ns = ns_match_data.get('ns') ns_ip = ns_match_data.get('ns_ip') ns_check.current.append(f"{ns_ns} ({ns_ip})") ns_check.has_problem = False if ns_here: ns_check.success("This domain's DNS is managed here.") else: ns_check.success( "While not required, this domain's DNS is not managed here.", important = True ) return ns_check def parse_spf(spf_data: dict, domain: str) -> DNSCheck: """Parses out the SPF check data from check_mailchannels_dns""" spf_check = DNSCheck("SPF") if not spf_data: spf_check.is_empty = True return spf_check spf_error = spf_data.get("error") spf_records = spf_data.get("records") if spf_records: if len(spf_records) > 1: spf_error = "Conflicting SPF records found, only one is needed." for spf in spf_records: spf_txt = spf.get("txt") spf_new = spf.get("new") spf_err = spf.get("errors") spf_check.current.append(spf_txt) if spf_new: spf_check.recommend( domain, spf_new, "Change your existing DNS record to:" ) spf_mailchan = spf.get("had_mailchan") if spf_mailchan: if "smtp.servconfig.com" in spf_txt: spf_check.error( "Please replace your SPF with what is recommended as smtp.servconfig.com is no longer valid.", ) else: spf_check.warning( "Changing this is recommended for a more concise SPF record but not required." ) else: if "smtp.servconfig.com" in spf_txt: spf_check.error( "Please replace your SPF with what is recommended as smtp.servconfig.com is no longer valid.", ) else: spf_check.error( "Not addressing this will result in poor email deliverability." ) else: spf_check.success("Your current SPF is valid and includes what we recommend.") else: spf_check.current.append("[missing]") spf_check.error( "Not having an SPF record will result in poor email deliverability." ) spf_check.recommended = { "name": domain, "value": "v=spf1 include:relay.mailchannels.net ~all", "message": "Add the following DNS record to your domain:" } if spf_error: spf_check.error( f"There is a problem with your SPF: {spf_error}" ) return spf_check def parse_auth(auth_data: dict, domain: str) -> DNSCheck: """Parses out the auth check data from check_mailchannels_dns""" auth_check = DNSCheck("MailChannels Domain Lockdown") if not auth_data: auth_check.is_empty = True return auth_check if rads.IMH_CLASS == "hub": rec_auth = "v=mc1 auth=webhostinghub" else: rec_auth = "v=mc1 auth=inmotionhosting" mailchan_domain = f"_mailchannels.{domain}." auth_error = auth_data.get("error") auth_match = auth_data.get("match") auth_records = auth_data.get("records") if auth_records: for record in auth_records: record_txt = record.get("txtdata") auth_check.current.append(record_txt) if auth_match: auth_check.success( "Your domain's MailChannel lockdown is valid and locked down." ) else: auth_check.error( "This server is unable to send email on your behalf unless this is addressed." ) auth_check.recommend( mailchan_domain, rec_auth, "Change your existing DNS record to:" ) else: auth_check.current.append("[missing]") auth_check.warning( "Adding this is recommended to prevent spoofing." ) auth_check.recommend( mailchan_domain, rec_auth, "Add the following DNS record to your domain:" ) if auth_error: auth_check.error( f"There is a problem with your MailChannel Lockdown record: {auth_error}" ) return auth_check def new_tag(name, description, tag_class): """Simple wrapper for simple dict containing tag data""" return { "name": name, "description": description, "class": tag_class, } def parse_domain(domain, data): """Takes all of the data from check_mailchannels_dns and builds out the information needed for the cPanel plugin to render.""" domain_tags = [] if ns_data := data.get("ns"): ns_check = parse_nameservers(ns_data) else: return None spf_check = parse_spf(data.get("spf"), domain) auth_check = parse_auth(data.get("auth"), domain) if ns_check.has_problem: domain_tags.append(ns_check.error_tag) else: if spf_check.has_problem: if spf_check.is_error: domain_tags.append( new_tag( "SPF Error", "This domain's SPF record is invalid and will have poor email deliverability.", "label-danger" ) ) else: domain_tags.append( new_tag( "SPF Recommendation", "There is an SPF recommendation available for this domain.", "label-warning" ) ) else: domain_tags.append( new_tag( "SPF Verified", "This domain's SPF is valid.", "label-success" ) ) if auth_check.has_problem: if auth_check.is_error: domain_tags.append( new_tag( "Domain Lockdown Error", "This server is unable to send email on behalf of this domain.", "label-danger" ) ) else: domain_tags.append( new_tag( "Domain Lockdown Recommendation", "There is a domain lockdown recommendation available.", "label-warning" ) ) else: domain_tags.append( new_tag( "Domain Lockdown Verified", "This domain's MailChannels Lockdown is valid.", "label-success" ) ) if ns_check.extra.get("external"): domain_tags.append( new_tag( "External DNS", "This domain's DNS is managed elsewhere.", "label-default" ) ) return { "domain": { "name": domain, "html_friendly": domain.replace(".", "_"), "tags": domain_tags, }, "dnsCheck": [ ns_check.build(), spf_check.build(), auth_check.build() ] } def run_check(): """Calls out to check_mailchannels_dns with the authenticated user""" # In the event cPanel doesn't pass stdin for some reason it will # cause this to stall. Let's try to self-correct. signal.signal(signal.SIGALRM, raise_stdin_fail) signal.alarm(5) stdin_data = json.loads(sys.stdin.read()) signal.signal(signal.SIGALRM, raise_timeout) signal.alarm(120) user = stdin_data[0] logging.info("Running mailchannels check for %s", user) proc = subprocess.run( ['/opt/sharedrads/check_mailchannels_dns', '-u', user, '--json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, encoding='utf-8', check=True ) signal.alarm(0) domains = [] proc_data = json.loads(proc.stdout) for domain in proc_data: domains.append( parse_domain(domain, proc_data.get(domain)) ) print(json.dumps({ "domains": domains })) class TimeoutAlarm(Exception): pass def raise_stdin_fail(signum, frame): print(json.dumps({"error": "stdin_fail"})) raise TimeoutAlarm def raise_timeout(signum, frame): print(json.dumps({"error": "timeout"})) raise TimeoutAlarm if __name__ == '__main__': request_uuid = uuid.uuid4() rads.setup_logging( path='/var/log/spf-coordinator.log', loglevel=logging.INFO, print_out=False, fmt=f'%(asctime)s spf_coordinator req_id={request_uuid} %(levelname)s %(message)s', chown=(0, 0), chmod=0o600, ) run_check()