This commit is contained in:
emdee@spm.plastiras.org 2024-02-19 14:01:39 +00:00
commit 66a31110b3
17 changed files with 14834 additions and 0 deletions

33
.gitignore vendored Normal file
View File

@ -0,0 +1,33 @@
.pylint.err
.pylint.out
*.pyc
*.pyo
libs/
*.egg-info
*.log
*.out
.idea
*~
#*
*.iml
*.junk
*.so
*.log
toxygen/build
toxygen/dist
*.spec
dist
toxygen/avatars
toxygen/__pycache__
/*.egg-info
/*.egg
html
Toxygen.egg-info
*.tox
.cache
*.db
*~
Makefile

7
.rsync.sh Normal file
View File

@ -0,0 +1,7 @@
#!/bin/sh
# find * -name \*.py | xargs grep -l '[ ]*$' | xargs sed -i -e 's/[ ]*$//'
rsync "$@" -vax --exclude \*.dst --exclude \*.log --exclude \*.out \
--exclude \*.egg-info --exclude libs --exclude dist --exclude build \
--exclude \*.pyc --exclude .pyl\* --exclude \*~ --exclude \*.err \
./ ../tox_wrapper.git/|grep -v /$

101
README.md Normal file
View File

@ -0,0 +1,101 @@
# tox_wrapper
[ctypes](https://docs.python.org/3/library/ctypes.html)
wrapping of [Tox](https://tox.chat/)
[```libtoxcore```](https://github.com/TokTok/c-toxcore) into Python
using [ctypesgen](https://github.com/ctypesgen/ctypesgen)
The full c-toxcore library is covered.
The code is typed so that every call in ```tox*.py``` should have the
right signature.
It has been tested with UDP and TCP proxy (Tor). It has ***not*** been
tested on Windows, and there may be some minor breakage, which should be
easy to fix. There is a good coverage integration testsuite in
```tox_wrapper/tests```. Change to that directory and run
```tests_wrapper.py --help```; the test suite gives a good set of examples of usage.
## Install
Run ```make install``` or put the parent of the wrapper directory on
your PYTHONPATH and touch a file called `__init__.py` in its parent
directory.
Set the ```TOXCORE_LIBS``` environment variable to say where to find
your ```libtoxcore.so``` and ```libtoxav.so``` and ```libtoxencryptsave.so```
files. Link all 3 filenames to ```libtoxcore.so``` if you have only
```libtoxcore.so``` (which is usually the case if you built
```c-toxcore``` with ```cmake``` rather than
```autogen/configure```). The environment variable TOXCORE_LIBS overrides;
look in the file ```tox_wrapper/libtox.py``` for the details.
# Tests
To test, run ```python3 tox_wrapper/tests/tests_wrapper.py --help```
As is, the code in ```tox.py``` is very verbose. Edit the file to change
```
def LOG_ERROR(a): print('EROR> '+a)
def LOG_WARN(a): print('WARN> '+a)
def LOG_INFO(a): print('INFO> '+a)
def LOG_DEBUG(a): print('DBUG> '+a)
def LOG_TRACE(a): pass # print('TRAC> '+a)
```
to all ```pass #``` or use ```logging.logger``` to suite your tastes.
```logging.logger``` can be dangerous in callbacks in ```Qt``` applications,
so we use simple print statements as default. The same applies to
```tox_wrapper/tests/tests_wrapper.py```.
## Prerequisites
No prerequisites in Python3 other than ctypesgen.
## Other wrappers
There are a number of other wrappings into Python of Tox core.
This one uses [ctypes](https://docs.python.org/3/library/ctypes.html)
which has its merits - there is no need to recompile anything as with
Cython - change the Python file and it's done. And you can follow things
in a Python debugger, or with the utterly stupendous Python feature of
```gdb``` (```gdb -ex r --args /usr/bin/python3.11 <pyfile>```).
CTYPES code can be brittle, segfaulting if you've got things wrong,
but if your wrapping is right, it is very efficient and easy to work on.
The [faulthandler](https://docs.python.org/3/library/faulthandler.html)
module can be helpful in debugging crashes
(e.g. from segmentation faults produced by erroneous C library wrapping).
Others include:
* <https://github.com/TokTok/py-toxcore-c> Cython bindings.
Incomplete and not really actively supported. Maybe it will get
worked on in the future, but TokTok seems to be working on
java, rust, scalla, go, etc. bindings instead.
No support for NGC groups or toxencryptsave.
* <https://github.com/oxij/PyTox>
forked from https://github.com/aitjcize/PyTox
by Wei-Ning Huang <aitjcize@gmail.com>.
Hardcore C wrapping which is not easy to keep up to date.
No support for NGC or toxencryptsave. Abandonned.
This was the basis for the TokTok/py-toxcore-c code until recently.
To our point of view, the ability of CTYPEs to follow code in the
debugger is a crucial advantage.
## Updates
Although Tox works over Tor, we do not recommend its usage for
anonymity as it leaks DNS requests due to a 6-year old known security
issue: https://github.com/TokTok/c-toxcore/issues/469 unless your Tox client
does hostname lookups before calling Tox (like toxygen does). Otherwise,
do not use it for anonymous communication unless you have a firewall in place.
The Tox project does not follow semantic versioning of its main structures
so the project may break the underlying ctypes wrapper at any time, so
you should run ctypesgen each time you install a new version of c-toxcore.
Up-to-date code is on https://git.plastiras.org/emdee/tox_wrapper
Work on this project is suspended until the
[MultiDevice](https://git.plastiras.org/emdee/tox_profile/wiki/MultiDevice-Announcements-POC) problem is solved. Fork me!

43
pyproject.toml Normal file
View File

@ -0,0 +1,43 @@
[project]
name = "tox_wrapper"
description = "A Python3 ctypesgen wrapping of c-toxcore into Python."
authors = [{ name = "Ingvar", email = "Ingvar@gitgub.com" } ]
requires-python = ">3.7"
keywords = ["tox", "python3", "ctypes"]
classifiers = [
"License :: OSI Approved",
"Operating System :: POSIX :: BSD :: FreeBSD",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
]
dynamic = ["version", "readme", ] # cannot be dynamic ['license']
[project.scripts]
toxygen_wrapper_tests = "toxygen_wrapper.tests.tests_wrapper:main"
toxygen_echo = "toxygen_wrapper.toxygen_echo:main"
[tool.setuptools.dynamic]
version = {attr = "toxygen_wrapper.__version__"}
readme = {file = ["README.md"]}
[project.license]
file = "LICENSE.md"
[project.urls]
repository = "https://git.plastiras.org/emdee/tox_wrapper"
[build-system]
# >= 61.0
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["toxygen_wrapper", "toxygen_wrapper.tests"]

53
setup.cfg Normal file
View File

@ -0,0 +1,53 @@
[metadata]
classifiers =
License :: OSI Approved
Intended Audience :: Web Developers
Operating System :: POSIX :: BSD :: FreeBSD
Operating System :: POSIX :: Linux
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: Implementation :: CPython
description='Tox ctypesgen wrapping into Python'
long_description='Tox ctypesgen wrapping of c-toxcore into Python3'
url='https://git.plastiras.org/emdee/tox_wrapper/'
keywords='ctypes Tox messenger'
[options]
zip_safe = false
python_requires = >=3.6
package_dir=
=src
packages = ["tox_wrapper", "tox_wrapper.tests"]
[easy_install]
zip_ok = false
[flake8]
jobs = 1
max-line-length = 88
ignore =
E111
E114
E128
E225
E261
E302
E305
E402
E501
E502
E541
E701
E702
E704
E722
E741
F508
F541
W503
W601

View File

@ -0,0 +1,87 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
import os
import sys
from ctypes import CDLL
# You need a libs directory beside this directory
# and you need to link your libtoxcore.so and libtoxav.so
# and libtoxencryptsave.so into ../libs/
# Link all 3 to libtoxcore.so if you have only libtoxcore.so
try:
import utils.util as util
sLIBS_DIR = util.get_libs_directory()
except ImportError:
sLIBS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)),
'libs')
# environment variable TOXCORE_LIBS overrides
d = os.environ.get('TOXCORE_LIBS', '')
if d and os.path.exists(d):
sLIBS_DIR = d
if os.environ.get('DEBUG', ''):
print ('DBUG: Setting TOXCORE_LIBS to ' +d)
del d
class LibToxCore:
def __init__(self):
platform = sys.platform
if platform == 'win32':
libtoxcore = 'libtox.dll'
elif platform == 'darwin':
libtoxcore = 'libtoxcore.dylib'
else:
libtoxcore = 'libtoxcore.so'
# libtoxcore and libsodium may be installed in your os
# give libs/ precedence
libFile = os.path.join(sLIBS_DIR, libtoxcore)
if os.path.isfile(libFile):
self._libtoxcore = CDLL(libFile)
else:
self._libtoxcore = CDLL(libtoxcore)
def __getattr__(self, item):
return self._libtoxcore.__getattr__(item)
class LibToxAV:
def __init__(self):
platform = sys.platform
if platform == 'win32':
# on Windows av api is in libtox.dll
self._libtoxav = CDLL(os.path.join(sLIBS_DIR, 'libtox.dll'))
elif platform == 'darwin':
self._libtoxav = CDLL('libtoxcore.dylib')
else:
libFile = os.path.join(sLIBS_DIR, 'libtoxav.so')
if os.path.isfile(libFile):
self._libtoxav = CDLL(libFile)
else:
self._libtoxav = CDLL('libtoxav.so')
def __getattr__(self, item):
return self._libtoxav.__getattr__(item)
# figure out how to see if we have a combined library
class LibToxEncryptSave:
def __init__(self):
platform = sys.platform
if platform == 'win32':
# on Windows profile encryption api is in libtox.dll
self._lib_tox_encrypt_save = CDLL(os.path.join(sLIBS_DIR, 'libtox.dll'))
elif platform == 'darwin':
self._lib_tox_encrypt_save = CDLL('libtoxcore.dylib')
else:
libFile = os.path.join(sLIBS_DIR, 'libtoxencryptsave.so')
if os.path.isfile(libFile):
self._lib_tox_encrypt_save = CDLL(libFile)
else:
self._lib_tox_encrypt_save = CDLL('libtoxencryptsave.so')
def __getattr__(self, item):
return self._lib_tox_encrypt_save.__getattr__(item)
# figure out how to see if we have a combined library

View File

View File

@ -0,0 +1,391 @@
"""SocksiPy - Python SOCKS module.
Version 1.00
Copyright 2006 Dan-Haim. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of Dan Haim nor the names of his contributors may be used
to endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY DAN HAIM "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
EVENT SHALL DAN HAIM OR HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE.
This module provides a standard socket-like interface for Python
for tunneling connections through SOCKS proxies.
"""
"""
Minor modifications made by Christopher Gilbert (http://motomastyle.com/)
for use in PyLoris (http://pyloris.sourceforge.net/)
Minor modifications made by Mario Vilas (http://breakingcode.wordpress.com/)
mainly to merge bug fixes found in Sourceforge
Minor modifications made by Eugene Dementiev (http://www.dementiev.eu/)
"""
import socket
import struct
import sys
PROXY_TYPE_SOCKS4 = 1
PROXY_TYPE_SOCKS5 = 2
PROXY_TYPE_HTTP = 3
_defaultproxy = None
_orgsocket = socket.socket
class ProxyError(Exception): pass
class GeneralProxyError(ProxyError): pass
class Socks5AuthError(ProxyError): pass
class Socks5Error(ProxyError): pass
class Socks4Error(ProxyError): pass
class HTTPError(ProxyError): pass
_generalerrors = ("success",
"invalid data",
"not connected",
"not available",
"bad proxy type",
"bad input")
_socks5errors = ("succeeded",
"general SOCKS server failure",
"connection not allowed by ruleset",
"Network unreachable",
"Host unreachable",
"Connection refused",
"TTL expired",
"Command not supported",
"Address type not supported",
"Unknown error")
_socks5autherrors = ("succeeded",
"authentication is required",
"all offered authentication methods were rejected",
"unknown username or invalid password",
"unknown error")
_socks4errors = ("request granted",
"request rejected or failed",
"request rejected because SOCKS server cannot connect to identd on the client",
"request rejected because the client program and identd report different user-ids",
"unknown error")
def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None) -> None:
"""setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets a default proxy which all further socksocket objects will use,
unless explicitly changed.
"""
global _defaultproxy
_defaultproxy = (proxytype, addr, port, rdns, username, password)
def wrapmodule(module) -> None:
"""wrapmodule(module)
Attempts to replace a module's socket library with a SOCKS socket. Must set
a default proxy using setdefaultproxy(...) first.
This will only work on modules that import socket directly into the namespace;
most of the Python Standard Library falls into this category.
"""
if _defaultproxy != None:
module.socket.socket = socksocket
else:
raise GeneralProxyError((4, "no proxy specified"))
class socksocket(socket.socket):
"""socksocket([family[, type[, proto]]]) -> socket object
Open a SOCKS enabled socket. The parameters are the same as
those of the standard socket init. In order for SOCKS to work,
you must specify family=AF_INET, type=SOCK_STREAM and proto=0.
"""
def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None):
_orgsocket.__init__(self, family, type, proto, _sock)
if _defaultproxy != None:
self.__proxy = _defaultproxy
else:
self.__proxy = (None, None, None, None, None, None)
self.__proxysockname = None
self.__proxypeername = None
def __recvall(self, count):
"""__recvall(count) -> data
Receive EXACTLY the number of bytes requested from the socket.
Blocks until the required number of bytes have been received.
"""
data = self.recv(count)
while len(data) < count:
d = self.recv(count-len(data))
if not d: raise GeneralProxyError((0, "connection closed unexpectedly"))
data = data + d
return data
def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None):
"""setproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets the proxy to be used.
proxytype - The type of the proxy to be used. Three types
are supported: PROXY_TYPE_SOCKS4 (including socks4a),
PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP
addr - The address of the server (IP or DNS).
port - The port of the server. Defaults to 1080 for SOCKS
servers and 8080 for HTTP proxy servers.
rdns - Should DNS queries be preformed on the remote side
(rather than the local side). The default is True.
Note: This has no effect with SOCKS4 servers.
username - Username to authenticate with to the server.
The default is no authentication.
password - Password to authenticate with to the server.
Only relevant when username is also provided.
"""
self.__proxy = (proxytype, addr, port, rdns, username, password)
def __negotiatesocks5(self, destaddr, destport):
"""__negotiatesocks5(self,destaddr,destport)
Negotiates a connection through a SOCKS5 server.
"""
# First we'll send the authentication packages we support.
if (self.__proxy[4]!=None) and (self.__proxy[5]!=None):
# The username/password details were supplied to the
# setproxy method so we support the USERNAME/PASSWORD
# authentication (in addition to the standard none).
self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
else:
# No username/password were entered, therefore we
# only support connections with no authentication.
self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00))
# We'll receive the server's response to determine which
# method was selected
chosenauth = self.__recvall(2)
if chosenauth[0:1] != chr(0x05).encode():
self.close()
raise GeneralProxyError((1, _generalerrors[1]))
# Check the chosen authentication method
if chosenauth[1:2] == chr(0x00).encode():
# No authentication is required
pass
elif chosenauth[1:2] == chr(0x02).encode():
# Okay, we need to perform a basic username/password
# authentication.
self.sendall(chr(0x01).encode() + chr(len(self.__proxy[4])) + self.__proxy[4] + chr(len(self.__proxy[5])) + self.__proxy[5])
authstat = self.__recvall(2)
if authstat[0:1] != chr(0x01).encode():
# Bad response
self.close()
raise GeneralProxyError((1, _generalerrors[1]))
if authstat[1:2] != chr(0x00).encode():
# Authentication failed
self.close()
raise Socks5AuthError((3, _socks5autherrors[3]))
# Authentication succeeded
else:
# Reaching here is always bad
self.close()
if chosenauth[1] == chr(0xFF).encode():
raise Socks5AuthError((2, _socks5autherrors[2]))
else:
raise GeneralProxyError((1, _generalerrors[1]))
# Now we can request the actual connection
req = struct.pack('BBB', 0x05, 0x01, 0x00)
# If the given destination address is an IP address, we'll
# use the IPv4 address request even if remote resolving was specified.
try:
ipaddr = socket.inet_aton(destaddr)
req = req + chr(0x01).encode() + ipaddr
except socket.error:
# Well it's not an IP number, so it's probably a DNS name.
if self.__proxy[3]:
# Resolve remotely
ipaddr = None
if type(destaddr) != type(b''): # python3
destaddr_bytes = destaddr.encode(encoding='idna')
else:
destaddr_bytes = destaddr
req = req + chr(0x03).encode() + chr(len(destaddr_bytes)).encode() + destaddr_bytes
else:
# Resolve locally
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr))
req = req + chr(0x01).encode() + ipaddr
req = req + struct.pack(">H", destport)
self.sendall(req)
# Get the response
resp = self.__recvall(4)
if resp[0:1] != chr(0x05).encode():
self.close()
raise GeneralProxyError((1, _generalerrors[1]))
elif resp[1:2] != chr(0x00).encode():
# Connection failed
self.close()
if ord(resp[1:2])<=8:
raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])]))
else:
raise Socks5Error((9, _socks5errors[9]))
# Get the bound address/port
elif resp[3:4] == chr(0x01).encode():
boundaddr = self.__recvall(4)
elif resp[3:4] == chr(0x03).encode():
resp = resp + self.recv(1)
boundaddr = self.__recvall(ord(resp[4:5]))
else:
self.close()
raise GeneralProxyError((1,_generalerrors[1]))
boundport = struct.unpack(">H", self.__recvall(2))[0]
self.__proxysockname = (boundaddr, boundport)
if ipaddr != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport)
else:
self.__proxypeername = (destaddr, destport)
def getproxysockname(self):
"""getsockname() -> address info
Returns the bound IP address and port number at the proxy.
"""
return self.__proxysockname
def getproxypeername(self):
"""getproxypeername() -> address info
Returns the IP and port number of the proxy.
"""
return _orgsocket.getpeername(self)
def getpeername(self):
"""getpeername() -> address info
Returns the IP address and port number of the destination
machine (note: getproxypeername returns the proxy)
"""
return self.__proxypeername
def __negotiatesocks4(self,destaddr,destport) -> None:
"""__negotiatesocks4(self,destaddr,destport)
Negotiates a connection through a SOCKS4 server.
"""
# Check if the destination address provided is an IP address
rmtrslv = False
try:
ipaddr = socket.inet_aton(destaddr)
except socket.error:
# It's a DNS name. Check where it should be resolved.
if self.__proxy[3]:
ipaddr = struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)
rmtrslv = True
else:
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr))
# Construct the request packet
req = struct.pack(">BBH", 0x04, 0x01, destport) + ipaddr
# The username parameter is considered userid for SOCKS4
if self.__proxy[4] != None:
req = req + self.__proxy[4]
req = req + chr(0x00).encode()
# DNS name if remote resolving is required
# NOTE: This is actually an extension to the SOCKS4 protocol
# called SOCKS4A and may not be supported in all cases.
if rmtrslv:
req = req + destaddr + chr(0x00).encode()
self.sendall(req)
# Get the response from the server
resp = self.__recvall(8)
if resp[0:1] != chr(0x00).encode():
# Bad data
self.close()
raise GeneralProxyError((1,_generalerrors[1]))
if resp[1:2] != chr(0x5A).encode():
# Server returned an error
self.close()
if ord(resp[1:2]) in (91, 92, 93):
self.close()
raise Socks4Error((ord(resp[1:2]), _socks4errors[ord(resp[1:2]) - 90]))
else:
raise Socks4Error((94, _socks4errors[4]))
# Get the bound address/port
self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
if rmtrslv != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport)
else:
self.__proxypeername = (destaddr, destport)
def __negotiatehttp(self, destaddr, destport) -> None:
"""__negotiatehttp(self,destaddr,destport)
Negotiates a connection through an HTTP server.
"""
# If we need to resolve locally, we do this now
if not self.__proxy[3]:
addr = socket.gethostbyname(destaddr)
else:
addr = destaddr
self.sendall(("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + "Host: " + destaddr + "\r\n\r\n").encode())
# We read the response until we get the string "\r\n\r\n"
resp = self.recv(1)
while resp.find("\r\n\r\n".encode()) == -1:
recv = self.recv(1)
if not recv:
raise GeneralProxyError((1, _generalerrors[1]))
resp = resp + recv
# We just need the first line to check if the connection
# was successful
statusline = resp.splitlines()[0].split(" ".encode(), 2)
if statusline[0] not in ("HTTP/1.0".encode(), "HTTP/1.1".encode()):
self.close()
raise GeneralProxyError((1, _generalerrors[1]))
try:
statuscode = int(statusline[1])
except ValueError:
self.close()
raise GeneralProxyError((1, _generalerrors[1]))
if statuscode != 200:
self.close()
raise HTTPError((statuscode, statusline[2]))
self.__proxysockname = ("0.0.0.0", 0)
self.__proxypeername = (addr, destport)
def connect(self, destpair) -> None:
"""connect(self, despair)
Connects to the specified destination through a proxy.
destpar - A tuple of the IP/DNS address and the port number.
(identical to socket's connect).
To select the proxy server use setproxy().
"""
# Do a minimal input check first
if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (type(destpair[0]) != type('')) or (type(destpair[1]) != int):
raise GeneralProxyError((5, _generalerrors[5]))
if self.__proxy[0] == PROXY_TYPE_SOCKS5:
if self.__proxy[2] != None:
portnum = int(self.__proxy[2])
else:
portnum = 1080
_orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatesocks5(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_SOCKS4:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 1080
_orgsocket.connect(self,(self.__proxy[1], portnum))
self.__negotiatesocks4(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_HTTP:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 8080
_orgsocket.connect(self,(self.__proxy[1], portnum))
self.__negotiatehttp(destpair[0], destpair[1])
elif self.__proxy[0] == None:
_orgsocket.connect(self, (destpair[0], destpair[1]))
else:
raise GeneralProxyError((4, _generalerrors[4]))

View File

@ -0,0 +1,163 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
import os
import sys
import logging
from io import BytesIO
import urllib
import traceback
global LOG
LOG = logging.getLogger('TestS')
try:
import pycurl
except ImportError:
pycurl = None
try:
import requests
except ImportError:
requests = None
lNO_PROXY = ['localhost', '127.0.0.1']
CONNECT_TIMEOUT = 20.0
def bAreWeConnected() -> bool:
# FixMe: Linux only
sFile = f"/proc/{os.getpid()}/net/route"
if not os.path.isfile(sFile): return None
i = 0
for elt in open(sFile, "r").readlines():
if elt.startswith('Iface'): continue
if elt.startswith('lo'): continue
i += 1
return i > 0
def pick_up_proxy_from_environ() -> dict:
retval = dict()
if os.environ.get('socks_proxy', ''):
# socks_proxy takes precedence over https/http
proxy = os.environ.get('socks_proxy', '')
i = proxy.find('//')
if i >= 0: proxy = proxy[i+2:]
retval['proxy_host'] = proxy.split(':')[0]
retval['proxy_port'] = proxy.split(':')[-1]
retval['proxy_type'] = 2
retval['udp_enabled'] = False
elif os.environ.get('https_proxy', ''):
# https takes precedence over http
proxy = os.environ.get('https_proxy', '')
i = proxy.find('//')
if i >= 0: proxy = proxy[i+2:]
retval['proxy_host'] = proxy.split(':')[0]
retval['proxy_port'] = proxy.split(':')[-1]
retval['proxy_type'] = 1
retval['udp_enabled'] = False
elif os.environ.get('http_proxy', ''):
proxy = os.environ.get('http_proxy', '')
i = proxy.find('//')
if i >= 0: proxy = proxy[i+2:]
retval['proxy_host'] = proxy.split(':')[0]
retval['proxy_port'] = proxy.split(':')[-1]
retval['proxy_type'] = 1
retval['udp_enabled'] = False
else:
retval['proxy_host'] = ''
retval['proxy_port'] = ''
retval['proxy_type'] = 0
retval['udp_enabled'] = True
return retval
def download_url(url:str, settings:str = None) -> None:
if not bAreWeConnected(): return ''
if settings is None:
settings = pick_up_proxy_from_environ()
if pycurl:
LOG.debug('Downloading with pycurl: ' + str(url))
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
c.setopt(c.WRITEDATA, buffer)
# Follow redirect.
c.setopt(c.FOLLOWLOCATION, True)
# cookie jar
cjar = os.path.join(os.environ['HOME'], '.local', 'jar.cookie')
if os.path.isfile(cjar):
c.setopt(c.COOKIEFILE, cjar)
# LARGS+=( --cookie-jar --junk-session-cookies )
#? c.setopt(c.ALTSVC_CTRL, 16)
c.setopt(c.NOPROXY, ','.join(lNO_PROXY))
#? c.setopt(c.CAINFO, certifi.where())
if settings['proxy_type'] == 2 and settings['proxy_host']:
socks_proxy = 'socks5h://'+settings['proxy_host']+':'+str(settings['proxy_port'])
settings['udp_enabled'] = False
c.setopt(c.PROXY, socks_proxy)
c.setopt(c.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
elif settings['proxy_type'] == 1 and settings['proxy_host']:
https_proxy = 'https://'+settings['proxy_host']+':'+str(settings['proxy_port'])
c.setopt(c.PROXY, https_proxy)
elif settings['proxy_type'] == 1 and settings['proxy_host']:
http_proxy = 'http://'+settings['proxy_host']+':'+str(settings['proxy_port'])
c.setopt(c.PROXY, http_proxy)
c.setopt(c.PROTOCOLS, c.PROTO_HTTPS)
try:
c.perform()
c.close()
#? assert c.getinfo(c.RESPONSE_CODE) < 300
result = buffer.getvalue()
# Body is a byte string.
LOG.info('nodes loaded with pycurl: ' + str(url))
return result
except Exception as ex:
LOG.error('TOX Downloading error with pycurl: ' + str(ex))
LOG.error('\n' + traceback.format_exc())
# drop through
if requests:
LOG.debug('Downloading with requests: ' + str(url))
try:
headers = dict()
headers['Content-Type'] = 'application/json'
proxies = dict()
if settings['proxy_type'] == 2 and settings['proxy_host']:
socks_proxy = 'socks5://'+settings['proxy_host']+':'+str(settings['proxy_port'])
settings['udp_enabled'] = False
proxies['https'] = socks_proxy
elif settings['proxy_type'] == 1 and settings['proxy_host']:
https_proxy = 'https://'+settings['proxy_host']+':'+str(settings['proxy_port'])
proxies['https'] = https_proxy
elif settings['proxy_type'] == 1 and settings['proxy_host']:
http_proxy = 'http://'+settings['proxy_host']+':'+str(settings['proxy_port'])
proxies['http'] = http_proxy
req = requests.get(url,
headers=headers,
proxies=proxies,
timeout=CONNECT_TIMEOUT)
# max_retries=3
assert req.status_code < 300
result = req.content
LOG.info('nodes loaded with requests: ' + str(url))
return result
except Exception as ex:
LOG.error('TOX Downloading error with requests: ' + str(ex))
# drop through
if not settings['proxy_type']: # no proxy
LOG.debug('Downloading with urllib no proxy: ' + str(url))
try:
req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')
response = urllib.request.urlopen(req)
result = response.read()
LOG.info('nodes loaded with no proxy: ' + str(url))
return result
except Exception as ex:
LOG.error('TOX Downloading ' + str(ex))
return ''
return ''

View File

@ -0,0 +1,584 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
"""
Code to interact with a running to using the stem library.
"""
import getpass
import os
import re
import select
import shutil
import socket
import sys
import logging
import time
from typing import Union, Callable, Union
import warnings
import stem
from stem.connection import MissingPassword
from stem.control import Controller
from stem.util.tor_tools import is_valid_fingerprint
global LOG
from toxygen_wrapper.tests.support_http import bAreWeConnected
warnings.filterwarnings('ignore')
LOG = logging.getLogger('TestS')
bHAVE_TORR = shutil.which('tor-resolve')
oSTEM_CONTROLER = None
yKNOWN_ONIONS = """
- facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd # facebook
- duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad # ddg
- zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad # keys.openpgp
"""
# grep -B 1 '<li><a href="' /tmp/tor.html |sed -e 's/<li><a href="http:../ - /' -e 's/.onion.*//' -e 's/<li id=./ # /' -e 's/".*//' -e '/^--/d' -e '/<li id/d'
# This will slow things down 1-2 min each
yKNOWN_ONIONS_TOR = """
# 2019.www.torproject.org
- jqyzxhjk6psc6ul5jnfwloamhtyh7si74b4743k2qgpskwwxrzhsxmad
# api.donate.torproject.org
- rbi3fpvpz4vlrx67scoqef2zxz7k4xyiludszg655favvkygjmhz6wyd
# archive.torproject.org
- uy3qxvwzwoeztnellvvhxh7ju7kfvlsauka7avilcjg7domzxptbq7qd
# aus1.torproject.org
- ot3ivcdxmalbsbponeeq5222hftpf3pqil24q3s5ejwo5t52l65qusid
# aus2.torproject.org
- b5t7emfr2rn3ixr4lvizpi3stnni4j4p6goxho7lldf4qg4hz5hvpqid
# blog.torproject.org
- pzhdfe7jraknpj2qgu5cz2u3i4deuyfwmonvzu5i3nyw4t4bmg7o5pad
# bridges.torproject.org
- yq5jjvr7drkjrelzhut7kgclfuro65jjlivyzfmxiq2kyv5lickrl4qd
# cloud.torproject.org
- ui3cpcohcoko6aydhuhlkwqqtvadhaflcc5zb7mwandqmcal7sbwzwqd
# collector.torproject.org
- pgmrispjerzzf2tdzbfp624cg5vpbvdw2q5a3hvtsbsx25vnni767yad
# collector2.torproject.org
- 3srlmjzbyyzz62jvdfqwn5ldqmh6mwnqxre2zamxveb75uz2qrqkrkyd
# community.torproject.org
- xmrhfasfg5suueegrnc4gsgyi2tyclcy5oz7f5drnrodmdtob6t2ioyd
# consensus-health.torproject.org
- tkskz5dkjel4xqyw5d5l3k52kgglotwn6vgb5wrl2oa5yi2szvywiyid
# crm.torproject.org
- 6ojylpznauimd2fga3m7g24vd7ebkzlemxdprxckevqpzs347ugmynqd
# deb.torproject.org
- apow7mjfryruh65chtdydfmqfpj5btws7nbocgtaovhvezgccyjazpqd
# dev.crm.torproject.org
- eewp4iydzyu2a5d6bvqadadkozxdbhsdtmsoczu2joexfrjjsheaecad
# dist.torproject.org
- scpalcwstkydpa3y7dbpkjs2dtr7zvtvdbyj3dqwkucfrwyixcl5ptqd
# donate-api.torproject.org
- lkfkuhcx62yfvzuz5o3ju4divuf4bsh2bybwd3oierq47kyp2ig2gvid
# donate.torproject.org
- yoaenchicimox2qdc47p36zm3cuclq7s7qxx6kvxqaxjodigfifljqqd
# exonerator.torproject.org
- pm46i5h2lfewyx6l7pnicbxhts2sxzacvsbmqiemqaspredf2gm3dpad
# extra.torproject.org
- kkr72iohlfix5ipjg776eyhplnl2oiv5tz4h2y2bkhjix3quafvjd5ad
# gettor.torproject.org
- ueghr2hzndecdntou33mhymbbxj7pir74nwzhqr6drhxpbz3j272p4id
# git.torproject.org
- xtlfhaspqtkeeqxk6umggfbr3gyfznvf4jhrge2fujz53433i2fcs3id
# gitlab.torproject.org
- eweiibe6tdjsdprb4px6rqrzzcsi22m4koia44kc5pcjr7nec2rlxyad
# gitweb.torproject.org
- gzgme7ov25seqjbphab4fkcph3jkobfwwpivt5kzbv3kqx2y2qttl4yd
# grafana1.torproject.org
- 7zjnw5lx2x27rwiocxkqdquo7fawj46mf2wiu2l7e6z6ng6nivmdxnad
# grafana2.torproject.org
- f3vd6fyiccuppybkxiblgigej3pfvvqzjnhd3wyv7h4ee5asawf2fhqd
# ircbouncer.torproject.org
- moz5kotsnjony4oxccxfo4lwk3pvoxmdoljibhgoonzgzjs5oemtjmqd
# metabase.metrics.torproject.org
- gr5pseamigereei4c6654hafzhid5z2c3oqzn6cfnx7yfyelt47znhad
# metrics.torproject.org
- hctxrvjzfpvmzh2jllqhgvvkoepxb4kfzdjm6h7egcwlumggtktiftid
# moat.torproject.org
- z7m7ogzdhu43nosvjtsuplfmuqa3ge5obahixydhmzdox6owwxfoxzid
# nagios.torproject.org
- w6vizvw4ckesva5fvlkrepynemxdq6pgo5sh4r76ec6msq5notkhqryd
# newsletter.torproject.org
- a4ygisnerpgtc5ayerl22pll6cls3oyj54qgpm7qrmb66xrxts6y3lyd
# nightlies.tbb.torproject.org
- umj4zbqdfcyevlkgqgpq6foxk3z75zzxsbgt5jqmfxofrbrjh3crbnad
# nyx.torproject.org
- 3ewfgrt4gzfccp6bnquhqb266r3zepiqpnsk3falwygkegtluwuyevid
- xao2lxsmia2edq2n5zxg6uahx6xox2t7bfjw6b5vdzsxi7ezmqob6qid
- dud2sxm6feahhuwj4y4lzktduy7v3qpaqsfkggtj2ojmzathttkegoid
# openpgpkey.torproject.org
- 2yldcptk56shc7lwieozoglw3t5ghty7m6mf2faysvfnzccqavbu2mad
# people.torproject.org
- 5ecey6oe4rocdsfoigr4idu42cecm2j7zfogc3xc7kfn4uriehwrs6qd
# prometheus1.torproject.org
- ydok5jiruh3ak6hcfdlm2g7iuraaxcomeckj2nucjsxif6qmrrda2byd
# prometheus2.torproject.org
- vyo6yrqhl3by7d6n5t6hjkflaqbarjpqjnvapr5u5rafk4imnfrmcjyd
# rbm.torproject.org
- nkuz2tpok7ctwd5ueer5bytj3bm42vp7lgjcsnznal3stotg6vyaakyd
# research.torproject.org
- xhqthou6scpfnwjyzc3ekdgcbvj76ccgyjyxp6cgypxjlcuhnxiktnqd
# review.torproject.net
- zhkhhhnppc5k6xju7n25rjba3wuip73jnodicxl65qdpchrwvvsilcyd
# rpm.torproject.org
- 4ayyzfoh5qdrokqaejis3rdredhvf22n3migyxfudpkpunngfc7g4lqd
# snowflake.torproject.org
- oljlphash3bpqtrvqpr5gwzrhroziw4mddidi5d2qa4qjejcbrmoypqd
# spec.torproject.org
- i3xi5qxvbrngh3g6o7czwjfxwjzigook7zxzjmgwg5b7xnjcn5hzciad
# staging-api.donate.torproject.org
- vorwws6g6mx23djlznmlqva4t5olulpnet6fxyiyytcu5dorp3fstdqd
# staging.crm.torproject.org
- pt34uujusar4arrvsqljndqlt7tck2d5cosaav5xni4nh7bmvshyp2yd
# staging.donate-api.torproject.org
- 7niqsyixinnhxvh33zh5dqnplxnc2yd6ktvats3zmtbbpzcphpbsa6qd
# status.torproject.org
- eixoaclv7qvnmu5rolbdwba65xpdiditdoyp6edsre3fitad777jr3ad
# stem.torproject.org
- mf34jlghauz5pxjcmdymdqbe5pva4v24logeys446tdrgd5lpsrocmqd
# styleguide.torproject.org
- 7khzpw47s35pwo3lvtctwf2szvnq3kgglvzc22elx7of2awdzpovqmqd
# submission.torproject.org
- givpjczyrb5jjseful3o5tn3tg7tidbu4gydl4sa5ekpcipivqaqnpad
# support.torproject.org
- rzuwtpc4wb3xdzrj3yeajsvm3fkq4vbeubm2tdxaqruzzzgs5dwemlad
# survey.torproject.org
- eh5esdnd6fkbkapfc6nuyvkjgbtnzq2is72lmpwbdbxepd2z7zbgzsqd
# svn-archive.torproject.org
- b63iq6es4biaawfilwftlfkw6a6putogxh4iakei2ioppb7dsfucekyd
# tb-manual.torproject.org
- dsbqrprgkqqifztta6h3w7i2htjhnq7d3qkh3c7gvc35e66rrcv66did
# test-api.donate.torproject.org
- wiofesr5qt2k7qrlljpk53isgedxi6ddw6z3o7iay2l7ne3ziyagxaid
# test-data.tbb.torproject.org
- umbk3kbgov4ekg264yulvbrpykfye7ohguqbds53qn547mdpt6o4qkad
# test.crm.torproject.org
- a4d52y2erv4eijii66cpnyqn7rsnnq3gmtrsdxzt2laoutvu4gz7fwid
# test.donate-api.torproject.org
- i4zhrn4md3ucd5dfgeo5lnqd3jy2z2kzp3lt4tdisvivzoqqtlrymkid
# www
- tttyx2vwp7ihml3vkhywwcizv6nbwrikpgeciy3qrow7l7muak2pnhad
# www.torproject.org
- 2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid
"""
# we check these each time but we got them by sorting bad relays
# in the wild we'll keep a copy here so we can avoid restesting
yKNOWN_NODNS = """
- 0x0.is
- a9.wtf
- apt96.com
- axims.net
- backup.spekadyon.org
- dfri.se
- dotsrc.org
- dtf.contact
- ezyn.de
- for-privacy.net
- galtland.network
- heraldonion.org
- interfesse.net
- kryptonit.org
- linkspartei.org
- mkg20001.io
- nicdex.com
- nx42.de
- pineapple.cx
- privacylayer.xyz
- privacysvcs.net
- prsv.ch
- sebastian-elisa-pfeifer.eu
- thingtohide.nl
- tor-exit-2.aa78i2efsewr0neeknk.xyz
- tor-exit-3.aa78i2efsewr0neeknk.xyz
- tor.dlecan.com
- tor.skankhunt42.pw
- transliberation.today
- tuxli.org
- unzane.com
- verification-for-nusenu.net
- www.defcon.org
"""
# - aklad5.com
# - artikel5ev.de
# - arvanode.net
# - dodo.pm
# - erjan.net
# - galtland.network
# - lonet.sh
# - moneneis.de
# - olonet.sh
# - or-exit-2.aa78i2efsewr0neeknk.xyz
# - or.wowplanet.de
# - ormycloud.org
# - plied-privacy.net
# - rivacysvcs.net
# - redacted.org
# - rofl.cat
# - sv.ch
# - tikel10.org
# - tor.wowplanet.de
# - torix-relays.org
# - tse.com
# - w.digidow.eu
# - w.cccs.de
def oMakeController(sSock:str = '', port:int = 9051, password:[str,None] = None):
global oSTEM_CONTROLER
from getpass import unix_getpass
if sSock and os.path.exists(sSock):
controller = Controller.from_socket_file(path=sSock)
else:
controller = Controller.from_port(port=port)
try:
controller.authenticate()
except (Exception, MissingPassword):
if password is None:
password = os.environ.get('TOR_CONTROLLER_PASSWORD', '')
if password:
p = password
else:
sys.stdout.flush()
p = unix_getpass(prompt='TOR_CONTROLLER_PASSWORD: ', stream=sys.stderr)
controller.authenticate(p)
oSTEM_CONTROLER = controller
return controller
def oGetStemController(log_level:int = 10, sock_or_pair:str = '/run/tor/control', password:[str,None] = None):
global oSTEM_CONTROLER
if oSTEM_CONTROLER: return oSTEM_CONTROLER
import stem.util.log
# stem.util.log.Runlevel = 'DEBUG' = 20 # log_level
if os.path.exists(sock_or_pair):
LOG.info(f"controller from socket {sock_or_pair}")
controller = Controller.from_socket_file(path=sock_or_pair)
else:
if type(sock_or_pair) == int:
port = sock_or_pair
elif ':' in sock_or_pair:
port = sock_or_pair.split(':')[1]
else:
port = sock_or_pair
try:
port = int(port)
except: port = 9051
LOG.info(f"controller from port {port}")
controller = Controller.from_port(port=port)
try:
controller.authenticate()
except (Exception, MissingPassword):
if password is None:
password = os.environ.get('TOR_CONTROLLER_PASSWORD', '')
if password:
p = password
else:
sys.stdout.flush()
p = getpass.unix_getpass(prompt='TOR_CONTROLLER_PASSWORD: ', stream=sys.stderr)
controller.authenticate(p)
oSTEM_CONTROLER = controller
LOG.debug(f"{controller}")
return oSTEM_CONTROLER
def sMapaddressResolv(target:str, iPort:int = 9051, log_level:int = 10, password:[str,None] = None) -> str:
if not stem:
LOG.warn('please install the stem Python package')
return ''
try:
controller = oGetStemController(log_level=log_level, password=password)
map_dict = {"0.0.0.0": target}
map_ret = controller.map_address(map_dict)
return map_ret
except Exception as e:
LOG.exception(e)
return ''
def vwait_for_controller(controller, wait_boot:int = 10) -> None:
if bAreWeConnected() is False:
raise RuntimeError("we are not connected")
percent = i = 0
# You can call this while boostrapping
while percent < 100 and i < wait_boot:
bootstrap_status = controller.get_info("status/bootstrap-phase")
progress_percent = re.match('.* PROGRESS=([0-9]+).*', bootstrap_status)
percent = int(progress_percent.group(1))
LOG.info(f"Bootstrapping {percent}%")
time.sleep(5)
i += 5
def bin_to_hex(raw_id:int, length: Union[int, None] = None) -> str:
if length is None: length = len(raw_id)
res = ''.join('{:02x}'.format(raw_id[i]) for i in range(length))
return res.upper()
def lIntroductionPoints(controller=None, lOnions:[list,None] = None, itimeout:int = 120, log_level:int = 10, password:[str,None] = None):
"""now working !!! stem 1.8.x timeout must be huge >120
'Provides the descriptor for a hidden service. The **address** is the
'.onion' address of the hidden service '
What about Services?
"""
if lOnions is None: lOnions = []
try:
from cryptography.utils import int_from_bytes
except ImportError:
import cryptography.utils
# guessing - not in the current cryptography but stem expects it
def int_from_bytes(**args): return int.to_bytes(*args)
cryptography.utils.int_from_bytes = int_from_bytes
# this will fai if the trick above didnt work
from stem.prereq import is_crypto_available
is_crypto_available(ed25519=True)
from queue import Empty
from stem import Timeout
from stem.client.datatype import LinkByFingerprint
from stem.descriptor.hidden_service import HiddenServiceDescriptorV3
if type(lOnions) not in [set, tuple, list]:
lOnions = list(lOnions)
if controller is None:
controller = oGetStemController(log_level=log_level, password=password)
l = []
for elt in lOnions:
LOG.info(f"controller.get_hidden_service_descriptor {elt}")
try:
desc = controller.get_hidden_service_descriptor(elt,
await_result=True,
timeout=itimeout)
# LOG.log(40, f"{dir(desc)} get_hidden_service_descriptor")
# timeouts 20 sec
# mistakenly a HSv2 descriptor
hs_address = HiddenServiceDescriptorV3.from_str(str(desc)) # reparse as HSv3
oInnerLayer = hs_address.decrypt(elt)
# LOG.log(40, f"{dir(oInnerLayer)}")
# IntroductionPointV3
n = oInnerLayer.introduction_points
if not n:
LOG.warn(f"NO introduction points for {elt}")
continue
LOG.info(f"{elt} {len(n)} introduction points")
lp = []
for introduction_point in n:
for linkspecifier in introduction_point.link_specifiers:
if isinstance(linkspecifier, LinkByFingerprint):
# LOG.log(40, f"Getting fingerprint for {linkspecifier}")
if hasattr(linkspecifier, 'fingerprint'):
assert len(linkspecifier.value) == 20
lp += [bin_to_hex(linkspecifier.value)]
LOG.info(f"{len(lp)} introduction points for {elt}")
l += lp
except (Empty, Timeout,) as e: # noqa
LOG.warn(f"Timed out getting introduction points for {elt}")
except stem.DescriptorUnavailable as e:
LOG.error(e)
except Exception as e:
LOG.exception(e)
return l
def zResolveDomain(domain:str) -> int:
try:
ip = sTorResolve(domain)
except Exception as e: # noqa
ip = ''
if ip == '':
try:
lpair = getaddrinfo(domain, 443)
except Exception as e:
LOG.warn(f"{e}")
lpair = None
if lpair is None:
LOG.warn(f"TorResolv and getaddrinfo failed for {domain}")
return ''
ip = lpair[0]
return ip
def sTorResolve(target:str,
verbose:bool = False,
sHost:str = '127.0.0.1',
iPort:int = 9050,
SOCK_TIMEOUT_SECONDS:float = 10.0,
SOCK_TIMEOUT_TRIES:int = 3,
) -> str:
MAX_INFO_RESPONSE_PACKET_LENGTH = 8
if '@' in target:
LOG.warn(f"sTorResolve failed invalid hostname {target}")
return ''
target = target.strip('/')
seb = b"\x04\xf0\x00\x00\x00\x00\x00\x01\x00"
seb += bytes(target, 'US-ASCII') + b"\x00"
assert len(seb) == 10 + len(target), str(len(seb)) + repr(seb)
# LOG.debug(f"0 Sending {len(seb)} to The TOR proxy {seb}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((sHost, iPort))
sock.settimeout(SOCK_TIMEOUT_SECONDS)
oRet = sock.sendall(seb) # noqa
i = 0
data = ''
while i < SOCK_TIMEOUT_TRIES:
i += 1
time.sleep(3)
lReady = select.select([sock.fileno()], [], [],
SOCK_TIMEOUT_SECONDS)
if not lReady[0]: continue
try:
flags = socket.MSG_WAITALL
data = sock.recv(MAX_INFO_RESPONSE_PACKET_LENGTH, flags)
except socket.timeout:
LOG.warn(f"4 The TOR proxy {(sHost, iPort)}" \
+" didnt reply in " + str(SOCK_TIMEOUT_SECONDS) + " sec."
+" #" +str(i))
except Exception as e:
LOG.error("4 The TOR proxy " \
+repr((sHost, iPort)) \
+" errored with " + str(e)
+" #" +str(i))
sock.close()
return ''
else:
if len(data) > 0: break
if len(data) == 0:
if i > SOCK_TIMEOUT_TRIES:
sLabel = "5 No reply #"
else:
sLabel = "5 No data #"
LOG.warn(f"sTorResolve: {sLabel} {i} on {sHost}:{iPort}")
sock.close()
return ''
assert len(data) >= 8
packet_sf = data[1]
if packet_sf == 90:
# , "%d" % packet_sf
assert f"{packet_sf}" == "90", f"packet_sf = {packet_sf}"
return f"{data[4]}.{data[5]}.{data[6]}.{data[7]}"
else:
# 91
LOG.warn(f"tor-resolve failed for {target} on {sHost}:{iPort}")
sout = f"/tmp/{os.getpid}.tmp"
iRet = os.system(f"tor-resolve -4 {target} > {sout} 2>/dev/null")
# os.system("strace tor-resolve -4 "+target+" 2>&1|grep '^sen\|^rec'")
if iRet == 0:
sAns = open(sout, 'rt').read().strip()
return sAns
return ''
def getaddrinfo(sHost:str, sPort:str) -> list:
# do this the explicit way = Ive seen the compact connect fail
# >>> sHost, sPort = 'l27.0.0.1', 33446
# >>> sock.connect((sHost, sPort))
# socket.gaierror: [Errno -2] Name or service not known
try:
lElts = socket.getaddrinfo(sHost, int(sPort), socket.AF_INET)
lElts = list(filter(lambda elt: elt[1] == socket.SOCK_DGRAM, lElts))
assert len(lElts) == 1, repr(lElts)
lPair = lElts[0][-1]
assert len(lPair) == 2, repr(lPair)
assert type(lPair[1]) == int, repr(lPair)
except (socket.gaierror, OSError, BaseException) as e:
LOG.error(e)
return None
return lPair
def icheck_torrc(sFile:str, oArgs) -> int:
l = open(sFile, 'rt').readlines()
a = {}
for elt in l:
elt = elt.strip()
if not elt or ' ' not in elt: continue
(k, v,) = elt.split(' ', 1)
a[k] = v
keys = a
if 'HashedControlPassword' not in keys:
LOG.info('Add HashedControlPassword for security')
print('run: tor --hashcontrolpassword <TopSecretWord>')
if 'ExcludeExitNodes' in keys:
elt = 'BadNodes.ExcludeExitNodes.BadExit'
LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}")
print(f"move to the {elt} section as a list")
if 'GuardNodes' in keys:
elt = 'GoodNodes.GuardNodes'
LOG.warn(f"Remove GuardNodes and move then to {oArgs.good_nodes}")
print(f"move to the {elt} section as a list")
if 'ExcludeNodes' in keys:
elt = 'BadNodes.ExcludeNodes.BadExit'
LOG.warn(f"Remove ExcludeNodes and move then to {oArgs.bad_nodes}")
print(f"move to the {elt} section as a list")
if 'ControlSocket' not in keys and os.path.exists('/run/tor/control'):
LOG.info('Add ControlSocket /run/tor/control for us')
print('ControlSocket /run/tor/control GroupWritable RelaxDirModeCheck')
if 'UseMicrodescriptors' not in keys or keys['UseMicrodescriptors'] != '1':
LOG.info('Add UseMicrodescriptors 0 for us')
print('UseMicrodescriptors 0')
if 'AutomapHostsSuffixes' not in keys:
LOG.info('Add AutomapHostsSuffixes for onions')
print('AutomapHostsSuffixes .exit,.onion')
if 'AutoMapHostsOnResolve' not in keys:
LOG.info('Add AutoMapHostsOnResolve for onions')
print('AutoMapHostsOnResolve 1')
if 'VirtualAddrNetworkIPv4' not in keys:
LOG.info('Add VirtualAddrNetworkIPv4 for onions')
print('VirtualAddrNetworkIPv4 172.16.0.0/12')
return 0
def lExitExcluder(oArgs, iPort:int = 9051, log_level:int = 10, password:[str,None] = None) -> list:
"""
https://raw.githubusercontent.com/nusenu/noContactInfo_Exit_Excluder/main/exclude_noContactInfo_Exits.py
"""
if not stem:
LOG.warn('please install the stem Python package')
return ''
LOG.debug('lExcludeExitNodes')
try:
controller = oGetStemController(log_level=log_level, password=password)
# generator
relays = controller.get_server_descriptors()
except Exception as e:
LOG.error(f'Failed to get relay descriptors {e}')
return None
if controller.is_set('ExcludeExitNodes'):
LOG.info('ExcludeExitNodes is in use already.')
return None
exit_excludelist=[]
LOG.debug("Excluded exit relays:")
for relay in relays:
if relay.exit_policy.is_exiting_allowed() and not relay.contact:
if is_valid_fingerprint(relay.fingerprint):
exit_excludelist.append(relay.fingerprint)
LOG.debug("https://metrics.torproject.org/rs.html#details/%s" % relay.fingerprint)
else:
LOG.warn('Invalid Fingerprint: %s' % relay.fingerprint)
try:
controller.set_conf('ExcludeExitNodes', exit_excludelist)
LOG.info('Excluded a total of %s exit relays without ContactInfo from the exit position.' % len(exit_excludelist))
except Exception as e:
LOG.exception('ExcludeExitNodes ' +str(e))
return exit_excludelist
if __name__ == '__main__':
if len(sys.argv) <= 1:
targets = ['zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad', # keys.openpgp.org
'facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd', # facebook
'libera75jm6of4wxpxt4aynol3xjmbtxgfyjpu34ss4d7r7q2v5zrpyd', # libera
]
else:
targets = sys.argv[1:]
controller = oGetStemController(log_level=10)
for target in targets:
print(target, lIntroductionPoints(controller, [target], itimeout=120))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,689 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
import logging
import os
import random
import re
import sys
import time
import threading
from ctypes import *
from typing import Union, Callable
import toxygen_wrapper.toxcore_enums_and_consts as enums
from toxygen_wrapper.tox import Tox, UINT32_MAX, ToxError
from toxygen_wrapper.toxcore_enums_and_consts import (TOX_ADDRESS_SIZE, TOX_CONNECTION,
TOX_FILE_CONTROL,
TOX_MESSAGE_TYPE,
TOX_SECRET_KEY_SIZE,
TOX_USER_STATUS)
try:
import support_testing as ts
except ImportError:
import toxygen_wrapper.tests.support_testing as ts
sleep = time.sleep
ADDR_SIZE = 38 * 2
CLIENT_ID_SIZE = 32 * 2
THRESHOLD = 120 # >25
fSOCKET_TIMEOUT = 15.0
iLOOP_N = 50
iN = 6
global LOG
LOG = logging.getLogger('TestS')
if False:
def LOG_ERROR(l: str) -> None: LOG.error('+ '+l)
def LOG_WARN(l: str) -> None: LOG.warn('+ '+l)
def LOG_INFO(l: str) -> None: LOG.info('+ '+l)
def LOG_DEBUG(l: str) -> None: LOG.debug('+ '+l)
def LOG_TRACE(l: str) -> None: pass # print('+ '+l)
else:
# just print to stdout so there is NO complications from logging.
def LOG_ERROR(l: str) -> None: print('EROR+ '+l)
def LOG_WARN(l: str) -> None: print('WARN+ '+l)
def LOG_INFO(l: str) -> None: print('INFO+ '+l)
def LOG_DEBUG(l: str) -> None: print('DEBUG+ '+l)
def LOG_TRACE(l: str) -> None: pass # print('TRAC+ '+l)
class WrapperMixin():
def bBobNeedAlice(self) -> bool:
"""
"""
if hasattr(self, 'baid') and self.baid >= 0 and \
self.baid in self.bob.self_get_friend_list():
LOG.warn(f"setUp ALICE IS ALREADY IN BOBS FRIEND LIST")
return False
elif self.bob.self_get_friend_list_size() >= 1:
LOG.warn(f"setUp BOB STILL HAS A FRIEND LIST")
return False
return True
def bAliceNeedAddBob (self) -> bool:
if hasattr(self, 'abid') and self.abid >= 0 and \
self.abid in self.alice.self_get_friend_list():
LOG.warn(f"setUp BOB IS ALREADY IN ALICES FRIEND LIST")
return False
if self.alice.self_get_friend_list_size() >= 1:
LOG.warn(f"setUp ALICE STILL HAS A FRIEND LIST")
return False
return True
def get_connection_status(self) -> bool:
if self.bob.mycon_time <= 1 or self.alice.mycon_time <= 1:
pass
# drop through
elif self.bob.dht_connected == TOX_CONNECTION['NONE']:
return False
elif self.alice.dht_connected == TOX_CONNECTION['NONE']:
return False
# if not self.connected
if self.bob.self_get_connection_status() == TOX_CONNECTION['NONE']:
return False
if self.alice.self_get_connection_status() == TOX_CONNECTION['NONE']:
return False
return True
def loop(self, n) -> None:
"""
t:iterate
t:iteration_interval
"""
interval = self.bob.iteration_interval()
for i in range(n):
self.alice.iterate()
self.bob.iterate()
sleep(interval / 1000.0)
def call_bootstrap(self, num: Union[int, None] = None, lToxes:Union[list[int], None] =None, i:int =0, fsocket_timeout:float = fSOCKET_TIMEOUT) -> None:
if num == None: num=ts.iNODES
if lToxes is None:
lToxes = [self.alice, self.bob]
# LOG.debug(f"call_bootstrap network={oTOX_OARGS.network}")
otox = lToxes[0]
if otox._args.network in ['new', 'newlocal', 'localnew']:
ts.bootstrap_local(self.lUdp, lToxes)
elif not ts.bAreWeConnected():
LOG.warning('we are NOT CONNECTED')
else:
random.shuffle(self.lUdp)
if otox._args.proxy_port > 0:
lElts = self.lUdp[:1]
else:
lElts = self.lUdp[:num+i]
LOG.debug(f"call_bootstrap ts.bootstrap_udp {len(lElts)}")
ts.bootstrap_udp(lElts, lToxes, fsocket_timeout=fsocket_timeout)
random.shuffle(self.lTcp)
lElts = self.lTcp[:num+i]
LOG.debug(f"call_bootstrap ts.bootstrap_tcp {len(lElts)}")
ts.bootstrap_tcp(lElts, lToxes, fsocket_timeout=fsocket_timeout)
def group_until_connected(self, otox, group_number:int, num: Union[int, None] = None, iMax:int = THRESHOLD, fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool:
"""
"""
i = 0
bRet = None
while i <= iMax :
i += 1
iRet = otox.group_is_connected(group_number)
if iRet == True or iRet == 0:
bRet = True
break
if i % 5 == 0:
j = i//5 + 1
self.call_bootstrap(num, lToxes=None, i=j, fsocket_timeout=fsocket_timeout)
s = ''
if i == 0: s = '\n'
LOG.info(s+"group_until_connected " \
+" #" + str(i) \
+" iRet=" +repr(iRet) \
+f" BOBS={otox.mycon_status}" \
+f" last={int(otox.mycon_time)}" )
self.loop(iLOOP_N)
else:
bRet = False
if bRet:
LOG.info(f"group_until_connected True i={i}" \
+f" iMax={iMax}" \
+f" BOB={otox.self_get_connection_status()}" \
+f" last={int(otox.mycon_time)}" )
return True
else:
LOG.warning(f"group_until_connected False i={i}" \
+f" iMax={iMax}" \
+f" BOB={otox.self_get_connection_status()}" \
+f" last={int(otox.mycon_time)}" )
return False
def loop_until_connected(self, otox=None, num: Union[int, None] = None, fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool:
"""
t:on_self_connection_status
t:self_get_connection_status
"""
i = 0
num = 4
bRet = None
if otox is None: otox = self.bob
while i <= otox._args.test_timeout :
i += 1
if (self.alice.mycon_status and self.bob.mycon_status):
bRet = True
break
if i % 5 == 0:
j = i//5 + 1
self.call_bootstrap(num, lToxes=None, i=j, fsocket_timeout=fsocket_timeout)
s = ''
if i == 0: s = '\n'
LOG.info(s+"loop_until_connected " \
+" #" + str(i) \
+" BOB=" +repr(self.bob.self_get_connection_status()) \
+" ALICE=" +repr(self.alice.self_get_connection_status())
+f" BOBS={self.bob.mycon_status}" \
+f" ALICES={self.alice.mycon_status}" \
+f" last={int(self.bob.mycon_time)}" )
if (self.alice.mycon_status and self.bob.mycon_status):
bRet = True
break
if (self.alice.self_get_connection_status() and
self.bob.self_get_connection_status()):
LOG_WARN(f"loop_until_connected disagree status() DISAGREE" \
+f' self.bob.mycon_status={self.bob.mycon_status}' \
+f' alice.mycon_status={self.alice.mycon_status}' \
+f" last={int(self.bob.mycon_time)}" )
bRet = True
break
self.loop(iLOOP_N)
else:
bRet = False
if bRet or \
( self.bob.self_get_connection_status() != TOX_CONNECTION['NONE'] and \
self.alice.self_get_connection_status() != TOX_CONNECTION['NONE'] ):
LOG.info(f"loop_until_connected returning True i={i}" \
+f" BOB={self.bob.self_get_connection_status()}" \
+f" ALICE={self.alice.self_get_connection_status()}" \
+f" last={int(self.bob.mycon_time)}" )
return True
otox._args.test_timeout += 5
LOG.warning(f"loop_until_connected returning False i={i}" \
+f" BOB={self.bob.self_get_connection_status()}" \
+f" ALICE={self.alice.self_get_connection_status()}" \
+f" last={int(self.bob.mycon_time)}" )
return False
def wait_objs_attr(self, objs: list, attr: str, fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool:
i = 0
otox = objs[0]
while i <= otox._args.test_timeout:
i += 1
if i % 5 == 0:
num = None
j = 0
j = i//5
self.call_bootstrap(num, lToxes=objs, i=j, fsocket_timeout=fsocket_timeout)
LOG.debug(f"wait_objs_attr {objs} for {attr} {i}")
if all([getattr(obj, attr) for obj in objs]):
return True
self.loop(iLOOP_N)
else:
otox._args.test_timeout += 1
LOG.warn(f"wait_objs_attr for {attr} i >= {otox._args.test_timeout}")
return all([getattr(obj, attr) is not None for obj in objs])
def wait_otox_attrs(self, obj, attrs: list[str], fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool:
assert all(attrs), f"wait_otox_attrs {attrs}"
i = 0
otox = obj
while i <= otox._args.test_timeout:
i += 1
if i % 5 == 0:
num = None
j = 0
if obj.mycon_time == 1:
# start with 4 random nodes ti bootstrap
num = 4
# every 10 sec add another random nodes to bootstrap
j = i//10 + 1
if obj.self_get_connection_status() == TOX_CONNECTION['NONE']:
self.call_bootstrap(num, lToxes=[obj], i=j, fsocket_timeout=fsocket_timeout)
LOG.debug(f"wait_otox_attrs {obj.name} for {attrs} {i}" \
+f" last={int(obj.mycon_time)}")
if all([getattr(obj, attr) is not None for attr in attrs]):
return True
self.loop(iLOOP_N)
else:
LOG.warning(f"wait_otox_attrs i >= {otox._args.test_timeout} attrs={attrs} results={[getattr(obj, attr) for attr in attrs]}")
return all([getattr(obj, attr) for attr in attrs])
def wait_ensure_exec(self, method, args:list, fsocket_timeout:float = fSOCKET_TIMEOUT) -> bool:
i = 0
oRet = None
while i <= self.bob._args.test_timeout:
i += 1
if i % 5 == 0:
# every 10 sec add another random nodes to bootstrap
j = i//10 + 1
self.call_bootstrap(num=None, lToxes=None, i=j, fsocket_timeout=fsocket_timeout)
LOG.debug("wait_ensure_exec " \
+" " +str(method)
+" " +str(i))
try:
oRet = method(*args)
if oRet:
LOG.info(f"wait_ensure_exec oRet {oRet!r}")
return True
except ArgumentError as e:
# ArgumentError('This client is currently NOT CONNECTED to the friend.')
# dunno
LOG.warning(f"wait_ensure_exec ArgumentError {e}")
return False
except Exception as e:
LOG.warning(f"wait_ensure_exec EXCEPTION {e}")
return False
sleep(3)
else:
LOG.error(f"wait_ensure_exec i >= {1*self.bob._args.test_timeout}")
return False
return oRet
def bob_add_alice_as_friend_norequest(self) -> bool:
if not self.bBobNeedAlice(): return True
apk = self.alice.self_get_public_key()
iRet = self.bob.friend_add_norequest(apk)
if iRet < 0:
return False
self.baid = self.bob.friend_by_public_key(apk)
assert self.baid >= 0, self.baid
assert self.bob.friend_exists(self.baid), "bob.friend_exists"
assert not self.bob.friend_exists(self.baid + 1)
assert self.baid in self.bob.self_get_friend_list()
assert self.bob.self_get_friend_list_size() >= 1
return True
def alice_add_bob_as_friend_norequest(self) -> bool:
if not self.bAliceNeedAddBob(): return True
bpk = self.bob.self_get_public_key()
iRet = self.alice.friend_add_norequest(bpk)
if iRet < 0:
return False
self.abid = self.alice.friend_by_public_key(bpk)
assert self.abid >= 0, self.abid
assert self.abid in self.alice.self_get_friend_list()
assert self.alice.friend_exists(self.abid), "alice.friend_exists"
assert not self.alice.friend_exists(self.abid + 1)
assert self.alice.self_get_friend_list_size() >= 1
return True
def both_add_as_friend(self) -> bool:
if self.bob._args.norequest:
assert self.bob_add_alice_as_friend_norequest()
assert self.alice_add_bob_as_friend_norequest()
else:
assert self.bob_add_alice_as_friend()
assert self.alice_add_bob_as_friend()
if not hasattr(self, 'baid') or self.baid < 0:
LOG.warn("both_add_as_friend no bob, baid")
if not hasattr(self, 'abid') or self.abid < 0:
LOG.warn("both_add_as_friend no alice, abid")
return True
def both_add_as_friend_norequest(self) -> bool:
if True or self.bBobNeedAlice():
assert self.bob_add_alice_as_friend_norequest()
if True or self.bAliceNeedAddBob():
assert self.alice_add_bob_as_friend_norequest()
if not hasattr(self.bob, 'baid') or self.bob.baid < 0:
LOG.warn("both_add_as_friend_norequest no bob, baid")
if not hasattr(self.alice, 'abid') or self.alice.abid < 0:
LOG.warn("both_add_as_friend_norequest no alice, abid")
#: Test last online
#? assert self.alice.friend_get_last_online(self.abid) is not None
#? assert self.bob.friend_get_last_online(self.baid) is not None
return True
def bob_add_alice_as_friend(self) -> bool:
"""
t:friend_add
t:on_friend_request
t:friend_by_public_key
"""
MSG = 'Alice, this is Bob.'
sSlot = 'friend_request'
if not self.bBobNeedAlice(): return True
def alices_on_friend_request(iTox,
public_key,
message_data,
message_data_size,
*largs) -> None:
try:
assert str(message_data, 'UTF-8') == MSG
LOG_INFO(f"alices_on_friend_request: {sSlot} = True ")
except Exception as e:
LOG_WARN(f"alices_on_friend_request: EXCEPTION {e}")
# return
setattr(self.bob, sSlot, True)
setattr(self.bob, sSlot, None)
apk = self.alice.self_get_public_key()
inum = -1
try:
inum = self.bob.friend_add(self.alice._address, bytes(MSG, 'UTF-8'))
assert inum >= 0, f"bob_add_alice_as_friend !>= 0 {inum}"
# need a friend connected?
if not self.get_connection_status():
LOG.warning(f"test_groups_join NOT CONNECTED")
self.loop_until_connected(self.bob)
self.alice.callback_friend_request(alices_on_friend_request)
if not self.wait_otox_attrs(self.bob, [sSlot]):
LOG_WARN(f"bob_add_alice_as_friend NO setting {sSlot}")
return False
self.baid = self.bob.friend_by_public_key(apk)
assert self.baid >= 0, self.baid
assert self.bob.friend_exists(self.baid)
assert not self.bob.friend_exists(self.baid + 1)
assert self.bob.self_get_friend_list_size() >= 1
assert self.baid in self.bob.self_get_friend_list()
except Exception as e:
LOG.error(f"bob_add_alice_as_friend EXCEPTION {e}")
return False
finally:
self.bob.callback_friend_message(None)
return True
def alice_add_bob_as_friend(self) -> bool:
"""
t:friend_add
t:on_friend_request
t:friend_by_public_key
"""
MSG = 'Bob, this is Alice.'
sSlot = 'friend_request'
if not self.bAliceNeedAddBob(): return True
def bobs_on_friend_request(iTox,
public_key,
message_data,
message_data_size,
*largs) -> None:
LOG_DEBUG(f"bobs_on_friend_request: " +repr(message_data))
try:
assert str(message_data, 'UTF-8') == MSG
except Exception as e:
LOG_WARN(f"bobs_on_friend_request: Exception {e}")
# return
setattr(self.alice, sSlot, True)
LOG_INFO(f"bobs_on_friend_request: {sSlot} = True ")
setattr(self.alice, sSlot, None)
bpk = self.bob.self_get_public_key()
inum = -1
try:
inum = self.alice.friend_add(self.bob._address, bytes(MSG, 'UTF-8'))
assert inum >= 0, f"alice.friend_add !>= 0 {inum}"
self.bob.callback_friend_request(bobs_on_friend_request)
if not self.wait_otox_attrs(self.alice, [sSlot]):
LOG_WARN(f"alice.friend_add NO wait {sSlot}")
return False
self.abid = self.alice.friend_by_public_key(bpk)
assert self.abid >= 0, self.abid
assert self.alice.friend_exists(self.abid), "not exists"
assert not self.alice.friend_exists(self.abid + 1), "exists +1"
assert self.abid in self.alice.self_get_friend_list(), "not in list"
assert self.alice.self_get_friend_list_size() >= 1, "list size"
except Exception as e:
LOG.error(f"alice.friend_add EXCEPTION {e}")
return False
finally:
self.bob.callback_friend_message(None)
return True
def bob_add_alice_as_friend_and_status(self) -> bool:
if self.bob._args.norequest:
assert self.bob_add_alice_as_friend_norequest()
else:
assert self.bob_add_alice_as_friend()
#: Wait until both are online
sSlot = 'friend_conn_status'
setattr(self.bob, sSlot, False)
def bobs_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"bobs_on_friend_connection_status {friend_id} ?>=0" +repr(iStatus))
setattr(self.bob, sSlot, False)
sSlot = 'friend_status'
setattr(self.bob, sSlot, None)
def bobs_on_friend_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"bobs_on_friend_status {friend_id} ?>=0 iS={iStatus}")
setattr(self.bob, sSlot, False)
sSlot = 'friend_conn_status'
setattr(self.alice, sSlot, None)
def alices_on_friend_connection_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"alices_on_friend_connection_status {friend_id} ?>=0 " +repr(iStatus))
setattr(self.alice, sSlot, False)
sSlot = 'friend_status'
setattr(self.alice, sSlot, None)
def alices_on_friend_status(iTox, friend_id, iStatus, *largs) -> None:
LOG_INFO(f"alices_on_friend_status {friend_id} ?>=0 iS={iStatus}")
setattr(self.alice, sSlot, False)
try:
# need a friend connected?
if not self.get_connection_status():
self.loop_until_connected(self.bob)
LOG.info("bob_add_alice_as_friend_and_status waiting for alice connections")
if not self.wait_otox_attrs(self.alice,
['friend_conn_status',
'friend_status']):
return False
self.bob.callback_friend_connection_status(bobs_on_friend_connection_status)
self.bob.callback_friend_status(bobs_on_friend_status)
self.alice.callback_friend_connection_status(alices_on_friend_connection_status)
self.alice.callback_friend_status(alices_on_friend_status)
LOG.info("bob_add_alice_as_friend_and_status waiting for bob connections")
if not self.wait_otox_attrs(self.bob,
['friend_conn_status',
'friend_status']):
LOG_WARN('bob_add_alice_as_friend_and_status NO')
# return False
except Exception as e:
LOG.error(f"bob_add_alice_as_friend_and_status ERROR {e}")
return False
finally:
self.alice.callback_friend_connection_status(None)
self.bob.callback_friend_connection_status(None)
self.alice.callback_friend_status(None)
self.bob.callback_friend_status(None)
return True
def bob_to_alice_connected(self) -> bool:
assert hasattr(self, 'baid')
iRet = self.bob.friend_get_connection_status(self.baid)
if iRet == TOX_CONNECTION['NONE']:
LOG.warn("bob.friend_get_connection_status")
return False
return True
def alice_to_bob_connected(self) -> bool:
assert hasattr(self, 'abid')
iRet = self.alice.friend_get_connection_status(self.abid)
if iRet == TOX_CONNECTION['NONE']:
LOG.error("alice.friend_get_connection_status")
return False
return True
def otox_test_groups_create(self,
otox,
group_name='test_group',
nick='test_nick',
topic='Test Topic', # str
) -> int:
privacy_state = enums.TOX_GROUP_PRIVACY_STATE['PUBLIC']
iGrp = otox.group_new(privacy_state, group_name, nick)
assert iGrp >= 0
LOG.info(f"group iGrp={iGrp}")
otox.group_set_topic(iGrp, topic)
assert otox.group_get_topic(iGrp) == topic
assert otox.group_get_topic_size(iGrp) == len(topic)
name = otox.group_get_name(iGrp)
if type(name) == bytes:
name = str(name, 'utf-8')
assert name == group_name, name
assert otox.group_get_name_size(iGrp) == len(group_name)
sPk = otox.group_self_get_public_key(iGrp)
assert otox.group_get_password_size(iGrp) >= 0
sP = otox.group_get_password(iGrp)
assert otox.group_get_privacy_state(iGrp) == privacy_state
assert otox.group_get_number_groups() > 0, "numg={otox.group_get_number_groups()}"
LOG.info(f"group pK={sPk} iGrp={iGrp} numg={otox.group_get_number_groups()}")
return iGrp
def otox_verify_group(self, otox, iGrp) -> None:
"""
group_self_get_name
group_self_get_peer_id
group_self_get_public_key
group_self_get_role
group_self_get_status
group_self_set_name
"""
group_number = iGrp
try:
assert type(iGrp) == int, "otox_test_groups_join iGrp not an int"
assert iGrp < UINT32_MAX, "otox_test_groups_join iGrp failure UINT32_MAX"
assert iGrp >= 0, f"otox_test_groups_join iGrp={iGrp} < 0"
sGrp = otox.group_get_chat_id(iGrp)
assert len(sGrp) == enums.TOX_GROUP_CHAT_ID_SIZE * 2, \
f"group sGrp={sGrp} {len(sGrp)} != {enums.TOX_GROUP_CHAT_ID_SIZE * 2}"
sPk = otox.group_self_get_public_key(iGrp)
LOG.info(f"otox_verify_group sPk={sPk} iGrp={iGrp} n={otox.group_get_number_groups()}")
sName = otox.group_self_get_name(iGrp)
iStat = otox.group_self_get_status(iGrp)
iId = otox.group_self_get_peer_id(iGrp)
iRole = otox.group_self_get_role(iGrp)
iStat = otox.group_self_get_status(iGrp)
LOG.info(f"otox_verify_group sName={sName} iStat={iStat} iId={iId} iRole={iRole} iStat={iStat}")
assert otox.group_self_set_name(iGrp, "NewName")
bRet = otox.group_is_connected(iGrp)
except Exception as e:
LOG.warn(f"group_is_connected EXCEPTION {e}")
return -1
# chat->connection_state == CS_CONNECTED || chat->connection_state == CS_CONNECTING;
if not bRet:
LOG.warn(f"group_is_connected WARN not connected iGrp={iGrp} n={otox.group_get_number_groups()}")
else:
LOG.info(f"group_is_connected SUCCESS connected iGrp={iGrp} n={otox.group_get_number_groups()}")
try:
bRet = self.group_until_connected(otox, iGrp, iMax=2*otox._args.test_timeout)
except Exception as e:
LOG.error(f"group_until_connected EXCEPTION {e}")
return -1
# chat->connection_state == CS_CONNECTED || chat->connection_state == CS_CONNECTING;
if bRet:
LOG.warn(f"group_until_connected WARN not connected iGrp={iGrp} n={otox.group_get_number_groups()}")
else:
LOG.info(f"group_until_connected SUCCESS connected iGrp={iGrp} n={otox.group_get_number_groups()}")
message = bytes('hello', 'utf-8')
bRet = otox.group_send_message(iGrp, TOX_MESSAGE_TYPE['NORMAL'], message)
if not bRet:
LOG.warn(f"group_send_message {bRet}")
else:
LOG.debug(f"group_send_message {bRet}")
# 360497DA684BCE2A500C1AF9B3A5CE949BBB9F6FB1F91589806FB04CA039E313
# 75D2163C19FEFFE51508046398202DDC321E6F9B6654E99BAE45FFEC134F05DE
def otox_test_groups_join(self, otox,
chat_id="75d2163c19feffe51508046398202ddc321e6f9b6654e99bae45ffec134f05de",
nick='nick',
topic='Test Topic', # str
):
status = ''
password = ''
LOG.debug(f"group_join nick={nick} chat_id={chat_id}")
try:
iGrp = otox.group_join(chat_id, password, nick, status)
LOG.info(f"otox_test_groups_join SUCCESS iGrp={iGrp} chat_id={chat_id}")
self.otox_verify_group(otox, iGrp)
except Exception as e:
# gui
LOG.error(f"otox_test_groups_join EXCEPTION {e}")
raise
return iGrp
def otox_test_groups(self,
otox,
group_name='test_group',
nick='test_nick',
topic='Test Topic', # str
) -> int:
try:
iGrp = self.otox_test_groups_create(otox, group_name, nick, topic)
self.otox_verify_group(otox, iGrp)
except Exception as e:
LOG.error(f"otox_test_groups ERROR {e}")
raise
# unfinished
# tox_group_peer_exit_cb
# tox_callback_group_peer_join
# tox.callback_group_peer_status
# tox.callback_group_peer_name
# tox.callback_group_peer_exit
# tox.callback_group_peer_join
return iGrp
def wait_friend_get_connection_status(self, otox, fid:int, n:int = iN) -> int:
i = 0
while i < n:
i += 1
iRet = otox.friend_get_connection_status(fid)
if iRet == TOX_CONNECTION['NONE']:
LOG.debug(f"wait_friend_get_connection_status NOT CONNECTED i={i} fid={fid} {iRet}")
self.loop_until_connected(otox)
else:
LOG.info(f"wait_friend_get_connection_status fid={fid} {iRet}")
return True
else:
LOG.error(f"wait_friend_get_connection_status fid={fid} n={n}")
return False
def warn_if_no_cb(self, alice, sSlot:str) -> None:
if not hasattr(alice, sSlot+'_cb') or \
not getattr(alice, sSlot+'_cb'):
LOG.warning(f"self.bob.{sSlot}_cb NOT EXIST")
def warn_if_cb(self, alice, sSlot:str) -> None:
if hasattr(self.bob, sSlot+'_cb') and \
getattr(self.bob, sSlot+'_cb'):
LOG.warning(f"self.bob.{sSlot}_cb EXIST")

3619
src/tox_wrapper/tox.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

409
src/tox_wrapper/toxav.py Normal file
View File

@ -0,0 +1,409 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
from ctypes import (CFUNCTYPE, POINTER, ArgumentError, byref, c_bool, c_char_p,
c_int, c_int32, c_size_t, c_uint8, c_uint16, c_uint32,
c_void_p, cast)
from typing import Union, Callable
try:
from toxygen_wrapper.libtox import LibToxAV
import toxygen_wrapper.toxav_enums as enum
except:
from libtox import LibToxAV
import toxav_enums as enum
class ToxError(ArgumentError): pass
def LOG_ERROR(a: str) -> None: print('EROR> '+a)
def LOG_WARN(a: str) -> None: print('WARN> '+a)
def LOG_INFO(a: str) -> None: print('INFO> '+a)
def LOG_DEBUG(a: str) -> None: print('DBUG> '+a)
def LOG_TRACE(a: str) -> None: pass # print('DEBUGx: '+a)
class ToxAV:
"""
The ToxAV instance type. Each ToxAV instance can be bound to only one Tox instance, and Tox instance can have only
one ToxAV instance. One must make sure to close ToxAV instance prior closing Tox instance otherwise undefined
behaviour occurs. Upon closing of ToxAV instance, all active calls will be forcibly terminated without notifying
peers.
"""
# Creation and destruction
def __init__(self, tox_pointer):
"""
Start new A/V session. There can only be only one session per Tox instance.
:param tox_pointer: pointer to Tox instance
"""
self.libtoxav = LibToxAV()
toxav_err_new = c_int()
f = self.libtoxav.toxav_new
f.restype = POINTER(c_void_p)
self._toxav_pointer = f(tox_pointer, byref(toxav_err_new))
toxav_err_new = toxav_err_new.value
if toxav_err_new == enum.TOXAV_ERR_NEW['NULL']:
raise ArgumentError('One of the arguments to the function was NULL when it was not expected.')
if toxav_err_new == enum.TOXAV_ERR_NEW['MALLOC']:
raise MemoryError('Memory allocation failure while trying to allocate structures required for the A/V '
'session.')
if toxav_err_new == enum.TOXAV_ERR_NEW['MULTIPLE']:
raise ToxError('Attempted to create a second session for the same Tox instance.')
self.call_state_cb = None
self.audio_receive_frame_cb = None
self.video_receive_frame_cb = None
self.call_cb = None
def kill(self) -> None:
"""
Releases all resources associated with the A/V session.
If any calls were ongoing, these will be forcibly terminated without notifying peers. After calling this
function, no other functions may be called and the av pointer becomes invalid.
"""
self.libtoxav.toxav_kill(self._toxav_pointer)
def get_tox_pointer(self):
"""
Returns the Tox instance the A/V object was created for.
:return: pointer to the Tox instance
"""
self.libtoxav.toxav_get_tox.restype = POINTER(c_void_p)
return self.libtoxav.toxav_get_tox(self._toxav_pointer)
# A/V event loop
def iteration_interval(self) -> int:
"""
Returns the interval in milliseconds when the next toxav_iterate call should be. If no call is active at the
moment, this function returns 200.
:return: interval in milliseconds
"""
return int(self.libtoxav.toxav_iteration_interval(self._toxav_pointer))
def iterate(self) -> None:
"""
Main loop for the session. This function needs to be called in intervals of toxav_iteration_interval()
milliseconds. It is best called in the separate thread from tox_iterate.
"""
self.libtoxav.toxav_iterate(self._toxav_pointer)
# Call setup
def call(self, friend_number: int, audio_bit_rate: int, video_bit_rate: int) -> bool:
"""
Call a friend. This will start ringing the friend.
It is the client's responsibility to stop ringing after a certain timeout, if such behaviour is desired. If the
client does not stop ringing, the library will not stop until the friend is disconnected. Audio and video
receiving are both enabled by default.
:param friend_number: The friend number of the friend that should be called.
:param audio_bit_rate: Audio bit rate in Kb/sec. Set this to 0 to disable audio sending.
:param video_bit_rate: Video bit rate in Kb/sec. Set this to 0 to disable video sending.
:return: True on success.
"""
toxav_err_call = c_int()
LOG_DEBUG(f"toxav_call")
result = self.libtoxav.toxav_call(self._toxav_pointer, c_uint32(friend_number), c_uint32(audio_bit_rate),
c_uint32(video_bit_rate), byref(toxav_err_call))
toxav_err_call = toxav_err_call.value
if toxav_err_call == enum.TOXAV_ERR_CALL['OK']:
return bool(result)
if toxav_err_call == enum.TOXAV_ERR_CALL['MALLOC']:
raise MemoryError('A resource allocation error occurred while trying to create the structures required for '
'the call.')
if toxav_err_call == enum.TOXAV_ERR_CALL['SYNC']:
raise ToxError('Synchronization error occurred.')
if toxav_err_call == enum.TOXAV_ERR_CALL['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
if toxav_err_call == enum.TOXAV_ERR_CALL['FRIEND_NOT_CONNECTED']:
raise ArgumentError('The friend was valid, but not currently connected.')
if toxav_err_call == enum.TOXAV_ERR_CALL['FRIEND_ALREADY_IN_CALL']:
raise ArgumentError('Attempted to call a friend while already in an audio or video call with them.')
if toxav_err_call == enum.TOXAV_ERR_CALL['INVALID_BIT_RATE']:
raise ArgumentError('Audio or video bit rate is invalid.')
raise ArgumentError('The function did not return OK')
def callback_call(self, callback: Union[Callable,None], user_data) -> None:
"""
Set the callback for the `call` event. Pass None to unset.
:param callback: The function for the call callback.
Should take pointer (c_void_p) to ToxAV object,
The friend number (c_uint32) from which the call is incoming.
True (c_bool) if friend is sending audio.
True (c_bool) if friend is sending video.
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
if callback is None:
self.libtoxav.toxav_callback_call(self._toxav_pointer, POINTER(None)(), user_data)
self.call_cb = None
return
LOG_DEBUG(f"toxav_callback_call")
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_bool, c_bool, c_void_p)
self.call_cb = c_callback(callback)
self.libtoxav.toxav_callback_call(self._toxav_pointer, self.call_cb, user_data)
def answer(self, friend_number: int, audio_bit_rate: int, video_bit_rate: int) -> bool:
"""
Accept an incoming call.
If answering fails for any reason, the call will still be pending and it is possible to try and answer it later.
Audio and video receiving are both enabled by default.
:param friend_number: The friend number of the friend that is calling.
:param audio_bit_rate: Audio bit rate in Kb/sec. Set this to 0 to disable audio sending.
:param video_bit_rate: Video bit rate in Kb/sec. Set this to 0 to disable video sending.
:return: True on success.
"""
toxav_err_answer = c_int()
LOG_DEBUG(f"toxav_answer")
result = self.libtoxav.toxav_answer(self._toxav_pointer,
c_uint32(friend_number),
c_uint32(audio_bit_rate),
c_uint32(video_bit_rate),
byref(toxav_err_answer))
toxav_err_answer = toxav_err_answer.value
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['OK']:
return bool(result)
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['SYNC']:
raise ToxError('Synchronization error occurred.')
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['CODEC_INITIALIZATION']:
raise ToxError('Failed to initialize codecs for call session. Note that codec initiation will fail if '
'there is no receive callback registered for either audio or video.')
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend number did not designate a valid friend.')
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['FRIEND_NOT_CALLING']:
raise ArgumentError('The friend was valid, but they are not currently trying to initiate a call. This is '
'also returned if this client is already in a call with the friend.')
if toxav_err_answer == enum.TOXAV_ERR_ANSWER['INVALID_BIT_RATE']:
raise ArgumentError('Audio or video bit rate is invalid.')
raise ToxError('The function did not return OK')
# Call state graph
def callback_call_state(self, callback: Union[Callable,None], user_data) -> None:
"""
Set the callback for the `call_state` event. Pass None to unset.
:param callback: Python function.
The function for the call_state callback.
Should take pointer (c_void_p) to ToxAV object,
The friend number (c_uint32) for which the call state changed.
The bitmask of the new call state which is guaranteed to be different than the previous state. The state is set
to 0 when the call is paused. The bitmask represents all the activities currently performed by the friend.
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
if callback is None:
self.libtoxav.toxav_callback_call_state(self._toxav_pointer, POINTER(None)(), user_data)
self.call_state_cb = None
return
LOG_DEBUG(f"callback_call_state")
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint32, c_void_p)
self.call_state_cb = c_callback(callback)
self.libtoxav.toxav_callback_call_state(self._toxav_pointer, self.call_state_cb, user_data)
# Call control
def call_control(self, friend_number: int, control: int) -> bool:
"""
Sends a call control command to a friend.
:param friend_number: The friend number of the friend this client is in a call with.
:param control: The control command to send.
:return: True on success.
"""
toxav_err_call_control = c_int()
LOG_DEBUG(f"call_control")
result = self.libtoxav.toxav_call_control(self._toxav_pointer,
c_uint32(friend_number),
c_int(control),
byref(toxav_err_call_control))
toxav_err_call_control = toxav_err_call_control.value
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['OK']:
return True
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['SYNC']:
raise ToxError('Synchronization error occurred.')
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['FRIEND_NOT_IN_CALL']:
raise ToxError('This client is currently not in a call with the friend. Before the call is answered, '
'only CANCEL is a valid control.')
if toxav_err_call_control == enum.TOXAV_ERR_CALL_CONTROL['INVALID_TRANSITION']:
raise ToxError('Happens if user tried to pause an already paused call or if trying to resume a call '
'that is not paused.')
raise ToxError('The function did not return OK.')
# TODO Controlling bit rates
# A/V sending
def audio_send_frame(self, friend_number: int, pcm, sample_count: int, channels: int, sampling_rate: int) -> bool:
"""
Send an audio frame to a friend.
The expected format of the PCM data is: [s1c1][s1c2][...][s2c1][s2c2][...]...
Meaning: sample 1 for channel 1, sample 1 for channel 2, ...
For mono audio, this has no meaning, every sample is subsequent. For stereo, this means the expected format is
LRLRLR... with samples for left and right alternating.
:param friend_number: The friend number of the friend to which to send an audio frame.
:param pcm: An array of audio samples. The size of this array must be sample_count * channels.
:param sample_count: Number of samples in this frame. Valid numbers here are
((sample rate) * (audio length) / 1000), where audio length can be 2.5, 5, 10, 20, 40 or 60 milliseconds.
:param channels: Number of audio channels. Sulpported values are 1 and 2.
:param sampling_rate: Audio sampling rate used in this frame. Valid sampling rates are 8000, 12000, 16000,
24000, or 48000.
"""
toxav_err_send_frame = c_int()
LOG_TRACE(f"toxav_audio_send_frame")
assert sampling_rate in [8000, 12000, 16000, 24000, 48000]
result = self.libtoxav.toxav_audio_send_frame(self._toxav_pointer,
c_uint32(friend_number),
cast(pcm, c_void_p),
c_size_t(sample_count), c_uint8(channels),
c_uint32(sampling_rate), byref(toxav_err_send_frame))
toxav_err_send_frame = toxav_err_send_frame.value
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['OK']:
return bool(result)
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']:
raise ArgumentError('The samples data pointer was NULL.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']:
raise ToxError('This client is currently not in a call with the friend.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']:
raise ToxError('Synchronization error occurred.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']:
raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too '
'large, or the audio sampling rate may be unsupported.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']:
raise ToxError('Either friend turned off audio or video receiving or we turned off sending for the said'
'payload.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']:
ToxError('Failed to push frame through rtp interface.')
raise ToxError('The function did not return OK.')
def video_send_frame(self, friend_number: int, width: int, height: int, y, u, v) -> bool:
"""
Send a video frame to a friend.
Y - plane should be of size: height * width
U - plane should be of size: (height/2) * (width/2)
V - plane should be of size: (height/2) * (width/2)
:param friend_number: The friend number of the friend to which to send a video frame.
:param width: Width of the frame in pixels.
:param height: Height of the frame in pixels.
:param y: Y (Luminance) plane data.
:param u: U (Chroma) plane data.
:param v: V (Chroma) plane data.
"""
toxav_err_send_frame = c_int()
LOG_TRACE(f"toxav_video_send_frame")
result = self.libtoxav.toxav_video_send_frame(self._toxav_pointer,
c_uint32(friend_number),
c_uint16(width),
c_uint16(height),
c_char_p(y),
c_char_p(u),
c_char_p(v),
byref(toxav_err_send_frame))
toxav_err_send_frame = toxav_err_send_frame.value
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['OK']:
return bool(result)
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['NULL']:
raise ArgumentError('One of Y, U, or V was NULL.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']:
raise ArgumentError('The friend_number passed did not designate a valid friend.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']:
raise ToxError('This client is currently not in a call with the friend.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['SYNC']:
raise ToxError('Synchronization error occurred.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['INVALID']:
raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too '
'large, or the audio sampling rate may be unsupported.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']:
raise ToxError('Either friend turned off audio or video receiving or we turned off sending for the said'
'payload.')
if toxav_err_send_frame == enum.TOXAV_ERR_SEND_FRAME['RTP_FAILED']:
ToxError('Failed to push frame through rtp interface.')
raise ToxError('The function did not return OK.')
# A/V receiving
def callback_audio_receive_frame(self, callback: Union[Callable,None], user_data) -> None:
"""
Set the callback for the `audio_receive_frame` event. Pass None to unset.
:param callback: Python function.
Function for the audio_receive_frame callback. The callback can be called multiple times per single
iteration depending on the amount of queued frames in the buffer. The received format is the same as in send
function.
Should take pointer (c_void_p) to ToxAV object,
The friend number (c_uint32) of the friend who sent an audio frame.
An array (c_uint8) of audio samples (sample_count * channels elements).
The number (c_size_t) of audio samples per channel in the PCM array.
Number (c_uint8) of audio channels.
Sampling rate (c_uint32) used in this frame.
pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
if callback is None:
self.libtoxav.toxav_callback_audio_receive_frame(self._toxav_pointer,
POINTER(None)(),
user_data)
self.audio_receive_frame_cb = None
return
LOG_DEBUG(f"toxav_callback_audio_receive_frame")
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, POINTER(c_uint8), c_size_t, c_uint8, c_uint32, c_void_p)
self.audio_receive_frame_cb = c_callback(callback)
self.libtoxav.toxav_callback_audio_receive_frame(self._toxav_pointer, self.audio_receive_frame_cb, user_data)
def callback_video_receive_frame(self, callback: Union[Callable,None], user_data) -> None:
"""
Set the callback for the `video_receive_frame` event. Pass None to unset.
:param callback: Python function.
The function type for the video_receive_frame callback.
Should take
toxAV pointer (c_void_p) to ToxAV object,
friend_number The friend number (c_uint32) of the friend who sent a video frame.
width Width (c_uint16) of the frame in pixels.
height Height (c_uint16) of the frame in pixels.
y
u
v Plane data (POINTER(c_uint8)).
The size of plane data is derived from width and height where
Y = MAX(width, abs(ystride)) * height,
U = MAX(width/2, abs(ustride)) * (height/2) and
V = MAX(width/2, abs(vstride)) * (height/2).
ystride
ustride
vstride Strides data (c_int32). Strides represent padding for each plane that may or may not be present. You must
handle strides in your image processing code. Strides are negative if the image is bottom-up
hence why you MUST abs() it when calculating plane buffer size.
user_data pointer (c_void_p) to user_data
:param user_data: pointer (c_void_p) to user data
"""
if callback is None:
self.libtoxav.toxav_callback_video_receive_frame(self._toxav_pointer, POINTER(None)(), user_data)
self.video_receive_frame_cb = None
return
LOG_DEBUG(f"toxav_callback_video_receive_frame")
c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint16, c_uint16,
POINTER(c_uint8), POINTER(c_uint8), POINTER(c_uint8),
c_int32, c_int32, c_int32,
c_void_p)
self.video_receive_frame_cb = c_callback(callback)
self.libtoxav.toxav_callback_video_receive_frame(self._toxav_pointer, self.video_receive_frame_cb, user_data)

View File

@ -0,0 +1,983 @@
# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*-
from tox_wrapper.tox_ctypesgen import *
TOX_USER_STATUS = {
'NONE': 0,
'AWAY': 1,
'BUSY': 2,
}
TOX_MESSAGE_TYPE = {
'NORMAL': 0,
'ACTION': 1,
}
TOX_PROXY_TYPE = {
'NONE': 0,
'HTTP': 1,
'SOCKS5': 2,
}
TOX_SAVEDATA_TYPE = {
'NONE': 0,
'TOX_SAVE': 1,
'SECRET_KEY': 2,
}
TOX_ERR_OPTIONS_NEW = {
'OK': 0,
'MALLOC': 1,
}
TOX_ERR_NEW = {
'OK': 0,
'NULL': 1,
'MALLOC': 2,
'PORT_ALLOC': 3,
'PROXY_BAD_TYPE': 4,
'PROXY_BAD_HOST': 5,
'PROXY_BAD_PORT': 6,
'PROXY_NOT_FOUND': 7,
'LOAD_ENCRYPTED': 8,
'LOAD_BAD_FORMAT': 9,
'TCP_SERVER_ALLOC': 10,
}
TOX_ERR_BOOTSTRAP = {
'OK': 0,
'NULL': 1,
'BAD_HOST': 2,
'BAD_PORT': 3,
}
TOX_CONNECTION = {
'NONE': 0,
'TCP': 1,
'UDP': 2,
}
TOX_ERR_SET_INFO = {
'OK': 0,
'NULL': 1,
'TOO_LONG': 2,
# The function returned successfully.
'TOX_ERR_SET_INFO_OK': 0,
# One of the arguments to the function was NULL when it was not expected.
'TOX_ERR_SET_INFO_NULL': 1,
# Information length exceeded maximum permissible size.
'TOX_ERR_SET_INFO_TOO_LONG': 2,
}
TOX_ERR_FRIEND_ADD = {
'OK': 0,
'NULL': 1,
'TOO_LONG': 2,
'NO_MESSAGE': 3,
'OWN_KEY': 4,
'ALREADY_SENT': 5,
'BAD_CHECKSUM': 6,
'SET_NEW_NOSPAM': 7,
'MALLOC': 8,
}
TOX_ERR_FRIEND_DELETE = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
}
TOX_ERR_FRIEND_BY_PUBLIC_KEY = {
'OK': 0,
'NULL': 1,
'NOT_FOUND': 2,
}
TOX_ERR_FRIEND_GET_PUBLIC_KEY = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
}
TOX_ERR_FRIEND_GET_LAST_ONLINE = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
}
TOX_ERR_FRIEND_QUERY = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
}
TOX_ERR_SET_TYPING = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
}
TOX_ERR_FRIEND_SEND_MESSAGE = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
'FRIEND_NOT_CONNECTED': 3,
'SENDQ': 4,
'TOO_LONG': 5,
'EMPTY': 6,
}
TOX_FILE_KIND = {
'DATA': 0,
'AVATAR': 1,
}
TOX_FILE_CONTROL = {
'RESUME': 0,
'PAUSE': 1,
'CANCEL': 2,
}
TOX_ERR_FILE_CONTROL = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
'FRIEND_NOT_CONNECTED': 2,
'NOT_FOUND': 3,
'NOT_PAUSED': 4,
'DENIED': 5,
'ALREADY_PAUSED': 6,
'SENDQ': 7,
}
TOX_ERR_FILE_SEEK = {
'OK': 0,
'FRIEND_NOT_FOUND': 1,
'FRIEND_NOT_CONNECTED': 2,
'NOT_FOUND': 3,
'DENIED': 4,
'INVALID_POSITION': 5,
'SENDQ': 6,
}
TOX_ERR_FILE_GET = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
'NOT_FOUND': 3,
}
TOX_ERR_FILE_SEND = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
'FRIEND_NOT_CONNECTED': 3,
'NAME_TOO_LONG': 4,
'TOO_MANY': 5,
}
TOX_ERR_FILE_SEND_CHUNK = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
'FRIEND_NOT_CONNECTED': 3,
'NOT_FOUND': 4,
'NOT_TRANSFERRING': 5,
'INVALID_LENGTH': 6,
'SENDQ': 7,
'WRONG_POSITION': 8,
}
TOX_ERR_FRIEND_CUSTOM_PACKET = {
'OK': 0,
'NULL': 1,
'FRIEND_NOT_FOUND': 2,
'FRIEND_NOT_CONNECTED': 3,
'INVALID': 4,
'EMPTY': 5,
'TOO_LONG': 6,
'SENDQ': 7,
}
TOX_ERR_GET_PORT = {
'OK': 0,
'NOT_BOUND': 1,
}
TOX_GROUP_PRIVACY_STATE = {
#
# The group is considered to be public. Anyone may join the group using the Chat ID.
#
# If the group is in this state, even if the Chat ID is never explicitly shared
# with someone outside of the group, information including the Chat ID, IP addresses,
# and peer ID's (but not Tox ID's) is visible to anyone with access to a node
# storing a DHT entry for the given group.
#
'PUBLIC': 0,
#
# The group is considered to be private. The only way to join the group is by having
# someone in your contact list send you an invite.
#
# If the group is in this state, no group information (mentioned above) is present in the DHT;
# the DHT is not used for any purpose at all. If a public group is set to private,
# all DHT information related to the group will expire shortly.
#
'PRIVATE': 1
}
TOX_GROUP_ROLE = {
#
# May kick and ban all other peers as well as set their role to anything (except founder).
# Founders may also set the group password, toggle the privacy state, and set the peer limit.
#
'FOUNDER': 0,
#
# May kick, ban and set the user and observer roles for peers below this role.
# May also set the group topic.
#
'MODERATOR': 1,
#
# May communicate with other peers normally.
#
'USER': 2,
#
# May observe the group and ignore peers; may not communicate with other peers or with the group.
#
'OBSERVER': 3
}
TOX_ERR_GROUP_NEW = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_NEW_OK': 0,
#
# The group name exceeded TOX_GROUP_MAX_GROUP_NAME_LENGTH.
#
'TOX_ERR_GROUP_NEW_TOO_LONG': 1,
#
# group_name is NULL or length is zero.
#
'TOX_ERR_GROUP_NEW_EMPTY': 2,
#
# TOX_GROUP_PRIVACY_STATE is an invalid type.
#
'TOX_ERR_GROUP_NEW_PRIVACY': 3,
#
# The group instance failed to initialize.
#
'TOX_ERR_GROUP_NEW_INIT': 4,
#
# The group state failed to initialize. This usually indicates that something went wrong
# related to cryptographic signing.
#
'TOX_ERR_GROUP_NEW_STATE': 5,
#
# The group failed to announce to the DHT. This indicates a network related error.
#
'TOX_ERR_GROUP_NEW_ANNOUNCE': 6,
}
TOX_ERR_GROUP_JOIN = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_JOIN_OK': 0,
#
# The group instance failed to initialize.
#
'TOX_ERR_GROUP_JOIN_INIT': 1,
#
# The chat_id pointer is set to NULL or a group with chat_id already exists. This usually
# happens if the client attempts to create multiple sessions for the same group.
#
'TOX_ERR_GROUP_JOIN_BAD_CHAT_ID': 2,
#
# Password length exceeded TOX_GROUP_MAX_PASSWORD_SIZE.
#
'TOX_ERR_GROUP_JOIN_TOO_LONG': 3,
}
TOX_ERR_GROUP_IS_CONNECTED = {
'TOX_ERR_GROUP_IS_CONNECTED_OK': 0,
'TOX_ERR_GROUP_IS_CONNECTED_GROUP_NOT_FOUND': 1
}
TOX_ERR_GROUP_RECONNECT = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_RECONNECT_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_RECONNECT_GROUP_NOT_FOUND': 1,
}
TOX_ERR_GROUP_DISCONNECT = {
# The function returned successfully.
'TOX_ERR_GROUP_DISCONNECT_OK': 0,
# The group number passed did not designate a valid group.
'TOX_ERR_GROUP_DISCONNECT_GROUP_NOT_FOUND': 1,
# The group is already disconnected.
'TOX_ERR_GROUP_DISCONNECT_ALREADY_DISCONNECTED': 2,
}
TOX_ERR_GROUP_LEAVE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_LEAVE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_LEAVE_GROUP_NOT_FOUND': 1,
#
# Message length exceeded 'TOX_GROUP_MAX_PART_LENGTH.
#
'TOX_ERR_GROUP_LEAVE_TOO_LONG': 2,
#
# The parting packet failed to send.
#
'TOX_ERR_GROUP_LEAVE_FAIL_SEND': 3,
#
# The group chat instance failed to be deleted. This may occur due to memory related errors.
#
'TOX_ERR_GROUP_LEAVE_DELETE_FAIL': 4,
}
TOX_ERR_GROUP_SELF_QUERY = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SELF_QUERY_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SELF_QUERY_GROUP_NOT_FOUND': 1,
}
TOX_ERR_GROUP_SELF_NAME_SET = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SELF_NAME_SET_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SELF_NAME_SET_GROUP_NOT_FOUND': 1,
#
# Name length exceeded 'TOX_MAX_NAME_LENGTH.
#
'TOX_ERR_GROUP_SELF_NAME_SET_TOO_LONG': 2,
#
# The length given to the set function is zero or name is a NULL pointer.
#
'TOX_ERR_GROUP_SELF_NAME_SET_INVALID': 3,
#
# The name is already taken by another peer in the group.
#
'TOX_ERR_GROUP_SELF_NAME_SET_TAKEN': 4,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_SELF_NAME_SET_FAIL_SEND': 5
}
TOX_ERR_GROUP_SELF_STATUS_SET = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SELF_STATUS_SET_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SELF_STATUS_SET_GROUP_NOT_FOUND': 1,
#
# An invalid type was passed to the set function.
#
'TOX_ERR_GROUP_SELF_STATUS_SET_INVALID': 2,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_SELF_STATUS_SET_FAIL_SEND': 3
}
TOX_ERR_GROUP_PEER_QUERY = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_PEER_QUERY_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_PEER_QUERY_GROUP_NOT_FOUND': 1,
#
# The ID passed did not designate a valid peer.
#
'TOX_ERR_GROUP_PEER_QUERY_PEER_NOT_FOUND': 2
}
TOX_ERR_GROUP_STATE_QUERIES = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_STATE_QUERIES_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_STATE_QUERIES_GROUP_NOT_FOUND': 1
}
TOX_ERR_GROUP_TOPIC_SET = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_TOPIC_SET_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_TOPIC_SET_GROUP_NOT_FOUND': 1,
#
# Topic length exceeded 'TOX_GROUP_MAX_TOPIC_LENGTH.
#
'TOX_ERR_GROUP_TOPIC_SET_TOO_LONG': 2,
#
# The caller does not have the required permissions to set the topic.
#
'TOX_ERR_GROUP_TOPIC_SET_PERMISSIONS': 3,
#
# The packet could not be created. This error is usually related to cryptographic signing.
#
'TOX_ERR_GROUP_TOPIC_SET_FAIL_CREATE': 4,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_TOPIC_SET_FAIL_SEND': 5
}
TOX_ERR_GROUP_SEND_MESSAGE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SEND_MESSAGE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SEND_MESSAGE_GROUP_NOT_FOUND': 1,
#
# Message length exceeded 'TOX_MAX_MESSAGE_LENGTH.
#
'TOX_ERR_GROUP_SEND_MESSAGE_TOO_LONG': 2,
#
# The message pointer is null or length is zero.
#
'TOX_ERR_GROUP_SEND_MESSAGE_EMPTY': 3,
#
# The message type is invalid.
#
'TOX_ERR_GROUP_SEND_MESSAGE_BAD_TYPE': 4,
#
# The caller does not have the required permissions to send group messages.
#
'TOX_ERR_GROUP_SEND_MESSAGE_PERMISSIONS': 5,
#
# Packet failed to send.
#
'TOX_ERR_GROUP_SEND_MESSAGE_FAIL_SEND': 6
}
TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_GROUP_NOT_FOUND': 1,
#
# The ID passed did not designate a valid peer.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_PEER_NOT_FOUND': 2,
#
# Message length exceeded 'TOX_MAX_MESSAGE_LENGTH.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_TOO_LONG': 3,
#
# The message pointer is null or length is zero.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_EMPTY': 4,
#
# The caller does not have the required permissions to send group messages.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_PERMISSIONS': 5,
#
# Packet failed to send.
#
'TOX_ERR_GROUP_SEND_PRIVATE_MESSAGE_FAIL_SEND': 6
}
TOX_ERR_GROUP_SEND_CUSTOM_PACKET = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_SEND_CUSTOM_PACKET_GROUP_NOT_FOUND': 1,
#
# Message length exceeded 'TOX_MAX_MESSAGE_LENGTH.
#
'TOX_ERR_GROUP_SEND_CUSTOM_PACKET_TOO_LONG': 2,
#
# The message pointer is null or length is zero.
#
'TOX_ERR_GROUP_SEND_CUSTOM_PACKET_EMPTY': 3,
#
# The caller does not have the required permissions to send group messages.
#
'TOX_ERR_GROUP_SEND_CUSTOM_PACKET_PERMISSIONS': 4
}
TOX_ERR_GROUP_INVITE_FRIEND = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_INVITE_FRIEND_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_INVITE_FRIEND_GROUP_NOT_FOUND': 1,
#
# The friend number passed did not designate a valid friend.
#
'TOX_ERR_GROUP_INVITE_FRIEND_FRIEND_NOT_FOUND': 2,
#
# Creation of the invite packet failed. This indicates a network related error.
#
'TOX_ERR_GROUP_INVITE_FRIEND_INVITE_FAIL': 3,
#
# Packet failed to send.
#
'TOX_ERR_GROUP_INVITE_FRIEND_FAIL_SEND': 4
}
TOX_ERR_GROUP_INVITE_ACCEPT = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_INVITE_ACCEPT_OK': 0,
#
# The invite data is not in the expected format.
#
'TOX_ERR_GROUP_INVITE_ACCEPT_BAD_INVITE': 1,
#
# The group instance failed to initialize.
#
'TOX_ERR_GROUP_INVITE_ACCEPT_INIT_FAILED': 2,
#
# Password length exceeded 'TOX_GROUP_MAX_PASSWORD_SIZE.
#
'TOX_ERR_GROUP_INVITE_ACCEPT_TOO_LONG': 3
}
TOX_GROUP_JOIN_FAIL = {
#
# You are using the same nickname as someone who is already in the group.
#
'TOX_GROUP_JOIN_FAIL_NAME_TAKEN': 0,
#
# The group peer limit has been reached.
#
'TOX_GROUP_JOIN_FAIL_PEER_LIMIT': 1,
#
# You have supplied an invalid password.
#
'TOX_GROUP_JOIN_FAIL_INVALID_PASSWORD': 2,
#
# The join attempt failed due to an unspecified error. This often occurs when the group is
# not found in the DHT.
#
'TOX_GROUP_JOIN_FAIL_UNKNOWN': 3
}
TOX_ERR_GROUP_FOUNDER_SET_PASSWORD = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_FOUNDER_SET_PASSWORD_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_FOUNDER_SET_PASSWORD_GROUP_NOT_FOUND': 1,
#
# The caller does not have the required permissions to set the password.
#
'TOX_ERR_GROUP_FOUNDER_SET_PASSWORD_PERMISSIONS': 2,
#
# Password length exceeded 'TOX_GROUP_MAX_PASSWORD_SIZE.
#
'TOX_ERR_GROUP_FOUNDER_SET_PASSWORD_TOO_LONG': 3,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_FOUNDER_SET_PASSWORD_FAIL_SEND': 4
}
TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_GROUP_NOT_FOUND': 1,
#
# 'TOX_GROUP_PRIVACY_STATE is an invalid type.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_INVALID': 2,
#
# The caller does not have the required permissions to set the privacy state.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_PERMISSIONS': 3,
#
# The privacy state could not be set. This may occur due to an error related to
# cryptographic signing of the new shared state.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_FAIL_SET': 4,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_FOUNDER_SET_PRIVACY_STATE_FAIL_SEND': 5
}
TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT_GROUP_NOT_FOUND': 1,
#
# The caller does not have the required permissions to set the peer limit.
#
'TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT_PERMISSIONS': 2,
#
# The peer limit could not be set. This may occur due to an error related to
# cryptographic signing of the new shared state.
#
'TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT_FAIL_SET': 3,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_FOUNDER_SET_PEER_LIMIT_FAIL_SEND': 4
}
TOX_ERR_GROUP_TOGGLE_IGNORE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_TOGGLE_IGNORE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_TOGGLE_IGNORE_GROUP_NOT_FOUND': 1,
#
# The ID passed did not designate a valid peer.
#
'TOX_ERR_GROUP_TOGGLE_IGNORE_PEER_NOT_FOUND': 2
}
TOX_ERR_GROUP_MOD_SET_ROLE = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_GROUP_NOT_FOUND': 1,
#
# The ID passed did not designate a valid peer. Note: you cannot set your own role.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_PEER_NOT_FOUND': 2,
#
# The caller does not have the required permissions for this action.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_PERMISSIONS': 3,
#
# The role assignment is invalid. This will occur if you try to set a peer's role to
# the role they already have.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_ASSIGNMENT': 4,
#
# The role was not successfully set. This may occur if something goes wrong with role setting': ,
# or if the packet fails to send.
#
'TOX_ERR_GROUP_MOD_SET_ROLE_FAIL_ACTION': 5
}
TOX_ERR_GROUP_MOD_REMOVE_PEER = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_GROUP_NOT_FOUND': 1,
#
# The ID passed did not designate a valid peer.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_PEER_NOT_FOUND': 2,
#
# The caller does not have the required permissions for this action.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_PERMISSIONS': 3,
#
# The peer could not be removed from the group.
#
# If a ban was set': , this error indicates that the ban entry could not be created.
# This is usually due to the peer's IP address already occurring in the ban list. It may also
# be due to the entry containing invalid peer information': , or a failure to cryptographically
# authenticate the entry.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_FAIL_ACTION': 4,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_MOD_REMOVE_PEER_FAIL_SEND': 5
}
TOX_ERR_GROUP_MOD_REMOVE_BAN = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_MOD_REMOVE_BAN_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_MOD_REMOVE_BAN_GROUP_NOT_FOUND': 1,
#
# The caller does not have the required permissions for this action.
#
'TOX_ERR_GROUP_MOD_REMOVE_BAN_PERMISSIONS': 2,
#
# The ban entry could not be removed. This may occur if ban_id does not designate
# a valid ban entry.
#
'TOX_ERR_GROUP_MOD_REMOVE_BAN_FAIL_ACTION': 3,
#
# The packet failed to send.
#
'TOX_ERR_GROUP_MOD_REMOVE_BAN_FAIL_SEND': 4
}
TOX_GROUP_MOD_EVENT = {
#
# A peer has been kicked from the group.
#
'KICK': 0,
#
# A peer has been banned from the group.
#
'BAN': 1,
#
# A peer as been given the observer role.
#
'OBSERVER': 2,
#
# A peer has been given the user role.
#
'USER': 3,
#
# A peer has been given the moderator role.
#
'MODERATOR': 4,
}
TOX_ERR_GROUP_BAN_QUERY = {
#
# The function returned successfully.
#
'TOX_ERR_GROUP_BAN_QUERY_OK': 0,
#
# The group number passed did not designate a valid group.
#
'TOX_ERR_GROUP_BAN_QUERY_GROUP_NOT_FOUND': 1,
#
# The ban_id does not designate a valid ban list entry.
#
'TOX_ERR_GROUP_BAN_QUERY_BAD_ID': 2,
}
TOX_GROUP_BAN_TYPE = {
'IP_PORT': 0,
'PUBLIC_KEY': 1,
'NICK': 2
}
TOX_PUBLIC_KEY_SIZE = 32
TOX_ADDRESS_SIZE = TOX_PUBLIC_KEY_SIZE + 6
TOX_MAX_FRIEND_REQUEST_LENGTH = 1016
TOX_MAX_MESSAGE_LENGTH = 1372
TOX_GROUP_MAX_TOPIC_LENGTH = 512
TOX_GROUP_MAX_PART_LENGTH = 128
TOX_GROUP_MAX_GROUP_NAME_LENGTH = 48
TOX_GROUP_MAX_PASSWORD_SIZE = 32
TOX_GROUP_CHAT_ID_SIZE = 32
TOX_GROUP_PEER_PUBLIC_KEY_SIZE = 32
TOX_MAX_NAME_LENGTH = 128
TOX_MAX_STATUS_MESSAGE_LENGTH = 1007
TOX_SECRET_KEY_SIZE = 32
TOX_FILE_ID_LENGTH = 32
TOX_HASH_LENGTH = 32
TOX_MAX_CUSTOM_PACKET_SIZE = 1373