rm stem_examples

This commit is contained in:
emdee 2024-02-02 15:02:50 +00:00
parent 6029ebfd77
commit 58937cfe7f
29 changed files with 0 additions and 4883 deletions

View File

@ -1,37 +0,0 @@
up:: up.phantompy up.badexits
up.phantompy::
cp -up phantompy.md /o/var/local/src/phantompy.git/README.md
cp -up phantompy.setup /o/var/local/src/phantompy.git/setup.py
cp -up setup.cfg lookupdns.py qasync_phantompy.py phantompy.py support_phantompy.py \
/o/var/local/src/phantompy.git/
up.badexits:: refresh
cp -up exclude_badExits.md /o/var/local/src/exclude_badExits.git/README.md
cp -up setup.cfg exclude_badExits.py exclude_badExits.bash \
support_onions.py trustor_poc.py \
/o/var/local/src/exclude_badExits.git
lint.phantompy::
/var/local/bin/pydev_flake8.bash lookupdns.py qasync_phantompy.py phantompy.py support_phantompy.py
lint.badexits::
/var/local/bin/pydev_flake8.bash exclude_badExits.py \
support_onions.py trustor_poc.py
isort -c -diff exclude_badExits.py \
support_onions.py trustor_poc.py
lint:: lint.badexits lint.phantompy
sh .pylint.sh
refresh::
/var/local/bin/python3.bash -c \
'import exclude_badExits; print(exclude_badExits.__doc__)' \
> exclude_badExits.md
echo "\n## Usage \n\`\`\`\n" \
>> exclude_badExits.md
/var/local/bin/python3.bash exclude_badExits.py --help \
| sed -e '/^[^uo ]/d' \
>> exclude_badExits.md
echo "\n\`\`\`\n" \
>> exclude_badExits.md

View File

@ -1,113 +0,0 @@
This extends nusenu's basic idea of using the stem library to
dynamically exclude nodes that are likely to be bad by putting them
on the ExcludeNodes or ExcludeExitNodes setting of a running Tor.
* https://github.com/nusenu/noContactInfo_Exit_Excluder
* https://github.com/TheSmashy/TorExitRelayExclude
The basic idea is to exclude Exit nodes that do not have ContactInfo:
* https://github.com/nusenu/ContactInfo-Information-Sharing-Specification
That can be extended to relays that do not have an email in the contact,
or to relays that do not have ContactInfo that is verified to include them.
But there's a problem, and your Tor notice.log will tell you about it:
you could exclude the relays needed to access hidden services or mirror
directories. So we need to add to the process the concept of a whitelist.
In addition, we may have our own blacklist of nodes we want to exclude,
or use these lists for other applications like selektor.
So we make two files that are structured in YAML:
```
/etc/tor/yaml/torrc-goodnodes.yaml
{sGOOD_NODES}
By default all sections of the goodnodes.yaml are used as a whitelist.
Use the GoodNodes/Onions list to list onion services you want the
Introduction Points whitelisted - these points may change daily
Look in tor's notice.log for warnings of 'Every introduction point for service'
```--hs_dir``` ```default='/var/lib/tor'``` will make the program
parse the files named ```hostname``` below this dir to find
Hidden Services to whitelist.
The Introduction Points can change during the day, so you may want to
rerun this program to freshen the list of Introduction Points. A full run
that processes all the relays from stem can take 30 minutes, or run with:
```--saved_only``` will run the program with just cached information
on the relats, but will update the Introduction Points from the Services.
/etc/tor/yaml/torrc-badnodes.yaml
{sBAD_NODES}
```
That part requires [PyYAML](https://pyyaml.org/wiki/PyYAML)
https://github.com/yaml/pyyaml/ or ```ruamel```: do
```pip3 install ruamel``` or ```pip3 install PyYAML```;
the advantage of the former is that it preserves comments.
(You may have to run this as the Tor user to get RW access to
/run/tor/control, in which case the directory for the YAML files must
be group Tor writeable, and its parent's directories group Tor RX.)
Because you don't want to exclude the introduction points to any onion
you want to connect to, ```--white_onions``` should whitelist the
introduction points to a comma sep list of onions; we fixed stem to do this:
* https://github.com/torproject/stem/issues/96
* https://gitlab.torproject.org/legacy/trac/-/issues/25417
Use the GoodNodes/Onions list in goodnodes.yaml to list onion services
you want the Introduction Points whitelisted - these points may change daily.
Look in tor's notice.log for 'Every introduction point for service'
```notice_log``` will parse the notice log for warnings about relays and
services that will then be whitelisted.
```--torrc``` will read a file like /etc/tor/torrc and make some
suggestions based on what it finds; it will not edit or change the file.
```--torrc_output``` will write the torrc ExcludeNodes configuration to a file.
```--good_contacts``` will write the contact info as a ciiss dictionary
to a YAML file. If the proof is uri-rsa, the well-known file of fingerprints
is downloaded and the fingerprints are added on a 'fps' field we create
of that fingerprint's entry of the YAML dictionary. This file is read at the
beginning of the program to start with a trust database, and only new
contact info from new relays are added to the dictionary.
Now for the final part: we lookup the Contact info of every relay
that is currently in our Tor, and check it the existence of the
well-known file that lists the fingerprints of the relays it runs.
If it fails to provide the well-know url, we assume its a bad
relay and add it to a list of nodes that goes on ```ExcludeNodes```
(not just ExcludeExitNodes```). If the Contact info is good, we add the
list of fingerprints to ```ExitNodes```, a whitelist of relays to use as exits.
```--bad_on``` We offer the users 3 levels of cleaning:
1. clean relays that have no contact ```=Empty```
2. clean relays that don't have an email in the contact (implies 1)
```=Empty,NoEmail```
3. clean relays that don't have "good' contactinfo. (implies 1)
```=Empty,NoEmail,NotGood```
The default is ```Empty,NoEmail,NotGood``` ; ```NoEmail``` is inherently imperfect
in that many of the contact-as-an-email are obfuscated, but we try anyway.
To be "good" the ContactInfo must:
1. have a url for the well-defined-file to be gotten
2. must have a file that can be gotten at the URL
3. must support getting the file with a valid SSL cert from a recognized authority
4. (not in the spec but added by Python) must use a TLS SSL > v1
5. must have a fingerprint list in the file
6. must have the FP that got us the contactinfo in the fingerprint list in the file.
```--wait_boot``` is the number of seconds to wait for Tor to booststrap
```--wellknown_output``` will make the program write the well-known files
(```/.well-known/tor-relay/rsa-fingerprint.txt```) to a directory.
```--relays_output write the download relays in json to a file. The relays
are downloaded from https://onionoo.torproject.org/details
For usage, do ```python3 exclude_badExits.py --help`
See [exclude_badExits.txt](./exclude_badExits.txt)

View File

@ -1,55 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/check_digests.html
import sys
import stem.descriptor.remote
import stem.util.tor_tools
def download_descriptors(fingerprint):
"""
Downloads the descriptors we need to validate this relay. Downloads are
parallelized, providing the caller with a tuple of the form...
(router_status_entry, server_descriptor, extrainfo_descriptor)
"""
conensus_query = stem.descriptor.remote.get_consensus()
server_desc_query = stem.descriptor.remote.get_server_descriptors(fingerprint)
extrainfo_query = stem.descriptor.remote.get_extrainfo_descriptors(fingerprint)
router_status_entries = filter(lambda desc: desc.fingerprint == fingerprint, conensus_query.run())
if len(router_status_entries) != 1:
raise IOError("Unable to find relay '%s' in the consensus" % fingerprint)
return (
router_status_entries[0],
server_desc_query.run()[0],
extrainfo_query.run()[0],
)
if __name__ == '__main__':
fingerprint = input("What relay fingerprint would you like to validate?\n")
print('') # blank line
if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
print("'%s' is not a valid relay fingerprint" % fingerprint)
sys.exit(1)
try:
router_status_entry, server_desc, extrainfo_desc = download_descriptors(fingerprint)
except Exception as exc:
print(exc)
sys.exit(1)
if router_status_entry.digest == server_desc.digest():
print("Server descriptor digest is correct")
else:
print("Server descriptor digest invalid, expected %s but is %s" % (router_status_entry.digest, server_desc.digest()))
if server_desc.extra_info_digest == extrainfo_desc.digest():
print("Extrainfo descriptor digest is correct")
else:
print("Extrainfo descriptor digest invalid, expected %s but is %s" % (server_desc.extra_info_digest, extrainfo_desc.digest()))

View File

@ -1,51 +0,0 @@
import collections
import stem.descriptor
import stem.descriptor.remote
import stem.directory
# Query all authority votes asynchronously.
downloader = stem.descriptor.remote.DescriptorDownloader(
document_handler = stem.descriptor.DocumentHandler.DOCUMENT,
)
# An ordered dictionary ensures queries are finished in the order they were
# added.
queries = collections.OrderedDict()
for name, authority in stem.directory.Authority.from_cache().items():
if authority.v3ident is None:
continue # authority doesn't vote if it lacks a v3ident
queries[name] = downloader.get_vote(authority)
# Wait for the votes to finish being downloaded, this produces a dictionary of
# authority nicknames to their vote.
votes = dict((name, query.run()[0]) for (name, query) in queries.items())
# Get a superset of all the fingerprints in all the votes.
all_fingerprints = set()
for vote in votes.values():
all_fingerprints.update(vote.routers.keys())
# Finally, compare moria1's votes to maatuska's votes.
for fingerprint in all_fingerprints:
moria1_vote = votes['moria1'].routers.get(fingerprint)
maatuska_vote = votes['maatuska'].routers.get(fingerprint)
if not moria1_vote and not maatuska_vote:
print("both moria1 and maatuska haven't voted about %s" % fingerprint)
elif not moria1_vote:
print("moria1 hasn't voted about %s" % fingerprint)
elif not maatuska_vote:
print("maatuska hasn't voted about %s" % fingerprint)
elif 'Running' in moria1_vote.flags and 'Running' not in maatuska_vote.flags:
print("moria1 has the Running flag but maatuska doesn't: %s" % fingerprint)
elif 'Running' in maatuska_vote.flags and 'Running' not in moria1_vote.flags:
print("maatuska has the Running flag but moria1 doesn't: %s" % fingerprint)

View File

@ -1,25 +0,0 @@
#!/bin/sh
# -*- mode: sh; fill-column: 75; tab-width: 8; coding: utf-8-unix -*-
ROLE=toxcore
PROG=exclude_badExits
build=build
dist=dist
# pyinstaller
if [ ! -e ${dist}/${PROG}.pyi -o ! ${dist}/${PROG}.pyi -nt ./${PROG}.py ] ; then
[ -f ${PROG}.spec ] || pyi-makespec ./${PROG}.py -F -c
[ -d ${build} ] || mkdir -p ${build}
[ -d ${dist} ] || mkdir -p ${dist}
[ -e ${dist}/${PROG}.pyi -a ${dist}/${PROG}.pyi -nt ./${PROG}.py ] || \
pyinstaller --distpath ${dist} --workpath ${build} \
--exclude tkinter --exclude matplotlib \
--exclude twisted --exclude jedi --exclude jaraco \
--exclude sphinx --exclude coverage --exclude nose \
--exclude PIL --exclude numpy --exclude OpenGL \
--exclude PySide2 --exclude PyQt5 --exclude IPython \
--onefile -c --ascii \
$PROG.py
# AttributeError: 'NoneType' object has no attribute 'groups'
# utils.py #400
fi
# cx_Freeze exclude_badExits.py

View File

@ -1,26 +0,0 @@
#!/bin/sh
# -*- mode: sh; fill-column: 75; tab-width: 8; coding: utf-8-unix -*-
ROLE=toxcore
PROG=exclude_badExits
build=build
dist=dist
# pyinstaller
if [ ! -e ${dist}/${PROG}.pyi -o ! ${dist}/${PROG}.pyi -nt ./${PROG}.py ] ; then
[ -f ${PROG}.spec ] || pyi-makespec ./${PROG}.py -F -c
[ -d ${build} ] || mkdir -p ${build}
[ -d ${dist} ] || mkdir -p ${dist}
[ -e ${dist}/${PROG}.pyi -a ${dist}/${PROG}.pyi -nt ./${PROG}.py ] || \
pyinstaller --distpath ${dist} --workpath ${build} \
--exclude tkinter --exclude matplotlib \
--exclude twisted --exclude jedi --exclude jaraco \
--exclude sphinx --exclude coverage --exclude nose \
--exclude PIL --exclude numpy --exclude OpenGL \
--exclude PySide2 --exclude PyQt5 --exclude IPython \
--onefile -c --ascii \
$PROG.py
# AttributeError: 'NoneType' object has no attribute 'groups'
# utils.py #400
fi
# cx_Freeze exclude_badExits.py

View File

@ -1,42 +0,0 @@
#!/bin/bash
# -*- mode: sh; fill-column: 75; tab-width: 8; coding: utf-8-unix -*-
PROG=exclude_badExits.py
SOCKS_PORT=9050
CAFILE=/etc/ssl/certs/ca-certificates.crt
ROLE=toxcore
# an example of running exclude_badExits with full debugging
# expected to take an hour or so
declare -a LARGS
LARGS=(
--log_level 10
)
# you may have a special python for installed packages
EXE=`which python3.bash`
LARGS+=(
--strict_nodes 1
--points_timeout 120
--proxy-host 127.0.0.1
--proxy-port $SOCKS_PORT
--https_cafile $CAFILE
)
if [ -f '/run/tor/control' ] ; then
LARGS+=(--proxy-ctl '/run/tor/control' )
else
LARGS+=(--proxy-ctl 9051 )
fi
ddg=duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad
# for example, whitelist the introduction points to DuckDuckGo
LARGS+=( --white_onions $ddg )
# you may need to be the tor user to read /run/tor/control
grep -q ^debian-tor /etc/group && TORU=debian-tor || {
grep -q ^tor /etc/group && TORU=tor
}
sudo -u $TORU $EXE exclude_badExits.py "${LARGS[@]}" \
2>&1|tee exclude_badExits6.log
# The DEBUG statements contain the detail of why the relay was considered bad.

View File

@ -1,151 +0,0 @@
This extends nusenu's basic idea of using the stem library to
dynamically exclude nodes that are likely to be bad by putting them
on the ExcludeNodes or ExcludeExitNodes setting of a running Tor.
* https://github.com/nusenu/noContactInfo_Exit_Excluder
* https://github.com/TheSmashy/TorExitRelayExclude
The basic idea is to exclude Exit nodes that do not have ContactInfo:
* https://github.com/nusenu/ContactInfo-Information-Sharing-Specification
That can be extended to relays that do not have an email in the contact,
or to relays that do not have ContactInfo that is verified to include them.
But there's a problem, and your Tor notice.log will tell you about it:
you could exclude the relays needed to access hidden services or mirror
directories. So we need to add to the process the concept of a whitelist.
In addition, we may have our own blacklist of nodes we want to exclude,
or use these lists for other applications like selektor.
So we make two files that are structured in YAML:
```
/etc/tor/yaml/torrc-goodnodes.yaml
GoodNodes:
Relays:
IntroductionPoints:
- NODEFINGERPRINT
...
By default all sections of the goodnodes.yaml are used as a whitelist.
/etc/tor/yaml/torrc-badnodes.yaml
BadNodes:
ExcludeExitNodes:
BadExit:
# $0000000000000000000000000000000000000007
```
That part requires [PyYAML](https://pyyaml.org/wiki/PyYAML)
https://github.com/yaml/pyyaml/ or ```ruamel```: do
```pip3 install ruamel``` or ```pip3 install PyYAML```;
the advantage of the former is that it preserves comments.
(You may have to run this as the Tor user to get RW access to
/run/tor/control, in which case the directory for the YAML files must
be group Tor writeable, and its parents group Tor RX.)
Because you don't want to exclude the introduction points to any onion
you want to connect to, ```--white_onions``` should whitelist the
introduction points to a comma sep list of onions; we fixed stem to do this:
* https://github.com/torproject/stem/issues/96
* https://gitlab.torproject.org/legacy/trac/-/issues/25417
```--torrc_output``` will write the torrc ExcludeNodes configuration to a file.
```--good_contacts``` will write the contact info as a ciiss dictionary
to a YAML file. If the proof is uri-rsa, the well-known file of fingerprints
is downloaded and the fingerprints are added on a 'fps' field we create
of that fingerprint's entry of the YAML dictionary. This file is read at the
beginning of the program to start with a trust database, and only new
contact info from new relays are added to the dictionary.
Now for the final part: we lookup the Contact info of every relay
that is currently in our Tor, and check it the existence of the
well-known file that lists the fingerprints of the relays it runs.
If it fails to provide the well-know url, we assume its a bad
relay and add it to a list of nodes that goes on ```ExcludeNodes```
(not just ExcludeExitNodes```). If the Contact info is good, we add the
list of fingerprints to ```ExitNodes```, a whitelist of relays to use as exits.
```--bad_on``` We offer the users 3 levels of cleaning:
1. clean relays that have no contact ```=Empty```
2. clean relays that don't have an email in the contact (implies 1)
```=Empty,NoEmail```
3. clean relays that don't have "good' contactinfo. (implies 1)
```=Empty,NoEmail,NotGood```
The default is ```=Empty,NotGood``` ; ```NoEmail``` is inherently imperfect
in that many of the contact-as-an-email are obfuscated, but we try anyway.
To be "good" the ContactInfo must:
1. have a url for the well-defined-file to be gotten
2. must have a file that can be gotten at the URL
3. must support getting the file with a valid SSL cert from a recognized authority
4. (not in the spec but added by Python) must use a TLS SSL > v1
5. must have a fingerprint list in the file
6. must have the FP that got us the contactinfo in the fingerprint list in the file,
For usage, do ```python3 exclude_badExits.py --help`
## Usage
```
usage: exclude_badExits.py [-h] [--https_cafile HTTPS_CAFILE]
[--proxy_host PROXY_HOST] [--proxy_port PROXY_PORT]
[--proxy_ctl PROXY_CTL] [--torrc TORRC]
[--timeout TIMEOUT] [--good_nodes GOOD_NODES]
[--bad_nodes BAD_NODES] [--bad_on BAD_ON]
[--bad_contacts BAD_CONTACTS]
[--strict_nodes {0,1}] [--wait_boot WAIT_BOOT]
[--points_timeout POINTS_TIMEOUT]
[--log_level LOG_LEVEL]
[--bad_sections BAD_SECTIONS]
[--white_onions WHITE_ONIONS]
[--torrc_output TORRC_OUTPUT]
[--relays_output RELAYS_OUTPUT]
[--good_contacts GOOD_CONTACTS]
optional arguments:
-h, --help show this help message and exit
--https_cafile HTTPS_CAFILE
Certificate Authority file (in PEM)
--proxy_host PROXY_HOST, --proxy-host PROXY_HOST
proxy host
--proxy_port PROXY_PORT, --proxy-port PROXY_PORT
proxy control port
--proxy_ctl PROXY_CTL, --proxy-ctl PROXY_CTL
control socket - or port
--torrc TORRC torrc to check for suggestions
--timeout TIMEOUT proxy download connect timeout
--good_nodes GOOD_NODES
Yaml file of good info that should not be excluded
--bad_nodes BAD_NODES
Yaml file of bad nodes that should also be excluded
--bad_on BAD_ON comma sep list of conditions - Empty,NoEmail,NotGood
--bad_contacts BAD_CONTACTS
Yaml file of bad contacts that bad FPs are using
--strict_nodes {0,1} Set StrictNodes: 1 is less anonymous but more secure,
although some sites may be unreachable
--wait_boot WAIT_BOOT
Seconds to wait for Tor to booststrap
--points_timeout POINTS_TIMEOUT
Timeout for getting introduction points - must be long
>120sec. 0 means disabled looking for IPs
--log_level LOG_LEVEL
10=debug 20=info 30=warn 40=error
--bad_sections BAD_SECTIONS
sections of the badnodes.yaml to use, comma separated,
'' BROKEN
--white_onions WHITE_ONIONS
comma sep. list of onions to whitelist their
introduction points - BROKEN
--torrc_output TORRC_OUTPUT
Write the torrc configuration to a file
--relays_output RELAYS_OUTPUT
Write the download relays in json to a file
--good_contacts GOOD_CONTACTS
Write the proof data of the included nodes to a YAML
file
```

View File

@ -1,52 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# https://stem.torproject.org/tutorials/examples/exit_used.html
import functools
import sys
import os
import getpass
from stem import StreamStatus
from stem.control import EventType, Controller
def main():
print("Tracking requests for tor exits. Press 'enter' to end.")
print("")
if os.path.exists('/var/run/tor/control'):
controller = Controller.from_socket_file(path='/var/run/tor/control')
else:
controller = Controller.from_port(port=9051)
try:
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
stream_listener = functools.partial(stream_event, controller)
controller.add_event_listener(stream_listener, EventType.STREAM)
print('Press Enter')
input() # wait for user to press enter
except Exception as e:
print(e)
finally:
del controller
def stream_event(controller, event):
if event.status == StreamStatus.SUCCEEDED and event.circ_id:
circ = controller.get_circuit(event.circ_id)
exit_fingerprint = circ.path[-1][0]
exit_relay = controller.get_network_status(exit_fingerprint)
print("Exit relay for our connection to %s" % (event.target))
print(" address: %s:%i" % (exit_relay.address, exit_relay.or_port))
print(" fingerprint: %s" % exit_relay.fingerprint)
print(" nickname: %s" % exit_relay.nickname)
print(" locale: %s" % controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown'))
print("")
if __name__ == '__main__':
main()

View File

@ -1,42 +0,0 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
# http://vt5hknv6sblkgf22.onion/tutorials/over_the_river.html
import sys
import os
import getpass
from stem.control import Controller
from stem.connection import MissingPassword
if len(sys.argv) <= 1:
sys.argv += ['']
if os.path.exists('/run/tor/control'):
controller = Controller.from_socket_file(path='/run/tor/control')
else:
controller = Controller.from_port(port=9051)
try:
controller.authenticate()
except (Exception, MissingPassword):
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
try:
for elt in sys.argv[1:]:
desc = controller.get_hidden_service_descriptor(elt, await_result=True, timeout=None)
print(f"{desc} get_hidden_service_descriptor\n")
l = desc.introduction_points()
if l:
print(f"{elt} NO introduction points\n")
continue
print(f"{elt} introduction points are...\n")
for introduction_point in l:
print(' %s:%s => %s' % (introduction_point.address,
introduction_point.port,
introduction_point.identifier))
except Exception as e:
print(e)
finally:
del controller

View File

@ -1,41 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/list_circuits.html
import sys
import os
import getpass
from stem import CircStatus
from stem.control import Controller
# port(port = 9051)
if os.path.exists('/var/run/tor/control'):
controller = Controller.from_socket_file(path='/var/run/tor/control')
else:
controller = Controller.from_port(port=9051)
try:
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
for circ in sorted(controller.get_circuits()):
if circ.status != CircStatus.BUILT:
continue
print("")
print("Circuit %s (%s)" % (circ.id, circ.purpose))
for i, entry in enumerate(circ.path):
div = '+' if (i == len(circ.path) - 1) else '|'
fingerprint, nickname = entry
desc = controller.get_network_status(fingerprint, None)
address = desc.address if desc else 'unknown'
print(" %s- %s (%s, %s)" % (div, fingerprint, nickname, address))
except Exception as e:
print(e)
finally:
del controller

View File

@ -1,84 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
"""
Looks for urls https://dns.google/resolve?
https://dns.google/resolve?name=domain.name&type=TXT&cd=true&do=true
and parses them to extract a magic field.
A good example of how you can parse json embedded in HTML with phantomjs.
"""
import sys
import os
from phantompy import Render
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class LookFor(Render):
def __init__(self, app, do_print=True, do_save=False):
app.lfps = []
self._app = app
self.do_print = do_print
self.do_save = do_save
self.progress = 0
self.we_run_this_tor_relay = None
Render.__init__(self, app, do_print, do_save)
def _exit(self, val):
Render._exit(self, val)
self.percent = 100
LOG.debug(f"phantom.py: Exiting with val {val}")
i = self.uri.find('name=')
fp = self.uri[i+5:]
i = fp.find('.')
fp = fp[:i]
# threadsafe?
self._app.lfps.append(fp)
def _html_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if type(args[0]) is str:
self._save(args[0])
i = self.ilookfor(args[0])
self._onConsoleMessage(i, "__PHANTOM_PY_SAVED__", 0 , '')
def ilookfor(self, html):
import json
marker = '<pre style="word-wrap: break-word; white-space: pre-wrap;">'
if marker not in html: return -1
i = html.find(marker) + len(marker)
html = html[i:]
assert html[0] == '{', html
i = html.find('</pre')
html = html[:i]
assert html[-1] == '}', html
LOG.debug(f"Found {len(html)} json")
o = json.loads(html)
if "Answer" not in o.keys() or type(o["Answer"]) != list:
LOG.warn(f"FAIL {self.uri}")
return 1
for elt in o["Answer"]:
assert type(elt) == dict, elt
assert 'type' in elt, elt
if elt['type'] != 16: continue
assert 'data' in elt, elt
if elt['data'] == 'we-run-this-tor-relay':
LOG.info(f"OK {self.uri}")
self.we_run_this_tor_relay = True
return 0
self.we_run_this_tor_relay = False
LOG.warn(f"BAD {self.uri}")
return 2
def _loadFinished(self, result):
LOG.debug(f"phantom.py: Loading finished {self.uri}")
self.toHtml(self._html_callback)

View File

@ -1,42 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# https://stem.torproject.org/tutorials/examples/exit_used.html
import functools
import sys
import getpass
import os
from stem import StreamStatus
from stem.control import EventType, Controller
global LOG
import logging
LOG = logging.getLogger('app.'+'tox_factory')
def sMapaddressResolv(target, iPort=9051):
if os.path.exists('/var/run/tor/control'):
controller = Controller.from_socket_file(path='/var/run/tor/control')
else:
controller = Controller.from_port(port=iPort)
try:
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
map_dict = {"0.0.0.0": target}
map_ret = controller.map_address(map_dict)
return map_ret
except Exception as e:
LOG.exception(e)
finally:
del controller
if __name__ == '__main__':
if len(sys.argv) < 2:
target = "l2ct3xnuaiwwtoybtn46qp2av4ndxcguwupzyv6xrsmnwi647vvmwtqd"
else:
target = sys.argv[1]
print(sMapaddressResolv(target))

View File

@ -1,21 +0,0 @@
from stem.descriptor.remote import DescriptorDownloader
from stem.version import Version
downloader = DescriptorDownloader()
count, with_contact = 0, 0
print("Checking for outdated relays...")
print("")
for desc in downloader.get_server_descriptors():
if desc.tor_version < Version('0.2.3.0'):
count += 1
if desc.contact:
print(' %-15s %s' % (desc.tor_version, desc.contact.decode("utf-8", "replace")))
with_contact += 1
print("")
print("%i outdated relays found, %i had contact information" % (count, with_contact))
# http://vt5hknv6sblkgf22.onion/tutorials/examples/outdated_relays.htmlhttp://vt5hknv6sblkgf22.onion/tutorials/examples/outdated_relays.html

View File

@ -1,97 +0,0 @@
# phantompy
A simple replacement for phantomjs using PyQt.
This code is based on a brilliant idea of
[Michael Franzl](https://gist.github.com/michaelfranzl/91f0cc13c56120391b949f885643e974/raw/a0601515e7a575bc4c7d4d2a20973b29b6c6f2df/phantom.py)
that he wrote up in his blog:
* https://blog.michael.franzl.name/2017/10/16/phantomjs-alternative-write-short-pyqt-scripts-instead-phantom-py/
* https://blog.michael.franzl.name/2017/10/16/phantom-py/
## Features
* Generate a PDF screenshot of the web page after it is completely loaded.
* Optionally execute a local JavaScript file specified by the argument
```javascript-file``` after the web page is completely loaded, and before the
PDF is generated. (YMMV - it segfaults for me. )
* Generate a HTML save file screenshot of the web page after it is
completely loaded and the javascript has run.
* console.log's will be printed to stdout.
* Easily add new features by changing the source code of this script,
without compiling C++ code. For more advanced applications, consider
attaching PyQt objects/methods to WebKit's JavaScript space by using
QWebFrame::addToJavaScriptWindowObject().
If you execute an external ```javascript-file```, phantompy has no
way of knowing when that script has finished doing its work. For this
reason, the external script should execute at the end
```console.log("__PHANTOM_PY_DONE__");``` when done. This will trigger
the PDF generation or the file saving, after which phantompy will exit.
If you do not want to run any javascipt file, this trigger is provided
in the code by default.
It is important to remember that since you're just running WebKit, you can
use everything that WebKit supports, including the usual JS client
libraries, CSS, CSS @media types, etc.
Qt picks up proxies from the environment, so this will respect
```https_proxy``` or ```http_proxy``` if set.
## Dependencies
* Python3
* PyQt5 (this should work with PySide2 and PyQt6 - let us know.)
* [qasnyc](https://github.com/CabbageDevelopment/qasync) for the
standalone program ```qasync_phantompy.py```
## Standalone
A standalone program is a little tricky as PyQt PyQt5.QtWebEngineWidgets'
QWebEnginePage uses callbacks at each step of the way:
1) loading the page = ```Render.run```
2) running javascript in and on the page = ```Render._loadFinished```
3) saving the page = ```Render.toHtml and _html_callback```
4) printing the page = ```Render._print```
The steps get chained by printing special messages to the Python
renderer of the JavaScript console: ```Render. _onConsoleMessage```
So it makes it hard if you want the standalone program to work without
a GUI, or in combination with another Qt program that is responsible
for the PyQt ```app.exec``` and the exiting of the program.
We've decided to use the best of the shims that merge the Python
```asyncio``` and Qt event loops:
[qasyc](https://github.com/CabbageDevelopment/qasync). This is seen as
the successor to the sorta abandoned [quamash](https://github.com/harvimt/quamash).
The code is based on a
[comment](https://github.com/CabbageDevelopment/qasync/issues/35#issuecomment-1315060043)
by [Alex March](https://github.com/hosaka) who's excellent code helped me.
As this is my first use of ```asyncio``` and ```qasync``` I may have
introduced some errors and it may be improved on, but it works, and
it not a monolithic Qt program, so it can be used as a library.
## Usage
The standalone program is ```quash_phantompy.py```
### Arguments
```
--js_input (optional) Path and name of a JavaScript file to execute on the HTML
--html_output <html-file> (optional) Path a HTML output file to generate after JS is applied
--pdf_output <pdf-file> (optional) Path and name of PDF file to generate after JS is applied
--log_level 10=debug 20=info 30=warn 40=error
html_or_url - required argument, a http(s) URL or a path to a local file.
```
Setting ```DEBUG=1``` in the environment will give debugging messages
on ```stderr```.
## Postscript
When I think of all the trouble people went to compiling and
maintaining the tonnes of C++ code that went into
[phantomjs](https://github.com/ariya/phantomjs), I am amazed that it
can be replaced with a couple of hundred lines of Python!

View File

@ -1,275 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 2; coding: utf-8 -*-
# https://gist.github.com/michaelfranzl/91f0cc13c56120391b949f885643e974/raw/a0601515e7a575bc4c7d4d2a20973b29b6c6f2df/phantom.py
# https://blog.michael.franzl.name/2017/10/16/phantomjs-alternative-write-short-pyqt-scripts-instead-phantom-py/
# https://blog.michael.franzl.name/2017/10/16/phantom-py/
 """
# phantom.py
Simple but fully scriptable headless QtWebKit browser using PyQt5 in Python3,
specialized in executing external JavaScript and generating PDF files. A lean
replacement for other bulky headless browser frameworks.
## Usage
If you have a display attached:
./phantom.py [--pdf_output <pdf-file>] [--js_input <javascript-file>] <url-or-html-file>
If you don't have a display attached (i.e. on a remote server), you can use
xvfb-run, or don't add --show_gui - it should work without a display.
Arguments:
[--pdf_output <pdf-file>] (optional) Path and name of PDF file to generate
[--html_output <html-file>] (optional) Path and name of HTML file to generate
[--js_input <javascript-file>] (optional) Path and name of a JavaScript file to execute
--log_level 10=debug 20=info 30=warn 40=error
<url> Can be a http(s) URL or a path to a local file
## Features
* Generate a PDF screenshot of the web page after it is completely loaded.
* Optionally execute a local JavaScript file specified by the argument
<javascript-file> after the web page is completely loaded, and before
the PDF is generated.
* console.log's will be printed to stdout.
* Easily add new features by changing the source code of this script, without
compiling C++ code. For more advanced applications, consider attaching
PyQt objects/methods to WebKit's JavaScript space by using
`QWebFrame::addToJavaScriptWindowObject()`.
If you execute an external <javascript-file>, phantom.py has no way of knowing
when that script has finished doing its work. For this reason, the external
script should execute `console.log("__PHANTOM_PY_DONE__");` when done. This will
trigger the PDF generation, after which phantom.py will exit. If no
`__PHANTOM_PY_DONE__` string is seen on the console for 10 seconds, phantom.py
will exit without doing anything. This behavior could be implemented more
elegantly without console.log's but it is the simplest solution.
It is important to remember that since you're just running WebKit, you can use
everything that WebKit supports, including the usual JS client libraries, CSS,
CSS @media types, etc.
## Dependencies
* Python3
* PyQt5
* [qasnyc](https://github.com/CabbageDevelopment/qasync) for the
standalone program ```qasnyc_phantompy.py```
* xvfb (optional for display-less machines)
Installation of dependencies in Debian Stretch is easy:
apt-get install xvfb python3-pyqt5 python3-pyqt5.qtwebkit
Finding the equivalent for other OSes is an exercise that I leave to you.
## Examples
Given the following file /tmp/test.html
<html>
<body>
<p>foo <span id="id1">foo</span> <span id="id2">foo</span></p>
</body>
<script>
document.getElementById('id1').innerHTML = "bar";
</script>
</html>
... and the following file /tmp/test.js:
document.getElementById('id2').innerHTML = "baz";
console.log("__PHANTOM_PY_DONE__");
... and running this script (without attached display) ...
xvfb-run python3 phantom.py /tmp/test.html /tmp/out.pdf /tmp/test.js
... you will get a PDF file /tmp/out.pdf with the contents "foo bar baz".
Note that the second occurrence of "foo" has been replaced by the web page's own
script, and the third occurrence of "foo" by the external JS file.
## License
Copyright 2017 Michael Karl Franzl
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
import importlib
import os
import sys # noqa
from qasync import QtModuleName
from qasync.QtCore import QUrl
QPrinter = importlib.import_module(QtModuleName + ".QtPrintSupport.QPrinter", package=QtModuleName)
QWebEnginePage = importlib.import_module(QtModuleName + ".QtWebEngineWidgets.QWebEnginePage", package=QtModuleName)
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
def prepare(sdir='/tmp'):
sfile = os.path.join(sdir, 'test.js')
if not os.path.exists(sfile):
with open(sfile, 'wt') as ofd:
ofd.write("""
document.getElementById('id2').innerHTML = "baz";
console.log("__PHANTOM_PY_DONE__");
""")
LOG.debug(f"wrote {sfile} ")
sfile = os.path.join(sdir, 'test.html')
if not os.path.exists(sfile):
with open(sfile, 'wt') as ofd:
ofd.write("""
<html>
<body>
<p>foo <span id="id1">foo</span> <span id="id2">foo</span></p>
</body>
<script>
document.getElementById('id1').innerHTML = "bar";
</script>
</html>
""")
LOG.debug(f"wrote {sfile} ")
class Render(QWebEnginePage):
def __init__(self, app, do_print=False, do_save=True):
app.ldone = []
self._app = app
self.do_print = do_print
self.do_save = do_save
self.percent = 0
self.uri = None
self.jsfile = None
self.htmlfile = None
self.pdffile = None
QWebEnginePage.__init__(self)
def run(self, url, pdffile, htmlfile, jsfile):
self._app.lstart.append(id(self))
self.percent = 10
self.uri = url
self.jsfile = jsfile
self.htmlfile = htmlfile
self.pdffile = pdffile
self.outfile = pdffile or htmlfile
LOG.debug(f"phantom.py: URL={url} htmlfile={htmlfile} pdffile={pdffile} JSFILE={jsfile}")
qurl = QUrl.fromUserInput(url)
# The PDF generation only happens when the special string __PHANTOM_PY_DONE__
# is sent to console.log(). The following JS string will be executed by
# default, when no external JavaScript file is specified.
self.js_contents = "setTimeout(function() { console.log('__PHANTOM_PY_DONE__') }, 5000);"
if jsfile:
try:
with open(self.jsfile, 'rt') as f:
self.js_contents = f.read()
except Exception as e: # noqa
LOG.exception(f"error reading jsfile {self.jsfile}")
self.loadFinished.connect(self._loadFinished)
self.percent = 20
self.load(qurl)
self.javaScriptConsoleMessage = self._onConsoleMessage
LOG.debug(f"phantom.py: loading 10")
def _onConsoleMessage(self, *args):
if len(args) > 3:
level, txt, lineno, filename = args
else:
level = 1
txt, lineno, filename = args
LOG.debug(f"CONSOLE {lineno} {txt} {filename}")
if "__PHANTOM_PY_DONE__" in txt:
self.percent = 40
# If we get this magic string, it means that the external JS is done
if self.do_save:
self.toHtml(self._html_callback)
return
# drop through
txt = "__PHANTOM_PY_SAVED__"
if "__PHANTOM_PY_SAVED__" in txt:
self.percent = 50
if self.do_print:
self._print()
return
txt = "__PHANTOM_PY_PRINTED__"
if "__PHANTOM_PY_PRINTED__" in txt:
self.percent = 60
self._exit(level)
def _loadFinished(self, result):
# RenderProcessTerminationStatus ?
self.percent = 30
LOG.info(f"phantom.py: _loadFinished {result} {self.percent}")
LOG.debug(f"phantom.py: Evaluating JS from {self.jsfile}")
self.runJavaScript("document.documentElement.contentEditable=true")
self.runJavaScript(self.js_contents)
def _html_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if type(args[0]) is str:
self._save(args[0])
self._onConsoleMessage(0, "__PHANTOM_PY_SAVED__", 0, '')
def _save(self, html):
sfile = self.htmlfile
# CompleteHtmlSaveFormat SingleHtmlSaveFormat MimeHtmlSaveFormat
with open(sfile, 'wt') as ofd:
ofd.write(html)
LOG.debug(f"Saved {sfile}")
def _printer_callback(self, *args):
"""print(self, QPrinter, Callable[[bool], None])"""
if args[0] is False:
i = 1
else:
i = 0
self._onConsoleMessage(i, "__PHANTOM_PY_PRINTED__", 0, '')
def _print(self):
sfile = self.pdffile
printer = QPrinter()
printer.setPageMargins(10, 10, 10, 10, QPrinter.Millimeter)
printer.setPaperSize(QPrinter.A4)
printer.setCreator("phantom.py by Michael Karl Franzl")
printer.setOutputFormat(QPrinter.PdfFormat)
printer.setOutputFileName(sfile)
self.print(printer, self._printer_callback)
LOG.debug("phantom.py: Printed")
def _exit(self, val):
self.percent = 100
LOG.debug(f"phantom.py: Exiting with val {val}")
# threadsafe?
self._app.ldone.append(self.uri)

View File

@ -1,128 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import asyncio
import os
import sys
# let qasync figure out what Qt we are using - we dont care
from qasync import QApplication, QEventLoop, QtWidgets
from phantompy import Render
# if you want an example of looking for things in downloaded HTML:
# from lookupdns import LookFor as Render
from support_phantompy import omain_argparser, vsetup_logging
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
try:
import shtab
except:
shtab = None
class Widget(QtWidgets.QWidget):
def __init__(self):
QtWidgets.QWidget.__init__(self)
self._label = QtWidgets.QLabel()
box = QtWidgets.QHBoxLayout()
self.setLayout(box)
box.addWidget(self._label)
self.progress = QtWidgets.QProgressBar()
self.progress.setRange(0, 99)
box.addWidget(self.progress)
def update(self, text):
i = len(asyncio.all_tasks())
self._label.setText(str(i))
self.progress.setValue(int(text))
class ContextManager:
def __init__(self) -> None:
self._seconds = 0
async def __aenter__(self):
LOG.debug("ContextManager enter")
return self
async def __aexit__(self, *args):
LOG.debug("ContextManager exit")
async def tick(self):
await asyncio.sleep(1)
self._seconds += 1
return self._seconds
async def main(widget, app, ilen):
LOG.debug("Task started")
try:
async with ContextManager() as ctx:
for i in range(1, 120):
seconds = await ctx.tick()
if widget:
widget.update(str(i))
if len(app.ldone) == ilen:
LOG.info(f"Finished with {app.ldone}")
print('\n'.join(app.ldone))
app.exit()
# raise asyncio.CancelledError
return
LOG.debug(f"{app.ldone} {seconds}")
except asyncio.CancelledError as ex: # noqa
LOG.debug("Task cancelled")
def iMain(largs):
parser = omain_argparser()
if shtab:
shtab.add_argument_to(parser, ["-s", "--print-completion"]) # magic!
oargs = parser.parse_args(largs)
bgui = oargs.show_gui
try:
d = int(os.environ.get('DEBUG', 0))
if d > 0:
oargs.log_level = 10
vsetup_logging(oargs.log_level, logfile='', stream=sys.stderr)
except: pass
app = QApplication([])
app.lstart = []
if bgui:
widget = Widget()
widget._app = app
widget.show()
else:
widget = None
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
url = oargs.html_url
htmlfile = oargs.html_output
pdffile = oargs.html_output
jsfile = oargs.js_input
# run only starts the url loading
r = Render(app,
do_print=True if pdffile else False,
do_save=True if htmlfile else False)
uri = url.strip()
r.run(uri, pdffile, htmlfile, jsfile)
LOG.debug(f"{r.percent} {app.lstart}")
LOG.info(f"queued {len(app.lstart)} urls")
task = loop.create_task(main(widget, app, 1))
loop.run_forever()
# cancel remaining tasks and wait for them to complete
task.cancel()
tasks = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*tasks))
if __name__ == '__main__':
iMain(sys.argv[1:])

View File

@ -1,140 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import sys
import os
import atexit
import traceback
import functools
import asyncio
import time
import qasync
import threading
from PyQt5.QtWidgets import (QProgressBar, QWidget, QVBoxLayout)
# from PySide2.QtWidgets import QApplication, QProgressBar
from qasync import QEventLoop, QThreadExecutor
from qasync import asyncSlot, asyncClose, QApplication
from phantompy import Render
from lookupdns import LookFor
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class MainWindow(QWidget):
"""Main window."""
def __init__(self):
super().__init__()
self.setLayout(QVBoxLayout())
self.progress = QProgressBar()
self.progress.setRange(0, 99)
self.layout().addWidget(self.progress)
async def main(app):
def close_future(future, loop):
loop.call_later(10, future.cancel)
future.cancel()
loop = asyncio.get_running_loop()
future = asyncio.Future()
app.ldone = []
getattr(app, "aboutToQuit").connect(
functools.partial(close_future, future, loop)
)
if False:
progress = QProgressBar()
progress.setRange(0, 99)
progress.show()
else:
mw = MainWindow()
progress = mw.progress
mw.show()
# LOG.info(f"calling first_50 {r}")
# await first_50(progress, r)
LOG.info(f"calling last_50 {r}")
o = QThreadExecutor(max_workers=1)
app.o = o
with o as executor:
await loop.run_in_executor(executor, functools.partial(last_50, progress, sys.argv[1:], app), loop)
LOG.info(f" {dir(o)}")
LOG.info(f"awaiting {future}")
await future
return True
async def first_50(progress, r=None):
progress.setValue(5)
LOG.info(f"first_50 {r}")
if r is not None:
# loop = asyncio.get_running_loop()
# LOG.info(f"first_50.r.run {r}")
# loop.call_soon_threadsafe(r.run, r.url, r.outfile, r.jsfile)
# r.run( r.url, r.outfile, r.jsfile)
for i in range(50):
# LOG.info(f"first_50 {r.progress} {i}")
# if r.progress >= 100: break
# progress.setValue(max(r.progress,i))
progress.setValue(i)
await asyncio.sleep(.1)
return
for i in range(50):
LOG.info(f"first_50 {r} {i}")
loop.call_soon_threadsafe(progress.setValue, i)
time.sleep(1)
def last_50(progress, largs, app, loop):
url = largs[0]
outfile = largs[1]
jsfile = largs[2] if len(largs) > 2 else None
r = Render(app, do_print=False, do_save=True)
uri = url.strip()
loop.call_soon_threadsafe(r.run, uri, outfile, jsfile)
time.sleep(1)
for i in range(50, 100):
j = len(app.ldone) # r.progress
if j == 100:
LOG.info(f"last_50 None {i} {j}")
else:
LOG.debug(f"last_50 None {i} {j}")
loop.call_soon_threadsafe(progress.setValue, i)
time.sleep(1)
if __name__ == '__main__':
url = 'https://dns.google/resolve?name=6D6EC2A2E2ED8BFF2D4834F8D669D82FC2A9FA8D.for-privacy.net&type=TXT&cd=true&do=true'
outfile = '/tmp/test1.pdf'
jsfile = '/tmp/test1.js'
from exclude_badExits import vsetup_logging
vsetup_logging(10)
app = QApplication([])
#?
loop = qasync.QEventLoop(app)
#NOT loop = asyncio.get_event_loop()
asyncio._set_running_loop(loop)
asyncio.events._set_running_loop(loop)
r = Render(app, do_print=False, do_save=True)
#loop.call_soon_threadsafe(r.run, url, outfile, jsfile)
r.run(url, outfile, jsfile)
app.rs = [r]
for i in range(20):
for elt in app.rs:
print (elt.percent)
time.sleep(2)
try:
qasync.run(main(app))
except asyncio.exceptions.CancelledError:
sys.exit(0)
except RuntimeError as e:
LOG.debug('Fixme')
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
else:
val = 0
sys.exit(val)

View File

@ -1,49 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import sys
import os
import traceback
from phantompy import Render
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
import sys
import asyncio
import time
from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop) # NEW must set the event loop
asyncio.events._set_running_loop(loop)
progress = QProgressBar()
progress.setRange(0, 99)
progress.show()
async def master():
await first_50()
with QThreadExecutor(1) as executor:
await loop.run_in_executor(exec, last_50)
# TODO announce completion?
async def first_50():
for i in range(50):
progress.setValue(i)
await asyncio.sleep(.1)
def last_50():
for i in range(50,100):
loop.call_soon_threadsafe(progress.setValue, i)
time.sleep(.1)
with loop: ## context manager calls .close() when loop completes, and releases all resources
loop.run_until_complete(master())

View File

@ -1,137 +0,0 @@
# -*-mode: python; py-indent-offset: 2; indent-tabs-mode: nil; coding: utf-8-unix -*-
# http://vt5hknv6sblkgf22.onion/tutorials/examples/relay_connections.html
import argparse
import collections
import time
import stem.connection
import stem.util.system
import stem.util.str_tools
from stem.control import Listener
from stem.control import Controller
from stem.util.connection import get_connections, port_usage, is_valid_ipv4_address
HEADER_LINE = " {version} uptime: {uptime} flags: {flags}\n"
DIV = '+%s+%s+%s+' % ('-' * 30, '-' * 6, '-' * 6)
COLUMN = '| %-28s | %4s | %4s |'
INBOUND_ORPORT = 'Inbound to our ORPort'
INBOUND_DIRPORT = 'Inbound to our DirPort'
INBOUND_CONTROLPORT = 'Inbound to our ControlPort'
OUTBOUND_ORPORT = 'Outbound to a relay'
OUTBOUND_EXIT = 'Outbound exit traffic'
OUTBOUND_UNKNOWN = 'Outbound uncategorized'
def main(controller):
parser = argparse.ArgumentParser()
parser.add_argument("--ctrlport", help="default: 9051 or 9151")
parser.add_argument("--resolver", help="default: autodetected")
args = parser.parse_args()
control_port = int(args.ctrlport) if args.ctrlport else 'default'
controller = stem.connection.connect(control_port = ('127.0.0.1', control_port))
if not controller:
return
desc = controller.get_network_status(default = None)
pid = controller.get_pid()
version = str(controller.get_version()).split()[0],
uptime = stem.util.str_tools.short_time_label(time.time() - stem.util.system.start_time(pid))
print(HEADER_LINE.format(
version=version,
uptime=uptime,
flags = ', '.join(desc.flags if desc else ['none']),
))
policy = controller.get_exit_policy()
relays = {} # address => [orports...]
for desc in controller.get_network_statuses():
relays.setdefault(desc.address, []).append(desc.or_port)
# categorize our connections
categories = collections.OrderedDict((
(INBOUND_ORPORT, []),
(INBOUND_DIRPORT, []),
(INBOUND_CONTROLPORT, []),
(OUTBOUND_ORPORT, []),
(OUTBOUND_EXIT, []),
(OUTBOUND_UNKNOWN, []),
))
exit_connections = {} # port => [connections]
for conn in get_connections(resolver = args.resolver, process_pid = pid):
if conn.protocol == 'udp':
continue
if conn.local_port in controller.get_ports(Listener.OR, []):
categories[INBOUND_ORPORT].append(conn)
elif conn.local_port in controller.get_ports(Listener.DIR, []):
categories[INBOUND_DIRPORT].append(conn)
elif conn.local_port in controller.get_ports(Listener.CONTROL, []):
categories[INBOUND_CONTROLPORT].append(conn)
elif conn.remote_port in relays.get(conn.remote_address, []):
categories[OUTBOUND_ORPORT].append(conn)
elif policy.can_exit_to(conn.remote_address, conn.remote_port):
categories[OUTBOUND_EXIT].append(conn)
exit_connections.setdefault(conn.remote_port, []).append(conn)
else:
categories[OUTBOUND_UNKNOWN].append(conn)
print(DIV)
print(COLUMN % ('Type', 'IPv4', 'IPv6'))
print(DIV)
total_ipv4, total_ipv6 = 0, 0
for label, connections in categories.items():
if len(connections) == 0:
continue
ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
ipv6_count = len(connections) - ipv4_count
total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count
print(COLUMN % (label, ipv4_count, ipv6_count))
print(DIV)
print(COLUMN % ('Total', total_ipv4, total_ipv6))
print(DIV)
print('')
if exit_connections:
print(DIV)
print(COLUMN % ('Exit Port', 'IPv4', 'IPv6'))
print(DIV)
total_ipv4, total_ipv6 = 0, 0
for port in sorted(exit_connections):
connections = exit_connections[port]
ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
ipv6_count = len(connections) - ipv4_count
total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count
usage = port_usage(port)
label = '%s (%s)' % (port, usage) if usage else port
print(COLUMN % (label, ipv4_count, ipv6_count))
print(DIV)
print(COLUMN % ('Total', total_ipv4, total_ipv6))
print(DIV)
print('')
if __name__ == '__main__':
with Controller.from_socket_file(path='/var/run/tor/control') as controller:
main(controller)

View File

@ -1,58 +0,0 @@
[metadata]
classifiers =
License :: OSI Approved
License :: OSI Approved :: BSD 1-clause
Intended Audience :: Web Developers
Operating System :: Microsoft :: Windows
Operating System :: POSIX :: BSD :: FreeBSD
Operating System :: POSIX :: Linux
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: Implementation :: CPython
Framework :: AsyncIO
[options]
zip_safe = false
python_requires = ~=3.6
packages = find:
include_package_data = false
install_requires =
qasync
cryptography
rsa
stem
[options.entry_points]
console_scripts =
phantompy = phantompy.__main__:iMain
[easy_install]
zip_ok = false
[flake8]
jobs = 1
max-line-length = 88
ignore =
E111
E114
E128
E225
E225
E261
E302
E305
E402
E501
E502
E541
E701
E704
E722
E741
F508
F541
W503

View File

@ -1,89 +0,0 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
# https://stackoverflow.com/questions/5239797/python-smtplib-proxy-support
# https://stackoverflow.com/questions/19642726/testing-python-smtp-email-service
import socket
import smtplib
import socks
class ProxySMTP(smtplib.SMTP):
def __init__(self, host='', port=0, local_hostname=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, proxy_addr=None, proxy_port=None):
"""Initialize a new instance.
If specified, `host' is the name of the remote host to which to
connect. If specified, `port' specifies the port to which to connect.
By default, smtplib.SMTP_PORT is used. If a host is specified the
connect method is called, and if it returns anything other than a
success code an SMTPConnectError is raised. If specified,
`local_hostname` is used as the FQDN of the local host in the HELO/EHLO
command. Otherwise, the local hostname is found using
socket.getfqdn(). The `source_address` parameter takes a 2-tuple (host,
port) for the socket to bind to as its source address before
connecting. If the host is '' and port is 0, the OS default behavior
will be used.
"""
self._host = host
self.timeout = timeout
self.esmtp_features = {}
self.command_encoding = 'ascii'
self.source_address = source_address
self.proxy_addr = proxy_addr
self.proxy_port = proxy_port
if host:
(code, msg) = self.connect(host, port)
if code != 220:
self.close()
raise smtplib.SMTPConnectError(code, msg)
if local_hostname is not None:
self.local_hostname = local_hostname
else:
# RFC 2821 says we should use the fqdn in the EHLO/HELO verb, and
# if that can't be calculated, that we should use a domain literal
# instead (essentially an encoded IP address like [A.B.C.D]).
fqdn = socket.getfqdn()
if '.' in fqdn:
self.local_hostname = fqdn
else:
# We can't find an fqdn hostname, so use a domain literal
addr = '127.0.0.1'
try:
addr = socket.gethostbyname(socket.gethostname())
except socket.gaierror:
pass
self.local_hostname = '[%s]' % addr
def _get_socket(self, host, port, timeout):
# This makes it simpler for SMTP_SSL to use the SMTP connect code
# and just alter the socket connection bit.
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
return socks.create_connection((host, port),
proxy_type=socks.PROXY_TYPE_SOCKS5,
timeout=timeout,
proxy_addr=self.proxy_addr,
proxy_port=self.proxy_port)
# And to use:
if __name__ == '__main__':
user_email, user_pass = 'foo', 'bar'
email_server = ProxySMTP('smtp.gmail.com', 587,
proxy_addr='127.0.0.1',
proxy_port=9050,
timeout=20)
email_server.starttls()
try:
email_server.login(user_email, user_pass)
except smtplib.SMTPAuthenticationError as e:
if len(e.args) > 1:
code = e.args[0]
if code = 535:
# 5.7.8 Username and Password not accepted
pass
raise
email_server.sendmail(user_email, recipient_list, msg.as_string())
email_server.quit()

View File

@ -1,445 +0,0 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
import getpass
import os
import re
import select
import shutil
import socket
import sys
import time
if False:
import cepa as stem
from cepa.connection import MissingPassword
from cepa.control import Controller
from cepa.util.tor_tools import is_valid_fingerprint
else:
import stem
from stem.connection import MissingPassword
from stem.control import Controller
from stem.util.tor_tools import is_valid_fingerprint
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
bHAVE_TORR = shutil.which('tor-resolve')
# we check these each time but we got them by sorting bad relays
# in the wild we'll keep a copy here so we can avoid restesting
yKNOWN_NODNS = """
---
- for-privacy.net
- backup.spekadyon.org
- verification-for-nusenu.net
- prsv.ch
- ezyn.de
- dfri.se
- dtf.contact
- galtland.network
- dotsrc.org
- nicdex.com
- unzane.com
- a9.wtf
- tor.skankhunt42.pw
- tor-exit-3.aa78i2efsewr0neeknk.xyz
- privacysvcs.net
- apt96.com
- mkg20001.io
- kryptonit.org
- sebastian-elisa-pfeifer.eu
- nx42.de
- www.defcon.org
- 0x0.is
- transliberation.today
- tor-exit-2.aa78i2efsewr0neeknk.xyz
- interfesse.net
- axims.net
- a9.wtf
- heraldonion.org
- linkspartei.org
- pineapple.cx
- privacylayer.xyz
- prsv.ch
- thingtohide.nl
- tor-exit-2.aa78i2efsewr0neeknk.xyz
- tor-exit-3.aa78i2efsewr0neeknk.xyz
- tor.dlecan.com
- tuxli.org
- verification-for-nusenu.net
"""
# - 0x0.is
# - aklad5.com
# - artikel5ev.de
# - arvanode.net
# - dodo.pm
# - erjan.net
# - galtland.network
# - lonet.sh
# - moneneis.de
# - olonet.sh
# - or-exit-2.aa78i2efsewr0neeknk.xyz
# - or.wowplanet.de
# - ormycloud.org
# - plied-privacy.net
# - rivacysvcs.net
# - redacted.org
# - rofl.cat
# - sv.ch
# - tikel10.org
# - tor.wowplanet.de
# - torix-relays.org
# - tse.com
# - w.digidow.eu
# - w.cccs.de
def oMakeController(sSock='', port=9051):
import getpass
if sSock and os.path.exists(sSock):
controller = Controller.from_socket_file(path=sSock)
else:
controller = Controller.from_port(port=port)
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
return controller
oSTEM_CONTROLER = None
def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'):
global oSTEM_CONTROLER
if oSTEM_CONTROLER: return oSTEM_CONTROLER
import stem.util.log
# stem.util.log.Runlevel = 'DEBUG' = 20 # log_level
if os.path.exists(sock_or_pair):
LOG.info(f"controller from socket {sock_or_pair}")
controller = Controller.from_socket_file(path=sock_or_pair)
else:
if type(sock_or_pair) == int:
port = sock_or_pair
elif ':' in sock_or_pair:
port = sock_or_pair.split(':')[1]
else:
port = sock_or_pair
try:
port = int(port)
except: port = 9051
LOG.info(f"controller from port {port}")
# stem.SocketError
controller = Controller.from_port(port=port)
try:
controller.authenticate()
except (Exception, MissingPassword):
sys.stdout.flush()
p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr)
controller.authenticate(p)
oSTEM_CONTROLER = controller
LOG.debug(f"{controller}")
return oSTEM_CONTROLER
def bAreWeConnected():
# FixMe: Linux only
sFile = f"/proc/{os.getpid()}/net/route"
if not os.path.isfile(sFile): return None
i = 0
for elt in open(sFile, "r").readlines():
if elt.startswith('Iface'): continue
if elt.startswith('lo'): continue
i += 1
return i > 0
def sMapaddressResolv(target, iPort=9051, log_level=10):
if not stem:
LOG.warn('please install the stem Python package')
return ''
try:
controller = oGetStemController(log_level=log_level)
map_dict = {"0.0.0.0": target}
map_ret = controller.map_address(map_dict)
return map_ret
except Exception as e:
LOG.exception(e)
return ''
def vwait_for_controller(controller, wait_boot=10):
if bAreWeConnected() is False:
raise SystemExit("we are not connected")
percent = i = 0
# You can call this while boostrapping
while percent < 100 and i < wait_boot:
bootstrap_status = controller.get_info("status/bootstrap-phase")
progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status)
percent = int(progress_percent.group(1))
LOG.info(f"Bootstrapping {percent}%")
time.sleep(5)
i += 5
def bin_to_hex(raw_id, length=None):
if length is None: length = len(raw_id)
res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length))
return res.upper()
def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10):
"""now working !!! stem 1.8.x timeout must be huge >120
'Provides the descriptor for a hidden service. The **address** is the
'.onion' address of the hidden service '
What about Services?
"""
try:
from cryptography.utils import int_from_bytes
except ImportError:
import cryptography.utils
# guessing - not in the current cryptography but stem expects it
def int_from_bytes(**args): return int.to_bytes(*args)
cryptography.utils.int_from_bytes = int_from_bytes
# this will fai if the trick above didnt work
from stem.prereq import is_crypto_available
is_crypto_available(ed25519=True)
from queue import Empty
from stem import Timeout
from stem.client.datatype import LinkByFingerprint
from stem.descriptor.hidden_service import HiddenServiceDescriptorV3
if type(lOnions) not in [set, tuple, list]:
lOnions = list(lOnions)
if controller is None:
controller = oGetStemController(log_level=log_level)
l = []
for elt in lOnions:
LOG.info(f"controller.get_hidden_service_descriptor {elt}")
try:
desc = controller.get_hidden_service_descriptor(elt,
await_result=True,
timeout=itimeout)
# LOG.log(40, f"{dir(desc)} get_hidden_service_descriptor")
# timeouts 20 sec
# mistakenly a HSv2 descriptor
hs_address = HiddenServiceDescriptorV3.from_str(str(desc)) # reparse as HSv3
oInnerLayer = hs_address.decrypt(elt)
# LOG.log(40, f"{dir(oInnerLayer)}")
# IntroductionPointV3
n = oInnerLayer.introduction_points
if not n:
LOG.warn(f"NO introduction points for {elt}")
continue
LOG.info(f"{elt} {len(n)} introduction points")
lp = []
for introduction_point in n:
for linkspecifier in introduction_point.link_specifiers:
if isinstance(linkspecifier, LinkByFingerprint):
# LOG.log(40, f"Getting fingerprint for {linkspecifier}")
if hasattr(linkspecifier, 'fingerprint'):
assert len(linkspecifier.value) == 20
lp += [bin_to_hex(linkspecifier.value)]
LOG.info(f"{len(lp)} introduction points for {elt}")
l += lp
except (Empty, Timeout,) as e: # noqa
LOG.warn(f"Timed out getting introduction points for {elt}")
continue
except Exception as e:
LOG.exception(e)
return l
def zResolveDomain(domain):
try:
ip = sTorResolve(domain)
except Exception as e: # noqa
ip = ''
if ip == '':
try:
lpair = getaddrinfo(domain, 443)
except Exception as e:
LOG.warn(f"{e}")
lpair = None
if lpair is None:
LOG.warn(f"TorResolv and getaddrinfo failed for {domain}")
return ''
ip = lpair[0]
return ip
def sTorResolve(target,
verbose=False,
sHost='127.0.0.1',
iPort=9050,
SOCK_TIMEOUT_SECONDS=10.0,
SOCK_TIMEOUT_TRIES=3,
):
MAX_INFO_RESPONSE_PACKET_LENGTH = 8
if '@' in target:
LOG.warn(f"sTorResolve failed invalid hostname {target}")
return ''
target = target.strip('/')
seb = b"\x04\xf0\x00\x00\x00\x00\x00\x01\x00"
seb += bytes(target, 'US-ASCII') + b"\x00"
assert len(seb) == 10 + len(target), str(len(seb)) + repr(seb)
# LOG.debug(f"0 Sending {len(seb)} to The TOR proxy {seb}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((sHost, iPort))
sock.settimeout(SOCK_TIMEOUT_SECONDS)
oRet = sock.sendall(seb) # noqa
i = 0
data = ''
while i < SOCK_TIMEOUT_TRIES:
i += 1
time.sleep(3)
lReady = select.select([sock.fileno()], [], [],
SOCK_TIMEOUT_SECONDS)
if not lReady[0]: continue
try:
flags=socket.MSG_WAITALL
data = sock.recv(MAX_INFO_RESPONSE_PACKET_LENGTH, flags)
except socket.timeout:
LOG.warn(f"4 The TOR proxy {(sHost, iPort)}" \
+" didnt reply in " + str(SOCK_TIMEOUT_SECONDS) + " sec."
+" #" +str(i))
except Exception as e:
LOG.error("4 The TOR proxy " \
+repr((sHost, iPort)) \
+" errored with " + str(e)
+" #" +str(i))
sock.close()
return ''
else:
if len(data) > 0: break
if len(data) == 0:
if i > SOCK_TIMEOUT_TRIES:
sLabel = "5 No reply #"
else:
sLabel = "5 No data #"
LOG.warn(f"sTorResolve: {sLabel} {i} on {sHost}:{iPort}")
sock.close()
return ''
assert len(data) >= 8
packet_sf = data[1]
if packet_sf == 90:
# , "%d" % packet_sf
assert f"{packet_sf}" == "90", f"packet_sf = {packet_sf}"
return f"{data[4]}.{data[5]}.{data[6]}.{data[7]}"
else:
# 91
LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}")
os.system(f"tor-resolve -4 {target} > /tmp/e 2>/dev/null")
# os.system("strace tor-resolve -4 "+target+" 2>&1|grep '^sen\|^rec'")
return ''
def getaddrinfo(sHost, sPort):
# do this the explicit way = Ive seen the compact connect fail
# >>> sHost, sPort = 'l27.0.0.1', 33446
# >>> sock.connect((sHost, sPort))
# socket.gaierror: [Errno -2] Name or service not known
try:
lElts = socket.getaddrinfo(sHost, int(sPort), socket.AF_INET)
lElts = list(filter(lambda elt: elt[1] == socket.SOCK_DGRAM, lElts))
assert len(lElts) == 1, repr(lElts)
lPair = lElts[0][-1]
assert len(lPair) == 2, repr(lPair)
assert type(lPair[1]) == int, repr(lPair)
except (socket.gaierror, OSError, BaseException) as e:
LOG.error(e)
return None
return lPair
def icheck_torrc(sFile, oArgs):
l = open(sFile, 'rt').readlines()
a = {}
for elt in l:
elt = elt.strip()
if not elt or ' ' not in elt: continue
(k, v,) = elt.split(' ', 1)
a[k] = v
keys = a
if 'HashedControlPassword' not in keys:
LOG.info('Add HashedControlPassword for security')
print('run: tor --hashcontrolpassword <TopSecretWord>')
if 'ExcludeExitNodes' in keys:
elt = 'BadNodes.ExcludeExitNodes.BadExit'
LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}")
print(f"move to the {elt} section as a list")
if 'GuardNodes' in keys:
elt = 'GoodNodes.GuardNodes'
LOG.warn(f"Remove GuardNodes and move then to {oArgs.good_nodes}")
print(f"move to the {elt} section as a list")
if 'ExcludeNodes' in keys:
elt = 'BadNodes.ExcludeNodes.BadExit'
LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}")
print(f"move to the {elt} section as a list")
if 'ControlSocket' not in keys and os.path.exists('/run/tor/control'):
LOG.info('Add ControlSocket /run/tor/control for us')
print('ControlSocket /run/tor/control GroupWritable RelaxDirModeCheck')
if 'UseMicrodescriptors' not in keys or keys['UseMicrodescriptors'] != '1':
LOG.info('Add UseMicrodescriptors 0 for us')
print('UseMicrodescriptors 0')
if 'AutomapHostsSuffixes' not in keys:
LOG.info('Add AutomapHostsSuffixes for onions')
print('AutomapHostsSuffixes .exit,.onion')
if 'AutoMapHostsOnResolve' not in keys:
LOG.info('Add AutoMapHostsOnResolve for onions')
print('AutoMapHostsOnResolve 1')
if 'VirtualAddrNetworkIPv4' not in keys:
LOG.info('Add VirtualAddrNetworkIPv4 for onions')
print('VirtualAddrNetworkIPv4 172.16.0.0/12')
return 0
def lExitExcluder(oArgs, iPort=9051, log_level=10):
"""
https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py
"""
if not stem:
LOG.warn('please install the stem Python package')
return ''
LOG.debug('lExcludeExitNodes')
try:
controller = oGetStemController(log_level=log_level)
# generator
relays = controller.get_server_descriptors()
except Exception as e:
LOG.error(f'Failed to get relay descriptors {e}')
return None
if controller.is_set('ExcludeExitNodes'):
LOG.info('ExcludeExitNodes is in use already.')
return None
exit_excludelist=[]
LOG.debug("Excluded exit relays:")
for relay in relays:
if relay.exit_policy.is_exiting_allowed() and not relay.contact:
if is_valid_fingerprint(relay.fingerprint):
exit_excludelist.append(relay.fingerprint)
LOG.debug("https://metrics.torproject.org/rs.html#details/%s" % relay.fingerprint)
else:
LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint)
try:
controller.set_conf('ExcludeExitNodes', exit_excludelist)
LOG.info('Excluded a total of %s exit relays without ContactInfo from the exit position.' % len(exit_excludelist))
except Exception as e:
LOG.exception('ExcludeExitNodes ' +str(e))
return exit_excludelist
if __name__ == '__main__':
target = 'duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad'
controller = oGetStemController(log_level=10)
lIntroductionPoints(controller, [target], itimeout=120)

View File

@ -1,48 +0,0 @@
#!/usr/local/bin/python3.sh
# -*-mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*
import argparse
import os
import sys
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
def omain_argparser(_=None):
try:
from OpenSSL import SSL
lCAfs = SSL._CERTIFICATE_FILE_LOCATIONS
except:
lCAfs = []
CAfs = []
for elt in lCAfs:
if os.path.exists(elt):
CAfs.append(elt)
if not CAfs:
CAfs = ['']
parser = argparse.ArgumentParser(add_help=True,
epilog=__doc__)
parser.add_argument('--https_cafile', type=str,
help="Certificate Authority file (in PEM) (unused)",
default=CAfs[0])
parser.add_argument('--log_level', type=int, default=20,
help="10=debug 20=info 30=warn 40=error")
parser.add_argument('--js_input', type=str, default='',
help="Operate on the HTML file with javascript")
parser.add_argument('--html_output', type=str, default='',
help="Write loaded and javascripted result to a HTML file")
parser.add_argument('--pdf_output', type=str, default='',
help="Write loaded and javascripted result to a PDF file")
parser.add_argument('--show_gui', type=bool, default=False, store_action=True,
help="show a progress meter that doesn't work")
parser.add_argument('html_url', type=str, nargs='?',
required=True,
help='html file or url')
return parser

View File

@ -1,37 +0,0 @@
#!/usr/bin/python3 -u
## Copyright (C) 2012 - 2020 ENCRYPTED SUPPORT LP <adrelanos@riseup.net>
## See the file COPYING for copying conditions.
import sys
from stem.connection import connect
import re
controller = connect()
if not controller:
sys.exit(255)
bootstrap_status = controller.get_info("status/bootstrap-phase")
## Possible answer, if network cable has been removed:
## 250-status/bootstrap-phase=WARN BOOTSTRAP PROGRESS=80 TAG=conn_or SUMMARY="Connecting to the Tor network" WARNING="No route to host" REASON=NOROUTE COUNT=26 RECOMMENDATION=warn
## Possible answer:
## 250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=85 TAG=handshake_or SUMMARY="Finishing handshake with first hop"
## Possible answer, when done:
## 250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=100 TAG=done SUMMARY="Done"
## TODO: parse the messages above.
## 0
print(format(bootstrap_status))
progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status)
exit_code = int(progress_percent.group(1))
controller.close()
sys.exit(exit_code)

View File

@ -1,613 +0,0 @@
#!/usr/bin/env python3
"""
Tor Contact Info Parser - A tool/Python Class for parsing Tor ContactInfo Information Sharing v2 specification contacts
Written by Eran Sandler (https://twitter.com/erans) (C) 2018
Turned into a proper command-line tool with sub-commands and flags by @Someguy123 at Privex Inc. (C) 2021
(https://www.privex.io) (https://github.com/PrivexInc)
This is a parser for the Tor ContactInfo Information Sharing Specification v2 (https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/).
The parser can parse the ContactInfo field of Tor relays based on the specification.
Official Repo: https://github.com/erans/torcontactinfoparser
Privex Fork: https://github.com/Privex/torcontactinfoparser
Released under the MIT License.
"""
import argparse
import os
import re
import sys
import json
import requests
import textwrap
try:
from rich import print as rprint
HAS_RICH = True
except ImportError:
def rprint(value='', *args, **kwargs):
if value not in [None, False, True] and isinstance(value, (dict, list, set, tuple)):
value = json.dumps(value, indent=4)
return print(value, *args, **kwargs)
# rprint = print
HAS_RICH = False
global LOG
import logging
import warnings
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
class TorContactInfoParser(object):
email_regex = "^[a-zA-Z0-9.!#$%&*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\\.[a-zA-Z0-9-]+)*$"
def _parse_string_value(self, value, min_length, max_length, valid_chars, raise_exception=False, field_name=None, deobfuscate_email=False):
value_length = len(value)
if value_length < min_length:
if raise_exception:
raise ValueError("value of field '{0}' is too short".format(field_name))
return None
if value_length > max_length:
if raise_exception:
raise ValueError("value of field '{0}' is too long".format(field_name))
return None
if valid_chars != "*":
m = re.search(valid_chars, value)
if not m:
if raise_exception:
raise ValueError("value of field '{0}' doesn't match valid chars restrictions".format(field_name))
else:
return None
return value
def _parse_email_value(self, value, field_name, raise_exception, deobfuscate_email):
if value:
v = value.replace("[]", "@")
if re.search(self.email_regex, v):
if not deobfuscate_email:
return v.replace("@", "[]")
return v
return None
_supported_fields_parsers = {
"email" : {
"fn": _parse_email_value,
"args": {}
},
"url" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 4,
"max_length" : 399,
"valid_chars" : "[_%/:a-zA-Z0-9.-]+"
}
},
"proof" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 7,
"max_length" : 7,
"valid_chars" : "[adinrsu-]+"
}
},
"ciissversion" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[12]+"
}
},
"pgp" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"abuse" : {
"fn": _parse_email_value,
"args": {}
},
"keybase" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"twitter" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z0-9_]+"
}
},
"mastodon" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"matrix" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"xmpp" : {
"fn": _parse_email_value,
"args": {}
},
"otr3" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 40,
"max_length" : 40,
"valid_chars" : "[a-z0-9]+"
}
},
"hoster" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "[a-zA-Z0-9.-]+"
}
},
"cost" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 13,
"valid_chars" : "[A-Z0-9.]+"
}
},
"uplinkbw" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 7,
"valid_chars" : "[0-9]+"
}
},
"trafficacct" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 9,
"valid_chars" : "[unmetrd0-9]+"
}
},
"memory" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 10,
"valid_chars" : "[0-9]+"
}
},
"cpu" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 50,
"valid_chars" : "[a-zA-Z0-9_-]+"
}
},
"virtualization" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 15,
"valid_chars" : "[a-z-]+"
}
},
"donationurl" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 254,
"valid_chars" : "*"
}
},
"btc" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 26,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"zec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 95,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"xmr" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 99,
"valid_chars" : "[a-zA-Z0-9]+"
}
},
"offlinemasterkey" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"signingkeylifetime" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 6,
"valid_chars" : "[0-9]+"
}
},
"sandbox" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 2,
"valid_chars" : "[yn]"
}
},
"os" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 20,
"valid_chars" : "[A-Za-z0-9/.]+"
}
},
"tls" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 0,
"max_length" : 14,
"valid_chars" : "[a-z]+"
}
},
"aesni" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"autoupdate" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"confmgmt" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 15,
"valid_chars" : "[a-zA-Z-]"
}
},
"dnslocation" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 5,
"max_length" : 100,
"valid_chars" : "[a-z,]"
}
},
"dnsqname" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnssec" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
},
"dnslocalrootzone" : {
"fn" : _parse_string_value,
"args" : {
"min_length" : 1,
"max_length" : 1,
"valid_chars" : "[yn]"
}
}
}
def __init__(self):
pass
def parse(self, value: str, raise_exception_on_invalid_value=False, deobfuscate_email=True) -> dict:
# the ciissversion field is mandatory
if not 'ciissversion:' in value:
return None
result = {}
parts = value.split(" ")
for p in parts:
field_parts = p.split(":", 1)
if len(field_parts) <= 1:
continue
name, data = field_parts
if name in self._supported_fields_parsers:
field_parser = self._supported_fields_parsers[name]
if field_parser is None:
result[name] = data
continue
if callable(field_parser):
value = field_parser(self, data)
else:
field_parser["args"]["field_name"] = name
field_parser["args"]["value"] = data
field_parser["args"]["raise_exception"] = raise_exception_on_invalid_value
field_parser["args"]["deobfuscate_email"] = deobfuscate_email
value = field_parser["fn"](self, **field_parser["args"])
if not result.get(name, None):
result[name] = value
return result
def cmd_parse(opts: argparse.Namespace):
"""
ArgParser function for parsing a single ContactInfo string, and outputting it as JSON (or python-style dict's)
"""
if opts.contact is None or len(opts.contact) == 0 or opts.contact[0] == '-':
contact = sys.stdin.read()
else:
contact = ' '.join(opts.contact).strip()
tparser = TorContactInfoParser()
res = tparser.parse(contact)
if not opts.pretty:
return print(json.dumps(res))
if opts.json:
res = json.dumps(res, indent=4) if opts.pretty else json.dumps(res)
# if not HAS_RICH: res = json.dumps(res, indent=4)
rprint(res)
def vsetup_logging(log_level, logfile='', stream=sys.stderr):
global LOG
add = True
try:
if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ:
os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red'
# https://pypi.org/project/coloredlogs/
import coloredlogs
except ImportError:
coloredlogs = False
logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S')
logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S'
logging._defaultFormatter.default_msec_format = ''
kwargs = dict(level=log_level,
force=True,
format='%(levelname)s %(message)s')
if logfile:
add = logfile.startswith('+')
sub = logfile.startswith('-')
if add or sub:
logfile = logfile[1:]
kwargs['filename'] = logfile
if coloredlogs:
# https://pypi.org/project/coloredlogs/
aKw = dict(level=log_level,
logger=LOG,
stream=stream,
fmt='%(levelname)s %(message)s'
)
coloredlogs.install(**aKw)
if logfile:
oHandler = logging.FileHandler(logfile)
LOG.addHandler(oHandler)
LOG.info(f"CSetting log_level to {log_level} {stream}")
else:
logging.basicConfig(**kwargs)
if add and logfile:
oHandler = logging.StreamHandler(stream)
LOG.addHandler(oHandler)
LOG.info(f"SSetting log_level to {log_level!s}")
def cmd_scan(opts: argparse.Namespace, adata=None):
"""
ArgParser function for scanning all ContactInfo strings from ``https://onionoo.torproject.org/details`` ,
and outputting each one as a Python-style Dict, or JSON.
"""
parser = TorContactInfoParser()
surl = "https://onionoo.torproject.org/details"
if not adata:
LOG.info(f"Getting relays from {surl}")
jdata = requests.get(surl)
try:
adata = jdata.json()
except Exception as e:
# simplejson.errors.JSONDecodeError
LOG.exception(f"JSON error {e}")
return
elts = adata["relays"]
else:
elts = json.loads(adata)['relays']
if not elts:
LOG.warn(f"NO relays - are we connected?")
return
LOG.info(f"{len(elts)} relays")
for relay in elts:
if 'fingerprint' not in relay.keys():
LOG.warn(f"fingerprint not in relay for {relay}")
continue
fp = relay['fingerprint']
verified_host_names = relay.get('verified_host_names', [])
contact = relay.get("contact", None)
if not contact:
LOG.warn(f"No contact for {fp} {verified_host_names}")
continue
if 'ciissversion' not in contact:
LOG.debug(f"No ciissversion in contact in {fp}")
continue
LOG.debug(f"parsing {fp}")
result = parser.parse(contact, False)
if not result:
LOG.warn(f"No result for {contact} in {fp}")
continue
if len(result) > 0:
if opts.json: result = json.dumps(result, indent=4) if opts.pretty else json.dumps(result)
if opts.pretty:
rprint(result)
else:
print(result)
ETC_DIR = '/etc/tor/yaml'
def oparser():
cparser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(f"""
Examples:
# 'scan' is the original behaviour of this script. It iterates over the data
# from https://onionoo.torproject.org/details , parses each contact, and prints it as Python dict-style JSON.
{sys.argv[0]} scan
# Same as previous. With no arguments, it's equivalent to running 'scan'.
{sys.argv[0]}
# If you pass '-p' after scan, it will enable pretty printing. For best pretty printing,
# make sure you have 'rich' installed from pypi.
{sys.argv[0]} scan -p
# If you need real JSON with double quotes, rather than Python dict-style JSON, you can
# use the '-j' flag to enable "real JSON" mode (you can combine with '-p' if you want pretty printed real json)
{sys.argv[0]} scan -j
# Using 'parse', you can parse an arbitrary ContactInfo string, and it will output the parsed result
# with pretty printing by default.
{sys.argv[0]} parse "contact Privex Inc. email:noc[]privex.io url:https://www.privex.io " \\
"proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc hoster:www.privex.io " \\
"uplinkbw:500 memory:4096 virtualization:kvm btc:bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2 " \\
"xmr:89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps"
{{
'email': 'noc@privex.io',
'url': 'https://www.privex.io',
'proof': 'uri-rsa',
'pgp': None,
'keybase': 'privexinc',
'twitter': 'PrivexInc',
'hoster': 'www.privex.io',
'uplinkbw': '500',
'memory': '4096',
'virtualization': 'kvm',
'btc': 'bc1qpst9uscvd8rpjjhzz9rau3trylh6e0wh76qrlhw3q9nj89ua728sn3t6a2',
'xmr': '89tukP3wfpH4FZAmC1D2GfArWwfPTz8Ap46NZc54Vyhy9YxEUYoFQ7HGQ74LrCMQTD3zxvwM1ewmGjH9WVmeffwR72m1Pps'
}}
# You can also pipe a contact string into 'parse', and it will work just the same.
echo "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc" | {sys.argv[0]} parse
{{'email': 'noc@privex.io', 'url': 'https://www.privex.io', 'proof': 'uri-rsa', 'pgp': None, 'keybase': 'privexinc', 'twitter': 'PrivexInc\n'}}
# If you need real JSON outputted, rather than Python dict-style output, you can pass -j to either 'parse' or 'scan'
{sys.argv[0]} parse -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{
"email": "noc@privex.io",
"url": "https://www.privex.io",
"proof": "uri-rsa",
"pgp": null,
"keybase": "privexinc",
"twitter": "PrivexInc"
}}
# You can use '-np' to disable pretty printing for 'parse' - you can combine it with '-j' to get flat, plain JSON.
{sys.argv[0]} parse -np -j "Privex Inc. email:noc[]privex.io url:https://www.privex.io proof:uri-rsa pgp:288DD1632F6E8951 keybase:privexinc twitter:PrivexInc"
{{"email": "noc@privex.io", "url": "https://www.privex.io", "proof": "uri-rsa", "pgp": null, "keybase": "privexinc", "twitter": "PrivexInc"}}
"""))
cparser.set_defaults(func=cmd_scan, json=False, pretty=False)
subparse = cparser.add_subparsers()
subparse.required = False
sp_parse = subparse.add_parser('parse',
help="Parse a single contact string, either as an argument, or piped into stdin")
sp_parse.add_argument('contact', nargs='*')
sp_parse.add_argument('-np', '--no-pretty',
action='store_false', default=False, dest='pretty',
help="Disable pretty printing JSON")
sp_parse.add_argument('--relays_output', type=str,
dest='relays_output',
default=os.path.join(ETC_DIR, 'relays.json'),
help="Write the download relays in json to a file")
sp_parse.add_argument('-j', '--json', action='store_true',
default=False, dest='json',
help="Output real JSON, not Python dict format.")
sp_parse.set_defaults(func=cmd_parse)
sp_scan = subparse.add_parser('scan', help="Parse all contacts from https://onionoo.torproject.org/details")
sp_scan.add_argument('-p', action='store_true', default=False, dest='pretty', help="Enable pretty printing JSON")
sp_scan.add_argument('-j', '--json', action='store_true', default=False, dest='json', help="Output real JSON, not Python dict format.")
# sp_scan.set_defaults(func=cmd_scan)
return cparser
if __name__ == "__main__":
if os.environ.get('DEBUG', ''):
log_level = 10
else:
log_level = 20
vsetup_logging(log_level)
try:
cparser = oparser()
opts = cparser.parse_args(sys.argv[1:])
data = None
if opts.relays_output and os.path.exists(opts.relays_output):
data = open(opts.relays_output, 'rt').read()
cmd_scan(opts, data)
except (requests.exceptions.ProxyError, Exception,) as e:
LOG.error(f"{e}")
i = 0
# else:
# args = cparser.parse_args(sys.argv[1:])
# i = args.func(args)
sys.exit(i)

View File

@ -1,627 +0,0 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -
# from https://github.com/nusenu/trustor-poc
# with minor refactoring to make the code more Pythonic.
import datetime
import os
import re
import sys
import ipaddress
import warnings
import urllib3.util
from urllib3.util import parse_url as urlparse
from stem.control import Controller
# from stem.util.tor_tools import *
try:
# unbound is not on pypi
from unbound import RR_CLASS_IN, RR_TYPE_TXT, ub_ctx
except:
ub_ctx = RR_TYPE_TXT = RR_CLASS_IN = None
global LOG
import logging
warnings.filterwarnings('ignore')
LOG = logging.getLogger()
logging.getLogger("urllib3").setLevel(logging.INFO)
# import urllib3.contrib.pyopenssl
# urllib3.contrib.pyopenssl.inject_into_urllib3()
# download this python library from
# https://github.com/erans/torcontactinfoparser
# sys.path.append('/home/....')
try:
from torcontactinfo import TorContactInfoParser
except:
TorContactInfoParser = None
class TrustorError(Exception): pass
# https://stackoverflow.com/questions/2532053/validate-a-hostname-string
# FIXME this check allows non-fqdn names
def is_valid_hostname(hostname):
if len(hostname) > 255:
return False
if hostname[-1] == ".":
hostname = hostname[:-1] # strip exactly one dot from the right, if present
allowed = re.compile("(?!-)[A-Z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
def read_local_trust_config(trust_config):
'''
reads a local configuration file containing trusted domains
and returns them in an array
'''
result = []
# for now we support max_depth = 0 only
# this PoC version has no support for recursion
# https://github.com/nusenu/tor-relay-operator-ids-trust-information#trust-information-consumers
supported_max_depths = ['0']
if (os.path.isfile(trust_config)):
f = open(trust_config)
for line in f:
line = line.strip()
if line[0] == '#':
continue
try:
domain, max_depth = line.split(':')
except:
LOG.error('invalid trust config line detected: %s aborting!' % line)
sys.exit(8)
if max_depth in supported_max_depths:
if is_valid_hostname(domain) and domain not in result:
result.append(domain)
else:
LOG.error('invalid duplicate domain in trust config file: %s: %s aborting!' % (trust_config, domain))
sys.exit(9)
else:
LOG.error('unsupported max_depth value (%s) used, aborting!' % line)
sys.exit(10)
return result
else:
LOG.error("trust config file %s missing, aborting!" % trust_config)
sys.exit(11)
def read_local_validation_cache(validation_cache_file, trusted_domains=[]):
'''
reads the local validation cache and returns all fingerprints in the cache
for trusted domains
format of each entry in the cache:
domain:fingerprint:prooftype:date
'''
result = []
if trusted_domains == []:
return result
if os.path.isfile(validation_cache_file):
with open(validation_cache_file, 'rt') as f:
for line in f:
line = line.strip()
if line[0] == '#':
continue
try:
domain, fingerprint, prooftype, dt = line.split(':')
except:
LOG.error('invalid trust cache entry detected: %s aborting!' % line)
sys.exit(12)
if domain in trusted_domains:
result.append(fingerprint)
else:
LOG.warn('ignoring cached entry for untrusted domain %s' % domain)
else:
LOG.info("Validation cache file not present. It will be created.")
return result
def get_controller(address='127.0.0.1', port=9151, password=''):
'''
connects to a local tor client via the tor ControlPort
and returns a controller that allows us to easily set specific tor
configuration options or read tor relay ContactInfo strings for validation
'''
try:
# controller = Controller.from_socket_file(path=torsocketpath)
controller = Controller.from_port(address=address, port=port)
controller.authenticate(password=password)
except Exception as e:
LOG.error(f"Failed to connect to the tor process, {e}")
sys.exit(1)
if not controller.is_set('UseMicrodescriptors'):
LOG.error('"UseMicrodescriptors 0" is required in your torrc configuration. Exiting.')
sys.exit(2)
return controller
def find_validation_candidates(controller,
trusted_domains=[],
validation_cache=[],
CAfile='/etc/ssl/certs/ca-certificates.crt',
accept_all=False):
'''
connect to a tor client via controlport and return a dict of all
not yet validated fingerprints per trusted operators
format:
{ trusted_domain: { prooftype: [fingerprint, fingerprint, ...]} }
example content:
{ 'emeraldonion.org' : { 'uri-rsa': ['044600FD968728A6F220D5347AD897F421B757C0', '09DCA3360179C6C8A5A20DDDE1C54662965EF1BA']}}
'''
# https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#proof
accepted_proof_types = ['uri-rsa', 'dns-rsa']
# https://github.com/nusenu/ContactInfo-Information-Sharing-Specification#ciissversion
accepted_ciissversions = ['2']
result = {}
try:
relays = controller.get_server_descriptors()
except:
LOG.error('Failed to get relay descriptors via tor\'s ControlPort. Exiting.')
sys.exit(3)
ci = TorContactInfoParser()
for relay in relays:
if relay.contact:
fingerprint = relay.fingerprint
# skip fingerprints we have already successfully validated in the past
# a future version would check the cache age as well
if fingerprint in validation_cache:
continue
contactstring = relay.contact.decode('utf-8')
parsed_ci = ci.parse(contactstring)
if len(parsed_ci) > 0:
if 'ciissversion' in parsed_ci and 'proof' in parsed_ci and 'url' in parsed_ci:
prooftype = parsed_ci['proof']
ciurl = parsed_ci['url']
if parsed_ci['ciissversion'] in accepted_ciissversions and prooftype in accepted_proof_types:
if ciurl.startswith('http://') or ciurl.startswith('https://'):
try:
domain = urlparse(ciurl).netloc
except:
LOG.warning('failed to parse domain %s' % ciurl)
domain = 'error'
continue
else:
domain = ciurl
if not is_valid_hostname(domain):
domain = 'error'
continue
# we can ignore relays that do not claim to be operated by a trusted operator
# if we do not accept all
if domain not in trusted_domains and not accept_all:
continue
if domain in result.keys():
if prooftype in result[domain].keys():
result[domain][prooftype].append(fingerprint)
else:
result[domain] = {prooftype: [fingerprint]}
# mixed proof types are not allowd as per spec but we are not strict here
LOG.warning('%s is using mixed prooftypes %s' % (domain, prooftype))
else:
result[domain] = {prooftype: [fingerprint]}
return result
def oDownloadUrlRequests(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050, content_type='text/plain', session=None):
import requests
# socks proxy used for outbound web requests (for validation of proofs)
proxy = {'https': "socks5h://{host}:{port}"}
# we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files
# https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
LOG.debug("fetching %s...." % uri)
try:
# grr. fix urllib3
# urllib3.connection WARNING Certificate did not match expected hostname:
head = requests.head(uri, timeout=timeout, proxies=proxy, headers=headers)
except Exception as e:
LOG.exception(f"{e}")
raise TrustorError(f"HTTP HEAD request failed for {uri} {e}")
if head.status_code >= 300:
raise TrustorError(f"HTTP Errorcode {head.status_code}")
if not head.headers['Content-Type'].startswith('text/plain'):
raise TrustorError(f"HTTP Content-Type != text/plain")
if not os.path.exists(sCAfile):
raise TrustorError(f"File not found CAfile {sCAfile}")
if session is None: session = requests.sessions.Session()
try:
oReqResp = session.request(method="get", url=uri,
proxies=proxy,
timeout=timeout,
headers=headers,
allow_redirects=False,
verify=True
)
except:
LOG.warn("HTTP GET request failed for %s" % uri)
raise
if oReqResp.status_code != 200:
raise TrustorError(f"HTTP Errorcode {head.status_code}")
if not oReqResp.headers['Content-Type'].startswith('text/plain'):
raise TrustorError(f"HTTP Content-Type != text/plain")
# check for redirects (not allowed as per spec)
if oReqResp.url != uri:
LOG.error(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
raise TrustorError(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
return oReqResp
# There's no point in using asyncio because of duplicate urls in the tasks
async def oDownloadUrlHttpx(uri, sCAfile, timeout=30, host='127.0.0.1', port=9050, content_type='text/plain'):
import httpcore
import asyncio
import httpx
# socks proxy used for outbound web requests (for validation of proofs)
if host and port:
proxy = "socks5://{host}:{port}"
else:
proxy = ''
# we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files
# https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
LOG.debug("fetching %s...." % uri)
async with httpx.AsyncClient(proxies=proxy) as client:
try:
# https://www.python-httpx.org/advanced/
head = await client.head(uri, timeout=timeout, headers=headers)
except Exception as e:
LOG.exception(f"{e}")
raise TrustorError(f"HTTP HEAD request failed for {uri} {e}")
if head.status_code >= 300:
raise TrustorError(f"HTTP Errorcode {head.status_code}")
if content_type and not head.headers['Content-Type'].startswith(content_type):
raise TrustorError(f"HTTP Content-Type != {content_type}" )
if not os.path.exists(sCAfile):
raise TrustorError(f"File not found CAfile {sCAfile}")
try:
oReqResp = await client.get(url=uri,
timeout=timeout,
headers=headers,
max_redirects=0,
verify=sCAfile,
)
except (asyncio.exceptions.CancelledError,
httpcore.PoolTimeout,
Exception,) as e:
LOG.warn(f"HTTP GET request failed for %s {e}" % uri)
raise
if oReqResp.status_code != 200:
LOG.warn(f"HTTP Errorcode {head.status_code}")
raise TrustorError(f"HTTP Errorcode {head.status_code}")
if not oReqResp.headers['Content-Type'].startswith('text/plain'):
LOG.warn(f"HTTP Content-Type != text/plain")
raise TrustorError(f"HTTP Content-Type != text/plain")
# check for redirects (not allowed as per spec)
if oReqResp.url != uri:
LOG.error(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
raise TrustorError(f'Redirect detected {uri} vs %s (final)' % (oReqResp.url))
return oReqResp
def ballow_subdomain_matching(hostname, dnsnames):
for elt in dnsnames:
if len(hostname.split('.')) > len(elt.split('.')) and hostname.endswith(elt):
# parent
return True
return False
from urllib3.util.ssl_match_hostname import (CertificateError, _dnsname_match,
_ipaddress_match)
def my_match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125
rules are followed, but IP addresses are not accepted for *hostname*.
CertificateError is raised on failure. On success, the function
returns nothing.
"""
if not cert:
raise ValueError(
"empty or no certificate, match_hostname needs a "
"SSL socket or SSL context with either "
"CERT_OPTIONAL or CERT_REQUIRED"
)
try:
# Divergence from upstream: ipaddress can't handle byte str
host_ip = ipaddress.ip_address(hostname)
except (UnicodeError, ValueError):
# ValueError: Not an IP address (common case)
# UnicodeError: Divergence from upstream: Have to deal with ipaddress not taking
# byte strings. addresses should be all ascii, so we consider it not
# an ipaddress in this case
host_ip = None
except AttributeError:
# Divergence from upstream: Make ipaddress library optional
if ipaddress is None:
host_ip = None
else: # Defensive
raise
dnsnames = []
san = cert.get("subjectAltName", ())
for key, value in san:
if key == "DNS":
if host_ip is None and _dnsname_match(value, hostname):
return
dnsnames.append(value)
elif key == "IP Address":
if host_ip is not None and _ipaddress_match(value, host_ip):
return
dnsnames.append(value)
if not dnsnames:
# The subject is only checked when there is no dNSName entry
# in subjectAltName
for sub in cert.get("subject", ()):
for key, value in sub:
# XXX according to RFC 2818, the most specific Common Name
# must be used.
if key == "commonName":
if _dnsname_match(value, hostname):
return
dnsnames.append(value)
if len(dnsnames) > 1:
# soften this to allow subdomain matching
if ballow_subdomain_matching(hostname, dnsnames):
LOG.warn(f"Allowing {hostname} in {dnsnames}")
return
raise CertificateError(
"hostname %r "
"doesn't match any of %s" % (hostname, ", ".join(map(repr, dnsnames)))
)
elif len(dnsnames) == 1:
raise CertificateError("hostname %r doesn't match %r" % (hostname, dnsnames[0]))
else:
raise CertificateError(
"no appropriate commonName or subjectAltName fields were found"
)
urllib3.util.ssl_match_hostname.match_hostname = my_match_hostname
from urllib3.util.ssl_ import is_ipaddress
def _my_match_hostname(cert, asserted_hostname):
# Our upstream implementation of ssl.match_hostname()
# only applies this normalization to IP addresses so it doesn't
# match DNS SANs so we do the same thing!
stripped_hostname = asserted_hostname.strip("u[]")
if is_ipaddress(stripped_hostname):
asserted_hostname = stripped_hostname
try:
my_match_hostname(cert, asserted_hostname)
except CertificateError as e:
LOG.warning(
"Certificate did not match hostname: %s. Certificate: %s",
asserted_hostname,
cert,
)
# Add cert to exception and reraise so client code can inspect
# the cert when catching the exception, if they want to
e._peer_cert = cert
raise
urllib3.connection._match_hostname = _my_match_hostname
from urllib3.contrib.socks import SOCKSProxyManager
# from urllib3 import Retry
def oDownloadUrlUrllib3Socks(uri,
sCAfile,
timeout=30,
host='127.0.0.1',
port=9050,
session=None,
content_type='text/plain'):
"""Theres no need to use requests here and it
adds too many layers on the SSL to be able to get at things
"""
domain = urlparse(uri).netloc
# socks proxy used for outbound web requests (for validation of proofs)
proxy = SOCKSProxyManager(f'socks5h://{host}:{port}/',
num_pools=1,
timeout=timeout,
cert_reqs='CERT_REQUIRED',
assert_hostname=domain,
ca_certs=sCAfile)
# we use this UA string when connecting to webservers to fetch rsa-fingerprint.txt proof files
# https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#uri-rsa
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'}
LOG.debug("fetching %s...." % uri)
try:
# grr. fix urllib3
# Errors will be wrapped in :class:`~urllib3.exceptions.MaxRetryError` unless
# retries are disabled, in which case the causing exception will be raised.
head = proxy.request('HEAD', uri,
headers=headers,
redirect=False,
retries=False)
except Exception as e:
LOG.error(f"HTTP HEAD request failed for {uri} {e}")
raise
if head.status >= 300:
raise TrustorError(f"HTTP Errorcode {head.status}")
if content_type and not head.headers['Content-Type'].startswith(content_type):
raise TrustorError(f"HTTP Content-Type != {content_type}")
if not os.path.exists(sCAfile):
raise TrustorError(f"File not found CAfile {sCAfile}")
try:
oReqResp = proxy.request("GET", uri,
headers=headers,
redirect=False,
)
except Exception as e:
LOG.warn(f"HTTP GET request failed for {uri} {e}")
raise
if oReqResp.status != 200:
raise TrustorError(f"HTTP Errorcode {head.status}")
if content_type and not oReqResp.headers['Content-Type'].startswith(content_type):
raise TrustorError(f"HTTP Content-Type != {content_type}")
# check for redirects (not allowed as per spec)
if oReqResp.geturl() != uri:
LOG.error(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl()))
raise TrustorError(f'Redirect detected %s vs %s (final)' % (uri, oReqResp.geturl()))
oReqResp.decode_content = True
return oReqResp
import urllib3.connectionpool
from urllib3.connection import HTTPSConnection
urllib3.connectionpool.VerifiedHTTPSConnection = HTTPSConnection
def lDownloadUrlFps(domain, sCAfile, timeout=30, host='127.0.0.1', port=9050):
uri = f"https://{domain}/.well-known/tor-relay/rsa-fingerprint.txt"
o = oDownloadUrlRequests(uri, sCAfile, timeout=timeout, host=host, port=port)
well_known_content = o.text.upper().strip().split('\n')
well_known_content = [i for i in well_known_content if i and len(i) == 40]
return well_known_content
def validate_proofs(candidates, validation_cache_file, timeout=20, host='127.0.0.1', port=9050):
'''
This function takes the return value of find_validation_candidates()
and validated them according to their proof type (uri-rsa, dns-rsa)
and writes properly validated relay fingerprints to the local validation cache
'''
dt_utc = datetime.datetime.now(datetime.timezone.utc).date()
f = open(validation_cache_file, mode='a')
count = 0
for domain in candidates.keys():
for prooftype in candidates[domain].keys():
if prooftype == 'uri-rsa':
well_known_content = lDownloadUrlFps(domain, timeout=timeout, host=host, port=port)
for fingerprint in candidates[domain][prooftype]:
if fingerprint in well_known_content:
# write cache entry
count += 1
f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc))
else:
LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype))
elif prooftype == 'dns-rsa' and ub_ctx:
for fingerprint in candidates[domain][prooftype]:
fp_domain = fingerprint + '.' + domain
if idns_validate(fp_domain,
libunbound_resolv_file='resolv.conf',
dnssec_DS_file='dnssec-root-trust',
) == 0:
count += 1
f.write('%s:%s:%s:%s\n' % (domain, fingerprint, prooftype, dt_utc))
else:
LOG.error('%s:%s:%s' % (fingerprint, domain, prooftype))
f.close()
LOG.info('successfully validated %s new (not yet validated before) relays' % count)
def idns_validate(domain,
libunbound_resolv_file='resolv.conf',
dnssec_DS_file='dnssec-root-trust',
):
'''
performs DNS TXT lookups and verifies the reply
- is DNSSEC valid and
- contains only a single TXT record
- the DNS record contains a hardcoded string as per specification
https://nusenu.github.io/ContactInfo-Information-Sharing-Specification/#dns-rsa
'''
if not ub_ctx: return -1
# this is not the system wide /etc/resolv.conf
# use dnscrypt-proxy to encrypt your DNS and route it via tor's SOCKSPort
ctx = ub_ctx()
if (os.path.isfile(libunbound_resolv_file)):
ctx.resolvconf(libunbound_resolv_file)
else:
LOG.error('libunbound resolv config file: "%s" is missing, aborting!' % libunbound_resolv_file)
return 5
if (os.path.isfile(dnssec_DS_file)):
ctx.add_ta_file(dnssec_DS_file)
else:
LOG.error('DNSSEC trust anchor file "%s" is missing, aborting!' % dnssec_DS_file)
return 6
status, result = ctx.resolve(domain, RR_TYPE_TXT, RR_CLASS_IN)
if status == 0 and result.havedata:
if len(result.rawdata) == 1 and result.secure:
# ignore the first byte, it is the TXT length
if result.data.as_raw_data()[0][1:] == b'we-run-this-tor-relay':
return 0
return 1
def configure_tor(controller, trusted_fingerprints, exitonly=True):
'''
takes the list of trusted fingerprints and configures a tor client
to only use trusted relays in a certain position
for now we only set exits.
we refuse to set the configuration if there are less then 40 trusted relays
'''
relay_count = len(trusted_fingerprints)
if relay_count < 41:
LOG.error('Too few trusted relays (%s), aborting!' % relay_count)
sys.exit(15)
try:
controller.set_conf('ExitNodes', trusted_fingerprints)
LOG.error('limited exits to %s relays' % relay_count)
except Exception as e: # noqa
LOG.exception('Failed to set ExitNodes tor config to trusted relays')
sys.exit(20)
if __name__ == '__main__':
CAfile = '/etc/ssl/certs/ca-certificates.crt'
trust_config = 'trust_config'
assert os.path.exists(trust_config)
trusted_domains = read_local_trust_config(trust_config)
validation_cache_file = 'validation_cache'
trusted_fingerprints = read_local_validation_cache(validation_cache_file,
trusted_domains=trusted_domains)
# tor ControlPort password
controller_password = ''
# tor ControlPort IP
controller_address = '127.0.0.1'
timeout = 20
port = 9050
controller = get_controller(address=controller_address, password=controller_password)
r = find_validation_candidates(controller,
validation_cache=trusted_fingerprints,
trusted_domains=trusted_domains,
CAfile=CAfile)
validate_proofs(r, validation_cache_file,
timeout=timeout,
host=controller_address,
port=port)
# refresh list with newly validated fingerprints
trusted_fingerprints = read_local_validation_cache(validation_cache_file,
trusted_domains=trusted_domains)
configure_tor(controller, trusted_fingerprints)