This commit is contained in:
emdee 2023-12-12 05:20:05 +00:00
parent bc35421760
commit daee891825
6 changed files with 99 additions and 113 deletions

View File

@ -1,7 +1,7 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- # -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
from ctypes import * from ctypes import *
from datetime import datetime from datetime import datetime
from typing import Union, Callable from typing import Union, Callable, Union
try: try:
from wrapper.libtox import LibToxCore from wrapper.libtox import LibToxCore
@ -258,7 +258,7 @@ class Tox:
""" """
return int(Tox.libtoxcore.tox_get_savedata_size(self._tox_pointer)) return int(Tox.libtoxcore.tox_get_savedata_size(self._tox_pointer))
def get_savedata(self, savedata=None) -> str: def get_savedata(self, savedata=None) -> bytes:
""" """
Store all information associated with the tox instance to a byte array. Store all information associated with the tox instance to a byte array.
@ -348,6 +348,7 @@ class Tox:
'address, or the IP address passed was invalid.') 'address, or the IP address passed was invalid.')
if tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_PORT']: if tox_err_bootstrap == TOX_ERR_BOOTSTRAP['BAD_PORT']:
raise ArgumentError('The port passed was invalid. The valid port range is (1, 65535).') raise ArgumentError('The port passed was invalid. The valid port range is (1, 65535).')
raise ToxError('The function did not return OK')
def self_get_connection_status(self) -> int: def self_get_connection_status(self) -> int:
""" """
@ -420,10 +421,10 @@ class Tox:
# Internal client information (Tox address/id) # Internal client information (Tox address/id)
def self_get_toxid(self, address=None) -> str: def self_get_toxid(self, address: Union[bytes, None]=None) -> str:
return self.self_get_address(address) return self.self_get_address(address)
def self_get_address(self, address=None) -> str: def self_get_address(self, address: Union[bytes, None]=None) -> str:
""" """
Writes the Tox friend address of the client to a byte array. The address is not in human-readable format. If a Writes the Tox friend address of the client to a byte array. The address is not in human-readable format. If a
client wants to display the address, formatting is required. client wants to display the address, formatting is required.
@ -456,7 +457,7 @@ class Tox:
""" """
return int(Tox.libtoxcore.tox_self_get_nospam(self._tox_pointer)) return int(Tox.libtoxcore.tox_self_get_nospam(self._tox_pointer))
def self_get_public_key(self, public_key: str=None) -> str: def self_get_public_key(self, public_key: Union[bytes, None] = None) -> str:
""" """
Copy the Tox Public Key (long term) from the Tox object. Copy the Tox Public Key (long term) from the Tox object.
@ -470,7 +471,7 @@ class Tox:
Tox.libtoxcore.tox_self_get_public_key(self._tox_pointer, public_key) Tox.libtoxcore.tox_self_get_public_key(self._tox_pointer, public_key)
return bin_to_string(public_key, TOX_PUBLIC_KEY_SIZE) return bin_to_string(public_key, TOX_PUBLIC_KEY_SIZE)
def self_get_secret_key(self, secret_key=None) -> str: def self_get_secret_key(self, secret_key: Union[bytes, None]=None) -> str:
""" """
Copy the Tox Secret Key from the Tox object. Copy the Tox Secret Key from the Tox object.
@ -505,12 +506,12 @@ class Tox:
byref(tox_err_set_info)) byref(tox_err_set_info))
tox_err_set_info = tox_err_set_info.value tox_err_set_info = tox_err_set_info.value
if tox_err_set_info == TOX_ERR_SET_INFO['OK']: if tox_err_set_info == TOX_ERR_SET_INFO['OK']:
return True # was bool(result) return bool(result)
elif tox_err_set_info == TOX_ERR_SET_INFO['NULL']: elif tox_err_set_info == TOX_ERR_SET_INFO['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.') raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_set_info == TOX_ERR_SET_INFO['TOO_LONG']: elif tox_err_set_info == TOX_ERR_SET_INFO['TOO_LONG']:
raise ArgumentError('Information length exceeded maximum permissible size.') raise ArgumentError('Information length exceeded maximum permissible size.')
return False # was raise ToxError('The function did not return OK')
def self_get_name_size(self) -> int: def self_get_name_size(self) -> int:
""" """
@ -523,7 +524,7 @@ class Tox:
retval = Tox.libtoxcore.tox_self_get_name_size(self._tox_pointer) retval = Tox.libtoxcore.tox_self_get_name_size(self._tox_pointer)
return int(retval) return int(retval)
def self_get_name(self, name: str=None) -> str: def self_get_name(self, name: Union[bytes,None]=None) -> str:
""" """
Write the nickname set by tox_self_set_name to a byte array. Write the nickname set by tox_self_set_name to a byte array.
@ -580,7 +581,7 @@ class Tox:
""" """
return Tox.libtoxcore.tox_self_get_status_message_size(self._tox_pointer) return Tox.libtoxcore.tox_self_get_status_message_size(self._tox_pointer)
def self_get_status_message(self, status_message: str=None) -> str: def self_get_status_message(self, status_message: Union[bytes,None]=None) -> str:
""" """
Write the status message set by tox_self_set_status_message to a byte array. Write the status message set by tox_self_set_status_message to a byte array.
@ -1877,17 +1878,7 @@ class Tox:
nick = bytes(nick, 'utf-8') nick = bytes(nick, 'utf-8')
if type(group_name) != bytes: if type(group_name) != bytes:
group_name = bytes(group_name, 'utf-8') group_name = bytes(group_name, 'utf-8')
if False: # API change if True:
peer_info = self.group_self_peer_info_new()
peer_info.contents.nick = c_char_p(nick)
peer_info.contents.nick_length = len(nick)
peer_info.contents.user_status = status
result = Tox.libtoxcore.tox_group_new(self._tox_pointer,
privacy_state,
group_name,
c_size_t(len(group_name)),
peer_info, byref(error))
else:
cgroup_name = c_char_p(group_name) cgroup_name = c_char_p(group_name)
result = Tox.libtoxcore.tox_group_new(self._tox_pointer, result = Tox.libtoxcore.tox_group_new(self._tox_pointer,
privacy_state, privacy_state,
@ -2203,7 +2194,8 @@ class Tox:
error = c_int() error = c_int()
key = create_string_buffer(TOX_GROUP_PEER_PUBLIC_KEY_SIZE) key = create_string_buffer(TOX_GROUP_PEER_PUBLIC_KEY_SIZE)
LOG_DEBUG(f"tox.group_self_get_public_key") LOG_DEBUG(f"tox.group_self_get_public_key")
result = Tox.libtoxcore.tox_group_self_get_public_key(self._tox_pointer, c_uint32(group_number), result = Tox.libtoxcore.tox_group_self_get_public_key(self._tox_pointer,
c_uint32(group_number),
key, byref(error)) key, byref(error))
if error.value: if error.value:
LOG_ERROR(f"tox.group_self_get_public_key {TOX_ERR_FRIEND_GET_PUBLIC_KEY[error.value]}") LOG_ERROR(f"tox.group_self_get_public_key {TOX_ERR_FRIEND_GET_PUBLIC_KEY[error.value]}")
@ -2452,7 +2444,9 @@ class Tox:
size = self.group_get_topic_size(group_number) size = self.group_get_topic_size(group_number)
topic = create_string_buffer(size) topic = create_string_buffer(size)
LOG_DEBUG(f"tox.group_get_topic") LOG_DEBUG(f"tox.group_get_topic")
result = Tox.libtoxcore.tox_group_get_topic(self._tox_pointer, c_uint32(group_number), topic, byref(error)) result = Tox.libtoxcore.tox_group_get_topic(self._tox_pointer,
c_uint32(group_number),
topic, byref(error))
if error.value: if error.value:
LOG_ERROR(f" err={error.value}") LOG_ERROR(f" err={error.value}")
raise ToxError(f" err={error.value}") raise ToxError(f" err={error.value}")
@ -2488,7 +2482,8 @@ class Tox:
size = self.group_get_name_size(group_number) size = self.group_get_name_size(group_number)
name = create_string_buffer(size) name = create_string_buffer(size)
LOG_DEBUG(f"tox.group_get_name") LOG_DEBUG(f"tox.group_get_name")
result = Tox.libtoxcore.tox_group_get_name(self._tox_pointer, c_uint32(group_number), result = Tox.libtoxcore.tox_group_get_name(self._tox_pointer,
c_uint32(group_number),
name, byref(error)) name, byref(error))
if error.value: if error.value:
LOG_ERROR(f"group_get_name err={error.value}") LOG_ERROR(f"group_get_name err={error.value}")

View File

@ -59,8 +59,15 @@ TOX_ERR_SET_INFO = {
'OK': 0, 'OK': 0,
'NULL': 1, 'NULL': 1,
'TOO_LONG': 2, 'TOO_LONG': 2,
# The function returned successfully.
'TOX_ERR_SET_INFO_OK': 0,
# One of the arguments to the function was NULL when it was not expected.
'TOX_ERR_SET_INFO_NULL': 1,
# Information length exceeded maximum permissible size.
'TOX_ERR_SET_INFO_TOO_LONG': 2,
} }
TOX_ERR_FRIEND_ADD = { TOX_ERR_FRIEND_ADD = {
'OK': 0, 'OK': 0,
'NULL': 1, 'NULL': 1,

View File

@ -60,7 +60,7 @@ class ToxEncryptSave:
:return: output array :return: output array
""" """
out = create_string_buffer(len(data) - TOX_PASS_ENCRYPTION_EXTRA_LENGTH) out = create_string_buffer(len(data) - enum.TOX_PASS_ENCRYPTION_EXTRA_LENGTH)
tox_err_decryption = c_int() tox_err_decryption = c_int()
if type(password) != bytes: if type(password) != bytes:
password = bytes(password, 'utf-8') password = bytes(password, 'utf-8')

View File

@ -17,9 +17,7 @@ import threading
import random import random
from ctypes import * from ctypes import *
import argparse import argparse
import time
from time import sleep
from os.path import exists
# LOG=util.log # LOG=util.log
global LOG global LOG
@ -36,12 +34,11 @@ import wrapper
import wrapper.toxcore_enums_and_consts as enums import wrapper.toxcore_enums_and_consts as enums
from wrapper.tox import Tox, UINT32_MAX, ToxError from wrapper.tox import Tox, UINT32_MAX, ToxError
from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS, \ from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS, \
TOX_MESSAGE_TYPE, TOX_PUBLIC_KEY_SIZE, TOX_FILE_CONTROL TOX_MESSAGE_TYPE, TOX_PUBLIC_KEY_SIZE, TOX_FILE_CONTROL, TOX_FILE_KIND
import wrapper_tests.support_testing as ts import wrapper_tests.support_testing as ts
from wrapper_tests.support_testing import oMainArgparser from wrapper_tests.support_testing import oMainArgparser
import time
def sleep(fSec): def sleep(fSec):
if 'QtCore' in globals(): if 'QtCore' in globals():
if fSec > .000001: QtCore.QThread.msleep(fSec) if fSec > .000001: QtCore.QThread.msleep(fSec)
@ -79,26 +76,26 @@ else:
super(AV, self).__init__(core) super(AV, self).__init__(core)
self.core = self.get_tox() self.core = self.get_tox()
def on_call(self, fid, audio_enabled, video_enabled): def on_call(self, fid, audio_enabled, video_enabled) -> None:
LOG.info("Incoming %s call from %d:%s ..." % ( LOG.info("Incoming %s call from %d:%s ..." % (
"video" if video_enabled else "audio", fid, "video" if video_enabled else "audio", fid,
self.core.friend_get_name(fid))) self.core.friend_get_name(fid)))
bret = self.answer(fid, 48, 64) bret = self.answer(fid, 48, 64)
LOG.info(f"Answered, in call... {bret!s}") LOG.info(f"Answered, in call... {bret}")
def on_call_state(self, fid, state): def on_call_state(self, fid, state) -> None:
LOG.info('call state:fn=%d, state=%d' % (fid, state)) LOG.info('call state:fn=%d, state=%d' % (fid, state))
def on_audio_bit_rate(self, fid, audio_bit_rate): def on_audio_bit_rate(self, fid, audio_bit_rate) -> None:
LOG.info('audio bit rate status: fn=%d, abr=%d' % LOG.info('audio bit rate status: fn=%d, abr=%d' %
(fid, audio_bit_rate)) (fid, audio_bit_rate))
def on_video_bit_rate(self, fid, video_bit_rate): def on_video_bit_rate(self, fid, video_bit_rate) -> None:
LOG.info('video bit rate status: fn=%d, vbr=%d' % LOG.info('video bit rate status: fn=%d, vbr=%d' %
(fid, video_bit_rate)) (fid, video_bit_rate))
def on_audio_receive_frame(self, fid, pcm, sample_count, def on_audio_receive_frame(self, fid, pcm, sample_count,
channels, sampling_rate): channels, sampling_rate) -> None:
# LOG.info('audio frame: %d, %d, %d, %d' % # LOG.info('audio frame: %d, %d, %d, %d' %
# (fid, sample_count, channels, sampling_rate)) # (fid, sample_count, channels, sampling_rate))
# LOG.info('pcm len:%d, %s' % (len(pcm), str(type(pcm)))) # LOG.info('pcm len:%d, %s' % (len(pcm), str(type(pcm))))
@ -109,7 +106,7 @@ else:
if bret is False: if bret is False:
LOG.error('on_audio_receive_frame error.') LOG.error('on_audio_receive_frame error.')
def on_video_receive_frame(self, fid, width, height, frame, u, v): def on_video_receive_frame(self, fid, width, height, frame, u, v) -> None:
LOG.info('video frame: %d, %d, %d, ' % (fid, width, height)) LOG.info('video frame: %d, %d, %d, ' % (fid, width, height))
sys.stdout.write('*') sys.stdout.write('*')
sys.stdout.flush() sys.stdout.flush()
@ -117,7 +114,7 @@ else:
if bret is False: if bret is False:
LOG.error('on_video_receive_frame error.') LOG.error('on_video_receive_frame error.')
def witerate(self): def witerate(self) -> None:
self.iterate() self.iterate()
@ -140,7 +137,7 @@ class EchoBot():
self.av = None self.av = None
self.on_connection_status = None self.on_connection_status = None
def start(self): def start(self) -> None:
self.connect() self.connect()
if bHAVE_AV: if bHAVE_AV:
# RuntimeError: Attempted to create a second session for the same Tox instance. # RuntimeError: Attempted to create a second session for the same Tox instance.
@ -150,7 +147,7 @@ class EchoBot():
public_key, public_key,
message_data, message_data,
message_data_size, message_data_size,
*largs): *largs) -> None:
key = ''.join(chr(x) for x in public_key[:TOX_PUBLIC_KEY_SIZE]) key = ''.join(chr(x) for x in public_key[:TOX_PUBLIC_KEY_SIZE])
sPk = wrapper.tox.bin_to_string(key, TOX_PUBLIC_KEY_SIZE) sPk = wrapper.tox.bin_to_string(key, TOX_PUBLIC_KEY_SIZE)
sMd = str(message_data, 'UTF-8') sMd = str(message_data, 'UTF-8')
@ -164,14 +161,14 @@ class EchoBot():
iMessageType, iMessageType,
message_data, message_data,
message_data_size, message_data_size,
*largs): *largs) -> None:
sMd = str(message_data, 'UTF-8') sMd = str(message_data, 'UTF-8')
LOG_debug(f"on_friend_message {iFriendNum}" +' ' +sMd) LOG_debug(f"on_friend_message {iFriendNum}" +' ' +sMd)
self.on_friend_message(iFriendNum, iMessageType, sMd) self.on_friend_message(iFriendNum, iMessageType, sMd)
LOG.info('setting bobs_on_friend_message') LOG.info('setting bobs_on_friend_message')
self._tox.callback_friend_message(bobs_on_friend_message) self._tox.callback_friend_message(bobs_on_friend_message)
def bobs_on_file_chunk_request(iTox, fid, filenumber, position, length, *largs): def bobs_on_file_chunk_request(iTox, fid, filenumber, position, length, *largs) -> None:
if length == 0: if length == 0:
return return
@ -180,7 +177,7 @@ class EchoBot():
self._tox.callback_file_chunk_request(bobs_on_file_chunk_request) self._tox.callback_file_chunk_request(bobs_on_file_chunk_request)
def bobs_on_file_recv(iTox, fid, filenumber, kind, size, filename, *largs): def bobs_on_file_recv(iTox, fid, filenumber, kind, size, filename, *largs):
LOG_info(f"on_file_recv {fid!s} {filenumber!s} {kind!s} {size!s} {filename}") LOG_info(f"on_file_recv {fid} {filenumber} {kind} {size} {filename}")
if size == 0: if size == 0:
return return
self.files[(fid, filenumber)] = { self.files[(fid, filenumber)] = {
@ -191,7 +188,7 @@ class EchoBot():
self._tox.file_control(fid, filenumber, TOX_FILE_CONTROL['RESUME']) self._tox.file_control(fid, filenumber, TOX_FILE_CONTROL['RESUME'])
def connect(self): def connect(self) -> None:
if not self.on_connection_status: if not self.on_connection_status:
def on_connection_status(iTox, iCon, *largs): def on_connection_status(iTox, iCon, *largs):
LOG_info('ON_CONNECTION_STATUS - CONNECTED ' + repr(iCon)) LOG_info('ON_CONNECTION_STATUS - CONNECTED ' + repr(iCon))
@ -233,7 +230,7 @@ class EchoBot():
except Exception as e: except Exception as e:
LOG.warn('error relay to ' + lElt[0]) LOG.warn('error relay to ' + lElt[0])
def loop(self): def loop(self) -> None:
if not self.av: if not self.av:
self.start() self.start()
checked = False checked = False
@ -266,30 +263,30 @@ class EchoBot():
LOG.info('Ending loop.') LOG.info('Ending loop.')
def iterate(self, n=100): def iterate(self, n=100) -> None:
interval = self._tox.iteration_interval() interval = self._tox.iteration_interval()
for i in range(n): for i in range(n):
self._tox.iterate() self._tox.iterate()
sleep(interval / 1000.0) sleep(interval / 1000.0)
self._tox.iterate() self._tox.iterate()
def on_friend_request(self, pk, message): def on_friend_request(self, pk, message) -> None:
LOG.debug('Friend request from %s: %s' % (pk, message)) LOG.debug('Friend request from %s: %s' % (pk, message))
self._tox.friend_add_norequest(pk) self._tox.friend_add_norequest(pk)
LOG.info('on_friend_request Accepted.') LOG.info('on_friend_request Accepted.')
save_to_file(self._tox, sDATA_FILE) save_to_file(self._tox, sDATA_FILE)
def on_friend_message(self, friendId, type, message): def on_friend_message(self, friendId, type, message) -> None:
name = self._tox.friend_get_name(friendId) name = self._tox.friend_get_name(friendId)
LOG.debug('%s: %s' % (name, message)) LOG.debug('%s: %s' % (name, message))
yMessage = bytes(message, 'UTF-8') yMessage = bytes(message, 'UTF-8')
self._tox.friend_send_message(friendId, TOX_MESSAGE_TYPE['NORMAL'], yMessage) self._tox.friend_send_message(friendId, TOX_MESSAGE_TYPE['NORMAL'], yMessage)
LOG.info('EchoBot sent: %s' % message) LOG.info('EchoBot sent: %s' % message)
def on_file_recv_chunk(self, fid, filenumber, position, data): def on_file_recv_chunk(self, fid, filenumber, position, data) -> None:
filename = self.files[(fid, filenumber)]['filename'] filename = self.files[(fid, filenumber)]['filename']
size = self.files[(fid, filenumber)]['size'] size = self.files[(fid, filenumber)]['size']
LOG.debug(f"on_file_recv_chunk {fid!s} {filenumber!s} {filename} {position/float(size)*100!s}") LOG.debug(f"on_file_recv_chunk {fid} {filenumber} {filename} {position/float(size)*100}")
if data is None: if data is None:
msg = "I got '{}', sending it back right away!".format(filename) msg = "I got '{}', sending it back right away!".format(filename)
@ -298,7 +295,7 @@ class EchoBot():
self.files[(fid, 0)] = self.files[(fid, filenumber)] self.files[(fid, 0)] = self.files[(fid, filenumber)]
length = self.files[(fid, filenumber)]['size'] length = self.files[(fid, filenumber)]['size']
self.file_send(fid, 0, length, filename, filename) self._tox.file_send(fid, TOX_FILE_KIND['DATA'], length, filename)
del self.files[(fid, filenumber)] del self.files[(fid, filenumber)]
return return
@ -330,7 +327,7 @@ class BaseThread(threading.Thread):
self._stop_thread = False self._stop_thread = False
self.name = name self.name = name
def stop_thread(self, timeout=-1): def stop_thread(self, timeout=-1) -> None:
self._stop_thread = True self._stop_thread = True
if timeout < 0: if timeout < 0:
timeout = ts.iTHREAD_TIMEOUT timeout = ts.iTHREAD_TIMEOUT
@ -348,7 +345,7 @@ class ToxIterateThread(BaseThread):
super().__init__(name='ToxIterateThread') super().__init__(name='ToxIterateThread')
self._tox = tox self._tox = tox
def run(self): def run(self) -> None:
while not self._stop_thread: while not self._stop_thread:
self._tox.iterate() self._tox.iterate()
sleep(self._tox.iteration_interval() / 1000) sleep(self._tox.iteration_interval() / 1000)
@ -377,7 +374,7 @@ def oArgparse(lArgv):
return oArgs return oArgs
def iMain(oArgs): def iMain(oArgs) -> int:
global sDATA_FILE global sDATA_FILE
# oTOX_OPTIONS = ToxOptions() # oTOX_OPTIONS = ToxOptions()
global oTOX_OPTIONS global oTOX_OPTIONS
@ -429,38 +426,14 @@ def iMain(oArgs):
iRet = 1 iRet = 1
return iRet return iRet
def main(largs=None): def main(lArgs=None) -> int:
if largs is None: largs = [] global oTOX_OARGS
oArgs = oArgparse(largs) global bIS_LOCAL
global oTOX_OARGS
oTOX_OARGS = oArgs
print(oArgs)
if coloredlogs:
logger = logging.getLogger()
# https://pypi.org/project/coloredlogs/
coloredlogs.install(level=oArgs.loglevel,
logger=logger,
# %(asctime)s,%(msecs)03d %(hostname)s [%(process)d]
fmt='%(name)s %(levelname)s %(message)s'
)
else:
logging.basicConfig(level=oArgs.loglevel) # logging.INFO
return iMain(oArgs)
def main(lArgs=None):
global oTOX_OARGS
if lArgs is None: lArgs = [] if lArgs is None: lArgs = []
oArgs = oArgparse(lArgs) oArgs = oArgparse(lArgs)
global bIS_LOCAL
bIS_LOCAL = oArgs.network in ['newlocal', 'localnew', 'local'] bIS_LOCAL = oArgs.network in ['newlocal', 'localnew', 'local']
oTOX_OARGS = oArgs oTOX_OARGS = oArgs
setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL) setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL)
bIS_LOCAL = True
setattr(oTOX_OARGS, 'bIS_LOCAL', bIS_LOCAL)
# oTOX_OPTIONS = ToxOptions()
global oTOX_OPTIONS global oTOX_OPTIONS
oTOX_OPTIONS = ts.oToxygenToxOptions(oArgs) oTOX_OPTIONS = ts.oToxygenToxOptions(oArgs)
if coloredlogs: if coloredlogs:

View File

@ -340,7 +340,7 @@ def oMainArgparser(_=None, iMode=0):
if not os.path.exists(sNodesJson): sNodesJson = '' if not os.path.exists(sNodesJson): sNodesJson = ''
logfile = os.path.join(os.environ.get('TMPDIR', '/tmp'), 'toxygen.log') logfile = os.path.join(os.environ.get('TMPDIR', '/tmp'), 'toxygen.log')
if not os.path.exists(sNodesJson): logfile = '' if not os.path.exists(logfile): logfile = ''
parser = argparse.ArgumentParser(add_help=True) parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('--proxy_host', '--proxy-host', type=str, parser.add_argument('--proxy_host', '--proxy-host', type=str,
@ -388,6 +388,10 @@ def oMainArgparser(_=None, iMode=0):
parser.add_argument('--dht_announcements_enabled',type=str, parser.add_argument('--dht_announcements_enabled',type=str,
default='True', choices=['True','False'], default='True', choices=['True','False'],
help='En/Disable DHT announcements') help='En/Disable DHT announcements')
# argparse.ArgumentError: argument --save_history: conflicting option string: --save_history
# parser.add_argument('--save_history', type=str, default='True',
# choices=['True', 'False'],
# help='En/Disable saving history')
return parser return parser
def vSetupLogging(oArgs): def vSetupLogging(oArgs):
@ -494,6 +498,7 @@ def clean_settings(self):
# overrides # overrides
self['mirror_mode'] = False self['mirror_mode'] = False
self['save_history'] = True
# REQUIRED!! # REQUIRED!!
if not os.path.exists('/proc/sys/net/ipv6'): if not os.path.exists('/proc/sys/net/ipv6'):
LOG.warn('Disabling IPV6 because /proc/sys/net/ipv6 does not exist') LOG.warn('Disabling IPV6 because /proc/sys/net/ipv6 does not exist')

View File

@ -43,17 +43,17 @@ from ctypes import *
faulthandler.enable() faulthandler.enable()
import warnings import warnings
warnings.filterwarnings('ignore') warnings.filterwarnings('ignore')
try: try:
from io import BytesIO from io import BytesIO
import certifi import certifi
import pycurl import pycurl
except ImportError: except ImportError:
pycurl = None pycurl = None
from pyannotate_runtime import collect_types
try: try:
import coloredlogs import coloredlogs
os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red'
@ -96,22 +96,22 @@ sleep = time.sleep
global LOG global LOG
LOG = logging.getLogger('TestS') LOG = logging.getLogger('TestS')
if False: if False:
def LOG_ERROR(l): LOG.error('+ '+l) def LOG_ERROR(l: str) -> None: LOG.error('+ '+l)
def LOG_WARN(l): LOG.warn('+ '+l) def LOG_WARN(l: str) -> None: LOG.warn('+ '+l)
def LOG_INFO(l): LOG.info('+ '+l) def LOG_INFO(l: str) -> None: LOG.info('+ '+l)
def LOG_DEBUG(l): LOG.debug('+ '+l) def LOG_DEBUG(l: str) -> None: LOG.debug('+ '+l)
def LOG_TRACE(l): pass # print('+ '+l) def LOG_TRACE(l: str) -> None: pass # print('+ '+l)
else: else:
# just print to stdout so there is NO complications from logging. # just print to stdout so there is NO complications from logging.
def LOG_ERROR(l): print('EROR+ '+l) def LOG_ERROR(l: str) -> None: print('EROR+ '+l)
def LOG_WARN(l): print('WARN+ '+l) def LOG_WARN(l: str) -> None: print('WARN+ '+l)
def LOG_INFO(l): print('INFO+ '+l) def LOG_INFO(l: str) -> None: print('INFO+ '+l)
def LOG_DEBUG(l): print('DEBUG+ '+l) def LOG_DEBUG(l: str) -> None: print('DEBUG+ '+l)
def LOG_TRACE(l): pass # print('TRAC+ '+l) def LOG_TRACE(l: str) -> None: pass # print('TRAC+ '+l)
ADDR_SIZE = 38 * 2 ADDR_SIZE = 38 * 2
CLIENT_ID_SIZE = 32 * 2 CLIENT_ID_SIZE = 32 * 2
THRESHOLD = 30 # >25 THRESHOLD = 35 # >25
iN = 6 iN = 6
global oTOX_OPTIONS global oTOX_OPTIONS
@ -124,7 +124,7 @@ def expectedFailure(test_item):
test_item.__unittest_expecting_failure__ = True test_item.__unittest_expecting_failure__ = True
return test_item return test_item
def expectedFail(reason): def expectedFail(reason: str):
""" """
expectedFailure with a reason expectedFailure with a reason
""" """
@ -185,7 +185,7 @@ class BaseThread(threading.Thread):
self._stop_thread = False self._stop_thread = False
self.name = name self.name = name
def stop_thread(self, timeout=-1): def stop_thread(self, timeout=-1) -> None:
self._stop_thread = True self._stop_thread = True
if timeout < 0: if timeout < 0:
timeout = ts.iTHREAD_TIMEOUT timeout = ts.iTHREAD_TIMEOUT
@ -203,7 +203,7 @@ class ToxIterateThread(BaseThread):
super().__init__(name='ToxIterateThread') super().__init__(name='ToxIterateThread')
self._tox = tox self._tox = tox
def run(self): def run(self) -> None:
while not self._stop_thread: while not self._stop_thread:
self._tox.iterate() self._tox.iterate()
sleep(self._tox.iteration_interval() / 1000) sleep(self._tox.iteration_interval() / 1000)
@ -275,7 +275,7 @@ class ToxSuite(unittest.TestCase):
failureException = AssertionError failureException = AssertionError
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls) -> None:
global oTOX_OARGS global oTOX_OARGS
assert oTOX_OPTIONS assert oTOX_OPTIONS
assert oTOX_OARGS assert oTOX_OARGS
@ -292,15 +292,17 @@ class ToxSuite(unittest.TestCase):
ipv='ipv4', ipv='ipv4',
udp_not_tcp=False) udp_not_tcp=False)
def tearDown(self): def tearDown(self) -> None:
""" """
""" """
if hasattr(self, 'bob') and self.bob.self_get_friend_list_size() >= 1: if hasattr(self, 'bob') and self.bob.self_get_friend_list_size() >= 1:
LOG.warn(f"tearDown BOBS STILL HAS A FRIEND LIST {self.bob.self_get_friend_list()}") LOG.warn(f"tearDown BOBS STILL HAS A FRIEND LIST {self.bob.self_get_friend_list()}")
for elt in self.bob.self_get_friend_list(): self.bob.friend_delete(elt) for elt in self.bob.self_get_friend_list():
self.bob.friend_delete(elt)
if hasattr(self, 'alice') and self.alice.self_get_friend_list_size() >= 1: if hasattr(self, 'alice') and self.alice.self_get_friend_list_size() >= 1:
LOG.warn(f"tearDown ALICE STILL HAS A FRIEND LIST {self.alice.self_get_friend_list()}") LOG.warn(f"tearDown ALICE STILL HAS A FRIEND LIST {self.alice.self_get_friend_list()}")
for elt in self.alice.self_get_friend_list(): self.alice.friend_delete(elt) for elt in self.alice.self_get_friend_list():
self.alice.friend_delete(elt)
LOG.debug(f"tearDown threads={threading.active_count()}") LOG.debug(f"tearDown threads={threading.active_count()}")
if hasattr(self, 'bob'): if hasattr(self, 'bob'):
@ -319,7 +321,7 @@ class ToxSuite(unittest.TestCase):
del self.alice del self.alice
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls) -> None:
if hasattr(cls, 'bob'): if hasattr(cls, 'bob'):
cls.bob._main_loop.stop_thread() cls.bob._main_loop.stop_thread()
cls.bob.kill() cls.bob.kill()
@ -329,7 +331,7 @@ class ToxSuite(unittest.TestCase):
cls.alice.kill() cls.alice.kill()
del cls.alice del cls.alice
def bBobNeedAlice(self): def bBobNeedAlice(self) -> bool:
""" """
""" """
if hasattr(self, 'baid') and self.baid >= 0 and \ if hasattr(self, 'baid') and self.baid >= 0 and \
@ -341,7 +343,7 @@ class ToxSuite(unittest.TestCase):
return False return False
return True return True
def bAliceNeedAddBob (self): def bAliceNeedAddBob (self) -> bool:
if hasattr(self, 'abid') and self.abid >= 0 and \ if hasattr(self, 'abid') and self.abid >= 0 and \
self.abid in self.alice.self_get_friend_list(): self.abid in self.alice.self_get_friend_list():
LOG.warn(f"setUp BOB IS ALREADY IN ALICES FRIEND LIST") LOG.warn(f"setUp BOB IS ALREADY IN ALICES FRIEND LIST")
@ -666,9 +668,9 @@ class ToxSuite(unittest.TestCase):
LOG_DEBUG(f"alices_on_friend_request: " +repr(message_data)) LOG_DEBUG(f"alices_on_friend_request: " +repr(message_data))
try: try:
assert str(message_data, 'UTF-8') == MSG assert str(message_data, 'UTF-8') == MSG
LOG_INFO(f"alices_on_friend_request: friend_added = True ") LOG_INFO(f"alices_on_friend_request: {sSlot} = True ")
except Exception as e: except Exception as e:
LOG_WARN(f"alices_on_friend_request: Exception {e}") LOG_WARN(f"alices_on_friend_request: EXCEPTION {e}")
# return # return
setattr(self.bob, sSlot, True) setattr(self.bob, sSlot, True)
@ -676,10 +678,10 @@ class ToxSuite(unittest.TestCase):
inum = -1 inum = -1
try: try:
inum = self.bob.friend_add(self.alice._address, bytes(MSG, 'UTF-8')) inum = self.bob.friend_add(self.alice._address, bytes(MSG, 'UTF-8'))
assert inum >= 0, f"bob.friend_add !>= 0 {inum}" assert inum >= 0, f"bob_add_alice_as_friend !>= 0 {inum}"
self.alice.callback_friend_request(alices_on_friend_request) self.alice.callback_friend_request(alices_on_friend_request)
if not self.wait_otox_attrs(self.bob, [sSlot]): if not self.wait_otox_attrs(self.bob, [sSlot]):
LOG_WARN(f"bob.friend_add NO {sSlot}") LOG_WARN(f"bob_add_alice_as_friend NO {sSlot}")
# return False # return False
self.baid = self.bob.friend_by_public_key(self.alice._address) self.baid = self.bob.friend_by_public_key(self.alice._address)
assert self.baid >= 0, self.baid assert self.baid >= 0, self.baid
@ -688,7 +690,7 @@ class ToxSuite(unittest.TestCase):
assert self.bob.self_get_friend_list_size() >= 1 assert self.bob.self_get_friend_list_size() >= 1
assert self.baid in self.bob.self_get_friend_list() assert self.baid in self.bob.self_get_friend_list()
except Exception as e: except Exception as e:
LOG.error(f"bob.friend_add EXCEPTION {e}") LOG.error(f"bob_add_alice_as_friend EXCEPTION {e}")
return False return False
finally: finally:
self.bob.callback_friend_message(None) self.bob.callback_friend_message(None)
@ -713,12 +715,12 @@ class ToxSuite(unittest.TestCase):
LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data)) LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data))
try: try:
assert str(message_data, 'UTF-8') == MSG assert str(message_data, 'UTF-8') == MSG
LOG_INFO(f"bobs_on_friend_request: friend_added = True ")
except Exception as e: except Exception as e:
LOG_WARN(f"bobs_on_friend_request: Exception {e}") LOG_WARN(f"bobs_on_friend_request: Exception {e}")
# return # return
setattr(self.alice, sSlot, True) setattr(self.alice, sSlot, True)
LOG_INFO(f"bobs_on_friend_request: {sSlot} = True ")
setattr(self.alice, sSlot, None) setattr(self.alice, sSlot, None)
inum = -1 inum = -1
try: try:
@ -2140,8 +2142,9 @@ def vOargsToxPreamble(oArgs, Tox, ToxTest):
### ###
def iMain(oArgs): def iMain(oArgs, failfast=True):
failfast=True
# collect_types.init_types_collection()
vOargsToxPreamble(oArgs, Tox, ToxSuite) vOargsToxPreamble(oArgs, Tox, ToxSuite)
# https://stackoverflow.com/questions/35930811/how-to-sort-unittest-testcases-properly/35930812#35930812 # https://stackoverflow.com/questions/35930811/how-to-sort-unittest-testcases-properly/35930812#35930812
@ -2150,7 +2153,10 @@ def iMain(oArgs):
runner = color_runner.runner.TextTestRunner(verbosity=2, failfast=failfast) runner = color_runner.runner.TextTestRunner(verbosity=2, failfast=failfast)
else: else:
runner = unittest.TextTestRunner(verbosity=2, failfast=failfast, warnings='ignore') runner = unittest.TextTestRunner(verbosity=2, failfast=failfast, warnings='ignore')
# with collect_types.collect():
runner.run(cases) runner.run(cases)
# collect_types.dump_stats('tests_wrapper.out')
def oToxygenToxOptions(oArgs): def oToxygenToxOptions(oArgs):
data = None data = None