# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -# -*- coding: utf-8 -*- import os import sys from stem.control import Controller from stem.util.tor_tools import * from urllib.parse import urlparse import requests import datetime try: from unbound import ub_ctx,RR_TYPE_TXT,RR_CLASS_IN except: ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None global LOG import logging LOG = logging.getLogger() # download this python library from # https://github.com/erans/torcontactinfoparser #sys.path.append('/home/....') try: from torcontactinfo import TorContactInfoParser except: TorContactInfoParser = None # tor ControlPort IP controller_address = '127.0.0.1' dnssec_DS_file = 'dnssec-root-trust' # this is not the system wide /etc/resolv.conf # use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort libunbound_resolv_file = 'resolv.conf' # for now we support max_depth = 0 only # this PoC version has no support for recursion # https://github.com/nusenu/tor-relay-operator-ids-trust-information#trust-information-consumers supported_max_depths = ['0'] # https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#ciissversion accepted_ciissversions = ['2'] # https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#proof accepted_proof_types = ['uri-rsa','dns-rsa'] # https://stackoverflow.com/questions/2532053/validate-a-hostname-string # FIXME this check allows non-fqdn names def is_valid_hostname(hostname): if len(hostname) > 255: return False if hostname[-1] == ".": hostname = hostname[:-1] # strip exactly one dot from the right, if present allowed = re.compile("(?!-)[A-Z\d-]{1,63}(? 0: if 'ciissversion' in parsed_ci and 'proof' in parsed_ci and 'url' in parsed_ci: prooftype = parsed_ci['proof'] ciurl = parsed_ci['url'] if parsed_ci['ciissversion'] in accepted_ciissversions and prooftype in accepted_proof_types: if ciurl.startswith('http://') or ciurl.startswith('https://'): try: domain=urlparse(ciurl).netloc except: LOG.warning('failed to parse domain %s' % ciurl) domain='error' continue else: domain=ciurl if not is_valid_hostname(domain): domain='error' continue # we can ignore relays that do not claim to be operated by a trusted operator # if we do not accept all if domain not in trusted_domains and not accept_all: continue if domain in result.keys(): if prooftype in result[domain].keys(): result[domain][prooftype].append(fingerprint) else: result[domain] = { prooftype : [fingerprint] } # mixed proof types are not allowd as per spec but we are not strict here LOG.warning('%s is using mixed prooftypes %s' % (domain, prooftype)) else: result[domain] = {prooftype : [fingerprint]} return result def lDownloadUrlFps(domain, timeout=20, host='127.0.0.1', port=9050): uri="https://"+domain+"/.well-known/tor-relay/rsa-fingerprint.txt" # socks proxy used for outbound web requests (for validation of proofs) proxy = {'https': 'socks5h://' +host +':' +str(port)} # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) try: head = requests.head(uri, timeout=timeout, proxies=proxy, headers=headers) except Exception as e: print("HTTP HEAD request failed for %s" % uri) print(e) head = None return [] if head.status_code != 200: return [] if not head.headers['Content-Type'].startswith('text/plain'): return [] try: fullfile = requests.get(uri, proxies=proxy, timeout=10, headers=headers) except: print("HTTP GET request failed for %s" % uri) return [] if fullfile.status_code != 200 or not fullfile.headers['Content-Type'].startswith('text/plain'): return [] #check for redirects (not allowed as per spec) if fullfile.url != uri: LOG.error('Redirect detected %s vs %s (final)' % (uri, fullfile.url)) return [] well_known_content = [i.strip() for i in fullfile.text.upper().split('\n')] return well_known_content def validate_proofs(candidates, validation_cache_file): ''' This function takes the return value of find_validation_candidates() and validated them according to their proof type (uri-rsa, dns-rsa) and writes properly validated relay fingerprints to the local validation cache ''' dt_utc = datetime.datetime.now(datetime.timezone.utc).date() f = open(validation_cache_file, mode='a') count = 0 for domain in candidates.keys(): for prooftype in candidates[domain].keys(): if prooftype == 'uri-rsa': well_known_content = lDownloadUrlFps(domain, timeout=20, host='127.0.0.1', port=9050) for fingerprint in candidates[domain][prooftype]: if fingerprint in well_known_content: # write cache entry count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) elif prooftype == 'dns-rsa' and ub_ctx: for fingerprint in candidates[domain][prooftype]: fp_domain = fingerprint+'.'+domain if dns_validate(fp_domain): count += 1 f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc)) else: LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) f.close() LOG.info('successfully validated %s new (not yet validated before) relays' % count) def dns_validate(domain): ''' performs DNS TXT lookups and verifies the reply - is DNSSEC valid and - contains only a single TXT record - the DNS record contains a hardcoded string as per specification https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#dns-rsa ''' if not ub_ctx: return False ctx = ub_ctx() if (os.path.isfile(libunbound_resolv_file)): ctx.resolvconf(libunbound_resolv_file) else: LOG.error('libunbound resolv config file: "%s" is missing, aborting!' % libunbound_resolv_file) sys.exit(5) if (os.path.isfile(dnssec_DS_file)): ctx.add_ta_file(dnssec_DS_file) else: LOG.error('DNSSEC trust anchor file "%s" is missing, aborting!' % dnssec_DS_file) sys.exit(6) status, result = ctx.resolve(domain, RR_TYPE_TXT, RR_CLASS_IN) if status == 0 and result.havedata: if len(result.rawdata) == 1 and result.secure: # ignore the first byte, it is the TXT length if result.data.as_raw_data()[0][1:] == b'we-run-this-tor-relay': return True return False def configure_tor(controller, trusted_fingerprints, exitonly=True): ''' takes the list of trusted fingerprints and configures a tor client to only use trusted relays in a certain position for now we only set exits. we refuse to set the configuration if there are less then 40 trusted relays ''' relay_count = len(trusted_fingerprints) if relay_count < 41: print('Too few trusted relays (%s), aborting!' % relay_count) sys.exit(15) try: controller.set_conf('ExitNodes', trusted_fingerprints) print('limited exits to %s relays' % relay_count) except Exception as e: print('Failed to set ExitNodes tor config to trusted relays') print(e) sys.exit(20) if __name__ == '__main__': trust_config = 'trust_config' assert os.path.exists(trust_config) trusted_domains = read_local_trust_config(trust_config) validation_cache_file = 'validation_cache' trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) # tor ControlPort password controller_password='' controller = get_controller(address=controller_address,password=controller_password) r = find_validation_candidates(controller,validation_cache=trusted_fingerprints,trusted_domains=trusted_domains) validate_proofs(r, validation_cache_file) # refresh list with newly validated fingerprints trusted_fingerprints = read_local_validation_cache(trusted_domains=trusted_domains) configure_tor(controller, trusted_fingerprints)