# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- # https://github.com/nusenu/noContactInfo_Exit_Excluder # https://github.com/TheSmashy/TorExitRelayExclude """ This extends nusenu's basic idea of using the stem library to dynamically exclude nodes that are likely to be bad by putting them on the ExcludeNodes or ExcludeExitNodes setting of a running Tor. * https://github.com/nusenu/noContactInfo_Exit_Excluder * https://github.com/TheSmashy/TorExitRelayExclude The basic cut is to exclude Exit nodes that do not have a contact. That can be extended to nodes that do not have an email in the contact etc. But there's a problem, and your Tor notice.log will tell you about it: you could exclude the nodes needed to access hidden services or directorues. So we need to add to the process the concept of a whitelist. In addition, we may have our own blacklist of nodes we want to exclude, or use these lists for other applications like selektor. So we make two files that are structured in YAML: ``` /etc/tor/torrc-goodnodes.yaml Nodes: IntroductionPoints: - $NODEFINGERPRINT ... By default all sections of the goodnodes.yaml are used as a whitelist. /etc/tor/torrc-badnodes.yaml Nodes: ExcludeExitNodes: BadExit: # $0000000000000000000000000000000000000007 ``` That part requires [PyYAML](https://pyyaml.org/wiki/PyYAML) https://github.com/yaml/pyyaml/ Right now only the ExcludeExitNodes section is used by we may add ExcludeNodes later, and by default all sub-sections of the badnodes.yaml are used as a ExcludeExitNodes but it can be customized with the lWanted commandline arg. The original idea has also been extended to add different conditions for exclusion: the ```--contact``` commandline arg is a comma sep list of conditions: * Empty - no contact info * NoEmail - no @ sign in the contact', More may be added later. Because you don't want to exclude the introduction points to any onion you want to connect to, ```--white_onions``` should whitelist the introduction points to a comma sep list of onions, but is currently broken in stem 1.8.0: see: * https://github.com/torproject/stem/issues/96 * https://gitlab.torproject.org/legacy/trac/-/issues/25417 ```--bad_output``` will write the torrc configuration to a file. ```--details_output``` will write the lookup URLs of the excluded nodes to a file For usage, do ```python3 exclude_badExits.py --help` """ import sys from stem.control import Controller from stem.util.tor_tools import is_valid_fingerprint import os import getpass import re import time import argparse from stem.control import Controller try: import yaml except: yaml = None try: import coloredlogs if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ: os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' # https://pypi.org/project/coloredlogs/ except ImportError as e: coloredlogs = False global LOG import logging LOG = logging.getLogger() sDETAILS_URL = "https://metrics.torproject.org/rs.html#details/" # You can call this while bootstrapping def oMakeController(sSock='/run/tor/control', port=9051): if os.path.exists(sSock): controller = Controller.from_socket_file(path=sSock) else: controller = Controller.from_port(port=port) sys.stdout.flush() p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr) controller.authenticate(p) return controller def lYamlBadNodes(sFile='/etc/tor/torrc-badnodes.yaml', section='ExcludeExitNodes', lWanted=['Hetzner','BadExit']): root = 'ExcludeNodes' l = [] if not yaml: return l if os.path.exists(sFile): with open(sFile, 'rt') as oFd: o = yaml.safe_load(oFd) for elt in o[root][section].keys(): if lWanted and elt not in lWanted: continue l += o[root][section][elt] # yq '.ExcludeNodes.Hetzner' < /etc/tor/torrc-badnodes.yaml |sed -e 's/^[[]/ExcludeNodesHetzner = [/' # yq '.ExcludeNodes.Hetzner|.[]' < /etc/tor/torrc-badnodes.yaml # yq '.ExcludeNodes.BadExit|.[]' < /etc/tor/torrc-badnodes.yaml return l def lYamlGoodNodes(sFile='/etc/tor/torrc-goodnodes.yaml'): root='IncludeNodes' l = [] if not yaml: return l if os.path.exists(sFile): with open(sFile, 'rt') as oFd: o = yaml.safe_load(oFd) for elt in o[root].keys(): l += o[root][elt] # yq '.Nodes.IntroductionPoints|.[]' < /etc/tor/torrc-goodnodes.yaml return l def lIntroductionPoints(lOnions): """not working in stem 1.8.3""" l = [] for elt in lOnions: desc = controller.get_hidden_service_descriptor(elt, await_result=True, timeout=None) l = desc.introduction_points() if l: LOG.warn(f"{elt} NO introduction points\n") continue LOG.info(f"{elt} introduction points are...\n") for introduction_point in l: LOG.info(' %s:%s => %s' % (introduction_point.address, introduction_point.port, introduction_point.identifier)) l += [introduction_point.address] return l def oMainArgparser(_=None): # 'Mode: 0=chat 1=chat+audio 2=chat+audio+video default: 0' if not os.path.exists('/proc/sys/net/ipv6'): bIpV6 = 'False' else: bIpV6 = 'True' lIpV6Choices=[bIpV6, 'False'] parser = argparse.ArgumentParser(add_help=True) parser.add_argument('--proxy_host', '--proxy-host', type=str, default='127.0.0.1', help='proxy host') parser.add_argument('--proxy_port', '--proxy-port', default=9051, type=int, help='proxy control port') parser.add_argument('--proxy_ctl', '--proxy-ctl', default='/run/tor/control', type=str, help='control socket - takes precedence over proxy_port') parser.add_argument('--good_nodes', type=str, default='/etc/tor/torrc-goodnodes.yaml', help="Yaml file of good nodes that should not be excluded") parser.add_argument('--bad_nodes', type=str, default='/etc/tor/torrc-badnodes.yaml', help="Yaml file of bad nodes that should also be excluded") parser.add_argument('--contact', type=str, default='Empty,NoEmail', help="comma sep list of conditions - Empty,NoEmail") parser.add_argument('--wait_boot', type=int, default=120, help="Seconds to wait for Tor to booststrap") parser.add_argument('--log_level', type=int, default=20, help="10=debug 20=info 30=warn 40=error") parser.add_argument('--bad_sections', type=str, default='Hetzner,BadExit', help="sections of the badnodes.yaml to use, comma separated, '' defaults to all") parser.add_argument('--white_onions', type=str, default='', help="comma sep. list of onions to whitelist their introduction points - BROKEN") parser.add_argument('--bad_output', type=str, default='', help="Write the torrc configuration to a file") parser.add_argument('--details_output', type=str, default='', help="Write the lookup URLs of the excluded nodes to a file") return parser def iMain(lArgs): global oTOX_OARGS parser = oMainArgparser() oArgs = parser.parse_args(lArgs) aKw = dict(level=oArgs.log_level, format='%(name)s %(levelname)-4s %(message)s', stream=sys.stdout, force=True) logging.basicConfig(**aKw) logging.getLogger('stem').setLevel(oArgs.log_level) controller = oMakeController(oArgs.proxy_ctl, oArgs.proxy_port) elt = controller.get_conf('UseMicrodescriptors') if elt != '0' : LOG.error('"UseMicrodescriptors 0" is required in your /etc/tor/torrc. Exiting.') return 2 percent = i = 0 # You can call this while boostrapping while percent < 100 and i < oArgs.wait_boot: bootstrap_status = controller.get_info("status/bootstrap-phase") progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status) percent = int(progress_percent.group(1)) LOG.info(f"Bootstrapping {percent}%") time.sleep(5) i += 5 elt = controller.get_conf('ExcludeExitNodes') if elt and elt != '{??}': LOG.warn(f'ExcludeExitNodes is in use already') lGood = lYamlGoodNodes(oArgs.good_nodes) LOG.info(f'lYamlGoodNodes {len(lGood)}') if oArgs.white_onions: l = lIntroductionPoints(oArgs.white_onions.split(,)) lGood += l relays = controller.get_server_descriptors() if oArgs.bad_sections: sections = oArgs.bad_sections.split(',') exit_excludelist = lYamlBadNodes(lWanted=sections) else: exit_excludelist = lYamlBadNodes() LOG.info(f'lYamlBadNodes {len(exit_excludelist)}') if oArgs.details_output: oFd = open(oArgs.details_output, 'wt') else: oFd = None lConds = oArgs.contact.split(',') for relay in relays: if not relay.exit_policy.is_exiting_allowed(): continue if ('Empty' in lConds and not relay.contact) or \ ('NoEmail' in lConds and relay.contact and not b'@' in relay.contact): if is_valid_fingerprint(relay.fingerprint): exit_excludelist.append(relay.fingerprint) if oFd: oFd.write(sDETAILS_URL +relay.fingerprint +"\n") else: LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint) if oFd: LOG.info(f"Wrote details URLs to {oArgs.details_output}") oFd.close() exit_excludelist = list(set(exit_excludelist).difference(set(lGood))) LOG.info(f'ExcludeExitNodes {len(exit_excludelist)} net bad exit nodes') controller.set_conf('ExcludeExitNodes', exit_excludelist) elt = controller.get_conf('ExcludeExitNodes') if oArgs.bad_output: with open(oArgs.bad_output, 'wt') as oFd: oFd.write(f"ExcludeExitNodes {','.join(exit_excludelist)}\n") LOG.info(f"Wrote tor configuration to {oArgs.bad_output}") logging.getLogger('stem').setLevel(40) for elt in controller._event_listeners: controller.remove_event_listener(elt) controller.close() return(0) if __name__ == '__main__': try: i = iMain(sys.argv[1:]) except Exception as e: LOG.exception(e) i = 1 sys.exit(i)