# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 - import os import sys import datetime import requests from stem.control import Controller from stem.util.tor_tools import * from urllib.parse import urlparse try: # unbound is not on pypi from unbound import ub_ctx,RR_TYPE_TXT,RR_CLASS_IN except: ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None global LOG import logging import warnings warnings.filterwarnings('ignore') LOG = logging.getLogger() # download this python library from # https://github.com/erans/torcontactinfoparser #sys.path.append('/home/....') try: from torcontactinfo import TorContactInfoParser except: TorContactInfoParser = None # for now we support max_depth = 0 only # this PoC version has no support for recursion # https://github.com/nusenu/tor-relay-operator-ids-trust-information#trust-information-consumers supported_max_depths = ['0'] # https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#ciissversion accepted_ciissversions = ['2'] # https://stackoverflow.com/questions/2532053/validate-a-hostname-string # FIXME this check allows non-fqdn names def is_valid_hostname(hostname): if len(hostname) > 255: return False if hostname[-1] == ".": hostname = hostname[:-1] # strip exactly one dot from the right, if present allowed = re.compile("(?!-)[A-Z\d-]{1,63}(? 0: if 'ciissversion' in parsed_ci and 'proof' in parsed_ci and 'url' in parsed_ci: prooftype = parsed_ci['proof'] ciurl = parsed_ci['url'] if parsed_ci['ciissversion'] in accepted_ciissversions and prooftype in accepted_proof_types: if ciurl.startswith('http://') or ciurl.startswith('https://'): try: domain=urlparse(ciurl).netloc except: LOG.warning('failed to parse domain %s' % ciurl) domain='error' continue else: domain=ciurl if not is_valid_hostname(domain): domain='error' continue # we can ignore relays that do not claim to be operated by a trusted operator # if we do not accept all if domain not in trusted_domains and not accept_all: continue if domain in result.keys(): if prooftype in result[domain].keys(): result[domain][prooftype].append(fingerprint) else: result[domain] = { prooftype : [fingerprint] } # mixed proof types are not allowd as per spec but we are not strict here LOG.warning('%s is using mixed prooftypes %s' % (domain, prooftype)) else: result[domain] = {prooftype : [fingerprint]} return result def lDownloadUrlFps(domain, sCAfile, timeout=30, host='127.0.0.1', port=9050): uri="https://"+domain+"/.well-known/tor-relay/rsa-fingerprint.txt" # socks proxy used for outbound web requests (for validation of proofs) proxy = {'https': 'socks5h://' +host +':' +str(port)} # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) try: # grr. fix urllib3 # urllib3.connection WARNING Certificate did not match expected hostname: head = requests.head(uri, timeout=timeout, proxies=proxy, headers=headers) except Exception as e: LOG.warn(f"HTTP HEAD request failed for {uri} {e}") head = None return [] if head.status_code != 200: return [] if not head.headers['Content-Type'].startswith('text/plain'): return [] assert os.path.exists(sCAfile), sCAfile try: from https_adapter import HTTPSAdapter except Exception as e: LOG.warn(f"Could not import HTTPSAdapter {e}") HTTPSAdapter = None HTTPSAdapter = None try: with requests.sessions.Session() as session: if HTTPSAdapter: # FixMe: upgrade to TLS1.3 session.mount("https://", HTTPSAdapter(pool_maxsize=1, max_retries=3,)) fullfile = session.request(method="get", url=uri, proxies=proxy, timeout=timeout, headers=headers, allow_redirects=False, verify=True ) except: LOG.warn("HTTP GET request failed for %s" % uri) return [] if fullfile.status_code != 200 or not fullfile.headers['Content-Type'].startswith('text/plain'): return [] #check for redirects (not allowed as per spec) if fullfile.url != uri: LOG.error('Redirect detected %s vs %s (final)' % (uri, fullfile.url)) return [] well_known_content = fullfile.text.upper().strip().split('\n') well_known_content = [i for i in well_known_content if i and len(i) == 40] return well_known_content def validate_proofs(candidates, validation_cache_file, timeout=20, host='127.0.0.1', port=9050): ''' This function takes the return value of find_validation_candidates() and validated them according to their proof type (uri-rsa, dns-rsa) and writes properly validated relay fingerprints to the local validation cache ''' dt_utc = datetime.datetime.now(datetime.timezone.utc).date() f = open(validation_cache_file, mode='a') count = 0 for domain in candidates.keys(): for prooftype in candidates[domain].keys(): if prooftype == 'uri-rsa': well_known_content = lDownloadUrlFps(domain, timeout=timeout, host=host, port=port) for fingerprint in candidates[domain][prooftype]: if fingerprint in well_known_content: # write cache entry count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) elif prooftype == 'dns-rsa' and ub_ctx: for fingerprint in candidates[domain][prooftype]: fp_domain = fingerprint+'.'+domain if idns_validate(fp_domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', ) == 0: count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) f.close() LOG.info('successfully validated %s new (not yet validated before) relays' % count) def idns_validate(domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', ): ''' performs DNS TXT lookups and verifies the reply - is DNSSEC valid and - contains only a single TXT record - the DNS record contains a hardcoded string as per specification https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#dns-rsa ''' if not ub_ctx: return -1 # this is not the system wide /etc/resolv.conf # use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort ctx = ub_ctx() if (os.path.isfile(libunbound_resolv_file)): ctx.resolvconf(libunbound_resolv_file) else: LOG.error('libunbound resolv config file: "%s" is missing, aborting!' % libunbound_resolv_file) return 5 if (os.path.isfile(dnssec_DS_file)): ctx.add_ta_file(dnssec_DS_file) else: LOG.error('DNSSEC trust anchor file "%s" is missing, aborting!' % dnssec_DS_file) return 6 status, result = ctx.resolve(domain, RR_TYPE_TXT, RR_CLASS_IN) if status == 0 and result.havedata: if len(result.rawdata) == 1 and result.secure: # ignore the first byte, it is the TXT length if result.data.as_raw_data()[0][1:] == b'we-run-this-tor-relay': return 0 return 1 def configure_tor(controller, trusted_fingerprints, exitonly=True): ''' takes the list of trusted fingerprints and configures a tor client to only use trusted relays in a certain position for now we only set exits. we refuse to set the configuration if there are less then 40 trusted relays ''' relay_count = len(trusted_fingerprints) if relay_count < 41: LOG.error('Too few trusted relays (%s), aborting!' % relay_count) sys.exit(15) try: controller.set_conf('ExitNodes', trusted_fingerprints) LOG.error('limited exits to %s relays' % relay_count) except Exception as e: LOG.exception('Failed to set ExitNodes tor config to trusted relays') sys.exit(20) if __name__ == '__main__': trust_config = 'trust_config' assert os.path.exists(trust_config) trusted_domains = read_local_trust_config(trust_config) validation_cache_file = 'validation_cache' trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) # tor ControlPort password controller_password='' # tor ControlPort IP controller_address = '127.0.0.1' timeout = 20 port = 9050 controller = get_controller(address=controller_address,password=controller_password) r = find_validation_candidates(controller, validation_cache=trusted_fingerprints, trusted_domains=trusted_domains) validate_proofs(r, validation_cache_file, timeout=timeout, host=controller_address, port=port) # refresh list with newly validated fingerprints trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) configure_tor(controller, trusted_fingerprints)