568 lines
19 KiB
Python
568 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Tor Contact Info Parser - A tool/Python Class for parsing Tor ContactInfo Information Sharing v2 specification contacts
|
||
Written by Eran Sandler (https://twitter.com/erans) (C) 2018
|
||
|
||
Turned into a proper command-line tool with sub-commands and flags by @Someguy123 at Privex Inc. (C) 2021
|
||
(https://www.privex.io) (https://github.com/PrivexInc)
|
||
|
||
This is a parser for the Tor ContactInfo Information Sharing Specification v2 (https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/).
|
||
|
||
The parser can parse the ContactInfo field of Tor relays based on the specification.
|
||
|
||
Official Repo: https://github.com/erans/torcontactinfoparser
|
||
Privex Fork: https://github.com/Privex/torcontactinfoparser
|
||
|
||
Released under the MIT License.
|
||
"""
|
||
import argparse
|
||
import os
|
||
import re
|
||
import sys
|
||
import json
|
||
import requests
|
||
import textwrap
|
||
try:
|
||
from rich import print as rprint
|
||
HAS_RICH = True
|
||
except ImportError:
|
||
def rprint(value='', *args, **kwargs):
|
||
if value not in [None, False, True] and isinstance(value, (dict, list, set, tuple)):
|
||
value = json.dumps(value, indent=4)
|
||
return print(value, *args, **kwargs)
|
||
# rprint = print
|
||
HAS_RICH = False
|
||
|
||
import logging
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
from exclude_utils import vsetup_logging
|
||
|
||
class TorContactInfoParser(object):
|
||
email_regex = "^[a-zA-Z0-9.!#$%&’*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\\.[a-zA-Z0-9-]+)*$"
|
||
|
||
def _parse_string_value(self, value, min_length, max_length, valid_chars, raise_exception=False, field_name=None, deobfuscate_email=False):
|
||
value_length = len(value)
|
||
if value_length < min_length:
|
||
if raise_exception:
|
||
raise ValueError("value of field '{0}' is too short".format(field_name))
|
||
return None
|
||
|
||
if value_length > max_length:
|
||
if raise_exception:
|
||
raise ValueError("value of field '{0}' is too long".format(field_name))
|
||
return None
|
||
|
||
if valid_chars != "*":
|
||
m = re.search(valid_chars, value)
|
||
if not m:
|
||
if raise_exception:
|
||
raise ValueError("value of field '{0}' doesn't match valid chars restrictions".format(field_name))
|
||
else:
|
||
return None
|
||
|
||
return value
|
||
|
||
def _parse_email_value(self, value, field_name, raise_exception, deobfuscate_email):
|
||
if value:
|
||
v = value.replace("[]", "@")
|
||
if re.search(self.email_regex, v):
|
||
if not deobfuscate_email:
|
||
return v.replace("@", "[]")
|
||
|
||
return v
|
||
|
||
return None
|
||
|
||
_supported_fields_parsers = {
|
||
"email" : {
|
||
"fn": _parse_email_value,
|
||
"args": {}
|
||
},
|
||
"url" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 4,
|
||
"max_length" : 399,
|
||
"valid_chars" : "[_%/:a-zA-Z0-9.-]+"
|
||
}
|
||
},
|
||
"proof" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 7,
|
||
"max_length" : 7,
|
||
"valid_chars" : "[adinrsu-]+"
|
||
}
|
||
},
|
||
"ciissversion" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[12]+"
|
||
}
|
||
},
|
||
"pgp" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 40,
|
||
"max_length" : 40,
|
||
"valid_chars" : "[a-zA-Z0-9]+"
|
||
}
|
||
},
|
||
"abuse" : {
|
||
"fn": _parse_email_value,
|
||
"args": {}
|
||
},
|
||
"keybase" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 50,
|
||
"valid_chars" : "[a-zA-Z0-9]+"
|
||
}
|
||
},
|
||
"twitter" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 15,
|
||
"valid_chars" : "[a-zA-Z0-9_]+"
|
||
}
|
||
},
|
||
"mastodon" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 254,
|
||
"valid_chars" : "*"
|
||
}
|
||
},
|
||
"matrix" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 254,
|
||
"valid_chars" : "*"
|
||
}
|
||
},
|
||
"xmpp" : {
|
||
"fn": _parse_email_value,
|
||
"args": {}
|
||
},
|
||
"otr3" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 40,
|
||
"max_length" : 40,
|
||
"valid_chars" : "[a-z0-9]+"
|
||
}
|
||
},
|
||
"hoster" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 254,
|
||
"valid_chars" : "[a-zA-Z0-9.-]+"
|
||
}
|
||
},
|
||
"cost" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 13,
|
||
"valid_chars" : "[A-Z0-9.]+"
|
||
}
|
||
},
|
||
"uplinkbw" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 7,
|
||
"valid_chars" : "[0-9]+"
|
||
}
|
||
},
|
||
"trafficacct" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 9,
|
||
"valid_chars" : "[unmetrd0-9]+"
|
||
}
|
||
},
|
||
"memory" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 10,
|
||
"valid_chars" : "[0-9]+"
|
||
}
|
||
},
|
||
"cpu" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 50,
|
||
"valid_chars" : "[a-zA-Z0-9_-]+"
|
||
}
|
||
},
|
||
"virtualization" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 15,
|
||
"valid_chars" : "[a-z-]+"
|
||
}
|
||
},
|
||
"donationurl" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 254,
|
||
"valid_chars" : "*"
|
||
}
|
||
},
|
||
"btc" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 26,
|
||
"max_length" : 99,
|
||
"valid_chars" : "[a-zA-Z0-9]+"
|
||
}
|
||
},
|
||
"zec" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 95,
|
||
"valid_chars" : "[a-zA-Z0-9]+"
|
||
}
|
||
},
|
||
"xmr" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 99,
|
||
"valid_chars" : "[a-zA-Z0-9]+"
|
||
}
|
||
},
|
||
"offlinemasterkey" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"signingkeylifetime" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 6,
|
||
"valid_chars" : "[0-9]+"
|
||
}
|
||
},
|
||
"sandbox" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 2,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"os" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 20,
|
||
"valid_chars" : "[A-Za-z0-9/.]+"
|
||
}
|
||
},
|
||
"tls" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 0,
|
||
"max_length" : 14,
|
||
"valid_chars" : "[a-z]+"
|
||
}
|
||
},
|
||
"aesni" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"autoupdate" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"confmgmt" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 15,
|
||
"valid_chars" : "[a-zA-Z-]"
|
||
}
|
||
},
|
||
"dnslocation" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 5,
|
||
"max_length" : 100,
|
||
"valid_chars" : "[a-z,]"
|
||
}
|
||
},
|
||
"dnsqname" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"dnssec" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
},
|
||
"dnslocalrootzone" : {
|
||
"fn" : _parse_string_value,
|
||
"args" : {
|
||
"min_length" : 1,
|
||
"max_length" : 1,
|
||
"valid_chars" : "[yn]"
|
||
}
|
||
}
|
||
}
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
def parse(self, value: str, raise_exception_on_invalid_value=False, deobfuscate_email=True) -> dict:
|
||
# the ciissversion field is mandatory
|
||
if not 'ciissversion:' in value:
|
||
return None
|
||
|
||
result = {}
|
||
parts = value.split(" ")
|
||
for p in parts:
|
||
field_parts = p.split(":", 1)
|
||
if len(field_parts) <= 1:
|
||
continue
|
||
name, data = field_parts
|
||
if name in self._supported_fields_parsers:
|
||
field_parser = self._supported_fields_parsers[name]
|
||
if field_parser is None:
|
||
result[name] = data
|
||
continue
|
||
if callable(field_parser):
|
||
value = field_parser(self, data)
|
||
else:
|
||
field_parser["args"]["field_name"] = name
|
||
field_parser["args"]["value"] = data
|
||
field_parser["args"]["raise_exception"] = raise_exception_on_invalid_value
|
||
field_parser["args"]["deobfuscate_email"] = deobfuscate_email
|
||
|
||
value = field_parser["fn"](self, **field_parser["args"])
|
||
|
||
if not result.get(name, None):
|
||
result[name] = value
|
||
|
||
return result
|
||
|
||
def cmd_parse(opts: argparse.Namespace):
|
||
"""
|
||
ArgParser function for parsing a single ContactInfo string, and outputting it as JSON (or python-style dict's)
|
||
"""
|
||
|
||
if opts.contact is None or len(opts.contact) == 0 or opts.contact[0] == '-':
|
||
contact = sys.stdin.read()
|
||
else:
|
||
contact = ' '.join(opts.contact).strip()
|
||
|
||
tparser = TorContactInfoParser()
|
||
res = tparser.parse(contact)
|
||
if not opts.pretty:
|
||
return print(json.dumps(res))
|
||
if opts.json:
|
||
res = json.dumps(res, indent=4) if opts.pretty else json.dumps(res)
|
||
# if not HAS_RICH: res = json.dumps(res, indent=4)
|
||
rprint(res)
|
||
|
||
def cmd_scan(opts: argparse.Namespace, adata=None) -> int:
|
||
"""
|
||
ArgParser function for scanning all ContactInfo strings from ``https://onionoo.torproject.org/details`` ,
|
||
and outputting each one as a Python-style Dict, or JSON.
|
||
"""
|
||
parser = TorContactInfoParser()
|
||
surl = "https://onionoo.torproject.org/details"
|
||
|
||
if not adata:
|
||
LOG.info(f"Getting relays from {surl}")
|
||
jdata = requests.get(surl)
|
||
try:
|
||
adata = jdata.json()
|
||
except Exception as e:
|
||
# simplejson.errors.JSONDecodeError
|
||
LOG.exception(f"JSON error {e}")
|
||
return
|
||
elts = adata["relays"]
|
||
else:
|
||
elts = json.loads(adata)['relays']
|
||
|
||
if not elts:
|
||
LOG.warn(f"NO relays - are we connected?")
|
||
return
|
||
LOG.info(f"{len(elts)} relays")
|
||
for relay in elts:
|
||
if 'fingerprint' not in relay.keys():
|
||
LOG.warn(f"fingerprint not in relay for {relay}")
|
||
continue
|
||
fp = relay['fingerprint']
|
||
verified_host_names = relay.get('verified_host_names', [])
|
||
contact = relay.get("contact", None)
|
||
if not contact:
|
||
LOG.warn(f"No contact for {fp} {verified_host_names}")
|
||
continue
|
||
if 'ciissversion' not in contact:
|
||
LOG.debug(f"No ciissversion in contact in {fp}")
|
||
continue
|
||
LOG.debug(f"parsing {fp}")
|
||
result = parser.parse(contact, False)
|
||
if not result:
|
||
LOG.warn(f"No result for {contact} in {fp}")
|
||
continue
|
||
if len(result) > 0:
|
||
if opts.json: result = json.dumps(result, indent=4) if opts.pretty else json.dumps(result)
|
||
if opts.pretty:
|
||
rprint(result)
|
||
else:
|
||
print(result)
|
||
return 0
|
||
|
||
ETC_DIR = '/etc/tor/yaml'
|
||
def oparser():
|
||
cparser = argparse.ArgumentParser(
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
description=textwrap.dedent(f"""
|
||
Examples:
|
||
|
||
# 'scan' is the original behaviour of this script. It iterates over the data
|
||
# from https://onionoo.torproject.org/details , parses each contact, and prints it as Python dict-style JSON.
|
||
{sys.argv[0]} scan
|
||
|
||
# Same as previous. With no arguments, it's equivalent to running 'scan'.
|
||
{sys.argv[0]}
|
||
|
||
# If you pass '-p' after scan, it will enable pretty printing. For best pretty printing,
|
||
# make sure you have 'rich' installed from pypi.
|
||
{sys.argv[0]} scan -p
|
||
|
||
# If you need real JSON with double quotes, rather than Python dict-style JSON, you can
|
||
# use the '-j' flag to enable "real JSON" mode (you can combine with '-p' if you want pretty printed real json)
|
||
{sys.argv[0]} scan -j
|
||
|
||
# Using 'parse', you can parse an arbitrary ContactInfo string, and it will output the parsed result
|
||
# with pretty printing by default.
|
||
|
||
{sys.argv[0]} parse "contact Privex Inc. email:noc[]privex.io url:https://www.privex.io " \\
|
||
"proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc hoster:www.privex.io " \\
|
||
"uplinkbw:500 memory:4096 virtualization:kvm btc:bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2 " \\
|
||
"xmr:89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps"
|
||
|
||
{{
|
||
'email': 'noc@privex.io',
|
||
'url': 'https://www.privex.io',
|
||
'proof': 'uri-rsa',
|
||
'pgp': None,
|
||
'keybase': 'privexinc',
|
||
'twitter': 'PrivexInc',
|
||
'hoster': 'www.privex.io',
|
||
'uplinkbw': '500',
|
||
'memory': '4096',
|
||
'virtualization': 'kvm',
|
||
'btc': 'bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2',
|
||
'xmr': '89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps'
|
||
}}
|
||
|
||
# You can also pipe a contact string into 'parse', and it will work just the same.
|
||
|
||
echo "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" | {sys.argv[0]} parse
|
||
{{'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc\n'}}
|
||
|
||
# If you need real JSON outputted, rather than Python dict-style output, you can pass -j to either 'parse' or 'scan'
|
||
|
||
{sys.argv[0]} parse -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
|
||
{{
|
||
"email": "noc@privex.io",
|
||
"url": "https://www.privex.io",
|
||
"proof": "uri-rsa",
|
||
"pgp": null,
|
||
"keybase": "privexinc",
|
||
"twitter": "PrivexInc"
|
||
}}
|
||
|
||
# You can use '-np' to disable pretty printing for 'parse' - you can combine it with '-j' to get flat, plain JSON.
|
||
|
||
{sys.argv[0]} parse -np -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
|
||
{{"email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc"}}
|
||
"""))
|
||
cparser.set_defaults(func=cmd_scan, json=False, pretty=False)
|
||
subparse = cparser.add_subparsers()
|
||
subparse.required = False
|
||
sp_parse = subparse.add_parser('parse',
|
||
help="Parse a single contact string, either as an argument, or piped into stdin")
|
||
sp_parse.add_argument('contact', nargs='*')
|
||
sp_parse.add_argument('-np', '--no-pretty',
|
||
action='store_false', default=False, dest='pretty',
|
||
help="Disable pretty printing JSON")
|
||
sp_parse.add_argument('--relays_output', type=str,
|
||
dest='relays_output',
|
||
default=os.path.join(ETC_DIR, 'relays.json'),
|
||
help="Write the download relays in json to a file")
|
||
sp_parse.add_argument('-j', '--json', action='store_true',
|
||
default=False, dest='json',
|
||
help="Output real JSON, not Python dict format.")
|
||
sp_parse.set_defaults(func=cmd_parse)
|
||
|
||
sp_scan = subparse.add_parser('scan', help="Parse all contacts from https://onionoo.torproject.org/details")
|
||
sp_scan.add_argument('-p', action='store_true', default=False, dest='pretty', help="Enable pretty printing JSON")
|
||
sp_scan.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.")
|
||
|
||
# sp_scan.set_defaults(func=cmd_scan)
|
||
|
||
return cparser
|
||
|
||
if __name__ == "__main__":
|
||
if os.environ.get('DEBUG', ''):
|
||
log_level = 10
|
||
else:
|
||
log_level = 20
|
||
LOG = logging.getLogger()
|
||
vsetup_logging(LOG, log_level)
|
||
try:
|
||
cparser = oparser()
|
||
opts = cparser.parse_args(sys.argv[1:])
|
||
data = None
|
||
if opts.relays_output and os.path.exists(opts.relays_output):
|
||
data = open(opts.relays_output, 'rt').read()
|
||
i = cmd_scan(opts, data)
|
||
except KeyboardInterrupt as e:
|
||
i = 0
|
||
except (requests.exceptions.ProxyError, Exception,) as e:
|
||
LOG.error(f"{e}")
|
||
i = 0
|
||
|
||
sys.exit(i)
|