From 7764404b9ad56c22e55b432f4884ed9c7f436c9b Mon Sep 17 00:00:00 2001 From: emdee Date: Thu, 17 Nov 2022 14:18:51 +0000 Subject: [PATCH] pylint --- wrapper/tox.py | 8 +- wrapper/toxav.py | 6 +- wrapper/toxencryptsave.py | 15 +--- wrapper_tests/support_onions.py | 145 ++++++++++++++++++------------- wrapper_tests/support_testing.py | 93 +++++++++++++++----- wrapper_tests/tests_wrapper.py | 73 ++++++---------- 6 files changed, 191 insertions(+), 149 deletions(-) diff --git a/wrapper/tox.py b/wrapper/tox.py index 9475317..360d957 100644 --- a/wrapper/tox.py +++ b/wrapper/tox.py @@ -3,13 +3,13 @@ from ctypes import * from datetime import datetime try: - from wrapper.toxcore_enums_and_consts import * - from wrapper.toxav import ToxAV from wrapper.libtox import LibToxCore + from wrapper.toxav import ToxAV + from wrapper.toxcore_enums_and_consts import * except: - from toxcore_enums_and_consts import * - from toxav import ToxAV from libtox import LibToxCore + from toxav import ToxAV + from toxcore_enums_and_consts import * # callbacks can be called in any thread so were being careful # tox.py can be called by callbacks diff --git a/wrapper/toxav.py b/wrapper/toxav.py index 86a426f..5480244 100644 --- a/wrapper/toxav.py +++ b/wrapper/toxav.py @@ -1,11 +1,13 @@ # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- -from ctypes import c_int, POINTER, c_void_p, byref, ArgumentError, c_uint32, CFUNCTYPE, c_size_t, c_uint8, c_uint16 -from ctypes import c_char_p, c_int32, c_bool, cast +from ctypes import (CFUNCTYPE, POINTER, ArgumentError, byref, c_bool, c_char_p, + c_int, c_int32, c_size_t, c_uint8, c_uint16, c_uint32, + c_void_p, cast) from wrapper.libtox import LibToxAV from wrapper.toxav_enums import * + def LOG_ERROR(a): print('EROR> '+a) def LOG_WARN(a): print('WARN> '+a) def LOG_INFO(a): print('INFO> '+a) diff --git a/wrapper/toxencryptsave.py b/wrapper/toxencryptsave.py index d436ba9..58e2a0f 100644 --- a/wrapper/toxencryptsave.py +++ b/wrapper/toxencryptsave.py @@ -7,19 +7,10 @@ except: import libtox from toxencryptsave_enums_and_consts import * -try: - from ctypes import c_size_t - from ctypes import create_string_buffer - from ctypes import byref - from ctypes import c_int - from ctypes import ArgumentError - from ctypes import c_char_p - from ctypes import c_bool -except: - print("failed to import ctypes") - raise +from ctypes import (ArgumentError, byref, c_bool, c_char_p, c_int, c_size_t, + create_string_buffer) + - class ToxEncryptSave: def __init__(self): diff --git a/wrapper_tests/support_onions.py b/wrapper_tests/support_onions.py index 4be62e0..0c463a1 100644 --- a/wrapper_tests/support_onions.py +++ b/wrapper_tests/support_onions.py @@ -1,35 +1,36 @@ # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- +import getpass import os -import sys import re -import traceback +import select import shutil import socket -import select +import sys import time -import getpass if False: import cepa as stem - from cepa.control import Controller from cepa.connection import MissingPassword + from cepa.control import Controller + from cepa.util.tor_tools import is_valid_fingerprint else: import stem - from stem.control import Controller from stem.connection import MissingPassword + from stem.control import Controller + from stem.util.tor_tools import is_valid_fingerprint global LOG import logging import warnings + warnings.filterwarnings('ignore') LOG = logging.getLogger() bHAVE_TORR = shutil.which('tor-resolve') -# maybe we should check these each time but we -# got them by sorting bad relays in the wild -# we'll keep a copy here +# we check these each time but we got them by sorting bad relays +# in the wild we'll keep a copy here so we can avoid restesting yKNOWN_NODNS = """ --- - 0x0.is @@ -50,6 +51,7 @@ yKNOWN_NODNS = """ - or.wowplanet.de - ormycloud.org - plied-privacy.net + - rivacysvcs.net - redacted.org - rification-for-nusenu.net - rofl.cat @@ -67,13 +69,24 @@ yKNOWN_NODNS = """ - w.cccs.de """ +def oMakeController(sSock='', port=9051): + import getpass + if sSock and os.path.exists(sSock): + controller = Controller.from_socket_file(path=sSock) + else: + controller = Controller.from_port(port=port) + sys.stdout.flush() + p = getpass.unix_getpass(prompt='Controller Password: ', stream=sys.stderr) + controller.authenticate(p) + return controller + oSTEM_CONTROLER = None def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'): global oSTEM_CONTROLER if oSTEM_CONTROLER: return oSTEM_CONTROLER - from stem.util.log import Runlevel - Runlevel = log_level + import stem.util.log + stem.util.log.Runlevel = log_level if os.path.exists(sock_or_pair): LOG.info(f"controller from socket {sock_or_pair}") @@ -96,7 +109,7 @@ def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'): controller.authenticate(p) oSTEM_CONTROLER = controller LOG.debug(f"{controller}") - return oSTEM_CONTROLER + return oSTEM_CONTROLER def bAreWeConnected(): # FixMe: Linux only @@ -144,68 +157,80 @@ def bin_to_hex(raw_id, length=None): return res.upper() def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10): - """now working !!! stem 1.8.x timeout must be huge >120""" + """now working !!! stem 1.8.x timeout must be huge >120 + 'Provides the descriptor for a hidden service. The **address** is the + '.onion' address of the hidden service ' + What about Services? + """ try: from cryptography.utils import int_from_bytes except ImportError: + import cryptography.utils + # guessing - not in the current cryptography but stem expects it def int_from_bytes(**args): return int.to_bytes(*args) cryptography.utils.int_from_bytes = int_from_bytes # this will fai if the trick above didnt work from stem.prereq import is_crypto_available - is_crypto_available(ed25519 = True) - - from stem.descriptor.hidden_service import HiddenServiceDescriptorV3 + is_crypto_available(ed25519=True) + + from queue import Empty + + from stem import Timeout from stem.client.datatype import LinkByFingerprint + from stem.descriptor.hidden_service import HiddenServiceDescriptorV3 if type(lOnions) not in [set, tuple, list]: lOnions = list(lOnions) if controller is None: controller = oGetStemController(log_level=log_level) l = [] - try: - for elt in lOnions: + for elt in lOnions: LOG.info(f"controller.get_hidden_service_descriptor {elt}") - desc = controller.get_hidden_service_descriptor(elt, - await_result=True, - timeout=itimeout) -# LOG.log(40, f"{dir(desc)} get_hidden_service_descriptor") - # timeouts 20 sec - # mistakenly a HSv2 descriptor - hs_address = HiddenServiceDescriptorV3.from_str(str(desc)) # reparse as HSv3 - oInnerLayer = hs_address.decrypt(elt) -# LOG.log(40, f"{dir(oInnerLayer)}") + try: + desc = controller.get_hidden_service_descriptor(elt, + await_result=True, + timeout=itimeout) + # LOG.log(40, f"{dir(desc)} get_hidden_service_descriptor") + # timeouts 20 sec + # mistakenly a HSv2 descriptor + hs_address = HiddenServiceDescriptorV3.from_str(str(desc)) # reparse as HSv3 + oInnerLayer = hs_address.decrypt(elt) + # LOG.log(40, f"{dir(oInnerLayer)}") - # IntroductionPointV3 - n = oInnerLayer.introduction_points - if not n: - LOG.warn(f"NO introduction points for {elt}") + # IntroductionPointV3 + n = oInnerLayer.introduction_points + if not n: + LOG.warn(f"NO introduction points for {elt}") + continue + LOG.info(f"{elt} {len(n)} introduction points") + lp = [] + for introduction_point in n: + for linkspecifier in introduction_point.link_specifiers: + if isinstance(linkspecifier, LinkByFingerprint): + # LOG.log(40, f"Getting fingerprint for {linkspecifier}") + if hasattr(linkspecifier, 'fingerprint'): + assert len(linkspecifier.value) == 20 + lp += [bin_to_hex(linkspecifier.value)] + LOG.info(f"{len(lp)} introduction points for {elt}") + l += lp + except (Empty, Timeout,) as e: # noqa + LOG.warn(f"Timed out getting introduction points for {elt}") continue - LOG.info(f"{elt} {len(n)} introduction points") - lp = [] - for introduction_point in n: - for linkspecifier in introduction_point.link_specifiers: - if isinstance(linkspecifier, LinkByFingerprint): -# LOG.log(40, f"Getting fingerprint for {linkspecifier}") - if hasattr(linkspecifier, 'fingerprint'): - assert len(linkspecifier.value) == 20 - lp += [bin_to_hex(linkspecifier.value)] - LOG.info(f"{len(lp)} introduction points for {elt}") - l += lp - except Exception as e: - LOG.exception(e) + except Exception as e: + LOG.exception(e) return l def zResolveDomain(domain): try: ip = sTorResolve(domain) - except Exception as e: + except Exception as e: # noqa ip = '' if ip == '': try: lpair = getaddrinfo(domain, 443) except Exception as e: - LOG.warn("{e}") + LOG.warn(f"{e}") lpair = None if lpair is None: LOG.warn(f"TorResolv and getaddrinfo failed for {domain}") @@ -222,13 +247,12 @@ def sTorResolve(target, ): MAX_INFO_RESPONSE_PACKET_LENGTH = 8 if '@' in target: - LOG.warn(f"sTorResolve failed invalid hostname {target}" ) + LOG.warn(f"sTorResolve failed invalid hostname {target}") return '' target = target.strip('/') - seb = b"\o004\o360\o000\o000\o000\o000\o000\o001\o000" seb = b"\x04\xf0\x00\x00\x00\x00\x00\x01\x00" seb += bytes(target, 'US-ASCII') + b"\x00" - assert len(seb) == 10+len(target), str(len(seb))+repr(seb) + assert len(seb) == 10 + len(target), str(len(seb)) + repr(seb) # LOG.debug(f"0 Sending {len(seb)} to The TOR proxy {seb}") @@ -236,7 +260,7 @@ def sTorResolve(target, sock.connect((sHost, iPort)) sock.settimeout(SOCK_TIMEOUT_SECONDS) - oRet = sock.sendall(seb) + oRet = sock.sendall(seb) # noqa i = 0 data = '' @@ -250,8 +274,7 @@ def sTorResolve(target, flags=socket.MSG_WAITALL data = sock.recv(MAX_INFO_RESPONSE_PACKET_LENGTH, flags) except socket.timeout: - LOG.warn("4 The TOR proxy " \ - +repr((sHost, iPort)) \ + LOG.warn(f"4 The TOR proxy {(sHost, iPort)}" \ +" didnt reply in " + str(SOCK_TIMEOUT_SECONDS) + " sec." +" #" +str(i)) except Exception as e: @@ -260,7 +283,7 @@ def sTorResolve(target, +" errored with " + str(e) +" #" +str(i)) sock.close() - raise SystemExit(4) + return '' else: if len(data) > 0: break @@ -269,10 +292,10 @@ def sTorResolve(target, sLabel = "5 No reply #" else: sLabel = "5 No data #" - LOG.info(sLabel +f"{i} on {sHost}:{iPort}" ) + LOG.warn(f"sTorResolve: {sLabel} {i} on {sHost}:{iPort}") sock.close() - raise SystemExit(5) - + return '' + assert len(data) >= 8 packet_sf = data[1] if packet_sf == 90: @@ -281,13 +304,13 @@ def sTorResolve(target, return f"{data[4]}.{data[5]}.{data[6]}.{data[7]}" else: # 91 - LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}" ) + LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}") os.system(f"tor-resolve -4 {target} > /tmp/e 2>/dev/null") # os.system("strace tor-resolve -4 "+target+" 2>&1|grep '^sen\|^rec'") return '' - + def getaddrinfo(sHost, sPort): # do this the explicit way = Ive seen the compact connect fail # >>> sHost, sPort = 'l27.0.0.1', 33446 @@ -310,8 +333,8 @@ def icheck_torrc(sFile, oArgs): a = {} for elt in l: elt = elt.strip() - if not elt or not ' ' in elt: continue - k,v = elt.split(' ', 1) + if not elt or ' ' not in elt: continue + (k, v,) = elt.split(' ', 1) a[k] = v keys = a @@ -388,4 +411,4 @@ def lExitExcluder(oArgs, iPort=9051, log_level=10): if __name__ == '__main__': target = 'duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad' controller = oGetStemController(log_level=10) - lIntroductionPoints(controller, [target], itimeout=120) + lIntroductionPoints(controller, [target], itimeout=120) diff --git a/wrapper_tests/support_testing.py b/wrapper_tests/support_testing.py index 259e60b..3e90bca 100644 --- a/wrapper_tests/support_testing.py +++ b/wrapper_tests/support_testing.py @@ -1,19 +1,21 @@ # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- -import os -import sys import argparse -import re -import traceback -import logging -import shutil +import contextlib import json -import socket +import logging +import os +import re import select -from ctypes import * -import time, contextlib +import shutil +import socket +import sys +import time +import traceback import unittest +from ctypes import * from random import Random + random = Random() try: @@ -34,14 +36,18 @@ except ImportError as e: import wrapper from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS + +from wrapper_tests.support_http import bAreWeConnected +from wrapper_tests.support_onions import (is_valid_fingerprint, + lIntroductionPoints, + oGetStemController, + sMapaddressResolv, sTorResolve) + try: from user_data.settings import get_user_config_path except ImportError: get_user_config_path = None -from wrapper_tests.support_http import bAreWeConnected -from wrapper_tests.support_onions import sTorResolve - # LOG=util.log global LOG LOG = logging.getLogger() @@ -53,8 +59,8 @@ def LOG_DEBUG(l): print('DEBUGc: '+l) def LOG_TRACE(l): pass # print('TRACE+ '+l) try: - from trepan.interfaces import server as Mserver from trepan.api import debug + from trepan.interfaces import server as Mserver except: # print('trepan3 TCP server NOT available.') pass @@ -72,7 +78,7 @@ else: iTHREAD_TIMEOUT = 1 iTHREAD_SLEEP = 1 iTHREAD_JOINS = 8 -iNODES=6 +iNODES = 6 lToxSamplerates = [8000, 12000, 16000, 24000, 48000] lToxSampleratesK = [8, 12, 16, 24, 48] @@ -378,8 +384,8 @@ def setup_logging(oArgs): LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") def signal_handler(num, f): - from trepan.interfaces import server as Mserver from trepan.api import debug + from trepan.interfaces import server as Mserver connection_opts={'IO': 'TCP', 'PORT': 6666} intf = Mserver.ServerInterface(connection_opts=connection_opts) dbg_opts = {'interface': intf} @@ -388,8 +394,13 @@ def signal_handler(num, f): return def merge_args_into_settings(args, settings): - from user_data.settings import clean_settings if args: + if not hasattr(args, 'audio'): + LOG.warn('No audio ' +repr(args)) + settings['audio'] = getattr(args, 'audio') + if not hasattr(args, 'video'): + LOG.warn('No video ' +repr(args)) + settings['video'] = getattr(args, 'video') for key in settings.keys(): # proxy_type proxy_port proxy_host not_key = 'not_' +key @@ -406,6 +417,40 @@ def merge_args_into_settings(args, settings): clean_settings(settings) return +def clean_settings(self): + # failsafe to ensure C tox is bytes and Py settings is str + + # overrides + self['mirror_mode'] = False + # REQUIRED!! + if not os.path.exists('/proc/sys/net/ipv6'): + LOG.warn('Disabling IPV6 because /proc/sys/net/ipv6 does not exist') + self['ipv6_enabled'] = False + + if 'proxy_type' in self and self['proxy_type'] == 0: + self['proxy_host'] = '' + self['proxy_port'] = 0 + + if 'proxy_type' in self and self['proxy_type'] != 0 and \ + 'proxy_host' in self and self['proxy_host'] != '' and \ + 'proxy_port' in self and self['proxy_port'] != 0: + if 'udp_enabled' in self and self['udp_enabled']: + # We don't currently support UDP over proxy. + LOG.info("UDP enabled and proxy set: disabling UDP") + self['udp_enabled'] = False + if 'local_discovery_enabled' in self and self['local_discovery_enabled']: + LOG.info("local_discovery_enabled enabled and proxy set: disabling local_discovery_enabled") + self['local_discovery_enabled'] = False + if 'dht_announcements_enabled' in self and self['dht_announcements_enabled']: + LOG.info("dht_announcements_enabled enabled and proxy set: disabling dht_announcements_enabled") + self['dht_announcements_enabled'] = False + + if 'auto_accept_path' in self and \ + type(self['auto_accept_path']) == bytes: + self['auto_accept_path'] = str(self['auto_accept_path'], 'UTF-8') + + LOG.debug("Cleaned settings") + def lSdSamplerates(iDev): try: import sounddevice as sd @@ -441,6 +486,8 @@ DEFAULT_NODES_COUNT = 8 global aNODES aNODES = {} import functools + + # @functools.lru_cache(maxsize=12) def generate_nodes(oArgs=None, nodes_count=DEFAULT_NODES_COUNT, @@ -613,12 +660,12 @@ def sDNSLookup(host): LOG.warn(f"onion address skipped because no tor-resolve {host}") return '' try: - sOut = f"/tmp/TR{os.getpid()}.log" - i = os.system(f"tor-resolve -4 {host} > {sOUT}") + sout = f"/tmp/TR{os.getpid()}.log" + i = os.system(f"tor-resolve -4 {host} > {sout}") if not i: LOG.warn(f"onion address skipped because tor-resolve on {host}") return '' - ip = open(sOut, 'rt').read() + ip = open(sout, 'rt').read() if ip.endswith('failed.'): LOG.warn(f"onion address skipped because tor-resolve failed on {host}") return '' @@ -636,12 +683,12 @@ def sDNSLookup(host): if ip == '': try: - sOut = f"/tmp/TR{os.getpid()}.log" - i = os.system(f"dig {host}|grep ^{host}|sed -e 's/.* //'> {sOUT}") + sout = f"/tmp/TR{os.getpid()}.log" + i = os.system(f"dig {host}|grep ^{host}|sed -e 's/.* //'> {sout}") if not i: LOG.warn(f"address skipped because dig failed on {host}") return '' - ip = open(sOut, 'rt').read().strip() + ip = open(sout, 'rt').read().strip() LOG.debug(f"address dig {ip} on {host}") return ip except: @@ -734,7 +781,7 @@ def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''): o = nmps().scan(hosts=sHost, arguments=cmd) aScan = o['scan'] ip = list(aScan.keys())[0] - state = aScam[ip][prot][iPort]['state'] + state = aScan[ip][prot][sPort]['state'] LOG.info(f"iNmapInfoNmap: to {sHost} {state}") return 0 diff --git a/wrapper_tests/tests_wrapper.py b/wrapper_tests/tests_wrapper.py index 2370e5a..d1f0785 100644 --- a/wrapper_tests/tests_wrapper.py +++ b/wrapper_tests/tests_wrapper.py @@ -27,28 +27,30 @@ which itself was forked from https://github.com/aitjcize/PyTox/ Modified to work with """ +import ctypes +import faulthandler import hashlib +import logging import os +import random import re import sys -import unittest -import traceback -import logging -import random import threading -import ctypes +import traceback +import unittest from ctypes import * -import faulthandler faulthandler.enable() import warnings + warnings.filterwarnings('ignore') try: - import pycurl - import certifi from io import BytesIO + + import certifi + import pycurl except ImportError: pycurl = None @@ -65,10 +67,13 @@ except ImportError as e: color_runner = None import wrapper -from wrapper.tox import Tox import wrapper.toxcore_enums_and_consts as enums -from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS, \ - TOX_MESSAGE_TYPE, TOX_SECRET_KEY_SIZE, TOX_FILE_CONTROL, TOX_ADDRESS_SIZE +from wrapper.tox import Tox +from wrapper.toxcore_enums_and_consts import (TOX_ADDRESS_SIZE, TOX_CONNECTION, + TOX_FILE_CONTROL, + TOX_MESSAGE_TYPE, + TOX_SECRET_KEY_SIZE, + TOX_USER_STATUS) try: import support_testing as ts @@ -82,16 +87,9 @@ except ImportError: bIS_NOT_TOXYGEN = True # from PyQt5 import QtCore -if 'QtCore' in globals(): - def qt_sleep(fSec): - if fSec > .000001: QtCore.QThread.sleep(fSec) - QtCore.QCoreApplication.processEvents() - sleep = qt_sleep -elif 'gevent' in globals(): - sleep = gevent.sleep -else: - import time - sleep = time.sleep +import time + +sleep = time.sleep global LOG LOG = logging.getLogger('TestS') @@ -287,18 +285,18 @@ class ToxSuite(unittest.TestCase): def call_bootstrap(self): LOG.debug(f"call_bootstrap") if oTOX_OARGS.network in ['new', 'newlocal', 'localnew']: - ts.bootstrap_local(self.lUdp, [alice, bob]) + ts.bootstrap_local(self.lUdp, [self.alice, self.bob]) elif self.get_connection_status() is True: LOG.debug(f"call_bootstrap {self.get_connection_status()}") elif not ts.bAreWeConnected(): - LOG.warn('we are NOT CONNECTED ') + LOG.warn('we are NOT CONNECTED') elif oTOX_OARGS.proxy_port > 0: random.shuffle(self.lUdp) # LOG.debug(f"call_bootstrap ts.bootstrap_udp {self.lUdp[:2]}") - ts.bootstrap_udp(self.lUdp[:iNODES], [self.alice, self.bob]) + ts.bootstrap_udp(self.lUdp[:ts.iNODES], [self.alice, self.bob]) random.shuffle(self.lTcp) # LOG.debug(f"call_bootstrap ts.bootstrap_tcp {self.lTcp[:8]}") - ts.bootstrap_tcp(self.lTcp[:iNODES], [self.alice, self.bob]) + ts.bootstrap_tcp(self.lTcp[:ts.iNODES], [self.alice, self.bob]) else: random.shuffle(self.lUdp) # LOG.debug(f"call_bootstrap ts.bootstrap_udp {self.lUdp[:8]}") @@ -696,24 +694,6 @@ class ToxSuite(unittest.TestCase): else: LOG.warn(f"bootstrap_local_netstat NOT {port} iStatus={iStatus}") - def test_bootstrap_local_bash(self): # works - """ - t:bootstrap - """ - if oTOX_OARGS.network not in ['new', 'test', 'newlocal', 'local']: - return - - o = oTOX_OARGS.network - sFile = bootstrap_node_info.__file__ - assert os.path.exists(sFile) - port = ts.tox_bootstrapd_port() - sExe = sys.executable - iStatus = os.system(sExe +f""" {sFile} --test ipv4 localhost {port}""") - if iStatus == 0: - LOG.info(f"bootstrap_local_bash connected {o} iStatus={iStatus}") - else: - LOG.warn(f"bootstrap_local_bash NOT CONNECTED {o} iStatus={iStatus}") - @unittest.skipIf(not bIS_LOCAL, "local test") def test_bootstrap_local(self): # works """ @@ -929,7 +909,7 @@ class ToxSuite(unittest.TestCase): self.bob.friend_delete(self.baid) self.alice.friend_delete(self.abid) - @unittest.skip('unfinished?') + @unittest.skip('unfinished') def test_bob_add_alice_as_friend_and_status(self): assert self.bob_add_alice_as_friend_and_status() self.bob.friend_delete(self.baid) @@ -1438,7 +1418,8 @@ class ToxSuite(unittest.TestCase): try: assert fid == BID assert FILE_NUMBER == file_number - if control == Tox.FILE_CONTROL_FINISHED: + # FixMe _FINISHED? + if False and control == TOX_FILE_CONTROL['RESUME']: # assert CONTEXT['RECEIVED'] == FILE_SIZE # m = hashlib.md5() # m.update(CONTEXT['FILE']) @@ -1575,8 +1556,6 @@ class ToxSuite(unittest.TestCase): # self._address try: - self.alice._kill_toxav() - self.alice._kill_tox() self.alice.kill() except: pass