From aac3793b350f3a8e3fd595036e91089ade865064 Mon Sep 17 00:00:00 2001 From: emdee Date: Thu, 17 Nov 2022 08:58:45 +0000 Subject: [PATCH] pep8 isort --- exclude_badExits.py | 218 +++++++++++++++++++++----------------------- support_onions.py | 68 ++++++++------ trustor_poc.py | 94 +++++++++---------- 3 files changed, 192 insertions(+), 188 deletions(-) diff --git a/exclude_badExits.py b/exclude_badExits.py index 406e65c..3c5a247 100644 --- a/exclude_badExits.py +++ b/exclude_badExits.py @@ -79,24 +79,21 @@ For usage, do ```python3 exclude_badExits.py --help` """ -import sys - -import os -import re -import socket -import time import argparse -import string +import os +import sys +import time from io import StringIO -import ipaddress -# list(ipaddress._find_address_range(ipaddress.IPv4Network('172.16.0.0/12')) -from urllib3.util.ssl_match_hostname import CertificateError import stem +import urllib3 from stem import InvalidRequest -from stem.control import Controller from stem.connection import IncorrectPassword from stem.util.tor_tools import is_valid_fingerprint +from urllib3.util.ssl_match_hostname import CertificateError + +# list(ipaddress._find_address_range(ipaddress.IPv4Network('172.16.0.0/12')) + try: from ruamel.yaml import YAML yaml = YAML(typ='rt') @@ -112,20 +109,24 @@ if yaml is None: yaml = None try: - from unbound import ub_ctx,RR_TYPE_TXT,RR_CLASS_IN + from unbound import RR_CLASS_IN, RR_TYPE_TXT, ub_ctx except: ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None global LOG import logging import warnings + warnings.filterwarnings('ignore') LOG = logging.getLogger() +from support_onions import (bAreWeConnected, icheck_torrc, lIntroductionPoints, + oGetStemController, vwait_for_controller, + yKNOWN_NODNS, zResolveDomain) from support_phantompy import vsetup_logging +from trustor_poc import TrustorError, idns_validate from trustor_poc import oDownloadUrlUrllib3 as oDownloadUrl -from trustor_poc import idns_validate, TrustorError -from support_onions import icheck_torrc, bAreWeConnected, lIntroductionPoints, zResolveDomain, vwait_for_controller, yKNOWN_NODNS + LOG.info("imported HTTPSAdapter") ETC_DIR = '/etc/tor/yaml' @@ -138,17 +139,6 @@ sEXCLUDE_EXIT_KEY = 'ExcludeNodes' sINCLUDE_EXIT_KEY = 'ExitNodes' sINCLUDE_GUARD_KEY = 'EntryNodes' -def oMakeController(sSock='', port=9051): - import getpass - if sSock and os.path.exists(sSock): - controller = Controller.from_socket_file(path=sSock) - else: - controller = Controller.from_port(port=port) - sys.stdout.flush() - p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr) - controller.authenticate(p) - return controller - oBAD_NODES = {} oBAD_ROOT = 'BadNodes' oBAD_NODES[oBAD_ROOT] = {} @@ -163,8 +153,8 @@ def lYamlBadNodes(sFile, global lKNOWN_NODNS global lMAYBE_NODNS - l = [] - if not yaml: return l + if not yaml: + return [] if os.path.exists(sFile): with open(sFile, 'rt') as oFd: oBAD_NODES = safe_load(oFd) @@ -188,7 +178,6 @@ oGOOD_NODES = {} oGOOD_ROOT = 'GoodNodes' def lYamlGoodNodes(sFile='/etc/tor/torrc-goodnodes.yaml'): global oGOOD_NODES - root = oGOOD_ROOT l = [] if not yaml: return l if os.path.exists(sFile): @@ -220,18 +209,18 @@ def bdomain_is_bad(domain, fp): tBAD_URLS = set() lATS = ['abuse', 'email'] lINTS = ['ciissversion', 'uplinkbw', 'signingkeylifetime', 'memory'] -lBOOLS = ['dnssec', 'dnsqname', 'aesni', 'autoupdate', 'dnslocalrootzone', - 'sandbox', 'offlinemasterkey'] +lBOOLS = ['dnssec', 'dnsqname', 'aesni', 'autoupdate', 'dnslocalrootzone', + 'sandbox', 'offlinemasterkey'] def aVerifyContact(a, fp, https_cafile, timeout=20, host='127.0.0.1', port=9050): global tBAD_URLS global lKNOWN_NODNS - # cleanups for yaml + # cleanups for elt in lINTS: if elt in a: a[elt] = int(a[elt]) for elt in lBOOLS: if elt in a: - if a[elt] in ['y','yes', 'true', 'True']: + if a[elt] in ['y', 'yes', 'true', 'True']: a[elt] = True else: a[elt] = False @@ -291,7 +280,7 @@ def aVerifyContact(a, fp, https_cafile, timeout=20, host='127.0.0.1', port=9050) if a['proof'] not in ['uri-rsa']: # only support uri for now if False and ub_ctx: - fp_domain = fp +'.'+domain + fp_domain = fp + '.' + domain if idns_validate(fp_domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', @@ -301,7 +290,7 @@ def aVerifyContact(a, fp, https_cafile, timeout=20, host='127.0.0.1', port=9050) return a LOG.debug(f"{len(keys)} contact fields for {fp}") - url="https://"+domain+"/.well-known/tor-relay/rsa-fingerprint.txt" + url = f"https://{domain}/.well-known/tor-relay/rsa-fingerprint.txt" try: LOG.debug(f"Downloading from {domain} for {fp}") o = oDownloadUrl(url, https_cafile, @@ -319,7 +308,10 @@ def aVerifyContact(a, fp, https_cafile, timeout=20, host='127.0.0.1', port=9050) else: LOG.warn(f"TrustorError downloading from {domain} {e.args}") tBAD_URLS.add(a['url']) - except (BaseException ) as e: + except urllib3.exceptions.MaxRetryError as e: # noqa + # maybe offline - not bad + LOG.warn(f"MaxRetryError downloading from {domain} {e}") + except (BaseException) as e: LOG.error(f"Exception {type(e)} downloading from {domain} {e}") else: if hasattr(o, 'status'): @@ -344,7 +336,7 @@ def aVerifyContact(a, fp, https_cafile, timeout=20, host='127.0.0.1', port=9050) if not l: LOG.warn(f"Downloading from {domain} empty for {fp}") else: - a['fps'] = [elt for elt in l if elt and len(elt) == 40 + a['fps'] = [elt for elt in l if elt and len(elt) == 40 \ and not elt.startswith('#')] LOG.info(f"Downloaded from {domain} {len(a['fps'])} FPs") return a @@ -379,7 +371,7 @@ def aParseContact(contact, fp): if ':' in line] LOG.debug(f"{fp} {len(l)} fields") s = f'"{fp}":\n' - s += '\n'.join([f" {line}\"".replace(':',': \"', 1) + s += '\n'.join([f" {line}\"".replace(':', ': \"', 1) for line in l]) oFd = StringIO(s) a = safe_load(oFd) @@ -434,8 +426,7 @@ def oMainArgparser(_=None): default=os.path.join(ETC_DIR, 'badcontacts.yaml'), help="Yaml file of bad contacts that bad FPs are using") - parser.add_argument('--strict_nodes', type=int, default=0, - choices=[0,1], + parser.add_argument('--strict_nodes', type=int, default=0, choices=[0, 1], help="Set StrictNodes: 1 is less anonymous but more secure, although some sites may be unreachable") parser.add_argument('--wait_boot', type=int, default=120, help="Seconds to wait for Tor to booststrap") @@ -456,29 +447,29 @@ def oMainArgparser(_=None): help="Write the proof data of the included nodes to a YAML file") return parser -def vwrite_badnodes(oArgs, oBAD_NODES, slen): - if oArgs.bad_nodes: - tmp = oArgs.bad_nodes +'.tmp' - bak = oArgs.bad_nodes +'.bak' +def vwrite_badnodes(oargs, oBAD_NODES, slen): + if oargs.bad_nodes: + tmp = oargs.bad_nodes +'.tmp' + bak = oargs.bad_nodes +'.bak' with open(tmp, 'wt') as oFYaml: yaml.dump(oBAD_NODES, oFYaml) - LOG.info(f"Wrote {slen} to {oArgs.bad_nodes}") + LOG.info(f"Wrote {slen} to {oargs.bad_nodes}") oFYaml.close() - if os.path.exists(oArgs.bad_nodes): - os.rename(oArgs.bad_nodes, bak) - os.rename(tmp, oArgs.bad_nodes) + if os.path.exists(oargs.bad_nodes): + os.rename(oargs.bad_nodes, bak) + os.rename(tmp, oargs.bad_nodes) -def vwrite_goodnodes(oArgs, oGOOD_NODES, ilen): - if oArgs.good_nodes: - tmp = oArgs.good_nodes +'.tmp' - bak = oArgs.good_nodes +'.bak' +def vwrite_goodnodes(oargs, oGOOD_NODES, ilen): + if oargs.good_nodes: + tmp = oargs.good_nodes +'.tmp' + bak = oargs.good_nodes +'.bak' with open(tmp, 'wt') as oFYaml: yaml.dump(oGOOD_NODES, oFYaml) - LOG.info(f"Wrote {ilen} good relays to {oArgs.good_nodes}") + LOG.info(f"Wrote {ilen} good relays to {oargs.good_nodes}") oFYaml.close() - if os.path.exists(oArgs.good_nodes): - os.rename(oArgs.good_nodes, bak) - os.rename(tmp, oArgs.good_nodes) + if os.path.exists(oargs.good_nodes): + os.rename(oargs.good_nodes, bak) + os.rename(tmp, oargs.good_nodes) def iMain(lArgs): global aTRUST_DB @@ -487,18 +478,18 @@ def iMain(lArgs): global oGOOD_NODES global lKNOWN_NODNS parser = oMainArgparser() - oArgs = parser.parse_args(lArgs) + oargs = parser.parse_args(lArgs) - vsetup_logging(oArgs.log_level) + vsetup_logging(oargs.log_level) if bAreWeConnected() is False: raise SystemExit("we are not connected") - sFile = oArgs.torrc + sFile = oargs.torrc if sFile and os.path.exists(sFile): - icheck_torrc(sFile, oArgs) + icheck_torrc(sFile, oargs) twhitelist_set = set() - sFile = oArgs.good_contacts + sFile = oargs.good_contacts if sFile and os.path.exists(sFile): try: with open(sFile, 'rt') as oFd: @@ -506,7 +497,7 @@ def iMain(lArgs): LOG.info(f"{len(aTRUST_DB.keys())} trusted contacts from {sFile}") # reverse lookup of fps to contacts # but... - for k,v in aTRUST_DB.items(): + for (k, v,) in aTRUST_DB.items(): if 'modified' not in v.keys(): v['modified'] = int(time.time()) aTRUST_DB_INDEX[k] = v @@ -520,19 +511,19 @@ def iMain(lArgs): except Exception as e: LOG.exception(f"Error reading YAML TrustDB {sFile} {e}") - if os.path.exists(oArgs.proxy_ctl): - controller = oMakeController(sSock=oArgs.proxy_ctl) + if os.path.exists(oargs.proxy_ctl): + controller = oGetStemController(log_level=oargs.log_level, sock_or_pair=oargs.proxy_ctl) else: - port =int(oArgs.proxy_ctl) - controller = oMakeController(port=port) + port =int(oargs.proxy_ctl) + controller = oGetStemController(port=port) - vwait_for_controller(controller, oArgs.wait_boot) + vwait_for_controller(controller, oargs.wait_boot) - if oArgs.good_contacts: - good_contacts_tmp = oArgs.good_contacts + '.tmp' + if oargs.good_contacts: + good_contacts_tmp = oargs.good_contacts + '.tmp' elt = controller.get_conf('UseMicrodescriptors') - if elt != '0' : + if elt != '0': LOG.error('"UseMicrodescriptors 0" is required in your /etc/tor/torrc. Exiting.') controller.set_conf('UseMicrodescriptors', 0) # does it work dynamically? @@ -542,8 +533,8 @@ def iMain(lArgs): if elt and elt != '{??}': LOG.warn(f"{sEXCLUDE_EXIT_KEY} is in use already") - twhitelist_set.update(set(lYamlGoodNodes(oArgs.good_nodes))) - LOG.info(f"lYamlGoodNodes {len(twhitelist_set)} GuardNodes from {oArgs.good_nodes}") + twhitelist_set.update(set(lYamlGoodNodes(oargs.good_nodes))) + LOG.info(f"lYamlGoodNodes {len(twhitelist_set)} GuardNodes from {oargs.good_nodes}") global oGOOD_NODES t = set() @@ -556,23 +547,23 @@ def iMain(lArgs): pass if 'Onions' in oGOOD_NODES[oGOOD_ROOT].keys(): # Provides the descriptor for a hidden service. The **address** is the - # '.onion' address of the hidden service + # '.onion' address of the hidden service w = set(oGOOD_NODES[oGOOD_ROOT]['Onions']) - if oArgs.white_onions: - w.update(oArgs.white_onions.split(',')) - if oArgs.points_timeout > 0: + if oargs.white_onions: + w.update(oargs.white_onions.split(',')) + if oargs.points_timeout > 0: LOG.info(f"{len(w)} services will be checked from IntroductionPoints") - t.update(lIntroductionPoints(controller, w, itimeout=oArgs.points_timeout)) + t.update(lIntroductionPoints(controller, w, itimeout=oargs.points_timeout)) if len(t) > 0: LOG.info(f"IntroductionPoints {len(t)} relays from {len(w)} services") twhitelist_set.update(t) texclude_set = set() - if oArgs.bad_nodes and os.path.exists(oArgs.bad_nodes): - if False and oArgs.bad_sections: + if oargs.bad_nodes and os.path.exists(oargs.bad_nodes): + if False and oargs.bad_sections: # BROKEN - sections = oArgs.bad_sections.split(',') - texclude_set = set(lYamlBadNodes(oArgs.bad_nodes, + sections = oargs.bad_sections.split(',') + texclude_set = set(lYamlBadNodes(oargs.bad_nodes, lWanted=sections, section=sEXCLUDE_EXIT_KEY)) LOG.info(f"Preloaded {len(texclude_set)} bad fps") @@ -583,7 +574,7 @@ def iMain(lArgs): iTotalContacts = 0 aBadContacts = {} - lConds = oArgs.contact.split(',') + lConds = oargs.contact.split(',') iR = 0 relays = controller.get_server_descriptors() @@ -612,17 +603,17 @@ def iMain(lArgs): relay.contact = str(relay.contact, 'UTF-8') if ('Empty' in lConds and not relay.contact) or \ - ('NoEmail' in lConds and relay.contact and not 'email:' in relay.contact): + ('NoEmail' in lConds and relay.contact and 'email:' not in relay.contact): texclude_set.add(relay.fingerprint) continue - if not relay.contact or not 'ciissversion:' in relay.contact: + if not relay.contact or 'ciissversion:' not in relay.contact: # should be unreached 'Empty' should always be in lConds continue iTotalContacts += 1 fp = relay.fingerprint - if relay.contact and not 'url:' in relay.contact: + if relay.contact and 'url:' not in relay.contact: LOG.info(f"{fp} skipping bad contact - no url: {sofar}") LOG.debug(f"{fp} {relay.contact} {sofar}") texclude_set.add(fp) @@ -632,7 +623,7 @@ def iMain(lArgs): # first rough cut i = c.find('url:') if i >=0: - c = c[i+4:] + c = c[i + 4:] i = c.find(' ') if i >=0: c = c[:i] c = c.lstrip('https://').lstrip('http://').strip('/') @@ -682,15 +673,14 @@ def iMain(lArgs): texclude_set.add(relay.fingerprint) continue - b = aVerifyContact(list(a.values())[0], relay.fingerprint, - oArgs.https_cafile, - timeout=oArgs.timeout, - host=oArgs.proxy_host, - port=oArgs.proxy_port) - - if not b or not 'fps' in b or not b['fps'] or not b['url']: + oargs.https_cafile, + timeout=oargs.timeout, + host=oargs.proxy_host, + port=oargs.proxy_port) + # need to skip urllib3.exceptions.MaxRetryError + if not b or 'fps' not in b or not b['fps'] or not b['url']: LOG.warn(f"{relay.fingerprint} did NOT VERIFY {sofar}") LOG.debug(f"{relay.fingerprint} {b} {sofar}") # If it's giving contact info that doesnt check out @@ -712,7 +702,7 @@ def iMain(lArgs): aTRUST_DB[relay.fingerprint] = b for elt in b['fps']: aTRUST_DB_INDEX[elt] = b - if oArgs.good_contacts and oArgs.log_level <= 20: + if oargs.good_contacts and oargs.log_level <= 20: # as we go along then clobber with open(good_contacts_tmp, 'wt') as oFYaml: yaml.dump(aTRUST_DB, oFYaml) @@ -724,38 +714,38 @@ def iMain(lArgs): texclude_set = texclude_set.difference(tdns_urls) LOG.info(f"{len(list(aTRUST_DB.keys()))} good contacts out of {iTotalContacts}") - if oArgs.torrc_output and texclude_set: - with open(oArgs.torrc_output, 'wt') as oFTorrc: + if oargs.torrc_output and texclude_set: + with open(oargs.torrc_output, 'wt') as oFTorrc: oFTorrc.write(f"{sEXCLUDE_EXIT_KEY} {','.join(texclude_set)}\n") oFTorrc.write(f"{sINCLUDE_EXIT_KEY} {','.join(aTRUST_DB_INDEX.keys())}\n") oFTorrc.write(f"{sINCLUDE_GUARD_KEY} {','.join(oGOOD_NODES[oGOOD_ROOT]['GuardNodes'])}\n") - LOG.info(f"Wrote tor configuration to {oArgs.torrc_output}") + LOG.info(f"Wrote tor configuration to {oargs.torrc_output}") oFTorrc.close() - if oArgs.bad_contacts and aBadContacts: + if oargs.bad_contacts and aBadContacts: # for later analysis - with open(oArgs.bad_contacts, 'wt') as oFYaml: + with open(oargs.bad_contacts, 'wt') as oFYaml: yaml.dump(aBadContacts, oFYaml) oFYaml.close() - if oArgs.good_contacts != '' and aTRUST_DB: + if oargs.good_contacts != '' and aTRUST_DB: with open(good_contacts_tmp, 'wt') as oFYaml: yaml.dump(aTRUST_DB, oFYaml) oFYaml.close() - if os.path.exists(oArgs.good_contacts): - bak = oArgs.good_contacts +'.bak' - os.rename(oArgs.good_contacts, bak) - os.rename(good_contacts_tmp, oArgs.good_contacts) - LOG.info(f"Wrote {len(list(aTRUST_DB.keys()))} good contact details to {oArgs.good_contacts}") + if os.path.exists(oargs.good_contacts): + bak = oargs.good_contacts +'.bak' + os.rename(oargs.good_contacts, bak) + os.rename(good_contacts_tmp, oargs.good_contacts) + LOG.info(f"Wrote {len(list(aTRUST_DB.keys()))} good contact details to {oargs.good_contacts}") oBAD_NODES[oBAD_ROOT]['ExcludeNodes']['BadExit'] = list(texclude_set) oBAD_NODES[oBAD_ROOT]['ExcludeDomains'] = lKNOWN_NODNS - vwrite_badnodes(oArgs, oBAD_NODES, str(len(texclude_set))) + vwrite_badnodes(oargs, oBAD_NODES, str(len(texclude_set))) oGOOD_NODES['GoodNodes']['Relays']['ExitNodes'] = list(aTRUST_DB_INDEX.keys()) # GuardNodes are readonl - vwrite_goodnodes(oArgs, oGOOD_NODES, len(aTRUST_DB_INDEX.keys())) - + vwrite_goodnodes(oargs, oGOOD_NODES, len(aTRUST_DB_INDEX.keys())) + retval = 0 try: logging.getLogger('stem').setLevel(30) @@ -764,7 +754,7 @@ def iMain(lArgs): LOG.info(f"{sEXCLUDE_EXIT_KEY} {len(texclude_set)} net bad exit relays") controller.set_conf(sEXCLUDE_EXIT_KEY, texclude_set) - except stem.SocketClosed as e: + except stem.SocketClosed as e: # noqa LOG.error(f"Failed setting {sEXCLUDE_EXIT_KEY} bad exit relays in Tor") retval += 1 @@ -772,7 +762,7 @@ def iMain(lArgs): if aTRUST_DB_INDEX.keys(): LOG.info(f"{sINCLUDE_EXIT_KEY} {len(aTRUST_DB_INDEX.keys())} good relays") controller.set_conf(sINCLUDE_EXIT_KEY, aTRUST_DB_INDEX.keys()) - except stem.SocketClosed as e: + except stem.SocketClosed as e: # noqa LOG.error(f"Failed setting {sINCLUDE_EXIT_KEY} good exit nodes in Tor") retval += 1 @@ -783,12 +773,12 @@ def iMain(lArgs): controller.set_conf(sINCLUDE_GUARD_KEY, oGOOD_NODES[oGOOD_ROOT]['GuardNodes']) cur = controller.get_conf('StrictNodes') - if oArgs.strict_nodes and int(cur) != oArgs.strict_nodes: - LOG.info(f"OVERRIDING StrictNodes to {oArgs.strict_nodes}") - controller.set_conf('StrictNodes', oArgs.strict_nodes) + if oargs.strict_nodes and int(cur) != oargs.strict_nodes: + LOG.info(f"OVERRIDING StrictNodes to {oargs.strict_nodes}") + controller.set_conf('StrictNodes', oargs.strict_nodes) else: LOG.info(f"StrictNodes is set to {cur}") - except stem.SocketClosed as e: + except stem.SocketClosed as e: # noqa LOG.errro(f"Failed setting {sINCLUDE_EXIT_KEY} good exit nodes in Tor") retval += 1 @@ -815,7 +805,7 @@ def iMain(lArgs): controller.close() except Exception as e: LOG.warn(str(e)) - + sys.stdout.write("dns-rsa domains:\n" +'\n'.join(tdns_urls) +'\n') return retval diff --git a/support_onions.py b/support_onions.py index d42feba..0c463a1 100644 --- a/support_onions.py +++ b/support_onions.py @@ -1,29 +1,29 @@ # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- +import getpass import os -import sys import re -import traceback +import select import shutil import socket -import select +import sys import time -import getpass if False: import cepa as stem - from cepa.control import Controller from cepa.connection import MissingPassword + from cepa.control import Controller from cepa.util.tor_tools import is_valid_fingerprint else: import stem - from stem.control import Controller from stem.connection import MissingPassword + from stem.control import Controller from stem.util.tor_tools import is_valid_fingerprint global LOG import logging import warnings + warnings.filterwarnings('ignore') LOG = logging.getLogger() @@ -69,13 +69,24 @@ yKNOWN_NODNS = """ - w.cccs.de """ +def oMakeController(sSock='', port=9051): + import getpass + if sSock and os.path.exists(sSock): + controller = Controller.from_socket_file(path=sSock) + else: + controller = Controller.from_port(port=port) + sys.stdout.flush() + p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr) + controller.authenticate(p) + return controller + oSTEM_CONTROLER = None def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'): global oSTEM_CONTROLER if oSTEM_CONTROLER: return oSTEM_CONTROLER - from stem.util.log import Runlevel - Runlevel = log_level + import stem.util.log + stem.util.log.Runlevel = log_level if os.path.exists(sock_or_pair): LOG.info(f"controller from socket {sock_or_pair}") @@ -154,18 +165,21 @@ def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10) try: from cryptography.utils import int_from_bytes except ImportError: + import cryptography.utils + # guessing - not in the current cryptography but stem expects it def int_from_bytes(**args): return int.to_bytes(*args) cryptography.utils.int_from_bytes = int_from_bytes # this will fai if the trick above didnt work from stem.prereq import is_crypto_available - is_crypto_available(ed25519 = True) + is_crypto_available(ed25519=True) - from stem.descriptor.hidden_service import HiddenServiceDescriptorV3 - from stem.client.datatype import LinkByFingerprint - from stem import Timeout from queue import Empty + from stem import Timeout + from stem.client.datatype import LinkByFingerprint + from stem.descriptor.hidden_service import HiddenServiceDescriptorV3 + if type(lOnions) not in [set, tuple, list]: lOnions = list(lOnions) if controller is None: @@ -194,13 +208,13 @@ def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10) for introduction_point in n: for linkspecifier in introduction_point.link_specifiers: if isinstance(linkspecifier, LinkByFingerprint): - # LOG.log(40, f"Getting fingerprint for {linkspecifier}") + # LOG.log(40, f"Getting fingerprint for {linkspecifier}") if hasattr(linkspecifier, 'fingerprint'): assert len(linkspecifier.value) == 20 lp += [bin_to_hex(linkspecifier.value)] LOG.info(f"{len(lp)} introduction points for {elt}") l += lp - except (Empty, Timeout, ) as e: + except (Empty, Timeout,) as e: # noqa LOG.warn(f"Timed out getting introduction points for {elt}") continue except Exception as e: @@ -210,13 +224,13 @@ def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10) def zResolveDomain(domain): try: ip = sTorResolve(domain) - except Exception as e: + except Exception as e: # noqa ip = '' if ip == '': try: lpair = getaddrinfo(domain, 443) except Exception as e: - LOG.warn("{e}") + LOG.warn(f"{e}") lpair = None if lpair is None: LOG.warn(f"TorResolv and getaddrinfo failed for {domain}") @@ -233,13 +247,12 @@ def sTorResolve(target, ): MAX_INFO_RESPONSE_PACKET_LENGTH = 8 if '@' in target: - LOG.warn(f"sTorResolve failed invalid hostname {target}" ) + LOG.warn(f"sTorResolve failed invalid hostname {target}") return '' target = target.strip('/') - seb = b"\o004\o360\o000\o000\o000\o000\o000\o001\o000" seb = b"\x04\xf0\x00\x00\x00\x00\x00\x01\x00" seb += bytes(target, 'US-ASCII') + b"\x00" - assert len(seb) == 10+len(target), str(len(seb))+repr(seb) + assert len(seb) == 10 + len(target), str(len(seb)) + repr(seb) # LOG.debug(f"0 Sending {len(seb)} to The TOR proxy {seb}") @@ -247,7 +260,7 @@ def sTorResolve(target, sock.connect((sHost, iPort)) sock.settimeout(SOCK_TIMEOUT_SECONDS) - oRet = sock.sendall(seb) + oRet = sock.sendall(seb) # noqa i = 0 data = '' @@ -261,8 +274,7 @@ def sTorResolve(target, flags=socket.MSG_WAITALL data = sock.recv(MAX_INFO_RESPONSE_PACKET_LENGTH, flags) except socket.timeout: - LOG.warn("4 The TOR proxy " \ - +repr((sHost, iPort)) \ + LOG.warn(f"4 The TOR proxy {(sHost, iPort)}" \ +" didnt reply in " + str(SOCK_TIMEOUT_SECONDS) + " sec." +" #" +str(i)) except Exception as e: @@ -271,7 +283,7 @@ def sTorResolve(target, +" errored with " + str(e) +" #" +str(i)) sock.close() - raise SystemExit(4) + return '' else: if len(data) > 0: break @@ -280,9 +292,9 @@ def sTorResolve(target, sLabel = "5 No reply #" else: sLabel = "5 No data #" - LOG.info(sLabel +f"{i} on {sHost}:{iPort}" ) + LOG.warn(f"sTorResolve: {sLabel} {i} on {sHost}:{iPort}") sock.close() - raise SystemExit(5) + return '' assert len(data) >= 8 packet_sf = data[1] @@ -292,7 +304,7 @@ def sTorResolve(target, return f"{data[4]}.{data[5]}.{data[6]}.{data[7]}" else: # 91 - LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}" ) + LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}") os.system(f"tor-resolve -4 {target} > /tmp/e 2>/dev/null") # os.system("strace tor-resolve -4 "+target+" 2>&1|grep '^sen\|^rec'") @@ -321,8 +333,8 @@ def icheck_torrc(sFile, oArgs): a = {} for elt in l: elt = elt.strip() - if not elt or not ' ' in elt: continue - k,v = elt.split(' ', 1) + if not elt or ' ' not in elt: continue + (k, v,) = elt.split(' ', 1) a[k] = v keys = a diff --git a/trustor_poc.py b/trustor_poc.py index 4ddc6c1..5445ee5 100644 --- a/trustor_poc.py +++ b/trustor_poc.py @@ -3,31 +3,32 @@ # from https://github.com/nusenu/trustor-poc # with minor refactoring to make the code more Pythonic. -import os -import sys import datetime +import os +import re +import sys import requests from stem.control import Controller -from stem.util.tor_tools import * -# from urllib.parse import urlparse +# from stem.util.tor_tools import * from urllib3.util import parse_url as urlparse try: # unbound is not on pypi - from unbound import ub_ctx,RR_TYPE_TXT,RR_CLASS_IN + from unbound import RR_CLASS_IN, RR_TYPE_TXT, ub_ctx except: ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None global LOG import logging import warnings + warnings.filterwarnings('ignore') LOG = logging.getLogger() # download this python library from # https://github.com/erans/torcontactinfoparser -#sys.path.append('/home/....') +# sys.path.append('/home/....') try: from torcontactinfo import TorContactInfoParser except: @@ -42,7 +43,7 @@ def is_valid_hostname(hostname): return False if hostname[-1] == ".": hostname = hostname[:-1] # strip exactly one dot from the right, if present - allowed = re.compile("(?!-)[A-Z\d-]{1,63}(? len(elt.split('.')) and \ - hostname.endswith(elt): + if len(hostname.split('.')) > len(elt.split('.')) and hostname.endswith(elt): # parent return True return False -from urllib3.util.ssl_match_hostname import (CertificateError, - match_hostname, - _dnsname_match, - _ipaddress_match, - ) +from urllib3.util.ssl_match_hostname import (CertificateError, _dnsname_match, + _ipaddress_match) + + def my_match_hostname(cert, hostname): """Verify that *cert* (in decoded format as returned by SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 @@ -341,10 +343,10 @@ def my_match_hostname(cert, hostname): raise CertificateError( "no appropriate commonName or subjectAltName fields were found" ) -match_hostname = my_match_hostname -from urllib3.util.ssl_ import ( - is_ipaddress, -) +urllib3.util.ssl_match_hostname.match_hostname = my_match_hostname +from urllib3.util.ssl_ import is_ipaddress + + def _my_match_hostname(cert, asserted_hostname): # Our upstream implementation of ssl.match_hostname() # only applies this normalization to IP addresses so it doesn't @@ -364,11 +366,12 @@ def _my_match_hostname(cert, asserted_hostname): # the cert when catching the exception, if they want to e._peer_cert = cert raise -from urllib3.connection import _match_hostname, HTTPSConnection urllib3.connection._match_hostname = _my_match_hostname from urllib3.contrib.socks import SOCKSProxyManager -from urllib3 import Retry + + +# from urllib3 import Retry def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050): """Theres no need to use requests here and it adds too many layers on the SSL to be able to get at things @@ -384,7 +387,7 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050): # we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files # https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa - headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'} LOG.debug("fetching %s...." % uri) try: @@ -419,7 +422,7 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050): if not oReqResp.headers['Content-Type'].startswith('text/plain'): raise TrustorError(f"HTTP Content-Type != text/plain") - #check for redirects (not allowed as per spec) + # check for redirects (not allowed as per spec) if oReqResp.geturl() != uri: LOG.error(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl())) raise TrustorError(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl())) @@ -427,10 +430,12 @@ def oDownloadUrlUrllib3(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050): return oReqResp import urllib3.connectionpool +from urllib3.connection import HTTPSConnection + urllib3.connectionpool.VerifiedHTTPSConnection = HTTPSConnection def lDownloadUrlFps(domain, sCAfile, timeout=30, host='127.0.0.1', port=9050): - uri="https://"+domain+"/.well-known/tor-relay/rsa-fingerprint.txt" + uri = f"https://{domain}/.well-known/tor-relay/rsa-fingerprint.txt" o = oDownloadUrlRequests(uri, sCAfile, timeout=timeout, host=host, port=port) well_known_content = o.text.upper().strip().split('\n') well_known_content = [i for i in well_known_content if i and len(i) == 40] @@ -460,7 +465,7 @@ def validate_proofs(candidates, validation_cache_file, timeout=20, host='127.0.0 LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype)) elif prooftype == 'dns-rsa' and ub_ctx: for fingerprint in candidates[domain][prooftype]: - fp_domain = fingerprint+'.'+domain + fp_domain = fingerprint + '.' + domain if idns_validate(fp_domain, libunbound_resolv_file='resolv.conf', dnssec_DS_file='dnssec-root-trust', @@ -488,7 +493,6 @@ def idns_validate(domain, # this is not the system wide /etc/resolv.conf # use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort - ctx = ub_ctx() if (os.path.isfile(libunbound_resolv_file)): ctx.resolvconf(libunbound_resolv_file) @@ -526,12 +530,10 @@ def configure_tor(controller, trusted_fingerprints, exitonly=True): try: controller.set_conf('ExitNodes', trusted_fingerprints) LOG.error('limited exits to %s relays' % relay_count) - except Exception as e: + except Exception as e: # noqa LOG.exception('Failed to set ExitNodes tor config to trusted relays') sys.exit(20) - - if __name__ == '__main__': CAfile = '/etc/ssl/certs/ca-certificates.crt' trust_config = 'trust_config' @@ -542,12 +544,12 @@ if __name__ == '__main__': trusted_fingerprints = read_local_validation_cache(validation_cache_file, trusted_domains=trusted_domains) # tor ControlPort password - controller_password='' + controller_password = '' # tor ControlPort IP controller_address = '127.0.0.1' timeout = 20 port = 9050 - controller = get_controller(address=controller_address,password=controller_password) + controller = get_controller(address=controller_address, password=controller_password) r = find_validation_candidates(controller, validation_cache=trusted_fingerprints,