more typing

This commit is contained in:
emdee 2023-12-13 00:57:28 +00:00
parent daee891825
commit b934928fe3
9 changed files with 288 additions and 264 deletions

View File

@ -1,4 +1,11 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
# ctypes wrapping of libtoxcore
# WIP: - all functions are being changed to accept strings or byres for variables
# the library will use as bytes, and return sstrings not bytes for things
# you will use as strings. YMMV.
from ctypes import *
from datetime import datetime
from typing import Union, Callable, Union
@ -7,12 +14,12 @@ try:
from wrapper.libtox import LibToxCore
from wrapper.toxav import ToxAV
from wrapper.toxcore_enums_and_consts import *
import wrapper.toxcore_enums_and_consts as enums
import wrapper.toxcore_enums_and_consts as enum
except:
from libtox import LibToxCore
from toxav import ToxAV
from toxcore_enums_and_consts import *
import toxcore_enums_and_consts as enums
import toxcore_enums_and_consts as enum
# callbacks can be called in any thread so were being careful
# tox.py can be called by callbacks
@ -667,7 +674,7 @@ class Tox:
' returned from tox_friend_add_norequest.')
if tox_err_friend_add == TOX_ERR_FRIEND_ADD['OWN_KEY']:
raise ArgumentError('The friend address belongs to the sending client.')
elif tox_err_friend_add == TOX_ERR_FRIEND_ADD['ALREADY_SENT']:
if tox_err_friend_add == TOX_ERR_FRIEND_ADD['ALREADY_SENT']:
raise ArgumentError('A friend request has already been sent, or the address belongs to a friend that is'
' already on the friend list.')
if tox_err_friend_add == TOX_ERR_FRIEND_ADD['BAD_CHECKSUM']:
@ -805,7 +812,7 @@ class Tox:
Tox.libtoxcore.tox_self_get_friend_list(self._tox_pointer, friend_list)
return friend_list[0:friend_list_size]
def friend_get_public_key(self, friend_number: int, public_key: str=None) -> str:
def friend_get_public_key(self, friend_number: int, public_key: Union[bytes,None]=None) -> str:
"""
Copies the Public Key associated with a given friend number to a byte array.
@ -826,6 +833,7 @@ class Tox:
return bin_to_string(public_key, TOX_PUBLIC_KEY_SIZE)
elif tox_err_friend_get_public_key == TOX_ERR_FRIEND_GET_PUBLIC_KEY['FRIEND_NOT_FOUND']:
raise ArgumentError('No friend with the given number exists on the friend list.')
raise ToxError('The function did not return OK')
def friend_get_last_online(self, friend_number: int) -> int:
"""
@ -938,12 +946,13 @@ class Tox:
tox_err_friend_query = tox_err_friend_query.value
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']:
return int(result)
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike'
' the `_self_` variants of these functions, which have no effect when a parameter is'
' NULL, these functions return an error in that case.')
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number did not designate a valid friend.')
raise ToxError('The function did not return OK')
def friend_get_status_message(self, friend_number: int, status_message=None) -> str:
"""
@ -1020,11 +1029,11 @@ class Tox:
tox_err_friend_query = tox_err_friend_query.value
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']:
return int(result)
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike'
' the `_self_` variants of these functions, which have no effect when a parameter is'
' NULL, these functions return an error in that case.')
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number did not designate a valid friend.')
raise ToxError('The function did not return OK.')
@ -1069,11 +1078,11 @@ class Tox:
tox_err_friend_query = tox_err_friend_query.value
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']:
return int(result)
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']:
raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike'
' the `_self_` variants of these functions, which have no effect when a parameter is'
' NULL, these functions return an error in that case.')
elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number did not designate a valid friend.')
raise ToxError('The function did not return OK for friend get connection status.')
@ -1200,23 +1209,25 @@ class Tox:
message = bytes(message, 'utf-8')
tox_err_friend_send_message = c_int()
LOG_DEBUG(f"tox.friend_send_message")
result = Tox.libtoxcore.tox_friend_send_message(self._tox_pointer, c_uint32(friend_number),
c_int(message_type), c_char_p(message), c_size_t(len(message)),
result = Tox.libtoxcore.tox_friend_send_message(self._tox_pointer,
c_uint32(friend_number),
c_int(message_type),
c_char_p(message), c_size_t(len(message)),
byref(tox_err_friend_send_message))
tox_err_friend_send_message = tox_err_friend_send_message.value
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['OK']:
return int(result)
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['NULL']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_FOUND']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_CONNECTED']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['FRIEND_NOT_CONNECTED']:
raise ArgumentError('This client is currently not connected to the friend.')
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['SENDQ']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['SENDQ']:
raise MemoryError('An allocation error occurred while increasing the send queue size.')
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['TOO_LONG']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['TOO_LONG']:
raise ArgumentError('Message length exceeded TOX_MAX_MESSAGE_LENGTH.')
elif tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['EMPTY']:
if tox_err_friend_send_message == TOX_ERR_FRIEND_SEND_MESSAGE['EMPTY']:
raise ArgumentError('Attempted to send a zero-length message.')
raise ToxError('The function did not return OK for friend send message.')
@ -1444,8 +1455,9 @@ class Tox:
if error.value == TOX_ERR_FILE_GET['OK']:
return bin_to_string(file_id, TOX_FILE_ID_LENGTH)
s = sGetError(error.value, TOX_ERR_FILE_GET)
LOG_ERROR(f"group_new {error.value} {s}")
raise ArgumentError(f"group_new {error.value} {s}")
LOG_ERROR(f"group_new err={error.value} {s}")
# have seen ArgumentError: group_new 3 NOT_FOUND
raise ArgumentError(f"group_new err={error.value} {s}")
# File transmission: sending
@ -1702,20 +1714,20 @@ class Tox:
tox_err_friend_custom_packet = tox_err_friend_custom_packet.value
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['OK']:
return bool(result)
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']:
raise ArgumentError('This client is currently not connected to the friend.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']:
raise ArgumentError('The first byte of data was not in the specified range for the packet type.'
'This range is 200-254 for lossy, and 160-191 for lossless packets.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']:
raise ArgumentError('Attempted to send an empty packet.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']:
raise ArgumentError('Packet data length exceeded TOX_MAX_CUSTOM_PACKET_SIZE.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']:
raise ToxError('Packet queue is full.')
raise ToxError('The function did not return OK')
@ -1740,21 +1752,22 @@ class Tox:
tox_err_friend_custom_packet = tox_err_friend_custom_packet.value
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['OK']:
return bool(result)
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['FRIEND_NOT_CONNECTED']:
raise ArgumentError('This client is currently not connected to the friend.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['INVALID']:
raise ArgumentError('The first byte of data was not in the specified range for the packet type.'
'This range is 200-254 for lossy, and 160-191 for lossless packets.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['EMPTY']:
raise ArgumentError('Attempted to send an empty packet.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['TOO_LONG']:
raise ArgumentError('Packet data length exceeded TOX_MAX_CUSTOM_PACKET_SIZE.')
elif tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']:
if tox_err_friend_custom_packet == TOX_ERR_FRIEND_CUSTOM_PACKET['SENDQ']:
raise ToxError('Packet queue is full.')
raise ToxError('The function did not return OK')
def callback_friend_lossy_packet(self, callback: Callable) -> None:
"""
@ -1890,7 +1903,7 @@ class Tox:
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_NEW)
LOG_ERROR(f"group_new {error.value} {s}")
LOG_ERROR(f"group_new err={error.value} {s}")
raise ToxError(f"group_new {s} err={error.value}")
# TypeError: '<' not supported between instances of 'c_uint' and 'int'
@ -1971,7 +1984,7 @@ class Tox:
byref(error))
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_RECONNECT)
LOG_ERROR(f"group_new {error.value} {s}")
LOG_ERROR(f"group_new err={error.value} {s}")
raise ToxError(f"group_new {s} err={error.value}")
return bool(result)
@ -1997,7 +2010,7 @@ class Tox:
result = Tox.libtoxcore.tox_group_disconnect(self._tox_pointer, c_uint32(group_number), byref(error))
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_DISCONNECT)
LOG_ERROR(f"group_disconnect {error.value} {s}")
LOG_ERROR(f"group_disconnect err={error.value} {s}")
raise ToxError(f"group_disconnect {s} err={error.value}")
return bool(result)
@ -2051,9 +2064,12 @@ class Tox:
error = c_int()
if type(name) != bytes:
topic = bytes(name, 'utf-8')
name = bytes(name, 'utf-8')
LOG_DEBUG(f"tox.group_self_set_name")
result = Tox.libtoxcore.tox_group_self_set_name(self._tox_pointer, c_uint32(group_number), name, c_size_t(len(name)), byref(error))
result = Tox.libtoxcore.tox_group_self_set_name(self._tox_pointer,
c_uint32(group_number),
c_char_p(name), c_size_t(len(name)),
byref(error))
if error.value:
LOG_ERROR(f"group_self_set_name err={error.value}")
raise ToxError("group_self_set_name err={error.value}")
@ -2346,7 +2362,7 @@ class Tox:
Tox.libtoxcore.tox_callback_group_peer_name(self._tox_pointer, self.group_peer_name_cb)
except Exception as e: # AttributeError
LOG_ERROR(f"tox.callback_conference_peer_name")
return None
return
def callback_group_peer_status(self, callback: Callable, user_data) -> int:
"""
@ -2369,7 +2385,7 @@ class Tox:
Tox.libtoxcore.tox_callback_group_peer_status(self._tox_pointer, self.group_peer_status_cb)
except Exception as e:
LOG_WARN(f"callback_group_peer_status Exception {e}")
return None
return
# Group chat state queries and events.
@ -2712,7 +2728,7 @@ class Tox:
# Group message sending
def group_send_custom_packet(self, group_number: int, lossless: bool, data) -> bool:
def group_send_custom_packet(self, group_number: int, lossless: bool, data: bytes) -> bool:
"""Send a custom packet to the group.
If lossless is true the packet will be lossless. Lossless
@ -2786,8 +2802,8 @@ class Tox:
byref(error))
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE)
LOG_ERROR(f"group_send_private_message {error.value} {s}")
raise ToxError(f"group_send_private_message {error.value} {s}")
LOG_ERROR(f"group_send_private_message err={error.value} {s}")
raise ToxError(f"group_send_private_message err={error.value} {s}")
return bool(result)
@ -2829,8 +2845,8 @@ class Tox:
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_SEND_MESSAGE)
LOG_ERROR(f"group_send_message {error.value} {s}")
raise ToxError(f"group_send_message {error.value} {s}")
LOG_ERROR(f"group_send_message err={error.value} {s}")
raise ToxError(f"group_send_message err={error.value} {s}")
return bool(result)
@ -2915,8 +2931,8 @@ class Tox:
result = Tox.libtoxcore.tox_group_invite_friend(self._tox_pointer, c_uint(group_number), c_uint32(friend_number), byref(error))
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_INVITE_FRIEND)
LOG_ERROR(f"group_invite_friend {error.value} {s}")
raise ToxError(f"group_invite_friend {error.value} {s}")
LOG_ERROR(f"group_invite_friend err={error.value} {s}")
raise ToxError(f"group_invite_friend err={error.value} {s}")
return bool(result)
# API change - this no longer exists
@ -2976,7 +2992,7 @@ class Tox:
raise ToxError(f"group_invite_accept ERROR {e}")
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_INVITE_ACCEPT)
LOG_ERROR(f"group_invite_friend {error.value} {s}")
LOG_ERROR(f"group_invite_friend err={error.value} {s}")
raise ToxError(f"group_invite_accept {s} err={error.value}")
return result
@ -3136,7 +3152,7 @@ class Tox:
byref(error))
if error.value:
s = sGetError(error.value, TOX_ERR_GROUP_FOUNDER_SET_PASSWORD)
LOG_ERROR(f"group_founder_set_password {error.value} {s}")
LOG_ERROR(f"group_founder_set_password err={error.value} {s}")
raise ToxError(f"group_founder_set_password {s} err={error.value}")
return bool(result)

View File

@ -42,10 +42,10 @@ class ToxAV:
toxav_err_new = toxav_err_new.value
if toxav_err_new == enum.TOXAV_ERR_NEW['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
elif toxav_err_new == enum.TOXAV_ERR_NEW['MALLOC']:
if toxav_err_new == enum.TOXAV_ERR_NEW['MALLOC']:
raise MemoryError('Memory allocation failure while trying to allocate structures required for the A/V '
'session.')
elif toxav_err_new == enum.TOXAV_ERR_NEW['MULTIPLE']:
if toxav_err_new == enum.TOXAV_ERR_NEW['MULTIPLE']:
raise RuntimeError('Attempted to create a second session for the same Tox instance.')
self.call_state_cb = None
@ -124,6 +124,7 @@ class ToxAV:
raise ArgumentError('Attempted to call a friend while already in an audio or video call with them.')
elif toxav_err_call == enum.TOXAV_ERR_CALL['INVALID_BIT_RATE']:
raise ArgumentError('Audio or video bit rate is invalid.')
raise ArgumentError('The function did not return OK')
def callback_call(self, callback: Callable, user_data) -> None:
"""
@ -169,18 +170,19 @@ class ToxAV:
toxav_err_answer = toxav_err_answer.value
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['OK']:
return bool(result)
elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['SYNC']:
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['SYNC']:
raise RuntimeError('Synchronization error occurred.')
elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['CODEC_INITIALIZATION']:
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['CODEC_INITIALIZATION']:
raise RuntimeError('Failed to initialize codecs for call session. Note that codec initiation will fail if '
'there is no receive callback registered for either audio or video.')
elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_FOUND']:
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_CALLING']:
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_CALLING']:
raise ArgumentError('The friend was valid, but they are not currently trying to initiate a call. This is '
'also returned if this client is already in a call with the friend.')
elif toxav_err_answer == enum.TOXAV_ERR_ANSWER['INVALID_BIT_RATE']:
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['INVALID_BIT_RATE']:
raise ArgumentError('Audio or video bit rate is invalid.')
raise ToxError('The function did not return OK')
# Call state graph
@ -224,16 +226,17 @@ class ToxAV:
toxav_err_call_control = toxav_err_call_control.value
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['OK']:
return bool(result)
elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['SYNC']:
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['SYNC']:
raise RuntimeError('Synchronization error occurred.')
elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_FOUND']:
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_IN_CALL']:
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_IN_CALL']:
raise RuntimeError('This client is currently not in a call with the friend. Before the call is answered, '
'only CANCEL is a valid control.')
elif toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['INVALID_TRANSITION']:
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['INVALID_TRANSITION']:
raise RuntimeError('Happens if user tried to pause an already paused call or if trying to resume a call '
'that is not paused.')
raise ToxError('The function did not return OK.')
# TODO Controlling bit rates
@ -267,22 +270,23 @@ class ToxAV:
toxav_err_send_frame = toxav_err_send_frame.value
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['OK']:
return bool(result)
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']:
raise ArgumentError('The samples data pointer was NULL.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']:
raise RuntimeError('This client is currently not in a call with the friend.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']:
raise RuntimeError('Synchronization error occurred.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']:
raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too '
'large, or the audio sampling rate may be unsupported.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']:
raise RuntimeError('Either friend turned off audio or video receiving or we turned off sending for the said'
'payload.')
elif toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']:
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']:
RuntimeError('Failed to push frame through rtp interface.')
raise ToxError('The function did not return OK.')
def video_send_frame(self, friend_number: int, width: int, height: int, y, u, v) -> None:
"""

View File

@ -46,13 +46,14 @@ class ToxEncryptSave:
tox_err_encryption = tox_err_encryption.value
if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['OK']:
return out[:]
elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['NULL']:
if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['NULL']:
raise ArgumentError('Some input data, or maybe the output pointer, was null.')
elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['KEY_DERIVATION_FAILED']:
if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['KEY_DERIVATION_FAILED']:
raise RuntimeError('The crypto lib was unable to derive a key from the given passphrase, which is usually a'
' lack of memory issue. The functions accepting keys do not produce this error.')
elif tox_err_encryption == enum.TOX_ERR_ENCRYPTION['FAILED']:
if tox_err_encryption == enum.TOX_ERR_ENCRYPTION['FAILED']:
raise RuntimeError('The encryption itself failed.')
raise ToxError('The function did not return OK.')
def pass_decrypt(self, data: bytes, password: str) -> bytes:
"""

View File

@ -16,7 +16,6 @@ import traceback
import threading
import random
from ctypes import *
import argparse
import time
# LOG=util.log
@ -32,7 +31,7 @@ def LOG_trace(a): pass # print('TRAC_ '+a)
import wrapper
import wrapper.toxcore_enums_and_consts as enums
from wrapper.tox import Tox, UINT32_MAX, ToxError
from wrapper.tox import Tox, UINT32_MAX
from wrapper.toxcore_enums_and_consts import TOX_CONNECTION, TOX_USER_STATUS, \
TOX_MESSAGE_TYPE, TOX_PUBLIC_KEY_SIZE, TOX_FILE_CONTROL, TOX_FILE_KIND
@ -54,7 +53,6 @@ except ImportError as e:
# logging.log(logging.DEBUG, f"coloredlogs not available: {e}")
coloredlogs = None
import wrapper_tests.support_testing as ts
if 'USER' in os.environ:
sDATA_FILE = '/tmp/logging_toxygen_' +os.environ['USER'] +'.tox'
elif 'USERNAME' in os.environ:
@ -276,9 +274,9 @@ class EchoBot():
LOG.info('on_friend_request Accepted.')
save_to_file(self._tox, sDATA_FILE)
def on_friend_message(self, friendId, type, message) -> None:
def on_friend_message(self, friendId, message_type , message) -> None:
name = self._tox.friend_get_name(friendId)
LOG.debug('%s: %s' % (name, message))
LOG.debug(f"{name}, {message}, {message_type}")
yMessage = bytes(message, 'UTF-8')
self._tox.friend_send_message(friendId, TOX_MESSAGE_TYPE['NORMAL'], yMessage)
LOG.info('EchoBot sent: %s' % message)
@ -311,7 +309,7 @@ class EchobotTox(Tox):
def __init__(self, opts, app=None):
super(EchobotTox, self).__init__(opts, app=app)
super().__init__(opts, app=app)
self._address = self.self_get_address()
self.name = 'pyechobot'
self._opts = opts

View File

@ -90,7 +90,7 @@ _socks4errors = ("request granted",
"request rejected because the client program and identd report different user-ids",
"unknown error")
def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None):
def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None) -> None:
"""setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets a default proxy which all further socksocket objects will use,
unless explicitly changed.
@ -98,7 +98,7 @@ def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=No
global _defaultproxy
_defaultproxy = (proxytype, addr, port, rdns, username, password)
def wrapmodule(module):
def wrapmodule(module) -> None:
"""wrapmodule(module)
Attempts to replace a module's socket library with a SOCKS socket. Must set
a default proxy using setdefaultproxy(...) first.
@ -272,7 +272,7 @@ class socksocket(socket.socket):
"""
return self.__proxypeername
def __negotiatesocks4(self,destaddr,destport):
def __negotiatesocks4(self,destaddr,destport) -> None:
"""__negotiatesocks4(self,destaddr,destport)
Negotiates a connection through a SOCKS4 server.
"""
@ -320,7 +320,7 @@ class socksocket(socket.socket):
else:
self.__proxypeername = (destaddr, destport)
def __negotiatehttp(self, destaddr, destport):
def __negotiatehttp(self, destaddr, destport) -> None:
"""__negotiatehttp(self,destaddr,destport)
Negotiates a connection through an HTTP server.
"""
@ -354,7 +354,7 @@ class socksocket(socket.socket):
self.__proxysockname = ("0.0.0.0", 0)
self.__proxypeername = (addr, destport)
def connect(self, destpair):
def connect(self, destpair) -> None:
"""connect(self, despair)
Connects to the specified destination through a proxy.
destpar - A tuple of the IP/DNS address and the port number.

View File

@ -22,7 +22,7 @@ except ImportError:
lNO_PROXY = ['localhost', '127.0.0.1']
CONNECT_TIMEOUT = 20.0
def bAreWeConnected():
def bAreWeConnected() -> bool:
# FixMe: Linux only
sFile = f"/proc/{os.getpid()}/net/route"
if not os.path.isfile(sFile): return None
@ -33,7 +33,7 @@ def bAreWeConnected():
i += 1
return i > 0
def pick_up_proxy_from_environ():
def pick_up_proxy_from_environ() -> dict:
retval = dict()
if os.environ.get('socks_proxy', ''):
# socks_proxy takes precedence over https/http
@ -68,7 +68,7 @@ def pick_up_proxy_from_environ():
retval['udp_enabled'] = True
return retval
def download_url(url, settings=None):
def download_url(url:str, settings:str = None) -> None:
if not bAreWeConnected(): return ''
if settings is None:

View File

@ -8,6 +8,7 @@ import shutil
import socket
import sys
import time
from typing import Union, Callable, Union
if False:
import cepa as stem
@ -224,7 +225,7 @@ yKNOWN_NODNS = """
# - w.digidow.eu
# - w.cccs.de
def oMakeController(sSock='', port=9051):
def oMakeController(sSock:str = '', port:int = 9051):
import getpass
if sSock and os.path.exists(sSock):
controller = Controller.from_socket_file(path=sSock)
@ -236,7 +237,7 @@ def oMakeController(sSock='', port=9051):
return controller
oSTEM_CONTROLER = None
def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'):
def oGetStemController(log_level:int = 10, sock_or_pair:str = '/run/tor/control'):
global oSTEM_CONTROLER
if oSTEM_CONTROLER: return oSTEM_CONTROLER
@ -268,7 +269,7 @@ def oGetStemController(log_level=10, sock_or_pair='/run/tor/control'):
LOG.debug(f"{controller}")
return oSTEM_CONTROLER
def bAreWeConnected():
def bAreWeConnected() -> bool:
# FixMe: Linux only
sFile = f"/proc/{os.getpid()}/net/route"
if not os.path.isfile(sFile): return None
@ -279,7 +280,7 @@ def bAreWeConnected():
i += 1
return i > 0
def sMapaddressResolv(target, iPort=9051, log_level=10):
def sMapaddressResolv(target:str, iPort:int = 9051, log_leve:int = 10) -> str:
if not stem:
LOG.warn('please install the stem Python package')
return ''
@ -295,7 +296,7 @@ def sMapaddressResolv(target, iPort=9051, log_level=10):
LOG.exception(e)
return ''
def vwait_for_controller(controller, wait_boot=10):
def vwait_for_controller(controller, wait_boot:int = 10) -> None:
if bAreWeConnected() is False:
raise SystemExit("we are not connected")
percent = i = 0
@ -308,12 +309,12 @@ def vwait_for_controller(controller, wait_boot=10):
time.sleep(5)
i += 5
def bin_to_hex(raw_id, length=None):
def bin_to_hex(raw_id:int, length: Union[int, None] = None) -> str:
if length is None: length = len(raw_id)
res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length))
return res.upper()
def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10):
def lIntroductionPoints(controller=None, lOnions:list = [], itimeout:int = 120, log_level:int = 10):
"""now working !!! stem 1.8.x timeout must be huge >120
'Provides the descriptor for a hidden service. The **address** is the
'.onion' address of the hidden service '
@ -379,7 +380,7 @@ def lIntroductionPoints(controller=None, lOnions=[], itimeout=120, log_level=10)
LOG.exception(e)
return l
def zResolveDomain(domain):
def zResolveDomain(domain:str) -> int:
try:
ip = sTorResolve(domain)
except Exception as e: # noqa
@ -396,13 +397,13 @@ def zResolveDomain(domain):
ip = lpair[0]
return ip
def sTorResolve(target,
verbose=False,
sHost='127.0.0.1',
iPort=9050,
SOCK_TIMEOUT_SECONDS=10.0,
SOCK_TIMEOUT_TRIES=3,
):
def sTorResolve(target:str,
verbose:bool = False,
sHost:str = '127.0.0.1',
iPort:int = 9050,
SOCK_TIMEOUT_SECONDS:float = 10.0,
SOCK_TIMEOUT_TRIES:int = 3,
) -> str:
MAX_INFO_RESPONSE_PACKET_LENGTH = 8
if '@' in target:
LOG.warn(f"sTorResolve failed invalid hostname {target}")
@ -469,7 +470,7 @@ def sTorResolve(target,
return ''
def getaddrinfo(sHost, sPort):
def getaddrinfo(sHost:str, sPort:str) -> list:
# do this the explicit way = Ive seen the compact connect fail
# >>> sHost, sPort = 'l27.0.0.1', 33446
# >>> sock.connect((sHost, sPort))
@ -486,7 +487,7 @@ def getaddrinfo(sHost, sPort):
return None
return lPair
def icheck_torrc(sFile, oArgs):
def icheck_torrc(sFile:str, oArgs) -> int:
l = open(sFile, 'rt').readlines()
a = {}
for elt in l:
@ -528,7 +529,7 @@ def icheck_torrc(sFile, oArgs):
print('VirtualAddrNetworkIPv4 172.16.0.0/12')
return 0
def lExitExcluder(oArgs, iPort=9051, log_level=10):
def lExitExcluder(oArgs, iPort:int = 9051, log_level:int = 10) -> list:
"""
https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py
"""

View File

@ -17,6 +17,7 @@ import unittest
from ctypes import *
from random import Random
import functools
from typing import Union, Callable, Union
random = Random()
@ -54,11 +55,11 @@ except ImportError:
global LOG
LOG = logging.getLogger()
def LOG_ERROR(l): print('ERRORc: '+l)
def LOG_WARN(l): print('WARNc: ' +l)
def LOG_INFO(l): print('INFOc: ' +l)
def LOG_DEBUG(l): print('DEBUGc: '+l)
def LOG_TRACE(l): pass # print('TRACE+ '+l)
def LOG_ERROR(l:str) -> None: print('ERRORc: '+l)
def LOG_WARN(l:str) -> None: print('WARNc: ' +l)
def LOG_INFO(l:str) -> None: print('INFOc: ' +l)
def LOG_DEBUG(l:str) -> None: print('DEBUGc: '+l)
def LOG_TRACE(l:str) -> None: pass # print('TRACE+ '+l)
try:
from trepan.api import debug
@ -132,8 +133,7 @@ lDEAD_BS = [
'tox.abilinski.com', '172.103.164.250', '172.103.164.250.tpia.cipherkey.com',
]
def assert_main_thread():
def assert_main_thread() -> None:
from PyQt5 import QtCore, QtWidgets
from qtpy.QtWidgets import QApplication
@ -144,7 +144,7 @@ def assert_main_thread():
raise RuntimeError('attempt to call MainWindow.append_message from non-app thread')
@contextlib.contextmanager
def ignoreStdout():
def ignoreStdout() -> None:
devnull = os.open(os.devnull, os.O_WRONLY)
old_stdout = os.dup(1)
sys.stdout.flush()
@ -157,7 +157,7 @@ def ignoreStdout():
os.close(old_stdout)
@contextlib.contextmanager
def ignoreStderr():
def ignoreStderr() -> None:
devnull = os.open(os.devnull, os.O_WRONLY)
old_stderr = os.dup(2)
sys.stderr.flush()
@ -169,7 +169,7 @@ def ignoreStderr():
os.dup2(old_stderr, 2)
os.close(old_stderr)
def clean_booleans(oArgs):
def clean_booleans(oArgs) -> None:
for key in lBOOLEANS:
if not hasattr(oArgs, key): continue
val = getattr(oArgs, key)
@ -179,11 +179,11 @@ def clean_booleans(oArgs):
else:
setattr(oArgs, key, True)
def on_log(iTox, level, filename, line, func, message, *data):
def on_log(iTox, level, filename, line, func, message, *data) -> None:
# LOG.debug(repr((level, filename, line, func, message,)))
tox_log_cb(level, filename, line, func, message)
def tox_log_cb(level, filename, line, func, message, *args):
def tox_log_cb(level, filename, line, func, message, *args) -> None:
"""
* @param level The severity of the log message.
* @param filename The source file from which the message originated.
@ -245,7 +245,7 @@ def tox_log_cb(level, filename, line, func, message, *args):
else:
LOG_TRACE(f"{level}: {message}")
def vAddLoggerCallback(tox_options, callback=None):
def vAddLoggerCallback(tox_options, callback=None) -> None:
if callback is None:
wrapper.tox.Tox.libtoxcore.tox_options_set_log_callback(
tox_options._options_pointer,
@ -259,7 +259,7 @@ def vAddLoggerCallback(tox_options, callback=None):
tox_options._options_pointer,
tox_options.self_logger_cb)
def get_video_indexes():
def get_video_indexes() -> list:
# Linux
return [str(l[5:]) for l in os.listdir('/dev/') if l.startswith('video')]
@ -394,7 +394,7 @@ def oMainArgparser(_=None, iMode=0):
# help='En/Disable saving history')
return parser
def vSetupLogging(oArgs):
def vSetupLogging(oArgs) -> None:
global LOG
logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S')
logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S'
@ -430,7 +430,7 @@ def vSetupLogging(oArgs):
LOG.info(f"Setting loglevel to {oArgs.loglevel!s}")
def setup_logging(oArgs):
def setup_logging(oArgs) -> None:
global LOG
if coloredlogs:
aKw = dict(level=oArgs.loglevel,
@ -459,7 +459,7 @@ def setup_logging(oArgs):
# LOG.trace = lambda l: LOG.log(0, repr(l))
LOG.info(f"Setting loglevel to {oArgs.loglevel!s}")
def signal_handler(num, f):
def signal_handler(num, f) -> None:
from trepan.api import debug
from trepan.interfaces import server as Mserver
connection_opts={'IO': 'TCP', 'PORT': 6666}
@ -469,7 +469,7 @@ def signal_handler(num, f):
debug(dbg_opts=dbg_opts)
return
def merge_args_into_settings(args, settings):
def merge_args_into_settings(args:list, settings:dict) -> None:
if args:
if not hasattr(args, 'audio'):
LOG.warn('No audio ' +repr(args))
@ -493,7 +493,7 @@ def merge_args_into_settings(args, settings):
clean_settings(settings)
return
def clean_settings(self):
def clean_settings(self:dict) -> None:
# failsafe to ensure C tox is bytes and Py settings is str
# overrides
@ -528,7 +528,7 @@ def clean_settings(self):
LOG.debug("Cleaned settings")
def lSdSamplerates(iDev):
def lSdSamplerates(iDev:int) -> list:
try:
import sounddevice as sd
except ImportError:
@ -546,7 +546,7 @@ def lSdSamplerates(iDev):
supported_samplerates.append(fs)
return supported_samplerates
def _get_nodes_path(oArgs):
def _get_nodes_path(oArgs:str):
if oArgs and hasattr(oArgs, 'nodes_json') and \
oArgs.nodes_json and os.path.isfile(oArgs.nodes_json):
default = oArgs.nodes_json
@ -565,9 +565,9 @@ aNODES = {}
# @functools.lru_cache(maxsize=12) TypeError: unhashable type: 'Namespace'
def generate_nodes(oArgs=None,
nodes_count=DEFAULT_NODES_COUNT,
ipv='ipv4',
udp_not_tcp=True):
nodes_count:int = DEFAULT_NODES_COUNT,
ipv:str = 'ipv4',
udp_not_tcp=True) -> dict:
global aNODES
sKey = ipv
sKey += ',0' if udp_not_tcp else ',1'
@ -584,11 +584,11 @@ def generate_nodes(oArgs=None,
return aNODES[sKey]
aNODES_CACHE = {}
def generate_nodes_from_file(sFile,
nodes_count=DEFAULT_NODES_COUNT,
ipv='ipv4',
udp_not_tcp=True,
):
def generate_nodes_from_file(sFile:str,
nodes_count:int = DEFAULT_NODES_COUNT,
ipv:str = 'ipv4',
udp_not_tcp:bool = True,
) -> dict:
"""https://github.com/TokTok/c-toxcore/issues/469
I had a conversation with @irungentoo on IRC about whether we really need to call tox_bootstrap() when having UDP disabled and why. The answer is yes, because in addition to TCP relays (tox_add_tcp_relay()), toxcore also needs to know addresses of UDP onion nodes in order to work correctly. The DHT, however, is not used when UDP is disabled. tox_bootstrap() function resolves the address passed to it as argument and calls onion_add_bs_node_path() and DHT_bootstrap() functions. Although calling DHT_bootstrap() is not really necessary as DHT is not used, we still need to resolve the address of the DHT node in order to populate the onion routes with onion_add_bs_node_path() call.
"""
@ -637,7 +637,7 @@ I had a conversation with @irungentoo on IRC about whether we really need to cal
LOG.debug(f"generate_nodes_from_file {sFile} len={len(sorted_nodes)}")
return sorted_nodes
def tox_bootstrapd_port():
def tox_bootstrapd_port() -> int:
port = 33446
sFile = '/etc/tox-bootstrapd.conf'
if os.path.exists(sFile):
@ -647,7 +647,7 @@ def tox_bootstrapd_port():
port = int(line[7:])
return port
def bootstrap_local(elts, lToxes, oArgs=None):
def bootstrap_local(elts:list, lToxes:list, oArgs=None):
if os.path.exists('/run/tox-bootstrapd/tox-bootstrapd.pid'):
LOG.debug('/run/tox-bootstrapd/tox-bootstrapd.pid')
iRet = True
@ -658,12 +658,12 @@ def bootstrap_local(elts, lToxes, oArgs=None):
LOG.info(f'bootstraping local')
return bootstrap_udp(elts, lToxes, oArgs)
def lDNSClean(l):
def lDNSClean(l:list) -> list:
global lDEAD_BS
# list(set(l).difference(set(lDEAD_BS)))
return [elt for elt in l if elt not in lDEAD_BS]
def lExitExcluder(oArgs, iPort=9051):
def lExitExcluder(oArgs, iPort:int =9051) -> list:
"""
https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py
"""
@ -703,7 +703,7 @@ def lExitExcluder(oArgs, iPort=9051):
aHOSTS = {}
@functools.lru_cache(maxsize=20)
def sDNSLookup(host):
def sDNSLookup(host:str) -> str:
global aHOSTS
ipv = 0
if host in lDEAD_BS:
@ -784,7 +784,7 @@ def sDNSLookup(host):
aHOSTS[host] = ip
return ip
def bootstrap_udp(lelts, lToxes, oArgs=None):
def bootstrap_udp(lelts:list, lToxes:list, oArgs=None) -> None:
lelts = lDNSClean(lelts)
socket.setdefaulttimeout(15.0)
for oTox in lToxes:
@ -828,7 +828,7 @@ def bootstrap_udp(lelts, lToxes, oArgs=None):
# LOG.debug(f'bootstrap_udp to {host} not connected')
pass
def bootstrap_tcp(lelts, lToxes, oArgs=None):
def bootstrap_tcp(lelts:list, lToxes:list, oArgs=None) -> None:
lelts = lDNSClean(lelts)
for oTox in lToxes:
if hasattr(oTox, 'oArgs'): oArgs = oTox.oArgs
@ -874,7 +874,7 @@ def bootstrap_tcp(lelts, lToxes, oArgs=None):
# +f" last={int(oTox.mycon_time)}" )
pass
def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''):
def iNmapInfoNmap(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = '') -> int:
if sHost in ['-', 'NONE']: return 0
if not nmap: return 0
nmps = nmap.PortScanner
@ -893,7 +893,7 @@ def iNmapInfoNmap(sProt, sHost, sPort, key=None, environ=None, cmd=''):
LOG.info(f"iNmapInfoNmap: to {sHost} {state}")
return 0
def iNmapInfo(sProt, sHost, sPort, key=None, environ=None, cmd='nmap'):
def iNmapInfo(sProt:str, sHost:str, sPort:str, key=None, environ=None, cmd:str = 'nmap'):
if sHost in ['-', 'NONE']: return 0
sFile = os.path.join("/tmp", f"{sHost}.{os.getpid()}.nmap")
if sProt in ['socks', 'socks5', 'tcp4']:
@ -917,7 +917,7 @@ def iNmapInfo(sProt, sHost, sPort, key=None, environ=None, cmd='nmap'):
# bootstrap_iNmapInfo(lElts, self._args, sProt)
def bootstrap_iNmapInfo(lElts, oArgs, protocol="tcp4", bIS_LOCAL=False, iNODES=iNODES, cmd='nmap'):
def bootstrap_iNmapInfo(lElts:list, oArgs, protocol:str = "tcp4", bIS_LOCAL:bool = False, iNODES:int = iNODES, cmd:str = 'nmap') -> bool:
if not bIS_LOCAL and not bAreWeConnected():
LOG.warn(f"bootstrap_iNmapInfo not local and NOT CONNECTED")
return True
@ -953,7 +953,7 @@ def bootstrap_iNmapInfo(lElts, oArgs, protocol="tcp4", bIS_LOCAL=False, iNODES=i
lRetval += [False]
return any(lRetval)
def caseFactory(cases):
def caseFactory(cases:list) -> list:
"""We want the tests run in order."""
if len(cases) > 1:
ordered_cases = sorted(cases, key=lambda f: inspect.findsource(f)[1])

View File

@ -39,6 +39,7 @@ import threading
import traceback
import unittest
from ctypes import *
from typing import Union, Callable, Union
faulthandler.enable()
@ -185,7 +186,7 @@ class BaseThread(threading.Thread):
self._stop_thread = False
self.name = name
def stop_thread(self, timeout=-1) -> None:
def stop_thread(self, timeout: int = -1) -> None:
self._stop_thread = True
if timeout < 0:
timeout = ts.iTHREAD_TIMEOUT
@ -213,7 +214,7 @@ bob = alice = None
def prepare(self):
global bob, alice
def bobs_on_self_connection_status(iTox, connection_state, *args):
def bobs_on_self_connection_status(iTox, connection_state, *args) -> None:
status = connection_state
self.bob.dht_connected = status
self.bob.mycon_time = time.time()
@ -232,7 +233,7 @@ def prepare(self):
if self.bob.self_get_connection_status() != status:
LOG_WARN(f"bobs_on_self_connection_status DISAGREE {status}")
def alices_on_self_connection_status(iTox, connection_state, *args):
def alices_on_self_connection_status(iTox, connection_state: int, *args) -> None:
#FixMe connection_num
status = connection_state
self.alice.dht_connected = status
@ -371,12 +372,12 @@ class ToxSuite(unittest.TestCase):
self.bBobNeedAlice()
self.bAliceNeedAddBob()
def run(self, result=None):
def run(self, result=None) -> None:
""" Stop after first error """
if not result.errors:
super(ToxSuite, self).run(result)
def get_connection_status(self):
def get_connection_status(self) -> None:
if self.bob.mycon_time <= 1 or self.alice.mycon_time <= 1:
pass
# drop through
@ -392,7 +393,7 @@ class ToxSuite(unittest.TestCase):
return False
return True
def loop(self, n):
def loop(self, n) -> None:
"""
t:iterate
t:iteration_interval
@ -403,7 +404,7 @@ class ToxSuite(unittest.TestCase):
self.bob.iterate()
sleep(interval / 1000.0)
def call_bootstrap(self, num=None, lToxes=None, i=0):
def call_bootstrap(self, num: Union[int, None] = None, lToxes:list[int] =None, i:int =0) -> None:
if num == None: num=ts.iNODES
if lToxes is None:
lToxes = [self.alice, self.bob]
@ -425,7 +426,7 @@ class ToxSuite(unittest.TestCase):
LOG.debug(f"call_bootstrap ts.bootstrap_tcp {len(lElts)}")
ts.bootstrap_tcp(lElts, lToxes)
def group_until_connected(self, otox, group_number, num=None, iMax=THRESHOLD):
def group_until_connected(self, otox, group_number:int, num: Union[int, None] = None, iMax:int = THRESHOLD) -> None:
"""
"""
i = 0
@ -463,7 +464,7 @@ class ToxSuite(unittest.TestCase):
+f" last={int(otox.mycon_time)}" )
return False
def loop_until_connected(self, num=None):
def loop_until_connected(self, num: Union[int, None] = None) -> None:
"""
t:on_self_connection_status
t:self_get_connection_status
@ -519,7 +520,7 @@ class ToxSuite(unittest.TestCase):
+f" last={int(self.bob.mycon_time)}" )
return False
def wait_objs_attr(self, objs, attr):
def wait_objs_attr(self, objs: list, attr: str) -> bool:
global THRESHOLD
i = 0
while i <= THRESHOLD:
@ -539,7 +540,7 @@ class ToxSuite(unittest.TestCase):
return all([getattr(obj, attr) is not None for obj in objs])
def wait_otox_attrs(self, obj, attrs):
def wait_otox_attrs(self, obj: list, attrs: list[str]) -> bool:
assert all(attrs), f"wait_otox_attrs {attrs}"
i = 0
while i <= THRESHOLD:
@ -562,7 +563,7 @@ class ToxSuite(unittest.TestCase):
return all([getattr(obj, attr) for attr in attrs])
def wait_ensure_exec(self, method, args):
def wait_ensure_exec(self, method, args:list) -> bool:
i = 0
oRet = None
while i <= THRESHOLD:
@ -593,7 +594,7 @@ class ToxSuite(unittest.TestCase):
return oRet
def bob_add_alice_as_friend_norequest(self):
def bob_add_alice_as_friend_norequest(self) -> bool:
if not self.bBobNeedAlice(): return True
MSG = 'Hi, this is Bob.'
@ -608,7 +609,7 @@ class ToxSuite(unittest.TestCase):
assert self.bob.self_get_friend_list_size() >= 1
return True
def alice_add_bob_as_friend_norequest(self):
def alice_add_bob_as_friend_norequest(self) -> bool:
if not self.bAliceNeedAddBob(): return True
iRet = self.alice.friend_add_norequest(self.bob._address)
@ -622,7 +623,7 @@ class ToxSuite(unittest.TestCase):
assert self.alice.self_get_friend_list_size() >= 1
return True
def both_add_as_friend(self):
def both_add_as_friend(self) -> bool:
if bUSE_NOREQUEST:
assert self.bob_add_alice_as_friend()
assert self.alice_add_bob_as_friend_norequest()
@ -635,7 +636,7 @@ class ToxSuite(unittest.TestCase):
LOG.warn("both_add_as_friend no alice, abid")
return True
def both_add_as_friend_norequest(self):
def both_add_as_friend_norequest(self) -> bool:
if self.bBobNeedAlice():
assert self.bob_add_alice_as_friend_norequest()
if self.bAliceNeedAddBob():
@ -650,7 +651,7 @@ class ToxSuite(unittest.TestCase):
#? assert self.bob.friend_get_last_online(self.baid) is not None
return True
def bob_add_alice_as_friend(self):
def bob_add_alice_as_friend(self) -> bool:
"""
t:friend_add
t:on_friend_request
@ -664,7 +665,7 @@ class ToxSuite(unittest.TestCase):
public_key,
message_data,
message_data_size,
*largs):
*largs) -> None:
LOG_DEBUG(f"alices_on_friend_request: " +repr(message_data))
try:
assert str(message_data, 'UTF-8') == MSG
@ -697,7 +698,7 @@ class ToxSuite(unittest.TestCase):
return True
def alice_add_bob_as_friend(self):
def alice_add_bob_as_friend(self) -> bool:
"""
t:friend_add
t:on_friend_request
@ -711,7 +712,7 @@ class ToxSuite(unittest.TestCase):
public_key,
message_data,
message_data_size,
*largs):
*largs) -> None:
LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data))
try:
assert str(message_data, 'UTF-8') == MSG
@ -743,7 +744,7 @@ class ToxSuite(unittest.TestCase):
self.bob.callback_friend_message(None)
return True
def bob_add_alice_as_friend_and_status(self):
def bob_add_alice_as_friend_and_status(self) -> bool:
if bUSE_NOREQUEST:
assert self.bob_add_alice_as_friend_norequest()
else:
@ -752,31 +753,28 @@ class ToxSuite(unittest.TestCase):
#: Wait until both are online
sSlot = 'friend_conn_status'
setattr(self.bob, sSlot, False)
def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs):
def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"bobs_on_friend_connection_status {friend_id} ?>=0" +repr(iStatus))
if iStatus > 0:
self.bob.friend_conn_status = True
setattr(self.bob, sSlot, False)
self.bob.friend_status = None
def bobs_on_friend_status(iTox, friend_id, iStatus, *largs):
sSlot = 'friend_status'
setattr(self.bob, sSlot, None)
def bobs_on_friend_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"bobs_on_friend_status {friend_id} ?>=0" +repr(iStatus))
if iStatus > 0:
self.bob.friend_status = True
setattr(self.bob, sSlot, False)
self.alice.friend_conn_status = None
def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs):
sSlot = 'friend_conn_status'
setattr(self.alice, sSlot, None)
def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"alices_on_friend_connection_status {friend_id} ?>=0 " +repr(iStatus))
if iStatus > 0:
self.alice.friend_conn_status = True
setattr(self.alice, sSlot, False)
self.alice.friend_status = False
def alices_on_friend_status(iTox, friend_id, iStatus, *largs):
sSlot = 'friend_status'
setattr(self.alice, sSlot, None)
def alices_on_friend_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"alices_on_friend_status {friend_id} ?>=0 " +repr(iStatus))
if iStatus > 0:
self.alice.friend_status = True
setattr(self.alice, sSlot, False)
self.alice.callback_friend_connection_status(alices_on_friend_connection_status)
self.alice.callback_friend_status(alices_on_friend_status)
try:
LOG.info("bob_add_alice_as_friend_and_status waiting for alice connections")
if not self.wait_otox_attrs(self.alice,
@ -786,6 +784,8 @@ class ToxSuite(unittest.TestCase):
self.bob.callback_friend_connection_status(bobs_on_friend_connection_status)
self.bob.callback_friend_status(bobs_on_friend_status)
self.alice.callback_friend_connection_status(alices_on_friend_connection_status)
self.alice.callback_friend_status(alices_on_friend_status)
LOG.info("bob_add_alice_as_friend_and_status waiting for bob connections")
if not self.wait_otox_attrs(self.bob,
@ -803,7 +803,7 @@ class ToxSuite(unittest.TestCase):
self.bob.callback_friend_status(None)
return True
def bob_to_alice_connected(self):
def bob_to_alice_connected(self) -> bool:
assert hasattr(self, 'baid')
iRet = self.bob.friend_get_connection_status(self.baid)
if iRet == TOX_CONNECTION['NONE']:
@ -811,7 +811,7 @@ class ToxSuite(unittest.TestCase):
return False
return True
def alice_to_bob_connected(self):
def alice_to_bob_connected(self) -> bool:
assert hasattr(self, 'abid')
iRet = self.alice.friend_get_connection_status(self.abid)
if iRet == TOX_CONNECTION['NONE']:
@ -824,7 +824,7 @@ class ToxSuite(unittest.TestCase):
group_name='test_group',
nick='test_nick',
topic='Test Topic', # str
):
) -> int:
privacy_state = enums.TOX_GROUP_PRIVACY_STATE['PUBLIC']
iGrp = otox.group_new(privacy_state, group_name, nick)
@ -850,16 +850,18 @@ class ToxSuite(unittest.TestCase):
LOG.info(f"group pK={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}")
return iGrp
def otox_verify_group(self, otox, iGrp):
def otox_verify_group(self, otox, iGrp) -> int:
"""
group_self_get_name
group_self_get_peer_id
group_self_get_public_key
group_self_get_role
group_self_get_status
group_self_set_name
"""
group_number = iGrp
try:
assert type(iGrp) == int, "otox_test_groups_join iGrp not an int"
assert iGrp < UINT32_MAX, "otox_test_groups_join iGrp failure UINT32_MAX"
assert iGrp >= 0, f"otox_test_groups_join iGrp={iGrp} < 0"
@ -876,7 +878,8 @@ class ToxSuite(unittest.TestCase):
iStat = otox.group_self_get_status(iGrp)
LOG.info(f"otox_verify_group sName={sName} iStat={iStat} iId={iId} iRole={iRole} iStat={iStat}")
try:
assert otox.group_self_set_name(iGrp, "NewName")
bRet = otox.group_is_connected(iGrp)
except Exception as e:
LOG.warn(f"group_is_connected EXCEPTION {e}")
@ -931,7 +934,7 @@ class ToxSuite(unittest.TestCase):
group_name='test_group',
nick='test_nick',
topic='Test Topic', # str
):
) -> int:
try:
iGrp = self.otox_test_groups_create(otox, group_name, nick, topic)
@ -949,7 +952,7 @@ class ToxSuite(unittest.TestCase):
# tox.callback_group_peer_join
return iGrp
def wait_friend_get_connection_status(self, otox, fid, n=iN):
def wait_friend_get_connection_status(self, otox, fid:int, n:int = iN) -> int:
i = 0
while i < n:
iRet = otox.friend_get_connection_status(fid)
@ -964,18 +967,18 @@ class ToxSuite(unittest.TestCase):
LOG.error(f"wait_friend_get_connection_status n={n}")
return False
def warn_if_no_cb(self, alice, sSlot):
def warn_if_no_cb(self, alice, sSlot:str) -> None:
if not hasattr(alice, sSlot+'_cb') or \
not getattr(alice, sSlot+'_cb'):
LOG.warning(f"self.bob.{sSlot}_cb NOT EXIST")
def warn_if_cb(self, alice, sSlot):
def warn_if_cb(self, alice, sSlot:str) -> None:
if hasattr(self.bob, sSlot+'_cb') and \
getattr(self.bob, sSlot+'_cb'):
LOG.warning(f"self.bob.{sSlot}_cb EXIST")
# tests are executed in order
def test_notice_log(self): # works
def test_notice_log(self) -> None: # works
notice = '/var/lib/tor/.SelekTOR/3xx/cache/9050/notice.log'
if os.path.exists(notice):
iRet = os.system(f"sudo sed -e '1,/.notice. Bootstrapped 100%/d' {notice}" + \
@ -993,7 +996,7 @@ class ToxSuite(unittest.TestCase):
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
def test_tests_start(self): # works
def test_tests_start(self) -> None: # works
"""
t:hash
t:kill
@ -1012,7 +1015,7 @@ class ToxSuite(unittest.TestCase):
assert self.bob.self_get_address() == self.bob._address
def test_bootstrap_local_netstat(self): # works
def test_bootstrap_local_netstat(self) -> None: # works
"""
t:callback_file_chunk_request
t:callback_file_recv
@ -1043,7 +1046,7 @@ class ToxSuite(unittest.TestCase):
else:
LOG.warning(f"bootstrap_local_netstat NOT {port} iStatus={iStatus}")
def test_bootstrap_local(self): # works
def test_bootstrap_local(self) -> None: # works
"""
t:call_bootstrap
t:add_tcp_relay
@ -1081,7 +1084,7 @@ class ToxSuite(unittest.TestCase):
LOG.warning(f"bootstrap_local NOT CONNECTED iStatus={iStatus}")
return False
def test_bootstrap_iNmapInfo(self): # works
def test_bootstrap_iNmapInfo(self) -> None: # works
# if os.environ['USER'] != 'root':
# return
iStatus = self.bob.self_get_connection_status()
@ -1097,7 +1100,7 @@ class ToxSuite(unittest.TestCase):
# assert
ts.bootstrap_iNmapInfo(lElts, oTOX_OARGS, bIS_LOCAL, iNODES=8)
def test_self_get_secret_key(self): # works
def test_self_get_secret_key(self) -> None: # works
"""
t:self_get_secret_key
"""
@ -1110,7 +1113,7 @@ class ToxSuite(unittest.TestCase):
assert len(str(oRet0))
del secret_key
def test_self_get_public_keys(self): # works
def test_self_get_public_keys(self) -> None: # works
"""
t:self_get_secret_key
t:self_get_public_key
@ -1125,7 +1128,7 @@ class ToxSuite(unittest.TestCase):
LOG.info('test_self_get_public_keys ' +repr(oRet1))
assert oRet0 != oRet1, repr(oRet0) +' != ' +repr(oRet1)
def test_self_name(self): # works
def test_self_name(self) -> None: # works
"""
t:self_set_name
t:self_get_name
@ -1140,7 +1143,7 @@ class ToxSuite(unittest.TestCase):
@unittest.skip('loud')
@unittest.skipIf(bIS_NOT_TOXYGEN or oTOX_OARGS.mode == 0, 'not testing in toxygen')
def test_sound_notification(self): # works
def test_sound_notification(self) -> None: # works
"""
Plays sound notification
:param type of notification
@ -1148,7 +1151,7 @@ class ToxSuite(unittest.TestCase):
from tests.toxygen_tests import test_sound_notification
test_sound_notification(self)
def test_address(self): # works
def test_address(self) -> None: # works
"""
t:self_get_address
t:self_get_nospam
@ -1166,7 +1169,7 @@ class ToxSuite(unittest.TestCase):
pk, sk = self.alice.self_get_keys()
assert pk == self.alice.self_get_address()[:CLIENT_ID_SIZE]
def test_status_message(self): # works
def test_status_message(self) -> None: # works
"""
t:self_get_status_message
t:self_get_status_message_size
@ -1178,7 +1181,7 @@ class ToxSuite(unittest.TestCase):
self.alice.self_get_status_message() +' is not ' +MSG
assert self.alice.self_get_status_message_size() == len(MSG)
def test_self_get_udp_port(self): # works
def test_self_get_udp_port(self) -> None: # works
"""
t:self_get_udp_port
"""
@ -1190,7 +1193,7 @@ class ToxSuite(unittest.TestCase):
LOG.info('self_get_udp_port bob ' +repr(o))
assert o > 0
def test_self_get_tcp_port(self): # works
def test_self_get_tcp_port(self) -> None: # works
"""
t:self_get_tcp_port
"""
@ -1201,7 +1204,7 @@ class ToxSuite(unittest.TestCase):
o = self.bob.self_get_tcp_port()
LOG.info('self_get_tcp_port ' +repr(o))
def test_get_dht_id(self): # works
def test_get_dht_id(self) -> None: # works
"""
t:self_get_dht_id
"""
@ -1210,7 +1213,7 @@ class ToxSuite(unittest.TestCase):
o2 = self.bob.self_get_dht_id()
assert len(o2) == 64
def test_bob_add_alice_as_friend_norequest(self): # works
def test_bob_add_alice_as_friend_norequest(self) -> None: # works
"""
t:friend_delete
t:friend_exists
@ -1228,7 +1231,7 @@ class ToxSuite(unittest.TestCase):
self.bob.friend_delete(self.baid)
def test_alice_add_bob_as_friend_norequest(self): # works - intermittent failures
def test_alice_add_bob_as_friend_norequest(self) -> None: # works - intermittent failures
"""
t:friend_delete
t:friend_exists
@ -1244,7 +1247,7 @@ class ToxSuite(unittest.TestCase):
if hasattr(self, 'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
def test_both_add_as_friend_norequest(self): # works
def test_both_add_as_friend_norequest(self) -> None: # works
"""
t:friend_delete
t:friend_exists
@ -1270,7 +1273,7 @@ class ToxSuite(unittest.TestCase):
self.alice.friend_delete(self.abid)
assert len(self.alice.self_get_friend_list()) == 0
def test_bob_add_alice_as_friend_and_status(self):
def test_bob_add_alice_as_friend_and_status(self) -> None:
"""
t:friend_delete
t:friend_exists
@ -1283,35 +1286,35 @@ class ToxSuite(unittest.TestCase):
self.bob.friend_delete(self.baid)
@unittest.skip('unfinished')
def test_alice_add_bob_as_friend_and_status(self):
def test_alice_add_bob_as_friend_and_status(self) -> None:
assert self.alice_add_bob_as_friend_and_status()
if hasattr(self, 'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
def test_loop_until_connected(self): # works
def test_loop_until_connected(self) -> None: # works
assert self.loop_until_connected()
def test_bob_assert_connection_status(self): # works
def test_bob_assert_connection_status(self) -> None: # works
if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']:
AssertionError("ERROR: NOT CONNECTED " \
+repr(self.bob.self_get_connection_status()))
def test_alice_assert_connection_status(self): # works
def test_alice_assert_connection_status(self) -> None: # works
if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']:
AssertionError("ERROR: NOT CONNECTED " \
+repr(self.alice.self_get_connection_status()))
def test_bob_assert_mycon_status(self): # works
def test_bob_assert_mycon_status(self) -> None: # works
if self.bob.mycon_status == False:
AssertionError("ERROR: NOT CONNECTED " \
+repr(self.bob.mycon_status))
def test_alice_assert_mycon_status(self): # works
def test_alice_assert_mycon_status(self) -> None: # works
if self.alice.mycon_status == False:
AssertionError("ERROR: NOT CONNECTED " \
+repr(self.alice.mycon_status))
def test_bob_add_alice_as_friend(self): # works?
def test_bob_add_alice_as_friend(self) -> None: # works?
try:
if bUSE_NOREQUEST:
assert self.bob_add_alice_as_friend_norequest()
@ -1331,7 +1334,7 @@ class ToxSuite(unittest.TestCase):
if len(self.bob.self_get_friend_list()) > 0:
LOG.warn(f"WTF bob.self_get_friend_list() {bob.self_get_friend_list()}")
def test_alice_add_bob_as_friend(self): # works!
def test_alice_add_bob_as_friend(self) -> None: # works!
try:
if bUSE_NOREQUEST:
assert self.alice_add_bob_as_friend_norequest()
@ -1357,7 +1360,7 @@ class ToxSuite(unittest.TestCase):
if len(self.alice.self_get_friend_list()) > 0:
LOG.warn(f"WTF alice.self_get_friend_list() {alice.self_get_friend_list()}")
def test_both_add_as_friend(self): # works
def test_both_add_as_friend(self) -> None: # works
try:
if bUSE_NOREQUEST:
assert self.both_add_as_friend_norequest()
@ -1375,11 +1378,12 @@ class ToxSuite(unittest.TestCase):
if hasattr(self,'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
def test_groups_join(self):
def test_groups_join(self) -> None:
"""
t:group_join
t:group_disconnect
t:group_leave
t:group_self_set_name
"""
if not self.get_connection_status():
LOG.warning(f"test_groups_join NOT CONNECTED")
@ -1399,7 +1403,7 @@ class ToxSuite(unittest.TestCase):
LOG.error(f"bob.group_leave EXCEPTION {e}")
raise
def test_groups(self):
def test_groups(self) -> None:
"""
t:group_new
t:group_disconnect
@ -1408,12 +1412,13 @@ class ToxSuite(unittest.TestCase):
t:group_get_topic
t:group_get_topic_size
t:group_get_privacy_state
t:group_self_set_name
t:group_get_number_groups
t:group_founder_set_password
t:group_founder_set_peer_limit
t:group_founder_set_privacy_state
t:group_get_chat_id
t:group_get_number_groups
t:group_get_password
t:group_get_password_size
t:group_get_peer_limit
@ -1439,7 +1444,7 @@ class ToxSuite(unittest.TestCase):
#? @unittest.skip("double free or corruption (fasttop)")
@expectedFail('fails') # assertion fails on == MSG
def test_on_friend_status_message(self): # fails
def test_on_friend_status_message(self) -> None: # fails
"""
t:self_set_status_message
t:self_get_status_message
@ -1452,7 +1457,7 @@ class ToxSuite(unittest.TestCase):
MSG = 'Happy'
sSlot = 'friend_status_message'
def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs):
def bob_on_friend_status_message(iTox, friend_id, new_status_message, new_status_size, *largs) -> None:
LOG_INFO(f"BOB_ON_friend_status_message friend_id={friend_id} " \
+f"new_status_message={new_status_message}")
try:
@ -1497,7 +1502,7 @@ class ToxSuite(unittest.TestCase):
if hasattr(self, 'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
def test_friend(self): # works! sometimes
def test_friend(self) -> None: # works! sometimes
"""
t:friend_get_name
t:friend_get_name_size
@ -1537,7 +1542,7 @@ class ToxSuite(unittest.TestCase):
self.alice.friend_delete(self.abid)
@expectedFail('fails') # assert self.bob.friend_get_status(self.baid) == TOX_USER_STATUS['BUSY']
def test_user_status(self): # fails
def test_user_status(self) -> None: # fails
"""
t:self_get_status
t:self_set_status
@ -1548,7 +1553,7 @@ class ToxSuite(unittest.TestCase):
sSlot = 'friend_status'
setattr(self.bob, sSlot, None)
def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs):
def bobs_on_friend_set_status(iTox, friend_id, new_status, *largs) -> None:
LOG_INFO(f"bobs_on_friend_set_status {friend_id} {new_status}")
try:
assert friend_id == self.baid
@ -1591,7 +1596,7 @@ class ToxSuite(unittest.TestCase):
self.bob.friend_delete(self.baid)
@unittest.skip('crashes')
def test_kill_remake(self):
def test_kill_remake(self) -> None:
"""
t:friend_get_kill_remake
t:on_friend_connection_status
@ -1636,7 +1641,7 @@ class ToxSuite(unittest.TestCase):
self.bob.friend_delete(self.baid)
@expectedFail('fails') # new name is empty
def test_friend_name(self): # works!
def test_friend_name(self) -> None: # works!
"""
t:self_set_name
t:friend_get_name
@ -1650,7 +1655,7 @@ class ToxSuite(unittest.TestCase):
#: Test friend name
NEWNAME = 'Jenny'
def bobs_on_friend_name(iTox, fid, newname, iNameSize, *largs):
def bobs_on_friend_name(iTox, fid:int, newname, iNameSize, *largs) -> None:
LOG_INFO(f"bobs_on_friend_name {sSlot} {fid}")
try:
assert fid == self.baid
@ -1693,7 +1698,7 @@ class ToxSuite(unittest.TestCase):
@expectedFail('fails') # This client is currently not connected to the friend.
def test_friend_message(self): # fails
def test_friend_message(self) -> None: # fails
"""
t:on_friend_action
t:on_friend_message
@ -1704,7 +1709,7 @@ class ToxSuite(unittest.TestCase):
MSG = 'Hi, Bob!'
sSlot = 'friend_message'
def alices_on_friend_message(iTox, fid, msg_type, message, iSize, *largs):
def alices_on_friend_message(iTox, fid:int, msg_type, message, iSize, *largs) -> None:
LOG_DEBUG(f"alices_on_friend_message {fid} {message}")
try:
assert fid == self.alice.abid
@ -1761,7 +1766,7 @@ class ToxSuite(unittest.TestCase):
self.alice.friend_delete(self.abid)
# This client is currently not connected to the friend.
def test_friend_action(self): # works! sometimes?
def test_friend_action(self) -> None: # works! sometimes?
"""
t:on_friend_action
t:on_friend_message
@ -1772,7 +1777,7 @@ class ToxSuite(unittest.TestCase):
ACTION = 'Kick'
sSlot = 'friend_read_action'
setattr(self.bob, sSlot, None)
def UNUSEDtheir_on_friend_action(iTox, fid, msg_type, action, *largs):
def UNUSEDtheir_on_friend_action(iTox, fid:int, msg_type, action, *largs):
LOG_DEBUG(f"their_on_friend_action {fid} {msg_type} {action}")
try:
assert msg_type == TOX_MESSAGE_TYPE['ACTION']
@ -1785,7 +1790,7 @@ class ToxSuite(unittest.TestCase):
sSlot = 'friend_read_receipt'
setattr(self.alice, sSlot, None)
def their_on_read_reciept(iTox, fid, msg_id, *largs):
def their_on_read_reciept(iTox, fid:int, msg_id, *largs) -> None:
LOG_DEBUG(f"their_on_read_reciept {fid} {msg_id}")
sSlot = 'friend_read_receipt'
try:
@ -1842,7 +1847,7 @@ class ToxSuite(unittest.TestCase):
if hasattr(self, 'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
def test_alice_typing_status(self): # works
def test_alice_typing_status(self) -> None: # works
"""
t:on_friend_read_receipt
t:on_friend_typing
@ -1854,7 +1859,7 @@ class ToxSuite(unittest.TestCase):
sSlot = 'friend_typing'
LOG.info("test_typing_status bob adding alice")
#: Test typing status
def bob_on_friend_typing(iTox, fid, is_typing, *largs):
def bob_on_friend_typing(iTox, fid:int, is_typing, *largs) -> None:
LOG_INFO(f"BOB_ON_friend_typing is_typing={is_typing} fid={fid}")
try:
assert fid == self.baid
@ -1896,8 +1901,8 @@ class ToxSuite(unittest.TestCase):
if hasattr(self, 'abid') and self.abid >= 0:
self.alice.friend_delete(self.abid)
@unittest.skip('unfinished')
def test_file_transfer(self): # unfinished
@expectedFail('fails') # @unittest.skip('unfinished')
def test_file_transfer(self) -> None: # unfinished
"""
t:file_send
t:file_send_chunk
@ -1931,7 +1936,7 @@ class ToxSuite(unittest.TestCase):
CONTEXT = { 'FILE': bytes(), 'RECEIVED': 0, 'START': False, 'SENT': 0 }
def alice_on_file_recv(iTox, fid, file_number, kind, size, filename):
def alice_on_file_recv(iTox, fid:int, file_number:int, kind, size, filename) -> None:
LOG_DEBUG(f"ALICE_ON_file_recv fid={fid} {file_number}")
try:
assert size == FILE_SIZE
@ -1944,7 +1949,7 @@ class ToxSuite(unittest.TestCase):
else:
LOG_INFO(f"ALICE_ON_file_recv " + str(fid))
def alice_on_file_recv_control(iTox, fid, file_number, control, *largs):
def alice_on_file_recv_control(iTox, fid:int, file_number, control, *largs) -> None:
# TOX_FILE_CONTROL = { 'RESUME': 0, 'PAUSE': 1, 'CANCEL': 2,}
LOG_DEBUG(f"ALICE_ON_file_recv_control fid={fid} {file_number} {control}")
try:
@ -1962,7 +1967,7 @@ class ToxSuite(unittest.TestCase):
LOG_INFO(f"ALICE_ON_file_recv " + str(fid))
self.alice.completed = False
def alice_on_file_recv_chunk(iTox, fid, file_number, position, iNumBytes, *largs):
def alice_on_file_recv_chunk(iTox, fid:int, file_number:int, position:int, iNumBytes, *largs) -> None:
LOG_DEBUG(f"ALICE_ON_file_recv_chunk {fid} {file_number}")
# FixMe - use file_number and iNumBytes to get data?
data = ''
@ -1995,7 +2000,7 @@ class ToxSuite(unittest.TestCase):
self.alice.callback_file_recv_chunk(alice_on_file_recv_chunk)
self.bob.completed = False
def bob_on_file_recv_control2(iTox, fid, file_number, control):
def bob_on_file_recv_control2(iTox, fid:int, file_number:int, control) -> None:
LOG_DEBUG(f"BOB_ON_file_recv_control2 {fid} {file_number} control={control}")
if control == TOX_FILE_CONTROL['RESUME']:
CONTEXT['START'] = True
@ -2003,7 +2008,7 @@ class ToxSuite(unittest.TestCase):
self.bob.completed = True
pass
def bob_on_file_chunk_request(iTox, fid, file_number, position, length, *largs):
def bob_on_file_chunk_request(iTox, fid:int, file_number:int, position:int, length, *largs) -> None:
LOG_DEBUG(f"BOB_ON_file_chunk_request {fid} {file_number}")
if length == 0:
return
@ -2081,14 +2086,13 @@ class ToxSuite(unittest.TestCase):
LOG_INFO(f"test_file_transfer:: self.wait_objs_attr completed")
#? @unittest.skip('crashes')
def test_tox_savedata(self): # works sorta
def test_tox_savedata(self) -> None: # works sorta
"""
t:get_savedata_size
t:get_savedata
"""
# Fatal Python error: Aborted
# "/var/local/src/toxygen_wrapper/wrapper/tox.py", line 180 in kill
return
assert self.alice.get_savedata_size() > 0
data = self.alice.get_savedata()
@ -2116,14 +2120,14 @@ class ToxSuite(unittest.TestCase):
else:
LOG.info("passed test_tox_savedata")
def test_kill(self): #
def test_kill(self) -> None: #
import threading
LOG.info(f"THE END {threading.active_count()}")
self.tearDown()
LOG.info(f"THE END {threading.enumerate()}")
def vOargsToxPreamble(oArgs, Tox, ToxTest):
def vOargsToxPreamble(oArgs, Tox, ToxTest) -> None:
ts.vSetupLogging(oArgs)
@ -2142,7 +2146,7 @@ def vOargsToxPreamble(oArgs, Tox, ToxTest):
###
def iMain(oArgs, failfast=True):
def iMain(oArgs, failfast=True) -> int:
# collect_types.init_types_collection()
@ -2229,7 +2233,7 @@ def oArgparse(lArgv):
return oArgs
def main(lArgs=None):
def main(lArgs=None) -> int:
global oTOX_OARGS
if lArgs is None: lArgs = []
oArgs = oArgparse(lArgs)