# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- import argparse import contextlib import inspect import json import logging import os import re import select import shutil import socket import sys import time import traceback import unittest from ctypes import * from random import Random random = Random() try: import coloredlogs if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ: os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' # https://pypi.org/project/coloredlogs/ except ImportError as e: coloredlogs = False try: import stem except ImportError as e: stem = False try: import nmap except ImportError as e: nmap = False import wrapper from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS from wrapper_tests.support_http import bAreWeConnected from wrapper_tests.support_onions import (is_valid_fingerprint, lIntroductionPoints, oGetStemController, sMapaddressResolv, sTorResolve) try: from user_data.settings import get_user_config_path except ImportError: get_user_config_path = None # LOG=util.log global LOG LOG = logging.getLogger() def LOG_ERROR(l): print('ERRORc: '+l) def LOG_WARN(l): print('WARNc: ' +l) def LOG_INFO(l): print('INFOc: ' +l) def LOG_DEBUG(l): print('DEBUGc: '+l) def LOG_TRACE(l): pass # print('TRACE+ '+l) try: from trepan.api import debug from trepan.interfaces import server as Mserver except: # print('trepan3 TCP server NOT available.') pass else: # print('trepan3 TCP server available.') def trepan_handler(num=None, f=None): connection_opts={'IO': 'TCP', 'PORT': 6666} intf = Mserver.ServerInterface(connection_opts=connection_opts) dbg_opts = { 'interface': intf } print(f'Starting TCP server listening on port 6666.') debug(dbg_opts=dbg_opts) return # self._audio_thread.isAlive iTHREAD_TIMEOUT = 1 iTHREAD_SLEEP = 1 iTHREAD_JOINS = 8 iNODES = 6 lToxSamplerates = [8000, 12000, 16000, 24000, 48000] lToxSampleratesK = [8, 12, 16, 24, 48] lBOOLEANS = [ 'local_discovery_enabled', 'udp_enabled', 'ipv6_enabled', 'trace_enabled', 'compact_mode', 'allow_inline', 'notifications', 'sound_notifications', 'calls_sound', 'hole_punching_enabled', 'dht_announcements_enabled', 'save_history', 'download_nodes_list' 'core_logging', ] sDIR = os.environ.get('TMPDIR', '/tmp') sTOX_VERSION = "1000002018" bHAVE_NMAP = shutil.which('nmap') bHAVE_JQ = shutil.which('jq') bHAVE_BASH = shutil.which('bash') bHAVE_TORR = shutil.which('tor-resolve') lDEAD_BS = [ # [notice] Have tried resolving or connecting to address # at 3 different places. Giving up. '104.244.74.69', '172.93.52.70', 'tox.abilinski.com', 'tox.novg.net', # Failed to resolve "tox3.plastiras.org". "tox3.plastiras.org", ] def assert_main_thread(): from PyQt5 import QtCore, QtWidgets from qtpy.QtWidgets import QApplication # this "instance" method is very useful! app_thread = QtWidgets.QApplication.instance().thread() curr_thread = QtCore.QThread.currentThread() if app_thread != curr_thread: raise RuntimeError('attempt to call MainWindow.append_message from non-app thread') @contextlib.contextmanager def ignoreStdout(): devnull = os.open(os.devnull, os.O_WRONLY) old_stdout = os.dup(1) sys.stdout.flush() os.dup2(devnull, 1) os.close(devnull) try: yield finally: os.dup2(old_stdout, 1) os.close(old_stdout) @contextlib.contextmanager def ignoreStderr(): devnull = os.open(os.devnull, os.O_WRONLY) old_stderr = os.dup(2) sys.stderr.flush() os.dup2(devnull, 2) os.close(devnull) try: yield finally: os.dup2(old_stderr, 2) os.close(old_stderr) def clean_booleans(oArgs): for key in lBOOLEANS: if not hasattr(oArgs, key): continue val = getattr(oArgs, key) if type(val) == bool: continue if val in ['False', 'false', '0']: setattr(oArgs, key, False) else: setattr(oArgs, key, True) def on_log(iTox, level, filename, line, func, message, *data): # LOG.debug(repr((level, filename, line, func, message,))) tox_log_cb(level, filename, line, func, message) def tox_log_cb(level, filename, line, func, message, *args): """ * @param level The severity of the log message. * @param filename The source file from which the message originated. * @param line The source line from which the message originated. * @param func The function from which the message originated. * @param message The log message. * @param user_data The user data pointer passed to tox_new in options. """ if type(func) == bytes: func = str(func, 'utf-8') message = str(message, 'UTF-8') filename = str(filename, 'UTF-8') if filename == 'network.c': if line == 660: return # root WARNING 3network.c#944:b'send_packet'attempted to send message with network family 10 (probably IPv6) on IPv4 socket if line == 944: return i = message.find('07 = GET_NODES') if i > 0: return if filename == 'TCP_common.c': return i = message.find(' | ') if i > 0: message = message[:i] # message = filename +'#' +str(line) +':'+func +' '+message name = 'core' # old level is meaningless level = 10 # LOG.level # LOG._log(LOG.level, f"{level}: {message}", list()) i = message.find('(0: OK)') if i > 0: level = 10 # LOG.debug else: i = message.find('(1: ') if i > 0: level = 30 # LOG.warn else: level = 20 # LOG.info o = LOG.makeRecord(filename, level, func, line, message, list(), None) # LOG.handle(o) LOG_TRACE(f"{level}: {func}{line} {message}") return elif level == 1: LOG.critical(f"{level}: {message}") elif level == 2: LOG.error(f"{level}: {message}") elif level == 3: LOG.warn(f"{level}: {message}") elif level == 4: LOG.info(f"{level}: {message}") elif level == 5: LOG.debug(f"{level}: {message}") else: LOG_TRACE(f"{level}: {message}") def vAddLoggerCallback(tox_options, callback=None): if callback is None: wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback( tox_options._options_pointer, POINTER(None)()) tox_options.self_logger_cb = None return c_callback = CFUNCTYPE(None, c_void_p, c_int, c_char_p, c_int, c_char_p, c_char_p, c_void_p) tox_options.self_logger_cb = c_callback(callback) wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback( tox_options._options_pointer, tox_options.self_logger_cb) def get_video_indexes(): # Linux return [str(l[5:]) for l in os.listdir('/dev/') if l.startswith('video')] def get_audio(): with ignoreStderr(): import pyaudio oPyA = pyaudio.PyAudio() input_devices = output_devices = 0 for i in range(oPyA.get_device_count()): device = oPyA.get_device_info_by_index(i) if device["maxInputChannels"]: input_devices += 1 if device["maxOutputChannels"]: output_devices += 1 # {'index': 21, 'structVersion': 2, 'name': 'default', 'hostApi': 0, 'maxInputChannels': 64, 'maxOutputChannels': 64, 'defaultLowInputLatency': 0.008707482993197279, 'defaultLowOutputLatency': 0.008707482993197279, 'defaultHighInputLatency': 0.034829931972789115, 'defaultHighOutputLatency': 0.034829931972789115, 'defaultSampleRate': 44100.0} audio = {'input': oPyA.get_default_input_device_info()['index'] if input_devices else -1, 'output': oPyA.get_default_output_device_info()['index'] if output_devices else -1, 'enabled': input_devices and output_devices} return audio def oMainArgparser(_=None, iMode=2): # 'Mode: 0=chat 1=chat+audio 2=chat+audio+video default: 0' if not os.path.exists('/proc/sys/net/ipv6'): bIpV6 = 'False' else: bIpV6 = 'True' lIpV6Choices=[bIpV6, 'False'] parser = argparse.ArgumentParser(add_help=True) parser.add_argument('--proxy_host', '--proxy-host', type=str, default='', help='proxy host') parser.add_argument('--proxy_port', '--proxy-port', default=0, type=int, help='proxy port') parser.add_argument('--proxy_type', '--proxy-type', default=0, type=int, choices=[0,1,2], help='proxy type 1=http, 2=socks') parser.add_argument('--udp_enabled', type=str, default='True', choices=['True', 'False'], help='En/Disable udp') parser.add_argument('--ipv6_enabled', type=str, default=bIpV6, choices=lIpV6Choices, help=f"En/Disable ipv6 - default {bIpV6}") parser.add_argument('--trace_enabled',type=str, default='True' if os.environ.get('DEBUG') else 'False', choices=['True','False'], help='Debugging from toxcore logger_trace or env DEBUG=1') parser.add_argument('--download_nodes_list', type=str, default='False', choices=['True', 'False'], help='Download nodes list') parser.add_argument('--nodes_json', type=str, default='') parser.add_argument('--network', type=str, choices=['old', 'main', 'local'], default='main') parser.add_argument('--download_nodes_url', type=str, default='https://nodes.tox.chat/json') parser.add_argument('--logfile', default='', help='Filename for logging - start with + for stdout too') parser.add_argument('--loglevel', default=logging.INFO, type=int, # choices=[logging.info,logging.trace,logging.debug,logging.error] help='Threshold for logging (lower is more) default: 20') parser.add_argument('--tcp_port', '--tcp-port', default=0, type=int, help='tcp port') parser.add_argument('--mode', type=int, default=iMode, choices=[0,1,2], help='Mode: 0=chat 1=chat+audio 2=chat+audio+video default: 0') parser.add_argument('--sleep', type=str, default='time', # could expand this to tk, gtk, gevent... choices=['qt','gevent','time'], help='Sleep method - one of qt, gevent , time') return parser def vSetupLogging(oArgs): global LOG logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' logging._defaultFormatter.default_msec_format = '' add = None kwargs = dict(level=oArgs.loglevel, format='%(levelname)-8s %(message)s') if oArgs.logfile: add = oArgs.logfile.startswith('+') sub = oArgs.logfile.startswith('-') if add or sub: oArgs.logfile = oArgs.logfile[1:] kwargs['filename'] = oArgs.logfile if coloredlogs: # https://pypi.org/project/coloredlogs/ aKw = dict(level=oArgs.loglevel, logger=LOG, stream=sys.stdout, fmt='%(name)s %(levelname)s %(message)s' ) coloredlogs.install(**aKw) if oArgs.logfile: oHandler = logging.FileHandler(oArgs.logfile) LOG.addHandler(oHandler) else: logging.basicConfig(**kwargs) if add: oHandler = logging.StreamHandler(sys.stdout) LOG.addHandler(oHandler) LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") def setup_logging(oArgs): global LOG if coloredlogs: aKw = dict(level=oArgs.loglevel, logger=LOG, fmt='%(name)s %(levelname)s %(message)s') if oArgs.logfile: oFd = open(oArgs.logfile, 'wt') setattr(oArgs, 'log_oFd', oFd) aKw['stream'] = oFd coloredlogs.install(**aKw) if oArgs.logfile: oHandler = logging.StreamHandler(stream=sys.stdout) LOG.addHandler(oHandler) else: aKw = dict(level=oArgs.loglevel, format='%(name)s %(levelname)-4s %(message)s') if oArgs.logfile: aKw['filename'] = oArgs.logfile logging.basicConfig(**aKw) logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' logging._defaultFormatter.default_msec_format = '' LOG.setLevel(oArgs.loglevel) # LOG.trace = lambda l: LOG.log(0, repr(l)) LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") def signal_handler(num, f): from trepan.api import debug from trepan.interfaces import server as Mserver connection_opts={'IO': 'TCP', 'PORT': 6666} intf = Mserver.ServerInterface(connection_opts=connection_opts) dbg_opts = {'interface': intf} LOG.info('Starting TCP server listening on port 6666.') debug(dbg_opts=dbg_opts) return def merge_args_into_settings(args, settings): if args: if not hasattr(args, 'audio'): LOG.warn('No audio ' +repr(args)) settings['audio'] = getattr(args, 'audio') if not hasattr(args, 'video'): LOG.warn('No video ' +repr(args)) settings['video'] = getattr(args, 'video') for key in settings.keys(): # proxy_type proxy_port proxy_host not_key = 'not_' +key if hasattr(args, key): val = getattr(args, key) if type(val) == bytes: # proxy_host - ascii? # filenames - ascii? val = str(val, 'UTF-8') settings[key] = val elif hasattr(args, not_key): val = not getattr(args, not_key) settings[key] = val clean_settings(settings) return def clean_settings(self): # failsafe to ensure C tox is bytes and Py settings is str # overrides self['mirror_mode'] = False # REQUIRED!! if not os.path.exists('/proc/sys/net/ipv6'): LOG.warn('Disabling IPV6 because /proc/sys/net/ipv6 does not exist') self['ipv6_enabled'] = False if 'proxy_type' in self and self['proxy_type'] == 0: self['proxy_host'] = '' self['proxy_port'] = 0 if 'proxy_type' in self and self['proxy_type'] != 0 and \ 'proxy_host' in self and self['proxy_host'] != '' and \ 'proxy_port' in self and self['proxy_port'] != 0: if 'udp_enabled' in self and self['udp_enabled']: # We don't currently support UDP over proxy. LOG.info("UDP enabled and proxy set: disabling UDP") self['udp_enabled'] = False if 'local_discovery_enabled' in self and self['local_discovery_enabled']: LOG.info("local_discovery_enabled enabled and proxy set: disabling local_discovery_enabled") self['local_discovery_enabled'] = False if 'dht_announcements_enabled' in self and self['dht_announcements_enabled']: LOG.info("dht_announcements_enabled enabled and proxy set: disabling dht_announcements_enabled") self['dht_announcements_enabled'] = False if 'auto_accept_path' in self and \ type(self['auto_accept_path']) == bytes: self['auto_accept_path'] = str(self['auto_accept_path'], 'UTF-8') LOG.debug("Cleaned settings") def lSdSamplerates(iDev): try: import sounddevice as sd except ImportError: return [] samplerates = (32000, 44100, 48000, 96000, ) device = iDev supported_samplerates = [] for fs in samplerates: try: sd.check_output_settings(device=device, samplerate=fs) except Exception as e: # LOG.debug(f"Sample rate not supported {fs}" +' '+str(e)) pass else: supported_samplerates.append(fs) return supported_samplerates def _get_nodes_path(oArgs=None): if oArgs and hasattr(oArgs, 'nodes_json') and oArgs.nodes_json: LOG.debug("_get_nodes_path: " +oArgs.nodes_json) default = oArgs.nodes_json elif get_user_config_path: default = os.path.join(get_user_config_path(), 'toxygen_nodes.json') else: # Windwoes default = os.path.join(os.getenv('HOME'), '.config', 'tox', 'toxygen_nodes.json') LOG.debug("_get_nodes_path: " +default) return default DEFAULT_NODES_COUNT = 8 global aNODES aNODES = {} import functools # @functools.lru_cache(maxsize=12) def generate_nodes(oArgs=None, nodes_count=DEFAULT_NODES_COUNT, ipv='ipv4', udp_not_tcp=True): global aNODES sKey = ipv sKey += ',0' if udp_not_tcp else ',1' if sKey in aNODES and aNODES[sKey]: return aNODES[sKey] sFile = _get_nodes_path(oArgs=oArgs) assert os.path.exists(sFile), sFile lNodes = generate_nodes_from_file(sFile, nodes_count=nodes_count, ipv=ipv, udp_not_tcp=udp_not_tcp) assert lNodes aNODES[sKey] = lNodes return aNODES[sKey] aNODES_CACHE = {} def generate_nodes_from_file(sFile, nodes_count=DEFAULT_NODES_COUNT, ipv='ipv4', udp_not_tcp=True, ): """https://github.com/TokTok/c-toxcore/issues/469 I had a conversation with @irungentoo on IRC about whether we really need to call tox_bootstrap() when having UDP disabled and why. The answer is yes, because in addition to TCP relays (tox_add_tcp_relay()), toxcore also needs to know addresses of UDP onion nodes in order to work correctly. The DHT, however, is not used when UDP is disabled. tox_bootstrap() function resolves the address passed to it as argument and calls onion_add_bs_node_path() and DHT_bootstrap() functions. Although calling DHT_bootstrap() is not really necessary as DHT is not used, we still need to resolve the address of the DHT node in order to populate the onion routes with onion_add_bs_node_path() call. """ global aNODES_CACHE key = ipv key += ',0' if udp_not_tcp else ',1' if key in aNODES_CACHE: sorted_nodes = aNODES_CACHE[key] else: if not os.path.exists(sFile): LOG.error("generate_nodes_from_file file not found " +sFile) return [] try: with open(sFile, 'rt') as fl: json_nodes = json.loads(fl.read())['nodes'] except Exception as e: LOG.error(f"generate_nodes_from_file error {sFile}\n{e}") return [] else: LOG.debug("generate_nodes_from_file " +sFile) if udp_not_tcp: nodes = [(node[ipv], node['port'], node['public_key'],) for node in json_nodes if node[ipv] != 'NONE' \ and node["status_udp"] in [True, "true"] ] else: nodes = [] elts = [(node[ipv], node['tcp_ports'], node['public_key'],) \ for node in json_nodes if node[ipv] != 'NONE' \ and node["status_tcp"] in [True, "true"] ] for (ipv, ports, public_key,) in elts: for port in ports: nodes += [(ipv, port, public_key)] if not nodes: LOG.warn(f'empty generate_nodes from {sFile} {json_nodes!r}') return [] sorted_nodes = nodes aNODES_CACHE[key] = sorted_nodes random.shuffle(sorted_nodes) if nodes_count is not None and len(sorted_nodes) > nodes_count: sorted_nodes = sorted_nodes[-nodes_count:] LOG.debug(f"generate_nodes_from_file {sFile} len={len(sorted_nodes)}") return sorted_nodes def tox_bootstrapd_port(): port = 33446 sFile = '/etc/tox-bootstrapd.conf' if os.path.exists(sFile): with open(sFile, 'rt') as oFd: for line in oFd.readlines(): if line.startswith('port = '): port = int(line[7:]) return port def bootstrap_local(self, elts, lToxes): if os.path.exists('/run/tox-bootstrapd/tox-bootstrapd.pid'): LOG.debug('/run/tox-bootstrapd/tox-bootstrapd.pid') iRet = True else: iRet = os.system("netstat -nle4|grep -q :33") if iRet > 0: LOG.warn(f'bootstraping local No local DHT running') LOG.info(f'bootstraping local') return bootstrap_udp(self, elts, lToxes) def lDNSClean(l): # [elt for elt in l if elt not in lDEAD_BS] return list(set(l).difference(lDEAD_BS)) def lExitExcluder(oArgs, iPort=9051): """ https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py """ if not stem: LOG.warn('please install the stem Python package') return '' LOG.debug('lExcludeExitNodes') try: controller = oGetStemController(log_level=10) # generator relays = controller.get_server_descriptors() except Exception as e: LOG.error(f'Failed to get relay descriptors {e}') return None if controller.is_set('ExcludeExitNodes'): LOG.info('ExcludeExitNodes is in use already.') return None exit_excludelist=[] LOG.debug("Excluded exit relays:") for relay in relays: if relay.exit_policy.is_exiting_allowed() and not relay.contact: if is_valid_fingerprint(relay.fingerprint): exit_excludelist.append(relay.fingerprint) LOG.debug("https://metrics.torproject.org/rs.html#details/%s" % relay.fingerprint) else: LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint) try: controller.set_conf('ExcludeExitNodes', exit_excludelist) LOG.info('Excluded a total of %s exit relays without ContactInfo from the exit position.' % len(exit_excludelist)) except Exception as e: LOG.exception('ExcludeExitNodes ' +str(e)) return exit_excludelist def sDNSLookup(host): ipv = 0 if host in lDEAD_BS: LOG.warn(f"address skipped because in lDEAD_BS {host}") return '' # return host try: s = host.replace('.','') int(s) except: try: s = host.replace(':','') int(s) except: pass else: ipv = 6 else: ipv = 4 if ipv > 0: # LOG.debug(f"{ipv} IP address {host}") return host ip = '' if host.endswith('.tox') or host.endswith('.onion'): if False and stem: ip = sMapaddressResolv(host) if ip: return ip ip = sTorResolve(host) if ip: return ip if not bHAVE_TORR: LOG.warn(f"onion address skipped because no tor-resolve {host}") return '' try: sout = f"/tmp/TR{os.getpid()}.log" i = os.system(f"tor-resolve -4 {host} > {sout}") if not i: LOG.warn(f"onion address skipped because tor-resolve on {host}") return '' ip = open(sout, 'rt').read() if ip.endswith('failed.'): LOG.warn(f"onion address skipped because tor-resolve failed on {host}") return '' LOG.debug(f"onion address tor-resolve {ip} on {host}") return ip except: pass else: try: ip = socket.gethostbyname(host) return ip except: # drop through pass if ip == '': try: sout = f"/tmp/TR{os.getpid()}.log" i = os.system(f"dig {host}|grep ^{host}|sed -e 's/.* //'> {sout}") if not i: LOG.warn(f"address skipped because dig failed on {host}") return '' ip = open(sout, 'rt').read().strip() LOG.debug(f"address dig {ip} on {host}") return ip except: ip = host LOG.debug(f'sDNSLookup {host} -> {ip}') return ip def bootstrap_good(lelts, lToxes): return bootstrap_udp(lelts, lToxes) def bootstrap_udp(lelts, lToxes): lelts = lDNSClean(lelts) LOG.debug(f'DHT bootstraping {len(lelts)}') for elt in lToxes: random.shuffle(lelts) for largs in lelts: host, port, key = largs ip = sDNSLookup(host) if not ip: LOG.warn(f'bootstrap_udp to {host} did not resolve') continue if type(port) == str: port = int(port) try: assert len(key) == 64, key oRet = elt.bootstrap(ip, port, key) except Exception as e: LOG.error(f'bootstrap to {host}:' +str(largs[1]) \ +' ' +str(e)) continue if not oRet: LOG.warn(f'bootstrap failed to {host} : ' +str(oRet)) elif elt.self_get_connection_status() != TOX_CONNECTION['NONE']: LOG.info(f'bootstrap to {host} connected') break else: LOG.debug(f'bootstrap to {host} not connected') pass def bootstrap_tcp(lelts, lToxes): lelts = lDNSClean(lelts) for oTox in lToxes: random.shuffle(lelts) LOG.info(f'bootstrap_tcp bootstapping {[l[0] for l in lelts]}') for (host, port, key,) in lelts: ip = sDNSLookup(host) if not ip: LOG.warn(f'bootstrap_tcp to {host} did not resolve {ip}') # continue ip = host if host.endswith('.onion') and stem: l = lIntroductionPoints(host) if not l: LOG.warn(f'bootstrap_tcp to {host} has no introduction points') continue if type(port) == str: port = int(port) try: assert len(key) == 64, key oRet = oTox.add_tcp_relay(ip, port, key) except Exception as e: LOG.error(f'bootstrap_tcp to {host} : ' +str(e)) continue if not oRet: LOG.warn(f'bootstrap_tcp failed to {host} : ' +str(oRet)) elif oTox.self_get_connection_status() != TOX_CONNECTION['NONE']: LOG.info(f'bootstrap_tcp to {host} connected') break else: LOG.debug(f'bootstrap_tcp to {host} but not connected') pass def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''): if sHost in ['-', 'NONE']: return 0 if not nmap: return 0 nmps = nmap.PortScanner if sProt in ['socks', 'socks5', 'tcp4']: prot = 'tcp' cmd = f" -Pn -n -sT -p T:{sPort}" else: prot = 'udp' cmd = f" -Pn -n -sU -p U:{sPort}" LOG.debug(f"iNmapInfoNmap cmd={cmd}") sys.stdout.flush() o = nmps().scan(hosts=sHost, arguments=cmd) aScan = o['scan'] ip = list(aScan.keys())[0] state = aScan[ip][prot][sPort]['state'] LOG.info(f"iNmapInfoNmap: to {sHost} {state}") return 0 def iNmapInfo(sProt, sHost, sPort, key=None, environ=None, cmd='nmap'): if sHost in ['-', 'NONE']: return 0 sFile = os.path.join("/tmp", f"{sHost}.{os.getpid()}.nmap") if sProt in ['socks', 'socks5', 'tcp4']: cmd += f" -Pn -n -sT -p T:{sPort} {sHost} | grep /tcp " else: cmd += f" -Pn -n -sU -p U:{sPort} {sHost} | grep /udp " LOG.debug(f"iNmapInfo cmd={cmd}") sys.stdout.flush() iRet = os.system('sudo ' +cmd +f" >{sFile} 2>&1 ") LOG.debug(f"iNmapInfo cmd={cmd} iRet={iRet}") if iRet != 0: return iRet assert os.path.exists(sFile), sFile with open(sFile, 'rt') as oFd: l = oFd.readlines() assert len(l) l = [line for line in l if line and not line.startswith('WARNING:')] s = '\n'.join([s.strip() for s in l]) LOG.info(f"iNmapInfo: to {sHost}\n{s}") return 0 def bootstrap_iNmapInfo(lElts, oArgs, protocol="tcp4", bIS_LOCAL=False, iNODES=iNODES, cmd='nmap'): if not bIS_LOCAL and not bAreWeConnected(): LOG.warn(f"bootstrap_iNmapInfo not local and NOT CONNECTED") return True lRetval = [] for elts in lElts[:iNODES]: host, port, key = elts ip = sDNSLookup(host) if not ip: LOG.info('bootstrap_iNmapInfo to {host} did not resolve') continue if type(port) == str: port = int(port) iRet = -1 try: if not nmap: iRet = iNmapInfo(protocol, ip, port, key, cmd=cmd) else: iRet = iNmapInfoNmap(protocol, ip, port, key) if iRet != 0: LOG.warn('iNmapInfo to ' +repr(host) +' retval=' +str(iRet)) lRetval += [False] else: LOG.debug('iNmapInfo to ' +repr(host) +' retval=' +str(iRet)) lRetval += [True] except Exception as e: LOG.exception('iNmapInfo to {host} : ' +str(e) ) lRetval += [False] return any(lRetval) def caseFactory(cases): """We want the tests run in order.""" if len(cases) > 1: ordered_cases = sorted(cases, key=lambda f: inspect.findsource(f)[1]) else: ordered_cases = cases return ordered_cases def suiteFactory(*testcases): """We want the tests run in order.""" linen = lambda f: getattr(tc, f).__code__.co_firstlineno lncmp = lambda a, b: linen(a) - linen(b) test_suite = unittest.TestSuite() for tc in testcases: test_suite.addTest(unittest.makeSuite(tc, sortUsing=lncmp)) return test_suite