diff --git a/tox_wrapper/tests/__init__.py b/tox_wrapper/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tox_wrapper/tests/socks.py b/tox_wrapper/tests/socks.py deleted file mode 100644 index fa1b25e..0000000 --- a/tox_wrapper/tests/socks.py +++ /dev/null @@ -1,391 +0,0 @@ -"""SocksiPy - Python SOCKS module. -Version 1.00 - -Copyright 2006 Dan-Haim. All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. -3. Neither the name of Dan Haim nor the names of his contributors may be used - to endorse or promote products derived from this software without specific - prior written permission. - -THIS SOFTWARE IS PROVIDED BY DAN HAIM "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO -EVENT SHALL DAN HAIM OR HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA -OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE. - - -This module provides a standard socket-like interface for Python -for tunneling connections through SOCKS proxies. - -""" - -""" - -Minor modifications made by Christopher Gilbert (http://motomastyle.com/) -for use in PyLoris (http://pyloris.sourceforge.net/) - -Minor modifications made by Mario Vilas (http://breakingcode.wordpress.com/) -mainly to merge bug fixes found in Sourceforge - -Minor modifications made by Eugene Dementiev (http://www.dementiev.eu/) - -""" - -import socket -import struct -import sys - -PROXY_TYPE_SOCKS4 = 1 -PROXY_TYPE_SOCKS5 = 2 -PROXY_TYPE_HTTP = 3 - -_defaultproxy = None -_orgsocket = socket.socket - -class ProxyError(Exception): pass -class GeneralProxyError(ProxyError): pass -class Socks5AuthError(ProxyError): pass -class Socks5Error(ProxyError): pass -class Socks4Error(ProxyError): pass -class HTTPError(ProxyError): pass - -_generalerrors = ("success", - "invalid data", - "not connected", - "not available", - "bad proxy type", - "bad input") - -_socks5errors = ("succeeded", - "general SOCKS server failure", - "connection not allowed by ruleset", - "Network unreachable", - "Host unreachable", - "Connection refused", - "TTL expired", - "Command not supported", - "Address type not supported", - "Unknown error") - -_socks5autherrors = ("succeeded", - "authentication is required", - "all offered authentication methods were rejected", - "unknown username or invalid password", - "unknown error") - -_socks4errors = ("request granted", - "request rejected or failed", - "request rejected because SOCKS server cannot connect to identd on the client", - "request rejected because the client program and identd report different user-ids", - "unknown error") - -def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None) -> None: - """setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) - Sets a default proxy which all further socksocket objects will use, - unless explicitly changed. - """ - global _defaultproxy - _defaultproxy = (proxytype, addr, port, rdns, username, password) - -def wrapmodule(module) -> None: - """wrapmodule(module) - Attempts to replace a module's socket library with a SOCKS socket. Must set - a default proxy using setdefaultproxy(...) first. - This will only work on modules that import socket directly into the namespace; - most of the Python Standard Library falls into this category. - """ - if _defaultproxy != None: - module.socket.socket = socksocket - else: - raise GeneralProxyError((4, "no proxy specified")) - -class socksocket(socket.socket): - """socksocket([family[, type[, proto]]]) -> socket object - Open a SOCKS enabled socket. The parameters are the same as - those of the standard socket init. In order for SOCKS to work, - you must specify family=AF_INET, type=SOCK_STREAM and proto=0. - """ - - def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None): - _orgsocket.__init__(self, family, type, proto, _sock) - if _defaultproxy != None: - self.__proxy = _defaultproxy - else: - self.__proxy = (None, None, None, None, None, None) - self.__proxysockname = None - self.__proxypeername = None - - def __recvall(self, count): - """__recvall(count) -> data - Receive EXACTLY the number of bytes requested from the socket. - Blocks until the required number of bytes have been received. - """ - data = self.recv(count) - while len(data) < count: - d = self.recv(count-len(data)) - if not d: raise GeneralProxyError((0, "connection closed unexpectedly")) - data = data + d - return data - - def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None): - """setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) - Sets the proxy to be used. - proxytype - The type of the proxy to be used. Three types - are supported: PROXY_TYPE_SOCKS4 (including socks4a), - PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP - addr - The address of the server (IP or DNS). - port - The port of the server. Defaults to 1080 for SOCKS - servers and 8080 for HTTP proxy servers. - rdns - Should DNS queries be preformed on the remote side - (rather than the local side). The default is True. - Note: This has no effect with SOCKS4 servers. - username - Username to authenticate with to the server. - The default is no authentication. - password - Password to authenticate with to the server. - Only relevant when username is also provided. - """ - self.__proxy = (proxytype, addr, port, rdns, username, password) - - def __negotiatesocks5(self, destaddr, destport): - """__negotiatesocks5(self,destaddr,destport) - Negotiates a connection through a SOCKS5 server. - """ - # First we'll send the authentication packages we support. - if (self.__proxy[4]!=None) and (self.__proxy[5]!=None): - # The username/password details were supplied to the - # setproxy method so we support the USERNAME/PASSWORD - # authentication (in addition to the standard none). - self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02)) - else: - # No username/password were entered, therefore we - # only support connections with no authentication. - self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00)) - # We'll receive the server's response to determine which - # method was selected - chosenauth = self.__recvall(2) - if chosenauth[0:1] != chr(0x05).encode(): - self.close() - raise GeneralProxyError((1, _generalerrors[1])) - # Check the chosen authentication method - if chosenauth[1:2] == chr(0x00).encode(): - # No authentication is required - pass - elif chosenauth[1:2] == chr(0x02).encode(): - # Okay, we need to perform a basic username/password - # authentication. - self.sendall(chr(0x01).encode() + chr(len(self.__proxy[4])) + self.__proxy[4] + chr(len(self.__proxy[5])) + self.__proxy[5]) - authstat = self.__recvall(2) - if authstat[0:1] != chr(0x01).encode(): - # Bad response - self.close() - raise GeneralProxyError((1, _generalerrors[1])) - if authstat[1:2] != chr(0x00).encode(): - # Authentication failed - self.close() - raise Socks5AuthError((3, _socks5autherrors[3])) - # Authentication succeeded - else: - # Reaching here is always bad - self.close() - if chosenauth[1] == chr(0xFF).encode(): - raise Socks5AuthError((2, _socks5autherrors[2])) - else: - raise GeneralProxyError((1, _generalerrors[1])) - # Now we can request the actual connection - req = struct.pack('BBB', 0x05, 0x01, 0x00) - # If the given destination address is an IP address, we'll - # use the IPv4 address request even if remote resolving was specified. - try: - ipaddr = socket.inet_aton(destaddr) - req = req + chr(0x01).encode() + ipaddr - except socket.error: - # Well it's not an IP number, so it's probably a DNS name. - if self.__proxy[3]: - # Resolve remotely - ipaddr = None - if type(destaddr) != type(b''): # python3 - destaddr_bytes = destaddr.encode(encoding='idna') - else: - destaddr_bytes = destaddr - req = req + chr(0x03).encode() + chr(len(destaddr_bytes)).encode() + destaddr_bytes - else: - # Resolve locally - ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) - req = req + chr(0x01).encode() + ipaddr - req = req + struct.pack(">H", destport) - self.sendall(req) - # Get the response - resp = self.__recvall(4) - if resp[0:1] != chr(0x05).encode(): - self.close() - raise GeneralProxyError((1, _generalerrors[1])) - elif resp[1:2] != chr(0x00).encode(): - # Connection failed - self.close() - if ord(resp[1:2])<=8: - raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])])) - else: - raise Socks5Error((9, _socks5errors[9])) - # Get the bound address/port - elif resp[3:4] == chr(0x01).encode(): - boundaddr = self.__recvall(4) - elif resp[3:4] == chr(0x03).encode(): - resp = resp + self.recv(1) - boundaddr = self.__recvall(ord(resp[4:5])) - else: - self.close() - raise GeneralProxyError((1,_generalerrors[1])) - boundport = struct.unpack(">H", self.__recvall(2))[0] - self.__proxysockname = (boundaddr, boundport) - if ipaddr != None: - self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) - else: - self.__proxypeername = (destaddr, destport) - - def getproxysockname(self): - """getsockname() -> address info - Returns the bound IP address and port number at the proxy. - """ - return self.__proxysockname - - def getproxypeername(self): - """getproxypeername() -> address info - Returns the IP and port number of the proxy. - """ - return _orgsocket.getpeername(self) - - def getpeername(self): - """getpeername() -> address info - Returns the IP address and port number of the destination - machine (note: getproxypeername returns the proxy) - """ - return self.__proxypeername - - def __negotiatesocks4(self,destaddr,destport) -> None: - """__negotiatesocks4(self,destaddr,destport) - Negotiates a connection through a SOCKS4 server. - """ - # Check if the destination address provided is an IP address - rmtrslv = False - try: - ipaddr = socket.inet_aton(destaddr) - except socket.error: - # It's a DNS name. Check where it should be resolved. - if self.__proxy[3]: - ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01) - rmtrslv = True - else: - ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) - # Construct the request packet - req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr - # The username parameter is considered userid for SOCKS4 - if self.__proxy[4] != None: - req = req + self.__proxy[4] - req = req + chr(0x00).encode() - # DNS name if remote resolving is required - # NOTE: This is actually an extension to the SOCKS4 protocol - # called SOCKS4A and may not be supported in all cases. - if rmtrslv: - req = req + destaddr + chr(0x00).encode() - self.sendall(req) - # Get the response from the server - resp = self.__recvall(8) - if resp[0:1] != chr(0x00).encode(): - # Bad data - self.close() - raise GeneralProxyError((1,_generalerrors[1])) - if resp[1:2] != chr(0x5A).encode(): - # Server returned an error - self.close() - if ord(resp[1:2]) in (91, 92, 93): - self.close() - raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90])) - else: - raise Socks4Error((94, _socks4errors[4])) - # Get the bound address/port - self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0]) - if rmtrslv != None: - self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) - else: - self.__proxypeername = (destaddr, destport) - - def __negotiatehttp(self, destaddr, destport) -> None: - """__negotiatehttp(self,destaddr,destport) - Negotiates a connection through an HTTP server. - """ - # If we need to resolve locally, we do this now - if not self.__proxy[3]: - addr = socket.gethostbyname(destaddr) - else: - addr = destaddr - self.sendall(("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode()) - # We read the response until we get the string "\r\n\r\n" - resp = self.recv(1) - while resp.find("\r\n\r\n".encode()) == -1: - recv = self.recv(1) - if not recv: - raise GeneralProxyError((1, _generalerrors[1])) - resp = resp + recv - # We just need the first line to check if the connection - # was successful - statusline = resp.splitlines()[0].split(" ".encode(), 2) - if statusline[0] not in ("HTTP/1.0".encode(), "HTTP/1.1".encode()): - self.close() - raise GeneralProxyError((1, _generalerrors[1])) - try: - statuscode = int(statusline[1]) - except ValueError: - self.close() - raise GeneralProxyError((1, _generalerrors[1])) - if statuscode != 200: - self.close() - raise HTTPError((statuscode, statusline[2])) - self.__proxysockname = ("0.0.0.0", 0) - self.__proxypeername = (addr, destport) - - def connect(self, destpair) -> None: - """connect(self, despair) - Connects to the specified destination through a proxy. - destpar - A tuple of the IP/DNS address and the port number. - (identical to socket's connect). - To select the proxy server use setproxy(). - """ - # Do a minimal input check first - if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or (type(destpair[1]) != int): - raise GeneralProxyError((5, _generalerrors[5])) - if self.__proxy[0] == PROXY_TYPE_SOCKS5: - if self.__proxy[2] != None: - portnum = int(self.__proxy[2]) - else: - portnum = 1080 - _orgsocket.connect(self, (self.__proxy[1], portnum)) - self.__negotiatesocks5(destpair[0], destpair[1]) - elif self.__proxy[0] == PROXY_TYPE_SOCKS4: - if self.__proxy[2] != None: - portnum = self.__proxy[2] - else: - portnum = 1080 - _orgsocket.connect(self,(self.__proxy[1], portnum)) - self.__negotiatesocks4(destpair[0], destpair[1]) - elif self.__proxy[0] == PROXY_TYPE_HTTP: - if self.__proxy[2] != None: - portnum = self.__proxy[2] - else: - portnum = 8080 - _orgsocket.connect(self,(self.__proxy[1], portnum)) - self.__negotiatehttp(destpair[0], destpair[1]) - elif self.__proxy[0] == None: - _orgsocket.connect(self, (destpair[0], destpair[1])) - else: - raise GeneralProxyError((4, _generalerrors[4])) diff --git a/tox_wrapper/tests/support_http.py b/tox_wrapper/tests/support_http.py deleted file mode 100644 index f3bc975..0000000 --- a/tox_wrapper/tests/support_http.py +++ /dev/null @@ -1,163 +0,0 @@ -# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- - -import os -import sys -import logging -from io import BytesIO -import urllib -import traceback - -global LOG -LOG = logging.getLogger('app.'+'ts') - -try: - import pycurl -except ImportError: - pycurl = None -try: - import requests -except ImportError: - requests = None - -lNO_PROXY = ['localhost', '127.0.0.1'] -CONNECT_TIMEOUT = 20.0 - -def bAreWeConnected() -> bool: - # FixMe: Linux only - sFile = f"/proc/{os.getpid()}/net/route" - if not os.path.isfile(sFile): return None - i = 0 - for elt in open(sFile, "r").readlines(): - if elt.startswith('Iface'): continue - if elt.startswith('lo'): continue - i += 1 - return i > 0 - -def pick_up_proxy_from_environ() -> dict: - retval = dict() - if os.environ.get('socks_proxy', ''): - # socks_proxy takes precedence over https/http - proxy = os.environ.get('socks_proxy', '') - i = proxy.find('//') - if i >= 0: proxy = proxy[i+2:] - retval['proxy_host'] = proxy.split(':')[0] - retval['proxy_port'] = proxy.split(':')[-1] - retval['proxy_type'] = 2 - retval['udp_enabled'] = False - elif os.environ.get('https_proxy', ''): - # https takes precedence over http - proxy = os.environ.get('https_proxy', '') - i = proxy.find('//') - if i >= 0: proxy = proxy[i+2:] - retval['proxy_host'] = proxy.split(':')[0] - retval['proxy_port'] = proxy.split(':')[-1] - retval['proxy_type'] = 1 - retval['udp_enabled'] = False - elif os.environ.get('http_proxy', ''): - proxy = os.environ.get('http_proxy', '') - i = proxy.find('//') - if i >= 0: proxy = proxy[i+2:] - retval['proxy_host'] = proxy.split(':')[0] - retval['proxy_port'] = proxy.split(':')[-1] - retval['proxy_type'] = 1 - retval['udp_enabled'] = False - else: - retval['proxy_host'] = '' - retval['proxy_port'] = '' - retval['proxy_type'] = 0 - retval['udp_enabled'] = True - return retval - -def download_url(url:str, settings:str = None) -> None: - if not bAreWeConnected(): return '' - - if settings is None: - settings = pick_up_proxy_from_environ() - - if pycurl: - LOG.debug('Downloading with pycurl: ' + str(url)) - buffer = BytesIO() - c = pycurl.Curl() - c.setopt(c.URL, url) - c.setopt(c.WRITEDATA, buffer) - # Follow redirect. - c.setopt(c.FOLLOWLOCATION, True) - - # cookie jar - cjar = os.path.join(os.environ['HOME'], '.local', 'jar.cookie') - if os.path.isfile(cjar): - c.setopt(c.COOKIEFILE, cjar) - # LARGS+=( --cookie-jar --junk-session-cookies ) - - #? c.setopt(c.ALTSVC_CTRL, 16) - - c.setopt(c.NOPROXY, ','.join(lNO_PROXY)) - #? c.setopt(c.CAINFO, certifi.where()) - if settings['proxy_type'] == 2 and settings['proxy_host']: - socks_proxy = 'socks5h://'+settings['proxy_host']+':'+str(settings['proxy_port']) - settings['udp_enabled'] = False - c.setopt(c.PROXY, socks_proxy) - c.setopt(c.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME) - elif settings['proxy_type'] == 1 and settings['proxy_host']: - https_proxy = 'https://'+settings['proxy_host']+':'+str(settings['proxy_port']) - c.setopt(c.PROXY, https_proxy) - elif settings['proxy_type'] == 1 and settings['proxy_host']: - http_proxy = 'http://'+settings['proxy_host']+':'+str(settings['proxy_port']) - c.setopt(c.PROXY, http_proxy) - c.setopt(c.PROTOCOLS, c.PROTO_HTTPS) - try: - c.perform() - c.close() - #? assert c.getinfo(c.RESPONSE_CODE) < 300 - result = buffer.getvalue() - # Body is a byte string. - LOG.info('nodes loaded with pycurl: ' + str(url)) - return result - except Exception as ex: - LOG.error('TOX Downloading error with pycurl: ' + str(ex)) - LOG.error('\n' + traceback.format_exc()) - # drop through - - if requests: - LOG.debug('Downloading with requests: ' + str(url)) - try: - headers = dict() - headers['Content-Type'] = 'application/json' - proxies = dict() - if settings['proxy_type'] == 2 and settings['proxy_host']: - socks_proxy = 'socks5://'+settings['proxy_host']+':'+str(settings['proxy_port']) - settings['udp_enabled'] = False - proxies['https'] = socks_proxy - elif settings['proxy_type'] == 1 and settings['proxy_host']: - https_proxy = 'https://'+settings['proxy_host']+':'+str(settings['proxy_port']) - proxies['https'] = https_proxy - elif settings['proxy_type'] == 1 and settings['proxy_host']: - http_proxy = 'http://'+settings['proxy_host']+':'+str(settings['proxy_port']) - proxies['http'] = http_proxy - req = requests.get(url, - headers=headers, - proxies=proxies, - timeout=CONNECT_TIMEOUT) - # max_retries=3 - assert req.status_code < 300 - result = req.content - LOG.info('nodes loaded with requests: ' + str(url)) - return result - except Exception as ex: - LOG.error('TOX Downloading error with requests: ' + str(ex)) - # drop through - - if not settings['proxy_type']: # no proxy - LOG.debug('Downloading with urllib no proxy: ' + str(url)) - try: - req = urllib.request.Request(url) - req.add_header('Content-Type', 'application/json') - response = urllib.request.urlopen(req) - result = response.read() - LOG.info('nodes loaded with no proxy: ' + str(url)) - return result - except Exception as ex: - LOG.error('TOX Downloading ' + str(ex)) - return '' - - return '' diff --git a/tox_wrapper/tests/support_onions.py b/tox_wrapper/tests/support_onions.py deleted file mode 100644 index 324c889..0000000 --- a/tox_wrapper/tests/support_onions.py +++ /dev/null @@ -1,573 +0,0 @@ -# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- - -import getpass -import os -import re -import select -import shutil -import socket -import sys -import time -from typing import Union, Callable, Union - -if False: - import cepa as stem - from cepa.connection import MissingPassword - from cepa.control import Controller - from cepa.util.tor_tools import is_valid_fingerprint -else: - import stem - from stem.connection import MissingPassword - from stem.control import Controller - from stem.util.tor_tools import is_valid_fingerprint - -global LOG -import logging -import warnings - -warnings.filterwarnings('ignore') -LOG = logging.getLogger() - -bHAVE_TORR = shutil.which('tor-resolve') - -yKNOWN_ONIONS = """ - - facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd # facebook - - duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad # ddg - - zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad # hks -""" -# grep -B 1 '
  • bool: - # FixMe: Linux only - sFile = f"/proc/{os.getpid()}/net/route" - if not os.path.isfile(sFile): return None - i = 0 - for elt in open(sFile, "r").readlines(): - if elt.startswith('Iface'): continue - if elt.startswith('lo'): continue - i += 1 - return i > 0 - -def sMapaddressResolv(target:str, iPort:int = 9051, log_level:int = 10) -> str: - if not stem: - LOG.warn('please install the stem Python package') - return '' - - try: - controller = oGetStemController(log_level=log_level) - - map_dict = {"0.0.0.0": target} - map_ret = controller.map_address(map_dict) - - return map_ret - except Exception as e: - LOG.exception(e) - return '' - -def vwait_for_controller(controller, wait_boot:int = 10) -> None: - if bAreWeConnected() is False: - raise SystemExit("we are not connected") - percent = i = 0 - # You can call this while boostrapping - while percent < 100 and i < wait_boot: - bootstrap_status = controller.get_info("status/bootstrap-phase") - progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status) - percent = int(progress_percent.group(1)) - LOG.info(f"Bootstrapping {percent}%") - time.sleep(5) - i += 5 - -def bin_to_hex(raw_id:int, length: Union[int, None] = None) -> str: - if length is None: length = len(raw_id) - res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length)) - return res.upper() - -def lIntroductionPoints(controller=None, lOnions:list = [], itimeout:int = 120, log_level:int = 10): - """now working !!! stem 1.8.x timeout must be huge >120 - 'Provides the descriptor for a hidden service. The **address** is the - '.onion' address of the hidden service ' - What about Services? - """ - try: - from cryptography.utils import int_from_bytes - except ImportError: - import cryptography.utils - - # guessing - not in the current cryptography but stem expects it - def int_from_bytes(**args): return int.to_bytes(*args) - cryptography.utils.int_from_bytes = int_from_bytes - # this will fai if the trick above didnt work - from stem.prereq import is_crypto_available - is_crypto_available(ed25519=True) - - from queue import Empty - - from stem import Timeout - from stem.client.datatype import LinkByFingerprint - from stem.descriptor.hidden_service import HiddenServiceDescriptorV3 - - if type(lOnions) not in [set, tuple, list]: - lOnions = list(lOnions) - if controller is None: - controller = oGetStemController(log_level=log_level) - l = [] - for elt in lOnions: - LOG.info(f"controller.get_hidden_service_descriptor {elt}") - try: - desc = controller.get_hidden_service_descriptor(elt, - await_result=True, - timeout=itimeout) - # LOG.log(40, f"{dir(desc)} get_hidden_service_descriptor") - # timeouts 20 sec - # mistakenly a HSv2 descriptor - hs_address = HiddenServiceDescriptorV3.from_str(str(desc)) # reparse as HSv3 - oInnerLayer = hs_address.decrypt(elt) - # LOG.log(40, f"{dir(oInnerLayer)}") - - # IntroductionPointV3 - n = oInnerLayer.introduction_points - if not n: - LOG.warn(f"NO introduction points for {elt}") - continue - LOG.info(f"{elt} {len(n)} introduction points") - lp = [] - for introduction_point in n: - for linkspecifier in introduction_point.link_specifiers: - if isinstance(linkspecifier, LinkByFingerprint): - # LOG.log(40, f"Getting fingerprint for {linkspecifier}") - if hasattr(linkspecifier, 'fingerprint'): - assert len(linkspecifier.value) == 20 - lp += [bin_to_hex(linkspecifier.value)] - LOG.info(f"{len(lp)} introduction points for {elt}") - l += lp - except (Empty, Timeout,) as e: # noqa - LOG.warn(f"Timed out getting introduction points for {elt}") - except stem.DescriptorUnavailable as e: - LOG.error(e) - except Exception as e: - LOG.exception(e) - return l - -def zResolveDomain(domain:str) -> int: - try: - ip = sTorResolve(domain) - except Exception as e: # noqa - ip = '' - if ip == '': - try: - lpair = getaddrinfo(domain, 443) - except Exception as e: - LOG.warn(f"{e}") - lpair = None - if lpair is None: - LOG.warn(f"TorResolv and getaddrinfo failed for {domain}") - return '' - ip = lpair[0] - return ip - -def sTorResolve(target:str, - verbose:bool = False, - sHost:str = '127.0.0.1', - iPort:int = 9050, - SOCK_TIMEOUT_SECONDS:float = 10.0, - SOCK_TIMEOUT_TRIES:int = 3, - ) -> str: - MAX_INFO_RESPONSE_PACKET_LENGTH = 8 - if '@' in target: - LOG.warn(f"sTorResolve failed invalid hostname {target}") - return '' - target = target.strip('/') - seb = b"\x04\xf0\x00\x00\x00\x00\x00\x01\x00" - seb += bytes(target, 'US-ASCII') + b"\x00" - assert len(seb) == 10 + len(target), str(len(seb)) + repr(seb) - -# LOG.debug(f"0 Sending {len(seb)} to The TOR proxy {seb}") - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((sHost, iPort)) - - sock.settimeout(SOCK_TIMEOUT_SECONDS) - oRet = sock.sendall(seb) # noqa - - i = 0 - data = '' - while i < SOCK_TIMEOUT_TRIES: - i += 1 - time.sleep(3) - lReady = select.select([sock.fileno()], [], [], - SOCK_TIMEOUT_SECONDS) - if not lReady[0]: continue - try: - flags=socket.MSG_WAITALL - data = sock.recv(MAX_INFO_RESPONSE_PACKET_LENGTH, flags) - except socket.timeout: - LOG.warn(f"4 The TOR proxy {(sHost, iPort)}" \ - +" didnt reply in " + str(SOCK_TIMEOUT_SECONDS) + " sec." - +" #" +str(i)) - except Exception as e: - LOG.error("4 The TOR proxy " \ - +repr((sHost, iPort)) \ - +" errored with " + str(e) - +" #" +str(i)) - sock.close() - return '' - else: - if len(data) > 0: break - - if len(data) == 0: - if i > SOCK_TIMEOUT_TRIES: - sLabel = "5 No reply #" - else: - sLabel = "5 No data #" - LOG.warn(f"sTorResolve: {sLabel} {i} on {sHost}:{iPort}") - sock.close() - return '' - - assert len(data) >= 8 - packet_sf = data[1] - if packet_sf == 90: - # , "%d" % packet_sf - assert f"{packet_sf}" == "90", f"packet_sf = {packet_sf}" - return f"{data[4]}.{data[5]}.{data[6]}.{data[7]}" - else: - # 91 - LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}") - - os.system(f"tor-resolve -4 {target} > /tmp/e 2>/dev/null") -# os.system("strace tor-resolve -4 "+target+" 2>&1|grep '^sen\|^rec'") - - return '' - -def getaddrinfo(sHost:str, sPort:str) -> list: - # do this the explicit way = Ive seen the compact connect fail - # >>> sHost, sPort = 'l27.0.0.1', 33446 - # >>> sock.connect((sHost, sPort)) - # socket.gaierror: [Errno -2] Name or service not known - try: - lElts = socket.getaddrinfo(sHost, int(sPort), socket.AF_INET) - lElts = list(filter(lambda elt: elt[1] == socket.SOCK_DGRAM, lElts)) - assert len(lElts) == 1, repr(lElts) - lPair = lElts[0][-1] - assert len(lPair) == 2, repr(lPair) - assert type(lPair[1]) == int, repr(lPair) - except (socket.gaierror, OSError, BaseException) as e: - LOG.error(e) - return None - return lPair - -def icheck_torrc(sFile:str, oArgs) -> int: - l = open(sFile, 'rt').readlines() - a = {} - for elt in l: - elt = elt.strip() - if not elt or ' ' not in elt: continue - (k, v,) = elt.split(' ', 1) - a[k] = v - keys = a - - if 'HashedControlPassword' not in keys: - LOG.info('Add HashedControlPassword for security') - print('run: tor --hashcontrolpassword ') - if 'ExcludeExitNodes' in keys: - elt = 'BadNodes.ExcludeExitNodes.BadExit' - LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}") - print(f"move to the {elt} section as a list") - if 'GuardNodes' in keys: - elt = 'GoodNodes.GuardNodes' - LOG.warn(f"Remove GuardNodes and move then to {oArgs.good_nodes}") - print(f"move to the {elt} section as a list") - if 'ExcludeNodes' in keys: - elt = 'BadNodes.ExcludeNodes.BadExit' - LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}") - print(f"move to the {elt} section as a list") - if 'ControlSocket' not in keys and os.path.exists('/run/tor/control'): - LOG.info('Add ControlSocket /run/tor/control for us') - print('ControlSocket /run/tor/control GroupWritable RelaxDirModeCheck') - if 'UseMicrodescriptors' not in keys or keys['UseMicrodescriptors'] != '1': - LOG.info('Add UseMicrodescriptors 0 for us') - print('UseMicrodescriptors 0') - if 'AutomapHostsSuffixes' not in keys: - LOG.info('Add AutomapHostsSuffixes for onions') - print('AutomapHostsSuffixes .exit,.onion') - if 'AutoMapHostsOnResolve' not in keys: - LOG.info('Add AutoMapHostsOnResolve for onions') - print('AutoMapHostsOnResolve 1') - if 'VirtualAddrNetworkIPv4' not in keys: - LOG.info('Add VirtualAddrNetworkIPv4 for onions') - print('VirtualAddrNetworkIPv4 172.16.0.0/12') - return 0 - -def lExitExcluder(oArgs, iPort:int = 9051, log_level:int = 10) -> list: - """ - https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py - """ - if not stem: - LOG.warn('please install the stem Python package') - return '' - LOG.debug('lExcludeExitNodes') - - try: - controller = oGetStemController(log_level=log_level) - # generator - relays = controller.get_server_descriptors() - except Exception as e: - LOG.error(f'Failed to get relay descriptors {e}') - return None - - if controller.is_set('ExcludeExitNodes'): - LOG.info('ExcludeExitNodes is in use already.') - return None - - exit_excludelist=[] - LOG.debug("Excluded exit relays:") - for relay in relays: - if relay.exit_policy.is_exiting_allowed() and not relay.contact: - if is_valid_fingerprint(relay.fingerprint): - exit_excludelist.append(relay.fingerprint) - LOG.debug("https://metrics.torproject.org/rs.html#details/%s" % relay.fingerprint) - else: - LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint) - - try: - controller.set_conf('ExcludeExitNodes', exit_excludelist) - LOG.info('Excluded a total of %s exit relays without ContactInfo from the exit position.' % len(exit_excludelist)) - except Exception as e: - LOG.exception('ExcludeExitNodes ' +str(e)) - return exit_excludelist - -if __name__ == '__main__': - target = 'duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad' - controller = oGetStemController(log_level=10) - lIntroductionPoints(controller, [target], itimeout=120) diff --git a/tox_wrapper/tests/support_testing.py b/tox_wrapper/tests/support_testing.py deleted file mode 100644 index 3d88741..0000000 --- a/tox_wrapper/tests/support_testing.py +++ /dev/null @@ -1,970 +0,0 @@ -# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- - -import argparse -import contextlib -import inspect -import json -import logging -import os -import re -import select -import shutil -import socket -import sys -import time -import traceback -import unittest -from ctypes import * -from random import Random -import functools -from typing import Union, Callable, Union - -random = Random() - -try: - import coloredlogs - if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ: - os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' - # https://pypi.org/project/coloredlogs/ -except ImportError as e: - coloredlogs = False -try: - import stem -except ImportError as e: - stem = False -try: - import nmap -except ImportError as e: - nmap = False - -import tox_wrapper -import tox_wrapper.toxcore_enums_and_consts as enums - -from tox_wrapper.tests.support_http import bAreWeConnected -from tox_wrapper.tests.support_onions import (is_valid_fingerprint, - lIntroductionPoints, - oGetStemController, - sMapaddressResolv, sTorResolve) - -try: - from user_data.settings import get_user_config_path -except ImportError: - get_user_config_path = None - -# LOG=util.log -global LOG -LOG = logging.getLogger() - -def LOG_ERROR(l:str) -> None: print('ERRORc: '+l) -def LOG_WARN(l:str) -> None: print('WARNc: ' +l) -def LOG_INFO(l:str) -> None: print('INFOc: ' +l) -def LOG_DEBUG(l:str) -> None: print('DEBUGc: '+l) -def LOG_TRACE(l:str) -> None: pass # print('TRACE+ '+l) - -try: - from trepan.api import debug - from trepan.interfaces import server as Mserver -except: -# print('trepan3 TCP server NOT available.') - pass -else: -# print('trepan3 TCP server available.') - def trepan_handler(num=None, f=None): - connection_opts={'IO': 'TCP', 'PORT': 6666} - intf = Mserver.ServerInterface(connection_opts=connection_opts) - dbg_opts = {'interface': intf } - print(f'Starting TCP server listening on port 6666.') - debug(dbg_opts=dbg_opts) - return - -# self._audio_thread.isAlive -iTHREAD_TIMEOUT = 1 -iTHREAD_SLEEP = 1 -iTHREAD_JOINS = 8 -iNODES = 6 - -lToxSamplerates = [8000, 12000, 16000, 24000, 48000] -lToxSampleratesK = [8, 12, 16, 24, 48] -lBOOLEANS = [ - 'local_discovery_enabled', - 'udp_enabled', - 'ipv6_enabled', - 'trace_enabled', - 'compact_mode', - 'allow_inline', - 'notifications', - 'sound_notifications', - 'calls_sound', - 'hole_punching_enabled', - 'dht_announcements_enabled', - 'save_history', - 'download_nodes_list' - 'core_logging', - ] - -sDIR = os.environ.get('TMPDIR', '/tmp') -sTOX_VERSION = "1000002018" -bHAVE_NMAP = shutil.which('nmap') -bHAVE_JQ = shutil.which('jq') -bHAVE_BASH = shutil.which('bash') -bHAVE_TORR = shutil.which('tor-resolve') - -lDEAD_BS = [ - # Failed to resolve "tox3.plastiras.org" - "tox3.plastiras.org", - 'tox.kolka.tech', - # here and gone - '122-116-39-151.hinet-ip.hinet.net', - # IPs that do not reverse resolve - '49.12.229.145', - "46.101.197.175", - '114.35.245.150', - '172.93.52.70', - '195.123.208.139', - '205.185.115.131', - # IPs that do not rreverse resolve - 'yggnode.cf', '188.225.9.167', - '85-143-221-42.simplecloud.ru', '85.143.221.42', - # IPs that do not ping - '104.244.74.69', 'tox.plastiras.org', - '195.123.208.139', - 'gt.sot-te.ch', '32.226.5.82', - # suspicious IPs - 'tox.abilinski.com', '172.103.164.250', '172.103.164.250.tpia.cipherkey.com', - ] - -def assert_main_thread() -> None: - from PyQt5 import QtCore, QtWidgets - from qtpy.QtWidgets import QApplication - - # this "instance" method is very useful! - app_thread = QtWidgets.QApplication.instance().thread() - curr_thread = QtCore.QThread.currentThread() - if app_thread != curr_thread: - raise RuntimeError('attempt to call MainWindow.append_message from non-app thread') - -@contextlib.contextmanager -def ignoreStdout() -> None: - devnull = os.open(os.devnull, os.O_WRONLY) - old_stdout = os.dup(1) - sys.stdout.flush() - os.dup2(devnull, 1) - os.close(devnull) - try: - yield - finally: - os.dup2(old_stdout, 1) - os.close(old_stdout) - -@contextlib.contextmanager -def ignoreStderr() -> None: - devnull = os.open(os.devnull, os.O_WRONLY) - old_stderr = os.dup(2) - sys.stderr.flush() - os.dup2(devnull, 2) - os.close(devnull) - try: - yield - finally: - os.dup2(old_stderr, 2) - os.close(old_stderr) - -def clean_booleans(oArgs) -> None: - for key in lBOOLEANS: - if not hasattr(oArgs, key): continue - val = getattr(oArgs, key) - if type(val) == bool: continue - if val in ['False', 'false', '0']: - setattr(oArgs, key, False) - else: - setattr(oArgs, key, True) - -def on_log(iTox, level, filename, line, func, message, *data) -> None: - # LOG.debug(repr((level, filename, line, func, message,))) - tox_log_cb(level, filename, line, func, message) - -def tox_log_cb(level, filename, line, func, message, *args) -> None: - """ - * @param level The severity of the log message. - * @param filename The source file from which the message originated. - * @param line The source line from which the message originated. - * @param func The function from which the message originated. - * @param message The log message. - * @param user_data The user data pointer passed to tox_new in options. - """ - if type(func) == bytes: - func = str(func, 'utf-8') - message = str(message, 'UTF-8') - filename = str(filename, 'UTF-8') - - if filename == 'network.c': - if line == 660: return - # root WARNING 3network.c#944:b'send_packet'attempted to send message with network family 10 (probably IPv6) on IPv4 socket - if line == 944: return - i = message.find('07 = GET_NODES') - if i > 0: - return - if filename == 'TCP_common.c': return - - i = message.find(' | ') - if i > 0: - message = message[:i] - # message = filename +'#' +str(line) +':'+func +' '+message - - name = 'core' - # old level is meaningless - level = 10 # LOG.level - - # LOG._log(LOG.level, f"{level}: {message}", list()) - - i = message.find('(0: OK)') - if i > 0: - level = 10 # LOG.debug - else: - i = message.find('(1: ') - if i > 0: - level = 30 # LOG.warn - else: - level = 20 # LOG.info - - o = LOG.makeRecord(filename, level, func, line, message, list(), None) - # LOG.handle(o) - LOG_TRACE(f"{level}: {func}{line} {message}") - return - - elif level == 1: - LOG.critical(f"{level}: {message}") - elif level == 2: - LOG.error(f"{level}: {message}") - elif level == 3: - LOG.warn(f"{level}: {message}") - elif level == 4: - LOG.info(f"{level}: {message}") - elif level == 5: - LOG.debug(f"{level}: {message}") - else: - LOG_TRACE(f"{level}: {message}") - -def vAddLoggerCallback(tox_options, callback=None) -> None: - if callback is None: - tox_wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback( - tox_options._options_pointer, - POINTER(None)()) - tox_options.self_logger_cb = None - return - - c_callback = CFUNCTYPE(None, c_void_p, c_int, c_char_p, c_int, c_char_p, c_char_p, c_void_p) - tox_options.self_logger_cb = c_callback(callback) - tox_wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback( - tox_options._options_pointer, - tox_options.self_logger_cb) - -def get_video_indexes() -> list: - # Linux - return [str(l[5:]) for l in os.listdir('/dev/') if l.startswith('video')] - -def get_audio(): - with ignoreStderr(): - import pyaudio - oPyA = pyaudio.PyAudio() - - input_devices = output_devices = 0 - for i in range(oPyA.get_device_count()): - device = oPyA.get_device_info_by_index(i) - if device["maxInputChannels"]: - input_devices += 1 - if device["maxOutputChannels"]: - output_devices += 1 - # {'index': 21, 'structVersion': 2, 'name': 'default', 'hostApi': 0, 'maxInputChannels': 64, 'maxOutputChannels': 64, 'defaultLowInputLatency': 0.008707482993197279, 'defaultLowOutputLatency': 0.008707482993197279, 'defaultHighInputLatency': 0.034829931972789115, 'defaultHighOutputLatency': 0.034829931972789115, 'defaultSampleRate': 44100.0} - audio = {'input': oPyA.get_default_input_device_info()['index'] if input_devices else -1, - 'output': oPyA.get_default_output_device_info()['index'] if output_devices else -1, - 'enabled': input_devices and output_devices} - return audio - -def oToxygenToxOptions(oArgs): - data = None - tox_options = tox_wrapper.tox.Tox.options_new() - if oArgs.proxy_type: - tox_options.contents.proxy_type = int(oArgs.proxy_type) - tox_options.contents.proxy_host = bytes(oArgs.proxy_host, 'UTF-8') - tox_options.contents.proxy_port = int(oArgs.proxy_port) - tox_options.contents.udp_enabled = False - else: - tox_options.contents.udp_enabled = oArgs.udp_enabled - if not os.path.exists('/proc/sys/net/ipv6'): - oArgs.ipv6_enabled = False - else: - tox_options.contents.ipv6_enabled = oArgs.ipv6_enabled - - tox_options.contents.tcp_port = int(oArgs.tcp_port) - tox_options.contents.dht_announcements_enabled = oArgs.dht_announcements_enabled - tox_options.contents.hole_punching_enabled = oArgs.hole_punching_enabled - - # overrides - tox_options.contents.local_discovery_enabled = False - tox_options.contents.experimental_thread_safety = False - # REQUIRED!! - if oArgs.ipv6_enabled and not os.path.exists('/proc/sys/net/ipv6'): - LOG.warning('Disabling IPV6 because /proc/sys/net/ipv6 does not exist' + repr(oArgs.ipv6_enabled)) - tox_options.contents.ipv6_enabled = False - else: - tox_options.contents.ipv6_enabled = bool(oArgs.ipv6_enabled) - - if data: # load existing profile - tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['TOX_SAVE'] - tox_options.contents.savedata_data = c_char_p(data) - tox_options.contents.savedata_length = len(data) - else: # create new profile - tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['NONE'] - tox_options.contents.savedata_data = None - tox_options.contents.savedata_length = 0 - - #? tox_options.contents.log_callback = LOG - if tox_options._options_pointer: - # LOG.debug("Adding logging to tox_options._options_pointer ") - vAddLoggerCallback(tox_options, on_log) - else: - LOG.warning("No tox_options._options_pointer " +repr(tox_options._options_pointer)) - - return tox_options - -def oMainArgparser(_=None, iMode=0): - # 'Mode: 0=chat 1=chat+audio 2=chat+audio+video default: 0' - if not os.path.exists('/proc/sys/net/ipv6'): - bIpV6 = 'False' - else: - bIpV6 = 'True' - lIpV6Choices=[bIpV6, 'False'] - - sNodesJson = os.path.join(os.environ['HOME'], '.config', 'tox', 'DHTnodes.json') - if not os.path.exists(sNodesJson): sNodesJson = '' - - logfile = os.path.join(os.environ.get('TMPDIR', '/tmp'), 'toxygen.log') - if not os.path.exists(logfile): logfile = '' - - parser = argparse.ArgumentParser(add_help=True) - parser.add_argument('--proxy_host', '--proxy-host', type=str, - # oddball - we want to use '' as a setting - default='0.0.0.0', - help='proxy host') - parser.add_argument('--proxy_port', '--proxy-port', default=0, type=int, - help='proxy port') - parser.add_argument('--proxy_type', '--proxy-type', default=0, type=int, - choices=[0,1,2], - help='proxy type 1=http, 2=socks') - parser.add_argument('--tcp_port', '--tcp-port', default=0, type=int, - help='tcp port') - parser.add_argument('--udp_enabled', type=str, default='True', - choices=['True', 'False'], - help='En/Disable udp') - parser.add_argument('--ipv6_enabled', type=str, default=bIpV6, - choices=lIpV6Choices, - help=f"En/Disable ipv6 - default {bIpV6}") - parser.add_argument('--trace_enabled',type=str, - default='True' if os.environ.get('DEBUG') else 'False', - choices=['True','False'], - help='Debugging from toxcore logger_trace or env DEBUG=1') - parser.add_argument('--download_nodes_list', type=str, default='False', - choices=['True', 'False'], - help='Download nodes list') - parser.add_argument('--nodes_json', type=str, - default=sNodesJson) - parser.add_argument('--network', type=str, - choices=['main', 'local'], - default='main') - parser.add_argument('--download_nodes_url', type=str, - default='https://nodes.tox.chat/json') - parser.add_argument('--logfile', default=logfile, - help='Filename for logging - start with + for stdout too') - parser.add_argument('--loglevel', default=logging.INFO, type=int, - # choices=[logging.info,logging.trace,logging.debug,logging.error] - help='Threshold for logging (lower is more) default: 20') - parser.add_argument('--mode', type=int, default=iMode, - choices=[0,1,2], - help='Mode: 0=chat 1=chat+audio 2=chat+audio+video default: 0') - parser.add_argument('--hole_punching_enabled',type=str, - default='False', choices=['True','False'], - help='En/Enable hole punching') - parser.add_argument('--dht_announcements_enabled',type=str, - default='True', choices=['True','False'], - help='En/Disable DHT announcements') -# argparse.ArgumentError: argument --save_history: conflicting option string: --save_history -# parser.add_argument('--save_history', type=str, default='True', -# choices=['True', 'False'], -# help='En/Disable saving history') - return parser - -def vSetupLogging(oArgs) -> None: - global LOG - logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') - logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' - logging._defaultFormatter.default_msec_format = '' - - add = None - kwargs = dict(level=oArgs.loglevel, - format='%(levelname)-8s %(message)s') - if oArgs.logfile: - add = oArgs.logfile.startswith('+') - sub = oArgs.logfile.startswith('-') - if add or sub: - oArgs.logfile = oArgs.logfile[1:] - kwargs['filename'] = oArgs.logfile - - if coloredlogs: - # https://pypi.org/project/coloredlogs/ - aKw = dict(level=oArgs.loglevel, - logger=LOG, - stream=sys.stdout, - fmt='%(name)s %(levelname)s %(message)s' - ) - coloredlogs.install(**aKw) - if oArgs.logfile: - oHandler = logging.FileHandler(oArgs.logfile) - LOG.addHandler(oHandler) - else: - logging.basicConfig(**kwargs) - if add: - oHandler = logging.StreamHandler(sys.stdout) - LOG.addHandler(oHandler) - - LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") - - -def setup_logging(oArgs) -> None: - global LOG - if coloredlogs: - aKw = dict(level=oArgs.loglevel, - logger=LOG, - fmt='%(name)s %(levelname)s %(message)s') - if oArgs.logfile: - oFd = open(oArgs.logfile, 'wt') - setattr(oArgs, 'log_oFd', oFd) - aKw['stream'] = oFd - coloredlogs.install(**aKw) - if oArgs.logfile: - oHandler = logging.StreamHandler(stream=sys.stdout) - LOG.addHandler(oHandler) - else: - aKw = dict(level=oArgs.loglevel, - format='%(name)s %(levelname)-4s %(message)s') - if oArgs.logfile: - aKw['filename'] = oArgs.logfile - logging.basicConfig(**aKw) - - logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') - logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' - logging._defaultFormatter.default_msec_format = '' - - LOG.setLevel(oArgs.loglevel) -# LOG.trace = lambda l: LOG.log(0, repr(l)) - LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") - -def signal_handler(num, f) -> None: - from trepan.api import debug - from trepan.interfaces import server as Mserver - connection_opts={'IO': 'TCP', 'PORT': 6666} - intf = Mserver.ServerInterface(connection_opts=connection_opts) - dbg_opts = {'interface': intf} - LOG.info('Starting TCP server listening on port 6666.') - debug(dbg_opts=dbg_opts) - return - -def merge_args_into_settings(args:list, settings:dict) -> None: - if args: - if not hasattr(args, 'audio'): - LOG.warn('No audio ' +repr(args)) - settings['audio'] = getattr(args, 'audio') - if not hasattr(args, 'video'): - LOG.warn('No video ' +repr(args)) - settings['video'] = getattr(args, 'video') - for key in settings.keys(): - # proxy_type proxy_port proxy_host - not_key = 'not_' +key - if hasattr(args, key): - val = getattr(args, key) - if type(val) == bytes: - # proxy_host - ascii? - # filenames - ascii? - val = str(val, 'UTF-8') - settings[key] = val - elif hasattr(args, not_key): - val = not getattr(args, not_key) - settings[key] = val - clean_settings(settings) - return - -def clean_settings(self:dict) -> None: - # failsafe to ensure C tox is bytes and Py settings is str - - # overrides - self['mirror_mode'] = False - self['save_history'] = True - # REQUIRED!! - if not os.path.exists('/proc/sys/net/ipv6'): - LOG.warn('Disabling IPV6 because /proc/sys/net/ipv6 does not exist') - self['ipv6_enabled'] = False - - if 'proxy_type' in self and self['proxy_type'] == 0: - self['proxy_host'] = '' - self['proxy_port'] = 0 - - if 'proxy_type' in self and self['proxy_type'] != 0 and \ - 'proxy_host' in self and self['proxy_host'] != '' and \ - 'proxy_port' in self and self['proxy_port'] != 0: - if 'udp_enabled' in self and self['udp_enabled']: - # We don't currently support UDP over proxy. - LOG.info("UDP enabled and proxy set: disabling UDP") - self['udp_enabled'] = False - if 'local_discovery_enabled' in self and self['local_discovery_enabled']: - LOG.info("local_discovery_enabled enabled and proxy set: disabling local_discovery_enabled") - self['local_discovery_enabled'] = False - if 'dht_announcements_enabled' in self and self['dht_announcements_enabled']: - LOG.info("dht_announcements_enabled enabled and proxy set: disabling dht_announcements_enabled") - self['dht_announcements_enabled'] = False - - if 'auto_accept_path' in self and \ - type(self['auto_accept_path']) == bytes: - self['auto_accept_path'] = str(self['auto_accept_path'], 'UTF-8') - - LOG.debug("Cleaned settings") - -def lSdSamplerates(iDev:int) -> list: - try: - import sounddevice as sd - except ImportError: - return [] - samplerates = (32000, 44100, 48000, 96000, ) - device = iDev - supported_samplerates = [] - for fs in samplerates: - try: - sd.check_output_settings(device=device, samplerate=fs) - except Exception as e: - # LOG.debug(f"Sample rate not supported {fs}" +' '+str(e)) - pass - else: - supported_samplerates.append(fs) - return supported_samplerates - -def _get_nodes_path(oArgs:str): - if oArgs and hasattr(oArgs, 'nodes_json') and \ - oArgs.nodes_json and os.path.isfile(oArgs.nodes_json): - default = oArgs.nodes_json - elif get_user_config_path: - default = os.path.join(get_user_config_path(), 'toxygen_nodes.json') - else: - # Windwoes - default = os.path.join(os.getenv('HOME'), '.config', 'tox', 'toxygen_nodes.json') - LOG.debug("_get_nodes_path: " +default) - return default - -DEFAULT_NODES_COUNT = 8 - -global aNODES -aNODES = {} - -# @functools.lru_cache(maxsize=12) TypeError: unhashable type: 'Namespace' -def generate_nodes(oArgs=None, - nodes_count:int = DEFAULT_NODES_COUNT, - ipv:str = 'ipv4', - udp_not_tcp=True) -> dict: - global aNODES - sKey = ipv - sKey += ',0' if udp_not_tcp else ',1' - if sKey in aNODES and aNODES[sKey]: - return aNODES[sKey] - sFile = _get_nodes_path(oArgs) - assert os.path.exists(sFile), sFile - lNodes = generate_nodes_from_file(sFile, - nodes_count=nodes_count, - ipv=ipv, - udp_not_tcp=udp_not_tcp) - assert lNodes - aNODES[sKey] = lNodes - return aNODES[sKey] - -aNODES_CACHE = {} -def generate_nodes_from_file(sFile:str, - nodes_count:int = DEFAULT_NODES_COUNT, - ipv:str = 'ipv4', - udp_not_tcp:bool = True, - ) -> dict: - """https://github.com/TokTok/c-toxcore/issues/469 -I had a conversation with @irungentoo on IRC about whether we really need to call tox_bootstrap() when having UDP disabled and why. The answer is yes, because in addition to TCP relays (tox_add_tcp_relay()), toxcore also needs to know addresses of UDP onion nodes in order to work correctly. The DHT, however, is not used when UDP is disabled. tox_bootstrap() function resolves the address passed to it as argument and calls onion_add_bs_node_path() and DHT_bootstrap() functions. Although calling DHT_bootstrap() is not really necessary as DHT is not used, we still need to resolve the address of the DHT node in order to populate the onion routes with onion_add_bs_node_path() call. -""" - global aNODES_CACHE - - key = ipv - key += ',0' if udp_not_tcp else ',1' - if key in aNODES_CACHE: - sorted_nodes = aNODES_CACHE[key] - else: - if not os.path.exists(sFile): - LOG.error("generate_nodes_from_file file not found " +sFile) - return [] - try: - with open(sFile, 'rt') as fl: - json_nodes = json.loads(fl.read())['nodes'] - except Exception as e: - LOG.error(f"generate_nodes_from_file error {sFile}\n{e}") - return [] - else: - LOG.debug("generate_nodes_from_file " +sFile) - - if udp_not_tcp: - nodes = [(node[ipv], node['port'], node['public_key'],) for - node in json_nodes if node[ipv] != 'NONE' \ - and node["status_udp"] in [True, "true"] - ] - else: - nodes = [] - elts = [(node[ipv], node['tcp_ports'], node['public_key'],) \ - for node in json_nodes if node[ipv] != 'NONE' \ - and node["status_tcp"] in [True, "true"] - ] - for (ipv, ports, public_key,) in elts: - for port in ports: - nodes += [(ipv, port, public_key)] - if not nodes: - LOG.warn(f'empty generate_nodes from {sFile} {json_nodes!r}') - return [] - sorted_nodes = nodes - aNODES_CACHE[key] = sorted_nodes - - random.shuffle(sorted_nodes) - if nodes_count is not None and len(sorted_nodes) > nodes_count: - sorted_nodes = sorted_nodes[-nodes_count:] - LOG.debug(f"generate_nodes_from_file {sFile} len={len(sorted_nodes)}") - return sorted_nodes - -def tox_bootstrapd_port() -> int: - port = 33446 - sFile = '/etc/tox-bootstrapd.conf' - if os.path.exists(sFile): - with open(sFile, 'rt') as oFd: - for line in oFd.readlines(): - if line.startswith('port = '): - port = int(line[7:]) - return port - -def bootstrap_local(elts:list, lToxes:list, oArgs=None): - if os.path.exists('/run/tox-bootstrapd/tox-bootstrapd.pid'): - LOG.debug('/run/tox-bootstrapd/tox-bootstrapd.pid') - iRet = True - else: - iRet = os.system("netstat -nle4|grep -q :33") - if iRet > 0: - LOG.warn(f'bootstraping local No local DHT running') - LOG.info(f'bootstraping local') - return bootstrap_udp(elts, lToxes, oArgs) - -def lDNSClean(l:list) -> list: - global lDEAD_BS - # list(set(l).difference(set(lDEAD_BS))) - return [elt for elt in l if elt not in lDEAD_BS] - -def lExitExcluder(oArgs, iPort:int =9051) -> list: - """ - https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py - """ - if not stem: - LOG.warn('please install the stem Python package') - return '' - LOG.debug('lExcludeExitNodes') - - try: - controller = oGetStemController(log_level=10) - # generator - relays = controller.get_server_descriptors() - except Exception as e: - LOG.error(f'Failed to get relay descriptors {e}') - return None - - if controller.is_set('ExcludeExitNodes'): - LOG.info('ExcludeExitNodes is in use already.') - return None - - exit_excludelist=[] - LOG.debug("Excluded exit relays:") - for relay in relays: - if relay.exit_policy.is_exiting_allowed() and not relay.contact: - if is_valid_fingerprint(relay.fingerprint): - exit_excludelist.append(relay.fingerprint) - LOG.debug("https://metrics.torproject.org/rs.html#details/%s" % relay.fingerprint) - else: - LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint) - - try: - controller.set_conf('ExcludeExitNodes', exit_excludelist) - LOG.info('Excluded a total of %s exit relays without ContactInfo from the exit position.' % len(exit_excludelist)) - except Exception as e: - LOG.exception('ExcludeExitNodes ' +str(e)) - return exit_excludelist - -aHOSTS = {} -@functools.lru_cache(maxsize=20) -def sDNSLookup(host:str) -> str: - global aHOSTS - ipv = 0 - if host in lDEAD_BS: -# LOG.warn(f"address skipped because in lDEAD_BS {host}") - return '' - if host in aHOSTS: - return aHOSTS[host] - - try: - s = host.replace('.','') - int(s) - ipv = 4 - except: - try: - s = host.replace(':','') - int(s) - ipv = 6 - except: pass - - if ipv > 0: -# LOG.debug(f"v={ipv} IP address {host}") - return host - - LOG.debug(f"sDNSLookup {host}") - ip = '' - if host.endswith('.tox') or host.endswith('.onion'): - if False and stem: - ip = sMapaddressResolv(host) - if ip: return ip - - ip = sTorResolve(host) - if ip: return ip - - if not bHAVE_TORR: - LOG.warn(f"onion address skipped because no tor-resolve {host}") - return '' - try: - sout = f"/tmp/TR{os.getpid()}.log" - i = os.system(f"tor-resolve -4 {host} > {sout}") - if not i: - LOG.warn(f"onion address skipped because tor-resolve on {host}") - return '' - ip = open(sout, 'rt').read() - if ip.endswith('failed.'): - LOG.warn(f"onion address skipped because tor-resolve failed on {host}") - return '' - LOG.debug(f"onion address tor-resolve {ip} on {host}") - return ip - except: - pass - else: - try: - ip = socket.gethostbyname(host) - LOG.debug(f"host={host} gethostbyname IP address {ip}") - if ip: - aHOSTS[host] = ip - return ip - # drop through - except: - # drop through - pass - - if ip == '': - try: - sout = f"/tmp/TR{os.getpid()}.log" - i = os.system(f"dig {host} +timeout=15|grep ^{host}|sed -e 's/.* //'> {sout}") - if not i: - LOG.warn(f"address skipped because dig failed on {host}") - return '' - ip = open(sout, 'rt').read().strip() - LOG.debug(f"address dig {ip} on {host}") - aHOSTS[host] = ip - return ip - except: - ip = host - LOG.debug(f'sDNSLookup {host} -> {ip}') - if ip and ip != host: - aHOSTS[host] = ip - return ip - -def bootstrap_udp(lelts:list, lToxes:list, oArgs=None) -> None: - lelts = lDNSClean(lelts) - socket.setdefaulttimeout(15.0) - for oTox in lToxes: - random.shuffle(lelts) - if hasattr(oTox, 'oArgs'): - oArgs = oTox.oArgs - if hasattr(oArgs, 'contents') and oArgs.contents.proxy_type != 0: - lelts = lelts[:1] - -# LOG.debug(f'bootstrap_udp DHT bootstraping {oTox.name} {len(lelts)}') - for largs in lelts: - assert len(largs) == 3 - host, port, key = largs - assert host; assert port; assert key - if host in lDEAD_BS: continue - ip = sDNSLookup(host) - if not ip: - LOG.warn(f'bootstrap_udp to host={host} port={port} did not resolve ip={ip}') - continue - - if type(port) == str: - port = int(port) - try: - assert len(key) == 64, key - # NOT ip - oRet = oTox.bootstrap(host, - port, - key) - except Exception as e: - if oArgs is None or ( - hasattr(oArgs, 'contents') and oArgs.contents.proxy_type == 0): - pass - # LOG.error(f'bootstrap_udp failed to host={host} port={port} {e}') - continue - if not oRet: - LOG.warn(f'bootstrap_udp failed to {host} : {oRet}') - elif oTox.self_get_connection_status() != enums.TOX_CONNECTION['NONE']: - LOG.info(f'bootstrap_udp to {host} connected') - break - else: -# LOG.debug(f'bootstrap_udp to {host} not connected') - pass - -def bootstrap_tcp(lelts:list, lToxes:list, oArgs=None) -> None: - lelts = lDNSClean(lelts) - for oTox in lToxes: - if hasattr(oTox, 'oArgs'): oArgs = oTox.oArgs - random.shuffle(lelts) -# LOG.debug(f'bootstrap_tcp bootstapping {oTox.name} {len(lelts)}') - for (host, port, key,) in lelts: - assert host; assert port;assert key - if host in lDEAD_BS: continue - ip = sDNSLookup(host) - if not ip: - LOG.warn(f'bootstrap_tcp to {host} did not resolve ip={ip}') -# continue - ip = host - if host.endswith('.onion') and stem: - l = lIntroductionPoints(host) - if not l: - LOG.warn(f'bootstrap_tcp to {host} has no introduction points') - continue - if type(port) == str: - port = int(port) - try: - assert len(key) == 64, key - oRet = oTox.add_tcp_relay(ip, - port, - key) - except Exception as e: - # The address could not be resolved to an IP address, or the IP address passed was invalid. - LOG.warn(f'bootstrap_tcp to {host} : ' +str(e)) - continue - if not oRet: - LOG.warn(f'bootstrap_tcp failed to {host} : {oRet}') - elif hasattr(oTox, 'mycon_time') and oTox.mycon_time == 1: - LOG.debug(f'bootstrap_tcp to {host} not yet connected') - elif hasattr(oTox, 'mycon_status') and oTox.mycon_status is False: - LOG.debug(f'bootstrap_tcp to {host} not True') - elif oTox.self_get_connection_status() != enums.TOX_CONNECTION['NONE']: - LOG.info(f'bootstrap_tcp to {host} connected') - break - else: -# LOG.debug(f'bootstrap_tcp to {host} but not connected' -# +f" last={int(oTox.mycon_time)}" ) - pass - -def iNmapInfoNmap(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = '') -> int: - if sHost in ['-', 'NONE']: return 0 - if not nmap: return 0 - nmps = nmap.PortScanner - if sProt in ['socks', 'socks5', 'tcp4']: - prot = 'tcp' - cmd = f" -Pn -n -sT -p T:{sPort}" - else: - prot = 'udp' - cmd = f" -Pn -n -sU -p U:{sPort}" - LOG.debug(f"iNmapInfoNmap cmd={cmd}") - sys.stdout.flush() - o = nmps().scan(hosts=sHost, arguments=cmd) - aScan = o['scan'] - ip = list(aScan.keys())[0] - state = aScan[ip][prot][sPort]['state'] - LOG.info(f"iNmapInfoNmap: to {sHost} {state}") - return 0 - -def iNmapInfo(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = 'nmap'): - if sHost in ['-', 'NONE']: return 0 - sFile = os.path.join("/tmp", f"{sHost}.{os.getpid()}.nmap") - if sProt in ['socks', 'socks5', 'tcp4']: - cmd += f" -Pn -n -sT -p T:{sPort} {sHost} | grep /tcp " - else: - cmd += f" -Pn -n -sU -p U:{sPort} {sHost} | grep /udp " - LOG.debug(f"iNmapInfo cmd={cmd}") - sys.stdout.flush() - iRet = os.system(cmd +f" >{sFile} 2>&1 ") - LOG.debug(f"iNmapInfo cmd={cmd} iRet={iRet}") - if iRet != 0: - return iRet - assert os.path.exists(sFile), sFile - with open(sFile, 'rt') as oFd: - l = oFd.readlines() - assert len(l) - l = [line for line in l if line and not line.startswith('WARNING:')] - s = '\n'.join([s.strip() for s in l]) - LOG.info(f"iNmapInfo: to {sHost}\n{s}") - return 0 - - -# bootstrap_iNmapInfo(lElts, self._args, sProt) -def bootstrap_iNmapInfo(lElts:list, oArgs, protocol:str = "tcp4", bIS_LOCAL:bool = False, iNODES:int = iNODES, cmd:str = 'nmap') -> bool: - if not bIS_LOCAL and not bAreWeConnected(): - LOG.warn(f"bootstrap_iNmapInfo not local and NOT CONNECTED") - return True - if os.environ['USER'] != 'root': - LOG.warn(f"bootstrap_iNmapInfo not ROOT USER={os.environ['USER']}") - cmd = 'sudo ' +cmd - - lRetval = [] - LOG.info(f"bootstrap_iNmapInfo testing nmap={nmap} len={len(lElts[:iNODES])}") - for elts in lElts[:iNODES]: - host, port, key = elts - ip = sDNSLookup(host) - if not ip: - LOG.info(f"bootstrap_iNmapInfo to {host} did not resolve ip={ip}") - continue - if type(port) == str: - port = int(port) - iRet = -1 - try: - if not nmap: - iRet = iNmapInfo(protocol, ip, port, key, cmd=cmd) - else: - iRet = iNmapInfoNmap(protocol, ip, port, key) - if iRet != 0: - LOG.warn('iNmapInfo to ' +repr(host) +' retval=' +str(iRet)) - lRetval += [False] - else: - LOG.info('iNmapInfo to ' +repr(host) +' retval=' +str(iRet)) - lRetval += [True] - except Exception as e: - LOG.exception('iNmapInfo to {host} : ' +str(e) - ) - lRetval += [False] - return any(lRetval) - -def caseFactory(cases:list) -> list: - """We want the tests run in order.""" - if len(cases) > 1: - ordered_cases = sorted(cases, key=lambda f: inspect.findsource(f)[1]) - else: - ordered_cases = cases - return ordered_cases - -def suiteFactory(*testcases): - """We want the tests run in order.""" - linen = lambda f: getattr(tc, f).__code__.co_firstlineno - lncmp = lambda a, b: linen(a) - linen(b) - - test_suite = unittest.TestSuite() - for tc in testcases: - test_suite.addTest(unittest.makeSuite(tc, sortUsing=lncmp)) - return test_suite diff --git a/tox_wrapper/tests/tests_wrapper.py b/tox_wrapper/tests/tests_wrapper.py deleted file mode 100644 index 6069584..0000000 --- a/tox_wrapper/tests/tests_wrapper.py +++ /dev/null @@ -1,2271 +0,0 @@ -# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- -# -# @file tests.py -# @author Wei-Ning Huang (AZ) -# -# Copyright (C) 2013 - 2014 Wei-Ning Huang (AZ) -# All Rights reserved. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# - -"""Originaly from https://github.com/oxij/PyTox c-toxcore-02 branch -which itself was forked from https://github.com/aitjcize/PyTox/ - -Modified to work with -""" - -import ctypes -import faulthandler -import hashlib -import logging -import os -import random -import re -import sys -import threading -import traceback -import unittest -from ctypes import * -from typing import Union, Callable, Union - -faulthandler.enable() - -import warnings -warnings.filterwarnings('ignore') - -try: - from io import BytesIO - import certifi - import pycurl -except ImportError: - pycurl = None - -from pyannotate_runtime import collect_types - -try: - import coloredlogs - os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' -except ImportError as e: - logging.log(logging.DEBUG, f"coloredlogs not available: {e}") - coloredlogs = None - -try: - import color_runner -except ImportError as e: - logging.log(logging.DEBUG, f"color_runner not available: {e}") - color_runner = None - -import tox_wrapper -import tox_wrapper.toxcore_enums_and_consts as enums -from tox_wrapper.tox import Tox, UINT32_MAX, ToxError - -from tox_wrapper.toxcore_enums_and_consts import (TOX_ADDRESS_SIZE, TOX_CONNECTION, - TOX_FILE_CONTROL, - TOX_MESSAGE_TYPE, - TOX_SECRET_KEY_SIZE, - TOX_USER_STATUS) - -try: - import support_testing as ts -except ImportError: - import tox_wrapper.tests.support_testing as ts - -try: - from tests.toxygen_tests import test_sound_notification - bIS_NOT_TOXYGEN = False -except ImportError: - bIS_NOT_TOXYGEN = True - -# from PyQt5 import QtCore -import time - -sleep = time.sleep - -global LOG -LOG = logging.getLogger('TestS') -if False: - def LOG_ERROR(l: str) -> None: LOG.error('+ '+l) - def LOG_WARN(l: str) -> None: LOG.warn('+ '+l) - def LOG_INFO(l: str) -> None: LOG.info('+ '+l) - def LOG_DEBUG(l: str) -> None: LOG.debug('+ '+l) - def LOG_TRACE(l: str) -> None: pass # print('+ '+l) -else: - # just print to stdout so there is NO complications from logging. - def LOG_ERROR(l: str) -> None: print('EROR+ '+l) - def LOG_WARN(l: str) -> None: print('WARN+ '+l) - def LOG_INFO(l: str) -> None: print('INFO+ '+l) - def LOG_DEBUG(l: str) -> None: print('DEBUG+ '+l) - def LOG_TRACE(l: str) -> None: pass # print('TRAC+ '+l) - -ADDR_SIZE = 38 * 2 -CLIENT_ID_SIZE = 32 * 2 -THRESHOLD = 35 # >25 -iN = 6 - -global oTOX_OPTIONS -oTOX_OPTIONS = {} - -bIS_LOCAL = 'new' in sys.argv or 'local' in sys.argv or 'newlocal' in sys.argv -bUSE_NOREQUEST = None - -def expectedFailure(test_item): - test_item.__unittest_expecting_failure__ = True - return test_item - -def expectedFail(reason: str): - """ - expectedFailure with a reason - """ - def decorator(test_item): - test_item.__unittest_expecting_failure__ = True - return test_item - return decorator - -class ToxOptions(): - def __init__(self): - self.ipv6_enabled = True - self.udp_enabled = True - self.proxy_type = 0 - self.proxy_host = '' - self.proxy_port = 0 - self.start_port = 0 - self.end_port = 0 - self.tcp_port = 0 - self.savedata_type = 0 # 1=toxsave, 2=secretkey - self.savedata_data = b'' - self.savedata_length = 0 - self.local_discovery_enabled = False - self.dht_announcements_enabled = True - self.hole_punching_enabled = False - self.experimental_thread_safety = False - -class App(): - def __init__(self): - self.mode = 0 -oAPP = App() - -class AliceTox(Tox): - - def __init__(self, opts, app=None): - - super(AliceTox, self).__init__(opts, app=app) - self._address = self.self_get_address() - self.name = 'alice' - self._opts = opts - self._app = app - -class BobTox(Tox): - - def __init__(self, opts, app=None): - super(BobTox, self).__init__(opts, app=app) - self._address = self.self_get_address() - self.name = 'bob' - self._opts = opts - self._app = app - -class BaseThread(threading.Thread): - - def __init__(self, name=None, target=None): - if name: - super().__init__(name=name, target=target) - else: - super().__init__(target=target) - self._stop_thread = False - self.name = name - - def stop_thread(self, timeout: int = -1) -> None: - self._stop_thread = True - if timeout < 0: - timeout = ts.iTHREAD_TIMEOUT - i = 0 - while i < ts.iTHREAD_JOINS: - self.join(timeout) - if not self.is_alive(): break - i = i + 1 - else: - LOG.warning(f"{self.name} BLOCKED") - -class ToxIterateThread(BaseThread): - - def __init__(self, tox): - super().__init__(name='ToxIterateThread') - self._tox = tox - - def run(self) -> None: - while not self._stop_thread: - self._tox.iterate() - sleep(self._tox.iteration_interval() / 1000) - -global bob, alice -bob = alice = None - -def prepare(self): - global bob, alice - def bobs_on_self_connection_status(iTox, connection_state, *args) -> None: - status = connection_state - self.bob.dht_connected = status - self.bob.mycon_time = time.time() - try: - if status != TOX_CONNECTION['NONE']: - LOG_DEBUG(f"bobs_on_self_connection_status TRUE {status}" \ - +f" last={int(self.bob.mycon_time)}" ) - self.bob.mycon_status = True - else: - LOG_DEBUG(f"bobs_on_self_connection_status FALSE {status}" \ - +f" last={int(self.bob.mycon_time)}" ) - self.bob.mycon_status = False - except Exception as e: - LOG_ERROR(f"bobs_on_self_connection_status {e}") - else: - if self.bob.self_get_connection_status() != status: - LOG_WARN(f"bobs_on_self_connection_status DISAGREE {status}") - - def alices_on_self_connection_status(iTox, connection_state: int, *args) -> None: - #FixMe connection_num - status = connection_state - self.alice.dht_connected = status - self.alice.mycon_time = time.time() - try: - if status != TOX_CONNECTION['NONE']: - LOG_DEBUG(f"alices_on_self_connection_status TRUE {status}" \ - +f" last={int(self.alice.mycon_time)}" ) - self.alice.mycon_status = True - else: - LOG_WARN(f"alices_on_self_connection_status FALSE {status}" \ - +f" last={int(self.alice.mycon_time)}" ) - self.alice.mycon_status = False - except Exception as e: - LOG_ERROR(f"alices_on_self_connection_status error={e}") - self.alice.dht_connected = status - - opts = oToxygenToxOptions(oTOX_OARGS) - global bUSE_NOREQUEST - bUSE_NOREQUEST = oTOX_OARGS.norequest == 'True' - - alice = AliceTox(opts, app=oAPP) - alice.oArgs = opts - alice.dht_connected = -1 - alice.mycon_status = False - alice.mycon_time = 1 - alice.callback_self_connection_status(alices_on_self_connection_status) - - bob = BobTox(opts, app=oAPP) - bob.oArgs = opts - bob.dht_connected = -1 - bob.mycon_status = False - bob.mycon_time = 1 - bob.callback_self_connection_status(bobs_on_self_connection_status) - if not bIS_LOCAL and not ts.bAreWeConnected(): - LOG.warning(f"doOnce not local and NOT CONNECTED") - return [bob, alice] - -class ToxSuite(unittest.TestCase): - failureException = AssertionError - - @classmethod - def setUpClass(cls) -> None: - global oTOX_OARGS - assert oTOX_OPTIONS - assert oTOX_OARGS - - cls.lUdp = ts.generate_nodes( - oArgs=oTOX_OARGS, - nodes_count=2*ts.iNODES, - ipv='ipv4', - udp_not_tcp=True) - - cls.lTcp = ts.generate_nodes( - oArgs=oTOX_OARGS, - nodes_count=2*ts.iNODES, - ipv='ipv4', - udp_not_tcp=False) - - def tearDown(self) -> None: - """ - """ - if hasattr(self, 'bob') and self.bob.self_get_friend_list_size() >= 1: - LOG.warn(f"tearDown BOBS STILL HAS A FRIEND LIST {self.bob.self_get_friend_list()}") - for elt in self.bob.self_get_friend_list(): - self.bob.friend_delete(elt) - if hasattr(self, 'alice') and self.alice.self_get_friend_list_size() >= 1: - LOG.warn(f"tearDown ALICE STILL HAS A FRIEND LIST {self.alice.self_get_friend_list()}") - for elt in self.alice.self_get_friend_list(): - self.alice.friend_delete(elt) - - LOG.debug(f"tearDown threads={threading.active_count()}") - if hasattr(self, 'bob'): - bob.callback_self_connection_status(None) - if hasattr(self.bob, 'main_loop'): - self.bob._main_loop.stop_thread() - del self.bob._main_loop -# self.bob.kill() - del self.bob - if hasattr(self, 'alice'): - alice.callback_self_connection_status(None) - if hasattr(self.alice, 'main_loop'): - self.alice._main_loop.stop_thread() - del self.alice._main_loop -# self.alice.kill() - del self.alice - - @classmethod - def tearDownClass(cls) -> None: - if hasattr(cls, 'bob'): - cls.bob._main_loop.stop_thread() - cls.bob.kill() - del cls.bob - if hasattr(cls, 'alice'): - cls.alice._main_loop.stop_thread() - cls.alice.kill() - del cls.alice - - def bBobNeedAlice(self) -> bool: - """ - """ - if hasattr(self, 'baid') and self.baid >= 0 and \ - self.baid in self.bob.self_get_friend_list(): - LOG.warn(f"setUp ALICE IS ALREADY IN BOBS FRIEND LIST") - return False - elif self.bob.self_get_friend_list_size() >= 1: - LOG.warn(f"setUp BOB STILL HAS A FRIEND LIST") - return False - return True - - def bAliceNeedAddBob (self) -> bool: - if hasattr(self, 'abid') and self.abid >= 0 and \ - self.abid in self.alice.self_get_friend_list(): - LOG.warn(f"setUp BOB IS ALREADY IN ALICES FRIEND LIST") - return False - elif self.alice.self_get_friend_list_size() >= 1: - LOG.warn(f"setUp ALICE STILL HAS A FRIEND LIST") - return False - return True - - def setUp(self): - cls = self - if not hasattr(cls, 'alice') and not hasattr(cls, 'bob'): - l = prepare(cls) - assert l - cls.bob, cls.alice = l - if not hasattr(cls.bob, '_main_loop'): -#? cls.bob._main_loop = ToxIterateThread(cls.bob) -#? cls.bob._main_loop.start() - LOG.debug(f"cls.bob._main_loop: ") # {threading.enumerate()} - if not hasattr(cls.alice, '_main_loop'): -#? cls.alice._main_loop = ToxIterateThread(cls.alice) -#? cls.alice._main_loop.start() - LOG.debug(f"cls.alice._main_loop: ") # {threading.enumerate()} - - self.bBobNeedAlice() - self.bAliceNeedAddBob() - - def run(self, result=None) -> None: - """ Stop after first error """ - if not result.errors: - super(ToxSuite, self).run(result) - - def get_connection_status(self) -> None: - if self.bob.mycon_time <= 1 or self.alice.mycon_time <= 1: - pass - # drop through - elif self.bob.dht_connected == TOX_CONNECTION['NONE']: - return False - elif self.alice.dht_connected == TOX_CONNECTION['NONE']: - return False - - # if not self.connected - if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']: - return False - if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']: - return False - return True - - def loop(self, n) -> None: - """ - t:iterate - t:iteration_interval - """ - interval = self.bob.iteration_interval() - for i in range(n): - self.alice.iterate() - self.bob.iterate() - sleep(interval / 1000.0) - - def call_bootstrap(self, num: Union[int, None] = None, lToxes:list[int] =None, i:int =0) -> None: - if num == None: num=ts.iNODES - if lToxes is None: - lToxes = [self.alice, self.bob] -# LOG.debug(f"call_bootstrap network={oTOX_OARGS.network}") - if oTOX_OARGS.network in ['new', 'newlocal', 'localnew']: - ts.bootstrap_local(self.lUdp, lToxes) - elif not ts.bAreWeConnected(): - LOG.warning('we are NOT CONNECTED') - else: - random.shuffle(self.lUdp) - if oTOX_OARGS.proxy_port > 0: - lElts = self.lUdp[:1] - else: - lElts = self.lUdp[:num+i] - LOG.debug(f"call_bootstrap ts.bootstrap_udp {len(lElts)}") - ts.bootstrap_udp(lElts, lToxes) - random.shuffle(self.lTcp) - lElts = self.lTcp[:num+i] - LOG.debug(f"call_bootstrap ts.bootstrap_tcp {len(lElts)}") - ts.bootstrap_tcp(lElts, lToxes) - - def group_until_connected(self, otox, group_number:int, num: Union[int, None] = None, iMax:int = THRESHOLD) -> None: - """ - """ - i = 0 - bRet = None - while i <= iMax : - iRet = otox.group_is_connected(group_number) - if iRet == True or iRet == 0: - bRet = True - break - if i % 5 == 0: - j = i//5 - self.call_bootstrap(num, lToxes=None, i=j) - s = '' - if i == 0: s = '\n' - LOG.info(s+"group_until_connected " \ - +" #" + str(i) \ - +" iRet=" +repr(iRet) \ - +f" BOBS={otox.mycon_status}" \ - +f" last={int(otox.mycon_time)}" ) - i += 1 - self.loop(100) - else: - bRet = False - - if bRet: - LOG.info(f"group_until_connected True i={i}" \ - +f" iMax={iMax}" \ - +f" BOB={otox.self_get_connection_status()}" \ - +f" last={int(otox.mycon_time)}" ) - return True - else: - LOG.warning(f"group_until_connected False i={i}" \ - +f" iMax={iMax}" \ - +f" BOB={otox.self_get_connection_status()}" \ - +f" last={int(otox.mycon_time)}" ) - return False - - def loop_until_connected(self, num: Union[int, None] = None) -> None: - """ - t:on_self_connection_status - t:self_get_connection_status - """ - global THRESHOLD - i = 0 - bRet = None - while i <= THRESHOLD : - if (self.alice.mycon_status and self.bob.mycon_status): - bRet = True - break - if i % 5 == 0: - j = i//5 - self.call_bootstrap(num, lToxes=None, i=j) - s = '' - if i == 0: s = '\n' - LOG.info(s+"loop_until_connected " \ - +" #" + str(i) \ - +" BOB=" +repr(self.bob.self_get_connection_status()) \ - +" ALICE=" +repr(self.alice.self_get_connection_status()) - +f" BOBS={self.bob.mycon_status}" \ - +f" ALICES={self.alice.mycon_status}" \ - +f" last={int(self.bob.mycon_time)}" ) - if (self.alice.mycon_status and self.bob.mycon_status): - bRet = True - break - if (self.alice.self_get_connection_status() and - self.bob.self_get_connection_status()): - LOG_WARN(f"loop_until_connected disagree status() DISAGREE" \ - +f' self.bob.mycon_status={self.bob.mycon_status}' \ - +f' alice.mycon_status={self.alice.mycon_status}' \ - +f" last={int(self.bob.mycon_time)}" ) - bRet = True - break - i += 1 - self.loop(100) - else: - bRet = False - - if bRet or \ - ( self.bob.self_get_connection_status() != TOX_CONNECTION['NONE'] and \ - self.alice.self_get_connection_status() != TOX_CONNECTION['NONE'] ): - LOG.info(f"loop_until_connected returning True {i}" \ - +f" BOB={self.bob.self_get_connection_status()}" \ - +f" ALICE={self.alice.self_get_connection_status()}" \ - +f" last={int(self.bob.mycon_time)}" ) - return True - else: - THRESHOLD += 5 - LOG.warning(f"loop_until_connected returning False {i}" \ - +f" BOB={self.bob.self_get_connection_status()}" \ - +f" ALICE={self.alice.self_get_connection_status()}" \ - +f" last={int(self.bob.mycon_time)}" ) - return False - - def wait_objs_attr(self, objs: list, attr: str) -> bool: - global THRESHOLD - i = 0 - while i <= THRESHOLD: - if i % 5 == 0: - num = None - j = 0 - j = i//5 - self.call_bootstrap(num, objs, i=j) - LOG.debug(f"wait_objs_attr {objs} for {attr} {i}") - if all([getattr(obj, attr) for obj in objs]): - return True - self.loop(100) - i += 1 - else: - THRESHOLD += 1 - LOG.warn(f"wait_objs_attr for {attr} i >= {THRESHOLD}") - - return all([getattr(obj, attr) is not None for obj in objs]) - - def wait_otox_attrs(self, obj, attrs: list[str]) -> bool: - assert all(attrs), f"wait_otox_attrs {attrs}" - i = 0 - while i <= THRESHOLD: - if i % 5 == 0: - num = None - j = 0 - if obj.mycon_time == 1: - num = 4 - j = i//5 - if obj.self_get_connection_status() == TOX_CONNECTION['NONE']: - self.call_bootstrap(num, [obj], i=j) - LOG.debug(f"wait_otox_attrs {obj.name} for {attrs} {i}" \ - +f" last={int(obj.mycon_time)}") - if all([getattr(obj, attr) is not None for attr in attrs]): - return True - self.loop(100) - i += 1 - else: - LOG.warning(f"wait_otox_attrs i >= {THRESHOLD} results={[getattr(obj, attr) for attr in attrs]}") - - return all([getattr(obj, attr) for attr in attrs]) - - def wait_ensure_exec(self, method, args:list) -> bool: - i = 0 - oRet = None - while i <= THRESHOLD: - if i % 5 == 0: - j = i//5 - self.call_bootstrap(num=None, lToxes=None, i=j) - LOG.debug("wait_ensure_exec " \ - +" " +str(method) - +" " +str(i)) - try: - oRet = method(*args) - if oRet: - LOG.info(f"wait_ensure_exec oRet {oRet!r}") - return True - except ArgumentError as e: - # ArgumentError('This client is currently NOT CONNECTED to the friend.') - # dunno - LOG.warning(f"wait_ensure_exec ArgumentError {e}") - return False - except Exception as e: - LOG.warning(f"wait_ensure_exec EXCEPTION {e}") - return False - sleep(3) - i += 1 - else: - LOG.error(f"wait_ensure_exec i >= {1*THRESHOLD}") - return False - - return oRet - - def bob_add_alice_as_friend_norequest(self) -> bool: - if not self.bBobNeedAlice(): return True - - MSG = 'Hi, this is Bob.' - iRet = self.bob.friend_add_norequest(self.alice._address) - if iRet < 0: - return False - self.baid = self.bob.friend_by_public_key(self.alice._address) - assert self.baid >= 0, self.baid - assert self.bob.friend_exists(self.baid), "bob.friend_exists" - assert not self.bob.friend_exists(self.baid + 1) - assert self.baid in self.bob.self_get_friend_list() - assert self.bob.self_get_friend_list_size() >= 1 - return True - - def alice_add_bob_as_friend_norequest(self) -> bool: - if not self.bAliceNeedAddBob(): return True - - iRet = self.alice.friend_add_norequest(self.bob._address) - if iRet < 0: - return False - self.abid = self.alice.friend_by_public_key(self.bob._address) - assert self.abid >= 0, self.abid - assert self.abid in self.alice.self_get_friend_list() - assert self.alice.friend_exists(self.abid), "alice.friend_exists" - assert not self.alice.friend_exists(self.abid + 1) - assert self.alice.self_get_friend_list_size() >= 1 - return True - - def both_add_as_friend(self) -> bool: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend() - assert self.alice_add_bob_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend_norequest() - assert self.alice_add_bob_as_friend_norequest() - if not hasattr(self, 'baid') or self.baid < 0: - LOG.warn("both_add_as_friend no bob, baid") - if not hasattr(self, 'abid') or self.abid < 0: - LOG.warn("both_add_as_friend no alice, abid") - return True - - def both_add_as_friend_norequest(self) -> bool: - if self.bBobNeedAlice(): - assert self.bob_add_alice_as_friend_norequest() - if self.bAliceNeedAddBob(): - assert self.alice_add_bob_as_friend_norequest() - if not hasattr(self, 'baid') or self.baid < 0: - LOG.warn("both_add_as_friend_norequest no bob, baid") - if not hasattr(self, 'abid') or self.abid < 0: - LOG.warn("both_add_as_friend_norequest no alice, abid") - - #: Test last online -#? assert self.alice.friend_get_last_online(self.abid) is not None -#? assert self.bob.friend_get_last_online(self.baid) is not None - return True - - def bob_add_alice_as_friend(self) -> bool: - """ - t:friend_add - t:on_friend_request - t:friend_by_public_key - """ - MSG = 'Alice, this is Bob.' - sSlot = 'friend_request' - if not self.bBobNeedAlice(): return True - - def alices_on_friend_request(iTox, - public_key, - message_data, - message_data_size, - *largs) -> None: - LOG_DEBUG(f"alices_on_friend_request: " +repr(message_data)) - try: - assert str(message_data, 'UTF-8') == MSG - LOG_INFO(f"alices_on_friend_request: {sSlot} = True ") - except Exception as e: - LOG_WARN(f"alices_on_friend_request: EXCEPTION {e}") - # return - setattr(self.bob, sSlot, True) - - setattr(self.bob, sSlot, None) - inum = -1 - try: - inum = self.bob.friend_add(self.alice._address, bytes(MSG, 'UTF-8')) - assert inum >= 0, f"bob_add_alice_as_friend !>= 0 {inum}" - self.alice.callback_friend_request(alices_on_friend_request) - if not self.wait_otox_attrs(self.bob, [sSlot]): - LOG_WARN(f"bob_add_alice_as_friend NO {sSlot}") - # return False - self.baid = self.bob.friend_by_public_key(self.alice._address) - assert self.baid >= 0, self.baid - assert self.bob.friend_exists(self.baid) - assert not self.bob.friend_exists(self.baid + 1) - assert self.bob.self_get_friend_list_size() >= 1 - assert self.baid in self.bob.self_get_friend_list() - except Exception as e: - LOG.error(f"bob_add_alice_as_friend EXCEPTION {e}") - return False - finally: - self.bob.callback_friend_message(None) - - return True - - def alice_add_bob_as_friend(self) -> bool: - """ - t:friend_add - t:on_friend_request - t:friend_by_public_key - """ - MSG = 'Bob, this is Alice.' - sSlot = 'friend_request' - if not self.bAliceNeedAddBob(): return True - - def bobs_on_friend_request(iTox, - public_key, - message_data, - message_data_size, - *largs) -> None: - LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data)) - try: - assert str(message_data, 'UTF-8') == MSG - except Exception as e: - LOG_WARN(f"bobs_on_friend_request: Exception {e}") - # return - setattr(self.alice, sSlot, True) - - LOG_INFO(f"bobs_on_friend_request: {sSlot} = True ") - setattr(self.alice, sSlot, None) - inum = -1 - try: - inum = self.alice.friend_add(self.bob._address, bytes(MSG, 'UTF-8')) - assert inum >= 0, f"alice.friend_add !>= 0 {inum}" - self.bob.callback_friend_request(bobs_on_friend_request) - if not self.wait_otox_attrs(self.alice, [sSlot]): - LOG_WARN(f"alice.friend_add NO wait {sSlot}") - #? return False - self.abid = self.alice.friend_by_public_key(self.bob._address) - assert self.abid >= 0, self.abid - assert self.alice.friend_exists(self.abid), "not exists" - assert not self.alice.friend_exists(self.abid + 1), "exists +1" - assert self.abid in self.alice.self_get_friend_list(), "not in list" - assert self.alice.self_get_friend_list_size() >= 1, "list size" - except Exception as e: - LOG.error(f"alice.friend_add EXCEPTION {e}") - return False - finally: - self.bob.callback_friend_message(None) - return True - - def bob_add_alice_as_friend_and_status(self) -> bool: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend() - - #: Wait until both are online - sSlot = 'friend_conn_status' - setattr(self.bob, sSlot, False) - def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None: - LOG_INFO(f"bobs_on_friend_connection_status {friend_id} ?>=0" +repr(iStatus)) - setattr(self.bob, sSlot, False) - - sSlot = 'friend_status' - setattr(self.bob, sSlot, None) - def bobs_on_friend_status(iTox, friend_id, iStatus, *largs) -> None: - LOG_INFO(f"bobs_on_friend_status {friend_id} ?>=0" +repr(iStatus)) - setattr(self.bob, sSlot, False) - - sSlot = 'friend_conn_status' - setattr(self.alice, sSlot, None) - def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None: - LOG_INFO(f"alices_on_friend_connection_status {friend_id} ?>=0 " +repr(iStatus)) - setattr(self.alice, sSlot, False) - - sSlot = 'friend_status' - setattr(self.alice, sSlot, None) - def alices_on_friend_status(iTox, friend_id, iStatus, *largs) -> None: - LOG_INFO(f"alices_on_friend_status {friend_id} ?>=0 " +repr(iStatus)) - setattr(self.alice, sSlot, False) - - try: - LOG.info("bob_add_alice_as_friend_and_status waiting for alice connections") - if not self.wait_otox_attrs(self.alice, - ['friend_conn_status', - 'friend_status']): - return False - - self.bob.callback_friend_connection_status(bobs_on_friend_connection_status) - self.bob.callback_friend_status(bobs_on_friend_status) - self.alice.callback_friend_connection_status(alices_on_friend_connection_status) - self.alice.callback_friend_status(alices_on_friend_status) - - LOG.info("bob_add_alice_as_friend_and_status waiting for bob connections") - if not self.wait_otox_attrs(self.bob, - ['friend_conn_status', - 'friend_status']): - LOG_WARN('bob_add_alice_as_friend_and_status NO') - # return False - except Exception as e: - LOG.error(f"bob_add_alice_as_friend_and_status ERROR {e}") - return False - finally: - self.alice.callback_friend_connection_status(None) - self.bob.callback_friend_connection_status(None) - self.alice.callback_friend_status(None) - self.bob.callback_friend_status(None) - return True - - def bob_to_alice_connected(self) -> bool: - assert hasattr(self, 'baid') - iRet = self.bob.friend_get_connection_status(self.baid) - if iRet == TOX_CONNECTION['NONE']: - LOG.warn("bob.friend_get_connection_status") - return False - return True - - def alice_to_bob_connected(self) -> bool: - assert hasattr(self, 'abid') - iRet = self.alice.friend_get_connection_status(self.abid) - if iRet == TOX_CONNECTION['NONE']: - LOG.error("alice.friend_get_connection_status") - return False - return True - - def otox_test_groups_create(self, - otox, - group_name='test_group', - nick='test_nick', - topic='Test Topic', # str - ) -> int: - privacy_state = enums.TOX_GROUP_PRIVACY_STATE['PUBLIC'] - - iGrp = otox.group_new(privacy_state, group_name, nick) - assert iGrp >= 0 - LOG.info(f"group iGrp={iGrp}") - - otox.group_set_topic(iGrp, topic) - assert otox.group_get_topic(iGrp) == topic - assert otox.group_get_topic_size(iGrp) == len(topic) - - name = otox.group_get_name(iGrp) - if type(name) == bytes: - name = str(name, 'utf-8') - assert name == group_name, name - assert otox.group_get_name_size(iGrp) == len(group_name) - - sPk = otox.group_self_get_public_key(iGrp) - assert otox.group_get_password_size(iGrp) >= 0 - sP = otox.group_get_password(iGrp) - assert otox.group_get_privacy_state(iGrp) == privacy_state - - assert otox.group_get_number_groups() > 0, "numg={otox.group_get_number_groups()}" - LOG.info(f"group pK={sPk} iGrp={iGrp} numg={otox.group_get_number_groups()}") - return iGrp - - def otox_verify_group(self, otox, iGrp) -> int: - """ - group_self_get_name - group_self_get_peer_id - group_self_get_public_key - group_self_get_role - group_self_get_status - group_self_set_name - """ - - group_number = iGrp - try: - assert type(iGrp) == int, "otox_test_groups_join iGrp not an int" - assert iGrp < UINT32_MAX, "otox_test_groups_join iGrp failure UINT32_MAX" - assert iGrp >= 0, f"otox_test_groups_join iGrp={iGrp} < 0" - sGrp = otox.group_get_chat_id(iGrp) - assert len(sGrp) == enums.TOX_GROUP_CHAT_ID_SIZE * 2, \ - f"group sGrp={sGrp} {len(sGrp)} != {enums.TOX_GROUP_CHAT_ID_SIZE * 2}" - sPk = otox.group_self_get_public_key(iGrp) - LOG.info(f"otox_verify_group sPk={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}") - - sName = otox.group_self_get_name(iGrp) - iStat = otox.group_self_get_status(iGrp) - iId = otox.group_self_get_peer_id(iGrp) - iRole = otox.group_self_get_role(iGrp) - iStat = otox.group_self_get_status(iGrp) - LOG.info(f"otox_verify_group sName={sName} iStat={iStat} iId={iId} iRole={iRole} iStat={iStat}") - - assert otox.group_self_set_name(iGrp, "NewName") - - bRet = otox.group_is_connected(iGrp) - except Exception as e: - LOG.warn(f"group_is_connected EXCEPTION {e}") - return -1 - # chat->connection_state == CS_CONNECTED || chat->connection_state == CS_CONNECTING; - if not bRet: - LOG.warn(f"group_is_connected WARN not connected iGrp={iGrp} n={otox.group_get_number_groups()}") - else: - LOG.info(f"group_is_connected SUCCESS connected iGrp={iGrp} n={otox.group_get_number_groups()}") - try: - bRet = self.group_until_connected(otox, iGrp, iMax=2*THRESHOLD) - except Exception as e: - LOG.error(f"group_until_connected EXCEPTION {e}") - return -1 - # chat->connection_state == CS_CONNECTED || chat->connection_state == CS_CONNECTING; - if bRet: - LOG.warn(f"group_until_connected WARN not connected iGrp={iGrp} n={otox.group_get_number_groups()}") - else: - LOG.info(f"group_until_connected SUCCESS connected iGrp={iGrp} n={otox.group_get_number_groups()}") - - message = bytes('hello', 'utf-8') - bRet = otox.group_send_message(iGrp, TOX_MESSAGE_TYPE['NORMAL'], message) - if not bRet: - LOG.warn(f"group_send_message {bRet}") - else: - LOG.debug(f"group_send_message {bRet}") - - # 360497DA684BCE2A500C1AF9B3A5CE949BBB9F6FB1F91589806FB04CA039E313 - # 75D2163C19FEFFE51508046398202DDC321E6F9B6654E99BAE45FFEC134F05DE - def otox_test_groups_join(self, otox, - chat_id="75d2163c19feffe51508046398202ddc321e6f9b6654e99bae45ffec134f05de", - nick='nick', - topic='Test Topic', # str - ): - status = '' - password = '' - LOG.debug(f"group_join nick={nick} chat_id={chat_id}") - try: - iGrp = otox.group_join(chat_id, password, nick, status) - LOG.info(f"otox_test_groups_join SUCCESS iGrp={iGrp} chat_id={chat_id}") - self.otox_verify_group(otox, iGrp) - - except Exception as e: - # gui - LOG.error(f"otox_test_groups_join EXCEPTION {e}") - raise - - return iGrp - - def otox_test_groups(self, - otox, - group_name='test_group', - nick='test_nick', - topic='Test Topic', # str - ) -> int: - - try: - iGrp = self.otox_test_groups_create(otox, group_name, nick, topic) - self.otox_verify_group(otox, iGrp) - except Exception as e: - LOG.error(f"otox_test_groups ERROR {e}") - raise - - # unfinished - # tox_group_peer_exit_cb - # tox_callback_group_peer_join - # tox.callback_group_peer_status - # tox.callback_group_peer_name - # tox.callback_group_peer_exit - # tox.callback_group_peer_join - return iGrp - - def wait_friend_get_connection_status(self, otox, fid:int, n:int = iN) -> int: - i = 0 - while i < n: - iRet = otox.friend_get_connection_status(fid) - if iRet == TOX_CONNECTION['NONE']: -# LOG.debug(f"wait_friend_get_connection_status NOT CONNECTED i={i} {iRet}") - self.loop_until_connected() - else: - LOG.info("wait_friend_get_connection_status {iRet}") - return True - i += 1 - else: - LOG.error(f"wait_friend_get_connection_status n={n}") - return False - - def warn_if_no_cb(self, alice, sSlot:str) -> None: - if not hasattr(alice, sSlot+'_cb') or \ - not getattr(alice, sSlot+'_cb'): - LOG.warning(f"self.bob.{sSlot}_cb NOT EXIST") - - def warn_if_cb(self, alice, sSlot:str) -> None: - if hasattr(self.bob, sSlot+'_cb') and \ - getattr(self.bob, sSlot+'_cb'): - LOG.warning(f"self.bob.{sSlot}_cb EXIST") - - # tests are executed in order - def test_notice_log(self) -> None: # works - notice = '/var/lib/tor/.SelekTOR/3xx/cache/9050/notice.log' - if os.path.exists(notice): - iRet = os.system(f"sudo sed -e '1,/.notice. Bootstrapped 100%/d' {notice}" + \ - "| grep 'Tried for 120 seconds to get a connection to :0.'") - if iRet == 0: - raise SystemExit("seconds to get a connection to :0") - else: - LOG.debug(f"checked {notice}") - - def test_tests_logging(self): # works - with self.assertLogs('foo', level='INFO') as cm: - logging.getLogger('foo').info('first message') - logging.getLogger('foo.bar').error('second message') - logging.getLogger('foo.bar.baz').debug('third message') - self.assertEqual(cm.output, ['INFO:foo:first message', - 'ERROR:foo.bar:second message']) - - def test_hash(self): # works - otox = self.bob - string = 'abcdef' - name = otox.hash(bytes(string, 'utf-8')) - assert name - string = b'abcdef' - name = otox.hash(string) - assert name - LOG.info(f"test_hash: {string} -> {name} ") - - def test_tests_start(self) -> None: # works - """ - t:hash - t:kill - t:libtoxcore - t:options_default - t:options_free - t:options_new - t:self_get_toxid - """ - LOG.info("test_tests_start " ) - port = ts.tox_bootstrapd_port() - - assert len(self.bob._address) == 2*TOX_ADDRESS_SIZE, len(self.bob._address) - assert len(self.alice._address) == 2*TOX_ADDRESS_SIZE, \ - len(self.alice._address) - - assert self.bob.self_get_address() == self.bob._address - assert self.alice.self_get_address() == self.alice._address - - def test_bootstrap_local_netstat(self) -> None: # works - """ - t:callback_file_chunk_request - t:callback_file_recv - t:callback_file_recv_chunk - t:callback_file_recv_control - t:callback_friend_connection_status - t:callback_friend_lossless_packet - t:callback_friend_lossy_packet - t:callback_friend_message - t:callback_friend_name - t:callback_friend_read_receipt - t:callback_friend_request - t:callback_friend_status - t:callback_friend_status_message - t:callback_friend_typing - t:callback_group_custom_packet - t:callback_group_invite - """ - if oTOX_OARGS.network not in ['new', 'newlocal', 'local']: - return - - port = ts.tox_bootstrapd_port() - if not port: - return - iStatus = os.system(f"""netstat -nle4 | grep :{port}""") - if iStatus == 0: - LOG.info(f"bootstrap_local_netstat port {port} iStatus={iStatus}") - else: - LOG.warning(f"bootstrap_local_netstat NOT {port} iStatus={iStatus}") - - def test_bootstrap_local(self) -> None: # works - """ - t:call_bootstrap - t:add_tcp_relay - t:self_get_dht_id - """ - # get port from /etc/tox-bootstrapd.conf 33445 - self.call_bootstrap() - # ts.bootstrap_local(self, self.lUdp) - i = 0 - iStatus = -1 - while i < 10: - i = i + 1 - iStatus = self.bob.self_get_connection_status() - if iStatus != TOX_CONNECTION['NONE']: - break - sleep(3) - else: - pass - - o1 = self.alice.self_get_dht_id() - assert len(o1) == 64 - o2 = self.bob.self_get_dht_id() - assert len(o2) == 64 - -# if o1 != o2: LOG.warning(f"bootstrap_local DHT NOT same {o1} {o2} iStatus={iStatus}") - - iStatus = self.bob.self_get_connection_status() - if iStatus != TOX_CONNECTION['NONE']: - LOG.info(f"bootstrap_local connected iStatus={iStatus}") - return True - iStatus = self.alice.self_get_connection_status() - if iStatus != TOX_CONNECTION['NONE']: - LOG.info(f"bootstrap_local connected iStatus={iStatus}") - return True - LOG.warning(f"bootstrap_local NOT CONNECTED iStatus={iStatus}") - return False - - def test_bootstrap_iNmapInfo(self) -> None: # works -# if os.environ['USER'] != 'root': -# return - iStatus = self.bob.self_get_connection_status() - LOG.info(f"test_bootstrap_iNmapInfo connected bob iStatus={iStatus}") - if oTOX_OARGS.network in ['new', 'newlocal', 'localnew']: - lElts = self.lUdp - elif oTOX_OARGS.proxy_port > 0: - lElts = self.lTcp - else: - lElts = self.lUdp - lRetval = [] - random.shuffle(lElts) - # assert - ts.bootstrap_iNmapInfo(lElts, oTOX_OARGS, bIS_LOCAL, iNODES=8) - - def test_self_get_secret_key(self) -> None: # works - """ - t:self_get_secret_key - """ - # test_self_get_secret_key - CRYPTO_SECRET_KEY_SIZE = 32 - secret_key = create_string_buffer(CRYPTO_SECRET_KEY_SIZE) - oRet0 = self.alice.self_get_secret_key(secret_key) - assert oRet0, repr(oRet0) - LOG.info('test_self_get_secret_key ' +repr(oRet0)) - assert len(str(oRet0)) - del secret_key - - def test_self_get_public_keys(self) -> None: # works - """ - t:self_get_secret_key - t:self_get_public_key - """ - - LOG.info('test_self_get_public_keys self.alice.self_get_secret_key') - oRet0 = self.alice.self_get_secret_key() - assert len(oRet0) - LOG.info('test_self_get_public_keys ' +repr(oRet0)) - oRet1 = self.alice.self_get_public_key() - assert len(oRet1) - LOG.info('test_self_get_public_keys ' +repr(oRet1)) - assert oRet0 != oRet1, repr(oRet0) +' != ' +repr(oRet1) - - def test_self_name(self) -> None: # works - """ - t:self_set_name - t:self_get_name - t:self_get_name_size - """ - self.alice.self_set_name('Alice') - assert self.alice.self_get_name() == 'Alice' - assert self.alice.self_get_name_size() == len('Alice') - self.bob.self_set_name('Bob') - assert self.bob.self_get_name() == 'Bob' - assert self.bob.self_get_name_size() == len('Bob') - - @unittest.skip('loud') - @unittest.skipIf(bIS_NOT_TOXYGEN or oTOX_OARGS.mode == 0, 'not testing in toxygen') - def test_sound_notification(self) -> None: # works - """ - Plays sound notification - :param type of notification - """ - from tests.toxygen_tests import test_sound_notification - test_sound_notification(self) - - def test_address(self) -> None: # works - """ - t:self_get_address - t:self_get_nospam - t:self_set_nospam - t:self_get_keys - """ - assert len(self.alice.self_get_address()) == ADDR_SIZE - assert len(self.bob.self_get_address()) == ADDR_SIZE - - self.alice.self_set_nospam(0x12345678) - assert self.alice.self_get_nospam() == 0x12345678 - self.loop(50) - - if hasattr(self.alice, 'self_get_keys'): - pk, sk = self.alice.self_get_keys() - assert pk == self.alice.self_get_address()[:CLIENT_ID_SIZE] - - def test_status_message(self) -> None: # works - """ - t:self_get_status_message - t:self_get_status_message_size - """ - MSG = 'Happy' - self.alice.self_set_status_message(MSG) - self.loop(100) - assert self.alice.self_get_status_message() == MSG, \ - self.alice.self_get_status_message() +' is not ' +MSG - assert self.alice.self_get_status_message_size() == len(MSG) - - def test_self_get_udp_port(self) -> None: # works - """ - t:self_get_udp_port - """ - if hasattr(oTOX_OPTIONS, 'udp_port') and oTOX_OPTIONS.udp_port: - o = self.alice.self_get_udp_port() - LOG.info('self_get_udp_port alice ' +repr(o)) - assert o > 0 - o = self.bob.self_get_udp_port() - LOG.info('self_get_udp_port bob ' +repr(o)) - assert o > 0 - - def test_self_get_tcp_port(self) -> None: # works - """ - t:self_get_tcp_port - """ - if hasattr(oTOX_OPTIONS, 'tcp_port') and oTOX_OPTIONS.tcp_port: - # errors if tcp_port <= 0 - o = self.alice.self_get_tcp_port() - LOG.info('self_get_tcp_port ' +repr(o)) - o = self.bob.self_get_tcp_port() - LOG.info('self_get_tcp_port ' +repr(o)) - - def test_get_dht_id(self) -> None: # works - """ - t:self_get_dht_id - """ - o1 = self.alice.self_get_dht_id() - assert len(o1) == 64 - o2 = self.bob.self_get_dht_id() - assert len(o2) == 64 - - def test_bob_add_alice_as_friend_norequest(self) -> None: # works - """ - t:friend_delete - t:friend_exists - t:friend_add_norequest - t:friend_get_public_key - t:self_get_friend_list - t:self_get_friend_list_size - """ - i = len(self.bob.self_get_friend_list()) - assert self.bob_add_alice_as_friend_norequest() - assert len(self.bob.self_get_friend_list()) == i + 1 - #: Test last online - assert self.bob.friend_get_last_online(self.baid) is not None - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - - - def test_alice_add_bob_as_friend_norequest(self) -> None: # works - intermittent failures - """ - t:friend_delete - t:friend_exists - t:friend_get_public_key - t:self_get_friend_list - t:self_get_friend_list_size - """ - i = len(self.alice.self_get_friend_list()) - assert self.alice_add_bob_as_friend_norequest() - assert len(self.alice.self_get_friend_list()) == i + 1 - #: Test last online - assert self.alice.friend_get_last_online(self.abid) is not None - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - def test_both_add_as_friend_norequest(self) -> None: # works - """ - t:friend_delete - t:friend_exists - t:friend_get_public_key - t:self_get_friend_list - t:self_get_friend_list_size - """ - try: - self.both_add_as_friend_norequest() - assert len(self.bob.self_get_friend_list()) > 0 - assert len(self.alice.self_get_friend_list()) > 0 - except AssertionError as e: - LOG.error(f"Failed test {e}") - raise - except Exception as e: - LOG.error(f"Failed test {e}") - raise - finally: - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - assert len(self.bob.self_get_friend_list()) == 0 - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - assert len(self.alice.self_get_friend_list()) == 0 - - def test_bob_add_alice_as_friend_and_status(self) -> None: - """ - t:friend_delete - t:friend_exists - t:friend_get_public_key - t:self_get_friend_list - t:self_get_friend_list_size - """ - self.bob_add_alice_as_friend_and_status() - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - - @unittest.skip('unfinished') - def test_alice_add_bob_as_friend_and_status(self) -> None: - assert self.alice_add_bob_as_friend_and_status() - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - def test_loop_until_connected(self) -> None: # works - assert self.loop_until_connected() - - def test_bob_assert_connection_status(self) -> None: # works - if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']: - AssertionError("ERROR: NOT CONNECTED " \ - +repr(self.bob.self_get_connection_status())) - - def test_alice_assert_connection_status(self) -> None: # works - if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']: - AssertionError("ERROR: NOT CONNECTED " \ - +repr(self.alice.self_get_connection_status())) - - def test_bob_assert_mycon_status(self) -> None: # works - if self.bob.mycon_status == False: - AssertionError("ERROR: NOT CONNECTED " \ - +repr(self.bob.mycon_status)) - - def test_alice_assert_mycon_status(self) -> None: # works - if self.alice.mycon_status == False: - AssertionError("ERROR: NOT CONNECTED " \ - +repr(self.alice.mycon_status)) - - def test_bob_add_alice_as_friend(self) -> None: # works? - try: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend() - #: Test last online - assert self.bob.friend_get_last_online(self.baid) is not None - except AssertionError as e: - LOG.error(f"Failed test {e}") - raise - except Exception as e: - LOG.error(f"Failed test {e}") - raise - finally: - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if len(self.bob.self_get_friend_list()) > 0: - LOG.warn(f"WTF bob.self_get_friend_list() {bob.self_get_friend_list()}") - - def test_alice_add_bob_as_friend(self) -> None: # works! - try: - if bUSE_NOREQUEST: - assert self.alice_add_bob_as_friend_norequest() - else: - assert self.alice_add_bob_as_friend() - #: Test last online - assert self.alice.friend_get_last_online(self.abid) is not None - except AssertionError as e: - #WTF? - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - LOG.error(f"Failed test {e}") - raise - except Exception as e: - #WTF? - LOG.error(f"test_alice_add_bob_as_friend EXCEPTION {e}") - raise - finally: - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - if len(self.alice.self_get_friend_list()) > 0: - LOG.warn(f"WTF alice.self_get_friend_list() {alice.self_get_friend_list()}") - - def test_both_add_as_friend(self) -> None: # works - try: - if bUSE_NOREQUEST: - assert self.both_add_as_friend_norequest() - else: - assert self.both_add_as_friend() - except AssertionError as e: - LOG.warn(f"Failed test {e}") - raise - except Exception as e: - LOG.error(f"test_both_add_as_friend EXCEPTION {e}") - raise - finally: - if hasattr(self,'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self,'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - def test_groups_join(self) -> None: - """ - t:group_join - t:group_disconnect - t:group_leave - t:group_self_set_name - """ - if not self.get_connection_status(): - LOG.warning(f"test_groups_join NOT CONNECTED") - self.loop_until_connected() - - iGrp = self.otox_test_groups_join(self.bob) - LOG.info(f"test_groups_join iGrp={iGrp}") - assert iGrp >= 0, f"test_groups_join iGrp={iGrp}" - try: - self.bob.group_disconnect(iGrp) - except Exception as e: - LOG.error(f"bob.group_disconnect EXCEPTION {e}") - raise - try: - self.bob.group_leave(iGrp, None) - except Exception as e: - LOG.error(f"bob.group_leave EXCEPTION {e}") - raise - - def test_groups(self) -> None: - """ - t:group_new - t:group_disconnect - t:group_get_name - t:group_get_name_size - t:group_get_topic - t:group_get_topic_size - t:group_get_privacy_state - t:group_self_set_name - t:group_get_number_groups - - t:group_founder_set_password - t:group_founder_set_peer_limit - t:group_founder_set_privacy_state - t:group_get_chat_id - t:group_get_password - t:group_get_password_size - t:group_get_peer_limit - t:group_invite_accept - t:group_invite_friend - t:group_is_connected - t:group_leave - t:group_mod_set_role - """ - iGrp = self.otox_test_groups(self.bob) - LOG.info(f"test_groups iGrp={iGrp}") - if iGrp >= 0: - try: - self.bob.group_disconnect(iGrp) - except Exception as e: - LOG.error(f"bob.group_disconnect EXCEPTION {e}") - raise - try: - self.bob.group_leave(iGrp, None) - except Exception as e: - LOG.error(f"bob.group_leave EXCEPTION {e}") - raise - -#? @unittest.skip("double free or corruption (fasttop)") - @expectedFail('fails') # assertion fails on == MSG - def test_on_friend_status_message(self) -> None: # fails - """ - t:self_set_status_message - t:self_get_status_message - t:self_get_status_message_size - t:friend_set_status_message - t:friend_get_status_message - t:friend_get_status_message_size - t:on_friend_status_message - """ - MSG = 'Happy' - sSlot = 'friend_status_message' - - def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs) -> None: - LOG_INFO(f"BOB_ON_friend_status_message friend_id={friend_id} " \ - +f"new_status_message={new_status_message}") - try: - assert str(new_status_message, 'UTF-8') == MSG - assert friend_id == self.baid - except Exception as e: - LOG_ERROR(f"BOB_ON_friend_status_message EXCEPTION {e}") - setattr(self.bob, sSlot, True) - - setattr(self.bob, sSlot, None) - try: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - assert self.alice_add_bob_as_friend_norequest() - else: - # no not connected error - assert self.bob_add_alice_as_friend() - assert self.alice_add_bob_as_friend_norequest() - - self.bob.callback_friend_status_message(bob_on_friend_status_message) - self.warn_if_no_cb(self.bob, sSlot) - status_message = bytes(MSG, 'utf-8') - self.alice.self_set_status_message(status_message) - if not self.wait_otox_attrs(self.bob, [sSlot]): - LOG_WARN(f"on_friend_status_message NO {sSlot}") - - assert self.bob.friend_get_status_message(self.baid) == MSG, \ - f"message={self.bob.friend_get_status_message(self.baid)}" - assert self.bob.friend_get_status_message_size(self.baid) == len(MSG), \ - f"message_len={self.bob.friend_get_status_message_size(self.baid)}" - - except AssertionError as e: - LOG.error(f"test_on_friend_status_message FAILED {e}") - raise - except Exception as e: - LOG.error(f"test_on_friend_status_message EXCEPTION {e}") - raise - finally: - self.bob.callback_friend_status(None) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - def test_friend(self) -> None: # works! sometimes - """ - t:friend_get_name - t:friend_get_name_size - t:on_friend_name - """ - - try: - #: Test friend request - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - assert self.alice_add_bob_as_friend_norequest() - else: - # no not connected error - assert self.bob_add_alice_as_friend() - assert self.alice_add_bob_as_friend_norequest() - - a = self.alice.self_get_address()[:CLIENT_ID_SIZE] - assert self.bob.friend_get_public_key(self.baid) == a, \ - LOG.error(f"test_friend BAID {a}") - del a - - #: Test friend_get_public_key - b = self.bob.self_get_address()[:CLIENT_ID_SIZE] - assert self.alice.friend_get_public_key(self.abid) == b, \ - LOG.error(f"test_friend ABID {b}") - del b - except AssertionError as e: - LOG.error(f"Failed test {e}") - raise - except Exception as e: - LOG.error(f"test_friend EXCEPTION {e}") - raise - finally: - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - @expectedFail('fails') # assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY'] - def test_user_status(self) -> None: # fails - """ - t:self_get_status - t:self_set_status - t:friend_get_status - t:friend_get_status - t:on_friend_status - """ - sSlot = 'friend_status' - - setattr(self.bob, sSlot, None) - def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs) -> None: - LOG_INFO(f"bobs_on_friend_set_status {friend_id} {new_status}") - try: - assert friend_id == self.baid - assert new_status in [TOX_USER_STATUS['BUSY'], TOX_USER_STATUS['AWAY']] - except Exception as e: - LOG_WARN(f"bobs_on_friend_set_status EXCEPTION {e}") - setattr(self.bob, sSlot, True) - - try: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend() - if not self.get_connection_status(): - LOG.warning(f"test_user_status NOT CONNECTED self.get_connection_status") - self.loop_until_connected() - - self.bob.callback_friend_status(bobs_on_friend_set_status) - self.warn_if_no_cb(self.bob, sSlot) - sSTATUS = TOX_USER_STATUS['BUSY'] - self.alice.self_set_status(sSTATUS) - sSlot = 'friend_status' - if not self.wait_otox_attrs(self.bob, [sSlot]): - # malloc(): unaligned tcache chunk detected - LOG_WARN(f'test_user_status NO {sSlot}') - - assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY'], \ - f"friend_get_status {self.bob.friend_get_status(self.baid)} != {TOX_USER_STATUS['BUSY']}" - - except AssertionError as e: - LOG.error(f"test_user_status FAILED {e}") - raise - except Exception as e: - LOG.error(f"test_user_status EXCEPTION {e}") - raise - finally: - self.bob.callback_friend_status(None) - self.warn_if_cb(self.bob, sSlot) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - - @unittest.skip('crashes') - def test_kill_remake(self) -> None: - """ - t:friend_get_kill_remake - t:on_friend_connection_status - """ - sSlot = 'friend_connection_status' - setattr(self.bob, sSlot, None) - def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs): - LOG_INFO(f"bobs_on_friend_connection_status " +repr(iStatus)) - try: - assert friend_id == self.baid - except Exception as e: - LOG_ERROR(f"bobs_on_friend_connection_status ERROR {e}") - setattr(self.bob, sSlot, True) - - opts = oToxygenToxOptions(oTOX_OARGS) - setattr(self.bob, sSlot, True) - try: - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend() - - self.bob.callback_friend_connection_status(bobs_on_friend_connection_status) - - LOG.info("test_kill_remake killing alice") - self.alice.kill() #! bang - LOG.info("test_kill_remake making alice") - self.alice = Tox(opts, app=oAPP) - LOG.info("test_kill_remake maked alice") - - if not self.wait_otox_attrs(self.bob, [sSlot]): - LOG_WARN(f'test_kill_remake NO {sSlot}') - except AssertionError as e: - LOG.error(f"test_kill_remake Failed test {e}") - raise - except Exception as e: - LOG.error(f"bobs_on_friend_connection_status {e}") - raise - finally: - self.bob.callback_friend_connection_status(None) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - - @expectedFail('fails') # new name is empty - def test_friend_name(self) -> None: # works! - """ - t:self_set_name - t:friend_get_name - t:friend_get_name_size - t:on_friend_name - """ - - sSlot= 'friend_name' - #: Test friend request - - #: Test friend name - NEWNAME = 'Jenny' - - def bobs_on_friend_name(iTox, fid:int, newname, iNameSize, *largs) -> None: - LOG_INFO(f"bobs_on_friend_name {sSlot} {fid}") - try: - assert fid == self.baid - assert str(newname, 'UTF-8') == NEWNAME - except Exception as e: - LOG_ERROR(f"bobs_on_friend_name EXCEPTION {e}") - setattr(self.bob, sSlot, True) - - setattr(self.bob, sSlot, None) - try: - LOG.info("test_friend_name") - if bUSE_NOREQUEST: - assert self.bob_add_alice_as_friend_norequest() - else: - assert self.bob_add_alice_as_friend() - - self.bob.callback_friend_name(bobs_on_friend_name) - self.warn_if_no_cb(self.bob, sSlot) - self.alice.self_set_name(NEWNAME) - if not self.wait_otox_attrs(self.bob, [sSlot]): - LOG_WARN(f"bobs_on_friend_name NO {sSlot}") - - # name=None - assert self.bob.friend_get_name(self.baid) == NEWNAME, \ - f"{self.bob.friend_get_name(self.baid)} != {NEWNAME}" - assert self.bob.friend_get_name_size(self.baid) == len(NEWNAME), \ - f"{self.bob.friend_get_name_size(self.baid)} != {len(NEWNAME)}" - - except AssertionError as e: - LOG.error(f"test_friend_name Failed test {e}") - raise - except Exception as e: - LOG.error(f"test_friend EXCEPTION {e}") - raise - finally: - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - self.bob.callback_friend_name(None) - self.warn_if_cb(self.bob, sSlot) - - - @expectedFail('fails') # This client is currently not connected to the friend. - def test_friend_message(self) -> None: # fails - """ - t:on_friend_action - t:on_friend_message - t:friend_send_message - """ - - #: Test message - MSG = 'Hi, Bob!' - sSlot = 'friend_message' - - def alices_on_friend_message(iTox, fid:int, msg_type, message, iSize, *largs) -> None: - LOG_DEBUG(f"alices_on_friend_message {fid} {message}") - try: - assert fid == self.alice.abid - assert msg_type == TOX_MESSAGE_TYPE['NORMAL'] - assert str(message, 'UTF-8') == MSG - except Exception as e: - LOG_ERROR(f"alices_on_friend_message EXCEPTION {e}") - else: - LOG_INFO(f"alices_on_friend_message {message}") - setattr(self.alice, sSlot, True) - - setattr(self.alice, sSlot, None) - self.alice.callback_friend_message(None) - try: - if bUSE_NOREQUEST: - assert self.both_add_as_friend_norequest() - else: - assert self.both_add_as_friend() - assert hasattr(self, 'baid'), \ - "both_add_as_friend_norequest no bob, baid" - assert hasattr(self, 'abid'), \ - "both_add_as_friend_norequest no alice, abid" - if not self.wait_friend_get_connection_status(self.bob, self.baid, n=2*iN): - LOG.warn('baid not connected') - if not self.wait_friend_get_connection_status(self.alice, self.abid, n=2*iN): - LOG.warn('abid not connected') - self.alice.callback_friend_message(alices_on_friend_message) - self.warn_if_no_cb(self.alice, sSlot) - - # dunno - both This client is currently NOT CONNECTED to the friend. - iMesId = self.bob.friend_send_message(self.baid, - TOX_MESSAGE_TYPE['NORMAL'], - bytes(MSG, 'UTF-8')) - assert iMesId >= 0, "iMesId >= 0" - if not self.wait_otox_attrs(self.alice, [sSlot]): - LOG_WARN(f"alices_on_friend_message NO {sSlot}") - except ArgumentError as e: - # ArgumentError('This client is currently NOT CONNECTED to the friend.') - # dunno - LOG.error(f"test_friend_message ArgumentError {e}") - raise - except AssertionError as e: - LOG.error(f"test_friend_message AssertionError {e}") - raise - except Exception as e: - LOG.error(f"test_friend_message EXCEPTION {e}") - raise - finally: - self.alice.callback_friend_message(None) - self.warn_if_cb(self.alice, sSlot) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - # This client is currently not connected to the friend. - def test_friend_action(self) -> None: # works! sometimes? - """ - t:on_friend_action - t:on_friend_message - t:friend_send_message - """ - - #: Test action - ACTION = 'Kick' - sSlot = 'friend_read_action' - setattr(self.bob, sSlot, None) - def UNUSEDtheir_on_friend_action(iTox, fid:int, msg_type, action, *largs): - LOG_DEBUG(f"their_on_friend_action {fid} {msg_type} {sSlot} {action}") - try: - assert msg_type == TOX_MESSAGE_TYPE['ACTION'] - assert action == ACTION - except Exception as e: - LOG_ERROR(f"their_on_friend_action EXCEPTION {sSlot} {e}") - else: - LOG_INFO(f"their_on_friend_action {sSlot} {action}") - setattr(self.bob, sSlot, True) - - sSlot = 'friend_read_receipt' - setattr(self.alice, sSlot, None) - def their_on_read_reciept(iTox, fid:int, msg_id, *largs) -> None: - LOG_DEBUG(f"their_on_read_reciept {fid} {msg_id}") - sSlot = 'friend_read_receipt' - try: - # should be the receivers id - assert fid == bob.baid or fid == alice.abid - assert msg_id >= 0 - except Exception as e: - LOG_ERROR(f"their_on_read_reciept {sSlot} {e}") - else: - LOG_INFO(f"their_on_read_reciept {sSlot} fid={fid}") - setattr(self.alice, sSlot, True) - - try: - if bUSE_NOREQUEST: - assert self.both_add_as_friend_norequest() - else: - assert self.both_add_as_friend() - - if not self.wait_friend_get_connection_status(self.bob, self.baid, n=iN): - LOG.warn('baid not connected') - if not self.wait_friend_get_connection_status(self.alice, self.abid, n=iN): - LOG.warn('abid not connected') - - self.bob.callback_friend_read_receipt(their_on_read_reciept) #was their_on_friend_action - self.alice.callback_friend_read_receipt(their_on_read_reciept) #was their_on_friend_action - self.warn_if_no_cb(self.bob, 'friend_read_receipt') - self.warn_if_no_cb(self.alice, 'friend_read_receipt') - if True: - iMsg = self.bob.friend_send_message(self.baid, - TOX_MESSAGE_TYPE['ACTION'], - bytes(ACTION, 'UTF-8')) - assert iMsg >= 0 - else: - assert self.wait_ensure_exec(self.bob.friend_send_message, - [self.baid, - TOX_MESSAGE_TYPE['ACTION'], - bytes(ACTION, 'UTF-8')]) - if not self.wait_otox_attrs(self.alice, [sSlot]): - LOG_WARN(f"alice test_friend_action NO {sSlot}") - except AssertionError as e: - LOG.error(f"Failed test {e}") - raise - except ArgumentError as e: - # ArgumentError('This client is currently NOT CONNECTED to the friend.') - # dunno - LOG.warning(f"test_friend_action {e}") - except Exception as e: - LOG.error(f"test_friend_action {e}") - raise - finally: - self.alice.callback_friend_read_receipt(None) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - def test_alice_typing_status(self) -> None: # works - """ - t:on_friend_read_receipt - t:on_friend_typing - t:self_set_typing - t:friend_get_typing - t:friend_get_last_online - """ - - sSlot = 'friend_typing' - LOG.info("test_typing_status bob adding alice") - #: Test typing status - def bob_on_friend_typing(iTox, fid:int, is_typing, *largs) -> None: - LOG_INFO(f"BOB_ON_friend_typing is_typing={is_typing} fid={fid}") - try: - assert fid == self.baid - if is_typing is True: - assert self.bob.friend_get_typing(fid) is True - except Exception as e: - LOG_ERROR(f"BOB_ON_friend_typing {e}") - setattr(self.bob, sSlot, True) - - setattr(self.bob, sSlot, None) - try: - if bUSE_NOREQUEST: - assert self.both_add_as_friend_norequest() - else: - assert self.both_add_as_friend() - - if not self.get_connection_status(): - LOG.warning(f"test_friend_typing NOT CONNECTED") - self.loop_until_connected() - - self.bob.callback_friend_typing(bob_on_friend_typing) - self.warn_if_no_cb(self.bob, sSlot) - self.alice.self_set_typing(self.abid, False) - if not self.wait_otox_attrs(self.bob, [sSlot]): - LOG_WARN(f"bobs_on_friend_typing NO {sSlot}") - except AssertionError as e: - LOG.error(f"Failed test {e}") - raise - except Exception as e: - LOG.error(f"test_alice_typing_status error={e}") - raise - finally: - self.bob.callback_friend_typing(None) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - @expectedFail('fails') # @unittest.skip('unfinished') - def test_file_transfer(self) -> None: # unfinished - """ - t:file_send - t:file_send_chunk - t:file_control - t:file_seek - t:file_get_file_id - t:on_file_recv - t:on_file_recv_control - t:on_file_recv_chunk - t:on_file_chunk_request - """ - - if bUSE_NOREQUEST: - assert self.both_add_as_friend_norequest() - else: - assert self.both_add_as_friend() - - FRIEND_NUMBER = self.baid - FILE_NUMBER = 1 - FILE = os.urandom(1024 * 1024) - FILE_NAME = b"/tmp/test.bin" - if not os.path.exists(FILE_NAME): - with open(FILE_NAME, 'wb') as oFd: - oFd.write(FILE) - FILE_SIZE = len(FILE) - OFFSET = 567 - # was FILE_ID = FILE_NAME - FILE_ID = 32*'1' # - - m = hashlib.md5() - m.update(FILE[OFFSET:]) - FILE_DIGEST = m.hexdigest() - - CONTEXT = { 'FILE': bytes(), 'RECEIVED': 0, 'START': False, 'SENT': 0 } - - def alice_on_file_recv(iTox, fid:int, file_number:int, kind, size, filename) -> None: - LOG_DEBUG(f"ALICE_ON_file_recv fid={fid} {file_number}") - try: - assert size == FILE_SIZE - assert filename == FILE_NAME - retv = self.alice.file_seek(fid, file_number, OFFSET) - assert retv is True - self.alice.file_control(fid, file_number, TOX_FILE_CONTROL['RESUME']) - except Exception as e: - LOG_ERROR(f"ALICE_ON_file_recv {e}") - else: - LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) - - def alice_on_file_recv_control(iTox, fid:int, file_number, control, *largs) -> None: - # TOX_FILE_CONTROL = { 'RESUME': 0, 'PAUSE': 1, 'CANCEL': 2,} - LOG_DEBUG(f"ALICE_ON_file_recv_control fid={fid} {file_number} {control}") - try: - assert FILE_NUMBER == file_number - # FixMe _FINISHED? - if False and control == TOX_FILE_CONTROL['RESUME']: - # assert CONTEXT['RECEIVED'] == FILE_SIZE - # m = hashlib.md5() - # m.update(CONTEXT['FILE']) - # assert m.hexdigest() == FILE_DIGEST - self.alice.completed = True - except Exception as e: - LOG_ERROR(f"ALICE_ON_file_recv {e}") - else: - LOG_INFO(f"ALICE_ON_file_recv " + str(fid)) - - self.alice.completed = False - def alice_on_file_recv_chunk(iTox, fid:int, file_number:int, position:int, iNumBytes, *largs) -> bool: - LOG_DEBUG(f"ALICE_ON_file_recv_chunk {fid} {file_number}") - # FixMe - use file_number and iNumBytes to get data? - data = '' - LOG_INFO(f"ALICE_ON_file_recv_chunk {fid}") - try: - if data is None: - assert CONTEXT['RECEIVED'] == (FILE_SIZE - OFFSET) - m = hashlib.md5() - m.update(CONTEXT['FILE']) - assert m.hexdigest() == FILE_DIGEST - self.alice.completed = True - self.alice.file_control(fid, file_number, TOX_FILE_CONTROL['CANCEL']) - return True - - CONTEXT['FILE'] += data - CONTEXT['RECEIVED'] += len(data) - # if CONTEXT['RECEIVED'] < FILE_SIZE: - # assert self.file_data_remaining( - # fid, file_number, 1) == FILE_SIZE - CONTEXT['RECEIVED'] - except Exception as e: - LOG_ERROR(f"ALICE_ON_file_recv_chunk {e}") - return False - return True - - # AliceTox.on_file_send_request = on_file_send_request - # AliceTox.on_file_control = on_file_control - # AliceTox.on_file_data = on_file_data - - try: - # required - assert self.wait_friend_get_connection_status(self.bob, self.baid, n=2*iN) - assert self.wait_friend_get_connection_status(self.alice, self.abid, n=2*iN) - - self.alice.callback_file_recv(alice_on_file_recv) - self.alice.callback_file_recv_control(alice_on_file_recv_control) - self.alice.callback_file_recv_chunk(alice_on_file_recv_chunk) - - self.bob.completed = False - def bob_on_file_recv_control2(iTox, fid:int, file_number:int, control) -> None: - LOG_DEBUG(f"BOB_ON_file_recv_control2 {fid} {file_number} control={control}") - if control == TOX_FILE_CONTROL['RESUME']: - CONTEXT['START'] = True - elif control == TOX_FILE_CONTROL['CANCEL']: - self.bob.completed = True - pass - - def bob_on_file_chunk_request(iTox, fid:int, file_number:int, position:int, length, *largs) -> None: - LOG_DEBUG(f"BOB_ON_file_chunk_request {fid} {file_number}") - if length == 0: - return - data = FILE[position:(position + length)] - self.bob.file_send_chunk(fid, file_number, position, data) - - sSlot = 'file_recv_control' - self.bob.callback_file_recv_control(bob_on_file_recv_control2) - self.bob.callback_file_chunk_request(bob_on_file_chunk_request) - - i = 0 - iKind = 0 - while i < 2: - i += 1 - try: - FN = self.bob.file_send(self.baid, iKind, FILE_SIZE, FILE_ID, FILE_NAME) - LOG.info(f"test_file_transfer bob.file_send {FN}") - except ArgumentError as e: - LOG.debug(f"test_file_transfer bob.file_send {e} {i}") - # ctypes.ArgumentError: This client is currently not connected to the friend - raise - else: - break - self.loop(100) - sleep(1) - else: - LOG.error(f"test_file_transfer bob.file_send 2") - raise AssertionError(f"test_file_transfer bob.file_send {THRESHOLD // 2}") - - # UINT32_MAX - try: - FID = self.bob.file_get_file_id(self.baid, FN) - hexFID = "".join([hex(ord(c))[2:].zfill(2) for c in FILE_NAME]) - assert FID.startswith(hexFID.upper()) - except Exception as e: - LOG.warn(f"test_file_transfer:: {FILE_NAME} {hexFID} {e}") - LOG.debug('\n' + traceback.format_exc()) - - if not self.wait_otox_attrs(self.bob, ['completed']): - LOG_WARN(f"test_file_transfer Bob NO completed") - return False - if not self.wait_otox_attrs(self.alice, ['completed']): - LOG_WARN(f"test_file_transfer Alice NO completed") - return False - return True - - except (ArgumentError, ValueError,) as e: - # ValueError: non-hexadecimal number found in fromhex() arg at position 0 - LOG.error(f"test_file_transfer: {e}") - raise - - except Exception as e: - LOG.error(f"test_file_transfer:: {e}") - LOG.debug('\n' + traceback.format_exc()) - raise - - finally: - self.alice.callback_file_recv(None) - self.alice.callback_file_recv_control(None) - self.alice.callback_file_recv_chunk(None) - self.bob.callback_file_recv_control(None) - self.bob.callback_file_chunk_request(None) - if hasattr(self, 'baid') and self.baid >= 0: - self.bob.friend_delete(self.baid) - if hasattr(self, 'abid') and self.abid >= 0: - self.alice.friend_delete(self.abid) - - LOG_INFO(f"test_file_transfer:: self.wait_objs_attr completed") - - @unittest.skip('crashes') - def test_tox_savedata(self) -> None: # - """ - t:get_savedata_size - t:get_savedata - """ - # Fatal Python error: Aborted - # "/var/local/src/toxygen_wrapper/wrapper/tox.py", line 180 in kill - - assert self.alice.get_savedata_size() > 0 - data = self.alice.get_savedata() - assert data is not None - addr = self.alice.self_get_address() - # self._address - - try: - LOG.info("test_tox_savedata alice.kill") - # crashes - self.alice.kill() - del self.alice - except: - pass - - oArgs = oTOX_OARGS - opts = oToxygenToxOptions(oArgs) - opts.savedata_data = data - opts.savedata_length = len(data) - - self.alice = Tox(tox_options=opts) - if addr != self.alice.self_get_address(): - LOG.warning("test_tox_savedata " + - f"{addr} != {self.alice.self_get_address()}") - else: - LOG.info("passed test_tox_savedata") - - def test_kill(self) -> None: # - import threading - LOG.info(f"THE END {threading.active_count()}") - self.tearDown() - LOG.info(f"THE END {threading.enumerate()}") - - -def vOargsToxPreamble(oArgs, Tox, ToxTest) -> None: - - ts.vSetupLogging(oArgs) - - methods = set([x for x in dir(Tox) if not x[0].isupper() - and not x[0] == '_']) - docs = "".join([getattr(ToxTest, x).__doc__ for x in dir(ToxTest) - if getattr(ToxTest, x).__doc__ is not None]) - - tested = set(re.findall(r't:(.*?)\n', docs)) - not_tested = methods.difference(tested) - - logging.info('Test Coverage: %.2f%%' % (len(tested) * 100.0 / len(methods))) - if len(not_tested): - logging.info('Not tested:\n %s' % "\n ".join(sorted(list(not_tested)))) - - -### - -def iMain(oArgs, failfast=True) -> int: - -# collect_types.init_types_collection() - - vOargsToxPreamble(oArgs, Tox, ToxSuite) - # https://stackoverflow.com/questions/35930811/how-to-sort-unittest-testcases-properly/35930812#35930812 - cases = ts.suiteFactory(*ts.caseFactory([ToxSuite])) - if color_runner: - runner = color_runner.runner.TextTestRunner(verbosity=2, failfast=failfast) - else: - runner = unittest.TextTestRunner(verbosity=2, failfast=failfast, warnings='ignore') - -# with collect_types.collect(): - runner.run(cases) - # collect_types.dump_stats('tests_wrapper.out') - -def oToxygenToxOptions(oArgs): - data = None - tox_options = tox_wrapper.tox.Tox.options_new() - if oArgs.proxy_type: - tox_options.contents.proxy_type = int(oArgs.proxy_type) - tox_options.contents.proxy_host = bytes(oArgs.proxy_host, 'UTF-8') - tox_options.contents.proxy_port = int(oArgs.proxy_port) - tox_options.contents.udp_enabled = False - else: - tox_options.contents.udp_enabled = oArgs.udp_enabled - if not os.path.exists('/proc/sys/net/ipv6'): - oArgs.ipv6_enabled = False - else: - tox_options.contents.ipv6_enabled = oArgs.ipv6_enabled - - tox_options.contents.tcp_port = int(oArgs.tcp_port) - tox_options.contents.dht_announcements_enabled = oArgs.dht_announcements_enabled - tox_options.contents.hole_punching_enabled = oArgs.hole_punching_enabled - - # overrides - tox_options.contents.local_discovery_enabled = False - tox_options.contents.experimental_thread_safety = False - # REQUIRED!! - if oArgs.ipv6_enabled and not os.path.exists('/proc/sys/net/ipv6'): - LOG.warning('Disabling IPV6 because /proc/sys/net/ipv6 does not exist' + repr(oArgs.ipv6_enabled)) - tox_options.contents.ipv6_enabled = False - else: - tox_options.contents.ipv6_enabled = bool(oArgs.ipv6_enabled) - - if data: # load existing profile - tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['TOX_SAVE'] - tox_options.contents.savedata_data = c_char_p(data) - tox_options.contents.savedata_length = len(data) - else: # create new profile - tox_options.contents.savedata_type = enums.TOX_SAVEDATA_TYPE['NONE'] - tox_options.contents.savedata_data = None - tox_options.contents.savedata_length = 0 - - #? tox_options.contents.log_callback = LOG - if tox_options._options_pointer: - # LOG.debug("Adding logging to tox_options._options_pointer ") - ts.vAddLoggerCallback(tox_options, ts.on_log) - else: - LOG.warning("No tox_options._options_pointer " +repr(tox_options._options_pointer)) - - return tox_options - -def oArgparse(lArgv): - parser = ts.oMainArgparser() - parser.add_argument('--norequest',type=str, default='False', - choices=['True','False'], - help='Use _norequest') - parser.add_argument('profile', type=str, nargs='?', default=None, - help='Path to Tox profile') - oArgs = parser.parse_args(lArgv) - - for key in ts.lBOOLEANS: - if key not in oArgs: continue - val = getattr(oArgs, key) - setattr(oArgs, key, bool(val)) - - if hasattr(oArgs, 'sleep'): - if oArgs.sleep == 'qt': - pass # broken or gevent.sleep(idle_period) - elif oArgs.sleep == 'gevent': - pass # broken or gevent.sleep(idle_period) - else: - oArgs.sleep = 'time' - - return oArgs - -def main(lArgs=None) -> int: - global oTOX_OARGS - if lArgs is None: lArgs = [] - oArgs = oArgparse(lArgs) - global bIS_LOCAL - bIS_LOCAL = oArgs.network in ['newlocal', 'localnew', 'local'] - oTOX_OARGS = oArgs - setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL) - bIS_LOCAL = True - setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL) - # oTOX_OPTIONS = ToxOptions() - global oTOX_OPTIONS - oTOX_OPTIONS = oToxygenToxOptions(oArgs) - if coloredlogs: - # https://pypi.org/project/coloredlogs/ - coloredlogs.install(level=oArgs.loglevel, - logger=LOG, - # %(asctime)s,%(msecs)03d %(hostname)s [%(process)d] - fmt='%(name)s %(levelname)s %(message)s' - ) - else: - logging.basicConfig(level=oArgs.loglevel) # logging.INFO - - return iMain(oArgs) - -if __name__ == '__main__': - sys.exit(main(sys.argv[1:]))