# -*- mode: python; indent-tabs-mode: nil; py-indent-offset: 4; coding: utf-8 -*- import os import sys import traceback from random import shuffle import threading from time import sleep from gevent import monkey; monkey.patch_all(); del monkey # noqa import gevent import tests.support_testing as ts from user_data import settings IDLE_PERIOD = 0.10 try: import coloredlogs if 'COLOREDLOGS_LEVEL_STYLES' not in os.environ: os.environ['COLOREDLOGS_LEVEL_STYLES'] = 'spam=22;debug=28;verbose=34;notice=220;warning=202;success=118,bold;error=124;critical=background=red' # https://pypi.org/project/coloredlogs/ except ImportError as e: coloredlogs = False global LOG import logging LOG = logging.getLogger('app') def setup_logging(oArgs): global LOG if coloredlogs: aKw = dict(level=oArgs.loglevel, logger=LOG, fmt='%(name)s %(levelname)s %(message)s') if oArgs.logfile: oFd = open(oArgs.logfile, 'wt') setattr(oArgs, 'log_oFd', oFd) aKw['stream'] = oFd coloredlogs.install(**aKw) else: aKw = dict(level=oArgs.loglevel, format='%(name)s %(levelname)-4s %(message)s') if oArgs.logfile: aKw['filename'] = oArgs.logfile logging.basicConfig(**aKw) if oArgs.logfile: oHandler = logging.StreamHandler(stream=sys.stdout) LOG.addHandler(oHandler) logging._defaultFormatter = logging.Formatter(datefmt='%m-%d %H:%M:%S') logging._defaultFormatter.default_time_format = '%m-%d %H:%M:%S' logging._defaultFormatter.default_msec_format = '' LOG.setLevel(oArgs.loglevel) LOG.trace = lambda l: LOG.log(0, repr(l)) LOG.info(f"Setting loglevel to {oArgs.loglevel!s}") if oArgs.loglevel < 20: # opencv debug sys.OpenCV_LOADER_DEBUG = True #? with ignoreStderr(): for png # silence logging PyQt5.uic.uiparser logging.getLogger('PyQt5.uic').setLevel(logging.ERROR) logging.getLogger('PyQt5.uic.uiparser').setLevel(logging.ERROR) logging.getLogger('PyQt5.uic.properties').setLevel(logging.ERROR) from PyQt5 import QtWidgets, QtGui, QtCore from qtpy.QtCore import QTimer from qtpy.QtWidgets import QApplication try: import qdarkstylexxx except ImportError: qdarkstyle = None from middleware import threads import middleware.callbacks as callbacks import ui.password_screen as password_screen import updater.updater as updater from middleware.tox_factory import tox_factory import wrapper.toxencryptsave as tox_encrypt_save import user_data.toxes from user_data import settings from user_data.settings import get_user_config_path, merge_args_into_settings from user_data.settings import Settings from ui.login_screen import LoginScreen from user_data.profile_manager import ProfileManager from plugin_support.plugin_support import PluginLoader from ui.main_screen import MainWindow from ui import tray import utils.ui as util_ui import utils.util as util from contacts.profile import Profile from file_transfers.file_transfers_handler import FileTransfersHandler from contacts.contact_provider import ContactProvider from contacts.friend_factory import FriendFactory from contacts.group_factory import GroupFactory from contacts.contacts_manager import ContactsManager from av.calls_manager import CallsManager from history.database import Database from ui.widgets_factory import WidgetsFactory from smileys.smileys import SmileyLoader from ui.items_factories import MessagesItemsFactory, ContactItemsFactory from messenger.messenger import Messenger from network.tox_dns import ToxDns from history.history import History from file_transfers.file_transfers_messages_service import FileTransfersMessagesService from groups.groups_service import GroupsService from ui.create_profile_screen import CreateProfileScreen from common.provider import Provider from contacts.group_peer_factory import GroupPeerFactory from user_data.backup_service import BackupService import styles.style # TODO: dynamic loading from tests.support_testing import lLOCAL, lGOOD, lNEW, lRELAYS, inodeinfo_test from tests.bootstrap_node_info import iNodeInfo from tests.tests_socks import main as oTOX_OPTIONS, iMain, ToxOptions global iI iI = 0 sSTYLE = """ .QWidget {font-family Helvetica;} .QCheckBox { font-family Helvetica;} .QComboBox { font-family Helvetica;} .QGroupBox { font-family Helvetica;} .QLabel {font-family Helvetica;} .QLineEdit { font-family Helvetica;} .QListWidget { font-family Helvetica;} .QListWidgetItem { font-family Helvetica;} .QMainWindow {font-family Helvetica;} .QMenu {font-family Helvetica;} .QMenuBar {font-family Helvetica;} .QPlainText {font-family Courier; weight: 75;} .QPlainTextEdit {font-family Courier;} .QPushButton {font-family Helvetica;} .QRadioButton { font-family Helvetica; } .QText {font-family Courier; weight: 75; } .QTextBrowser {font-family Courier; weight: 75; } .QTextSingleLine {font-family Courier; weight: 75; } .QToolBar { font-weight: bold; } """ from copy import deepcopy class App: def __init__(self, version, args): global LOG self._args = args self._path = path_to_profile = args.profile uri = args.uri logfile = args.logfile loglevel = args.loglevel setup_logging(args) # sys.stderr.write( 'Command line args: ' +repr(oArgs) +'\n') LOG.info("Command line: " +' '.join(sys.argv[1:])) LOG.debug(f'oArgs = {args!r}') LOG.info("Starting toxygen version " +version) self._version = version self._app = self._settings = self._profile_manager = None self._plugin_loader = self._messenger = None self._tox = self._ms = self._init = self._main_loop = self._av_loop = None self._uri = self._toxes = self._tray = self._file_transfer_handler = self._contacts_provider = None self._friend_factory = self._calls_manager = None self._contacts_manager = self._smiley_loader = None self._group_peer_factory = self._tox_dns = self._backup_service = None self._group_factory = self._groups_service = self._profile = None if uri is not None and uri.startswith('tox:'): self._uri = uri[4:] # ----------------------------------------------------------------------------------------------------------------- # Public methods # ----------------------------------------------------------------------------------------------------------------- def set_trace(self): LOG.debug('pdb.set_trace ') sys.stdin = sys.__stdin__ sys.stdout = sys.__stdout__ import pdb; pdb.set_trace() def ten(self, i=0): global iI iI += 1 if logging.getLogger('app').getEffectiveLevel() != 10: sys.stderr.write('CHANGED '+str(logging.getLogger().level+'\n')) LOG.setLevel(10) LOG.root.setLevel(10) logging.getLogger('app').setLevel(10) #sys.stderr.write(f"ten '+str(iI)+' {i}"+' '+repr(LOG) +'\n') #LOG.debug('ten '+str(iI)) def iMain(self): """ Main function of app. loads login screen if needed and starts main screen """ self._app = QtWidgets.QApplication([]) self._load_icon() if util.get_platform() == 'Linux': QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_X11InitThreads) self._load_base_style() encrypt_save = tox_encrypt_save.ToxEncryptSave() self._toxes = user_data.toxes.ToxES(encrypt_save) try: # this throws everything as errors if not self._select_and_load_profile(): return 2 if hasattr(self._args, 'update') and self._args.update: if self._try_to_update(): return 3 self._load_app_styles() if self._args.language != 'English': # > /var/local/src/toxygen/toxygen/app.py(303)_load_app_translations()->None # -> self._app.translator = translator # (Pdb) Fatal Python error: Segmentation fault self._load_app_translations() self._create_dependencies() self._start_threads(True) if self._uri is not None: self._ms.add_contact(self._uri) except Exception as e: LOG.error(f"Error loading profile: {e!s}") sys.stderr.write(' iMain(): ' +f"Error loading profile: {e!s}" \ +'\n' + traceback.format_exc()+'\n') util_ui.message_box(str(e), util_ui.tr('Error loading profile')) return 4 self._app.lastWindowClosed.connect(self._app.quit) try: self._execute_app() self.quit() retval = 0 except KeyboardInterrupt: retval = 0 except Exception: retval = 1 return retval # ----------------------------------------------------------------------------------------------------------------- # App executing # ----------------------------------------------------------------------------------------------------------------- def _execute_app(self): LOG.debug("_execute_app") while True: try: self._app.exec_() except Exception as ex: LOG.error('Unhandled exception: ' + str(ex)) else: break def quit(self, retval=0): LOG.debug("quit") oArgs = self._args if hasattr(oArgs, 'log_oFd'): oArgs.log_oFd.close() delattr(oArgs, 'log_oFd') # failsafe: segfaults on exit if hasattr(self, '_tox'): if self._tox and hasattr(self._tox, 'kill'): self._tox.kill() del self._tox self._stop_app() if hasattr(self, '_app'): self._app.quit() del self._app.quit del self._app raise SystemExit(retval) def _stop_app(self): LOG.debug("_stop_app") self._plugin_loader.stop() try: self._stop_threads(is_app_closing=True) except (Exception, RuntimeError): # RuntimeError: cannot join current thread pass if hasattr(self, '_tray') and self._tray: self._tray.hide() self._save_profile() self._settings.close() self._kill_toxav() self._kill_tox() sys.stderr.write('_stop_app end' +'\n') # ----------------------------------------------------------------------------------------------------------------- # App loading # ----------------------------------------------------------------------------------------------------------------- def _load_base_style(self): if self._args.theme in ['', 'default']: return if qdarkstyle: LOG.debug("_load_base_style qdarkstyle " +self._args.theme) # QDarkStyleSheet if self._args.theme == 'light': from qdarkstyle.light.palette import LightPalette style = qdarkstyle.load_stylesheet(palette=LightPalette) else: from qdarkstyle.dark.palette import DarkPalette style = qdarkstyle.load_stylesheet(palette=DarkPalette) else: LOG.debug("_load_base_style qss " +self._args.theme) name = self._args.theme + '.qss' with open(util.join_path(util.get_styles_directory(), name)) as fl: style = fl.read() style += '\n' +sSTYLE self._app.setStyleSheet(style) def _load_app_styles(self): LOG.debug(f"_load_app_styles {list(settings.built_in_themes().keys())!r}") # application color scheme if self._settings['theme'] in ['', 'default']: return for theme in settings.built_in_themes().keys(): if self._settings['theme'] != theme: continue if qdarkstyle: LOG.debug("_load_base_style qdarkstyle " +self._args.theme) # QDarkStyleSheet if self._args.theme == 'light': from qdarkstyle.light.palette import LightPalette style = qdarkstyle.load_stylesheet(palette=LightPalette) else: from qdarkstyle.dark.palette import DarkPalette style = qdarkstyle.load_stylesheet(palette=DarkPalette) else: theme_path = settings.built_in_themes()[theme] file_path = util.join_path(util.get_styles_directory(), theme_path) if not os.path.isfile(file_path): LOG.warn('_load_app_styles: no theme file ' + file_path) continue with open(file_path) as fl: style = fl.read() LOG.debug('_load_app_styles: loading theme file ' + file_path) style += '\n' +sSTYLE self._app.setStyleSheet(style) LOG.info('_load_app_styles: loaded theme ' +self._args.theme) break def _load_login_screen_translations(self): LOG.debug("_load_login_screen_translations") current_language, supported_languages = self._get_languages() if current_language not in supported_languages: return lang_path = supported_languages[current_language] translator = QtCore.QTranslator() translator.load(util.get_translations_directory() + lang_path) self._app.installTranslator(translator) self._app.translator = translator def _load_icon(self): LOG.debug("_load_icon") icon_file = os.path.join(util.get_images_directory(), 'icon.png') self._app.setWindowIcon(QtGui.QIcon(icon_file)) @staticmethod def _get_languages(): LOG.debug("_get_languages") current_locale = QtCore.QLocale() curr_language = current_locale.languageToString(current_locale.language()) supported_languages = settings.supported_languages() return curr_language, supported_languages def _load_app_translations(self): LOG.debug("_load_app_translations") lang = settings.supported_languages()[self._settings['language']] translator = QtCore.QTranslator() translator.load(os.path.join(util.get_translations_directory(), lang)) self._app.installTranslator(translator) self._app.translator = translator def _select_and_load_profile(self): LOG.debug("_select_and_load_profile: " +repr(self._path)) if self._path is not None: # toxygen was started with path to profile try: assert os.path.exists(self._path), self._path self._load_existing_profile(self._path) except Exception as e: LOG.error('_load_existing_profile failed: ' + str(e)) title = 'Loading the profile failed ' if self._path: title += os.path.basename(self._path) text = 'Loading the profile failed - \n' +str(e) if 'Dis' == 'Abled': text += '\nLoading the profile failed - \n' \ +str(e) +'\nContinue with a default profile?' reply = util_ui.question(text, title) if not reply: LOG.debug('_load_existing_profile not continuing ') raise LOG.debug('_load_existing_profile continuing ') # drop through else: util_ui.message_box(text, title) raise else: auto_profile = Settings.get_auto_profile() if auto_profile is None: # no default profile LOG.debug('_select_and_load_profile no default profile ') result = self._select_profile() if result is None: LOG.debug('no selected profile ') return False if result.is_new_profile(): # create new profile if not self._create_new_profile(result.profile_path): LOG.warn('no new profile ') return False LOG.debug('created new profile ') else: # load existing profile self._load_existing_profile(result.profile_path) # drop through self._path = result.profile_path else: # default profile LOG.debug('loading default profile ') self._path = auto_profile self._load_existing_profile(auto_profile) if settings.is_active_profile(self._path): # profile is in use LOG.warn(f"_select_and_load_profile active: {self._path}") profile_name = util.get_profile_name_from_path(self._path) title = util_ui.tr('Profile {}').format(profile_name) text = util_ui.tr( 'Other instance of Toxygen uses this profile or profile was not properly closed. Continue?') reply = util_ui.question(text, title) if not reply: return False self._settings.set_active_profile() return True # ----------------------------------------------------------------------------------------------------------------- # Threads # ----------------------------------------------------------------------------------------------------------------- def _start_threads(self, initial_start=True): LOG.debug(f"_start_threads before: {threading.enumerate()!r}") # init thread self._init = threads.InitThread(self._tox, self._plugin_loader, self._settings, self, initial_start) self._init.start() def te(): return [t.name for t in threading.enumerate()] LOG.debug(f"_start_threads init: {te()!r}") # starting threads for tox iterate and toxav iterate self._main_loop = threads.ToxIterateThread(self._tox) self._main_loop.start() self._av_loop = threads.ToxAVIterateThread(self._tox.AV) self._av_loop.start() if initial_start: threads.start_file_transfer_thread() LOG.debug(f"_start_threads after: {[t.name for t in threading.enumerate()]!r}") def _stop_threads(self, is_app_closing=True): LOG.debug("_stop_threads") self._init.stop_thread(1.0) self._av_loop.stop_thread() self._main_loop.stop_thread() if is_app_closing: threads.stop_file_transfer_thread() def iterate(self, n=100): interval = self._tox.iteration_interval() for i in range(n): self._tox.iterate() gevent.sleep(interval / 1000.0) # ----------------------------------------------------------------------------------------------------------------- # Profiles # ----------------------------------------------------------------------------------------------------------------- def _select_profile(self): LOG.debug("_select_profile") if self._args.language != 'English': self._load_login_screen_translations() ls = LoginScreen() profiles = ProfileManager.find_profiles() ls.update_select(profiles) ls.show() self._app.exec_() return ls.result def _load_existing_profile(self, profile_path): LOG.info("_load_existing_profile " +repr(profile_path)) assert os.path.exists(profile_path), profile_path self._profile_manager = ProfileManager(self._toxes, profile_path) data = self._profile_manager.open_profile() if self._toxes.is_data_encrypted(data): LOG.debug("_entering password") data = self._enter_password(data) LOG.debug("_entered password") json_file = profile_path.replace('.tox', '.json') assert os.path.exists(json_file), json_file LOG.debug("creating _settings from: " +json_file) self._settings = Settings(self._toxes, json_file, self) self._tox = self._create_tox(data, self._settings) LOG.debug("created _tox") def _create_new_profile(self, profile_name): LOG.info("_create_new_profile " + profile_name) result = self._get_create_profile_screen_result() if result is None: return False if result.save_into_default_folder: profile_path = util.join_path(get_user_config_path(), profile_name + '.tox') else: profile_path = util.join_path(util.curr_directory(__file__), profile_name + '.tox') if os.path.isfile(profile_path): util_ui.message_box(util_ui.tr('Profile with this name already exists'), util_ui.tr('Error')) return False name = profile_name or 'toxygen_user' assert self._args self._path = profile_path if result.password: self._toxes.set_password(result.password) self._settings = Settings(self._toxes, self._path.replace('.tox', '.json'), app=self) self._tox = self._create_tox(None, self._settings) self._tox.self_set_name(name if name else 'Toxygen User') self._tox.self_set_status_message('Toxing on Toxygen') self._profile_manager = ProfileManager(self._toxes, profile_path) try: self._save_profile() except Exception as ex: #? print(ex) LOG.error('Profile creation exception: ' + str(ex)) text = util_ui.tr('Profile saving error! Does Toxygen have permission to write to this directory?') util_ui.message_box(text, util_ui.tr('Error')) return False current_language, supported_languages = self._get_languages() if current_language in supported_languages: self._settings['language'] = current_language self._settings.save() return True def _get_create_profile_screen_result(self): LOG.debug("_get_create_profile_screen_result") cps = CreateProfileScreen() cps.show() self._app.exec_() return cps.result def _save_profile(self, data=None): LOG.debug("_save_profile") data = data or self._tox.get_savedata() self._profile_manager.save_profile(data) # ----------------------------------------------------------------------------------------------------------------- # Other private methods # ----------------------------------------------------------------------------------------------------------------- def _enter_password(self, data): """ Show password screen """ LOG.debug("_enter_password") p = password_screen.PasswordScreen(self._toxes, data) p.show() self._app.lastWindowClosed.connect(self._app.quit) self._app.exec_() if p.result is not None: return p.result self._force_exit(0) return None def _reset(self): LOG.debug("_reset") """ Create new tox instance (new network settings) :return: tox instance """ self._contacts_manager.reset_contacts_statuses() self._stop_threads(False) data = self._tox.get_savedata() self._save_profile(data) self._kill_toxav() self._kill_tox() try: # create new tox instance self._tox = self._create_tox(data, self._settings) assert self._tox self._start_threads(False) tox_savers = [self._friend_factory, self._group_factory, self._plugin_loader, self._contacts_manager, self._contacts_provider, self._messenger, self._file_transfer_handler, self._groups_service, self._profile] for tox_saver in tox_savers: tox_saver.set_tox(self._tox) self._calls_manager.set_toxav(self._tox.AV) self._contacts_manager.update_friends_numbers() self._contacts_manager.update_groups_lists() self._contacts_manager.update_groups_numbers() self._init_callbacks() except BaseException as e: LOG.error(f"_reset : {e}") LOG.debug('_reset: ' \ +'\n' + traceback.format_exc()) title = util_ui.tr('Reset Error') text = util_ui.tr('Error:') + str(e) util_ui.message_box(text, title) def _create_dependencies(self): LOG.info(f"_create_dependencies toxygen version {self._version}") if hasattr(self._args, 'update') and self._args.update: self._backup_service = BackupService(self._settings, self._profile_manager) self._smiley_loader = SmileyLoader(self._settings) self._tox_dns = ToxDns(self._settings) self._ms = MainWindow(self._settings, self._tray, self) db_path = self._path.replace('.tox', '.db') db = Database(db_path, self._toxes) if os.path.exists(db_path) and hasattr(db, 'open'): db.open() assert self._tox contact_items_factory = ContactItemsFactory(self._settings, self._ms) self._friend_factory = FriendFactory(self._profile_manager, self._settings, self._tox, db, contact_items_factory) self._group_factory = GroupFactory(self._profile_manager, self._settings, self._tox, db, contact_items_factory) self._group_peer_factory = GroupPeerFactory(self._tox, self._profile_manager, db, contact_items_factory) self._contacts_provider = ContactProvider(self._tox, self._friend_factory, self._group_factory, self._group_peer_factory) self._profile = Profile(self._profile_manager, self._tox, self._ms, self._contacts_provider, self._reset) self._init_profile() self._plugin_loader = PluginLoader(self._settings, self) history = None messages_items_factory = MessagesItemsFactory(self._settings, self._plugin_loader, self._smiley_loader, self._ms, lambda m: history.delete_message(m)) history = History(self._contacts_provider, db, self._settings, self._ms, messages_items_factory) self._contacts_manager = ContactsManager(self._tox, self._settings, self._ms, self._profile_manager, self._contacts_provider, history, self._tox_dns, messages_items_factory) history.set_contacts_manager(self._contacts_manager) self._calls_manager = CallsManager(self._tox.AV, self._settings, self._ms, self._contacts_manager, self) self._messenger = Messenger(self._tox, self._plugin_loader, self._ms, self._contacts_manager, self._contacts_provider, messages_items_factory, self._profile, self._calls_manager) file_transfers_message_service = FileTransfersMessagesService(self._contacts_manager, messages_items_factory, self._profile, self._ms) self._file_transfer_handler = FileTransfersHandler(self._tox, self._settings, self._contacts_provider, file_transfers_message_service, self._profile) messages_items_factory.set_file_transfers_handler(self._file_transfer_handler) widgets_factory = None widgets_factory_provider = Provider(lambda: widgets_factory) self._groups_service = GroupsService(self._tox, self._contacts_manager, self._contacts_provider, self._ms, widgets_factory_provider) widgets_factory = WidgetsFactory(self._settings, self._profile, self._profile_manager, self._contacts_manager, self._file_transfer_handler, self._smiley_loader, self._plugin_loader, self._toxes, self._version, self._groups_service, history, self._contacts_provider) if False: self._tray = tray.init_tray(self._profile, self._settings, self._ms, self._toxes) self._ms.set_dependencies(widgets_factory, self._tray, self._contacts_manager, self._messenger, self._profile, self._plugin_loader, self._file_transfer_handler, history, self._calls_manager, self._groups_service, self._toxes, self) if False: # the tray icon does not die with the app self._tray.show() self._ms.show() # FixMe: self._log = lambda line: LOG.log(self._args.loglevel, self._ms.status(line)) self._ms._log = self._log # used in callbacks.py self.LOG = self._log # backwards if False: self.status_handler = logging.Handler() self.status_handler.setLevel(logging.INFO) # self._args.loglevel self.status_handler.handle = self._ms.status self._init_callbacks() LOG.info("_create_dependencies toxygen version " +self._version) def _try_to_update(self): LOG.debug("_try_to_update") updating = updater.start_update_if_needed(self._version, self._settings) if updating: LOG.info("Updating toxygen version " +self._version) self._save_profile() self._settings.close() self._kill_toxav() self._kill_tox() return updating def _create_tox(self, data, settings_): LOG.info("_create_tox calling tox_factory") assert self._args retval = tox_factory(data=data, settings=settings_, args=self._args, app=self) LOG.debug("_create_tox succeeded") return retval def _force_exit(self, retval=0): LOG.debug("_force_exit") sys.exit(0) def _init_callbacks(self, ms=None): LOG.debug("_init_callbacks") callbacks.init_callbacks(self._tox, self._profile, self._settings, self._plugin_loader, self._contacts_manager, self._calls_manager, self._file_transfer_handler, self._ms, self._tray, self._messenger, self._groups_service, self._contacts_provider, self._ms) def _init_profile(self): LOG.debug("_init_profile") if not self._profile.has_avatar(): self._profile.reset_avatar(self._settings['identicons']) def _kill_toxav(self): LOG.debug("_kill_toxav") self._calls_manager.set_toxav(None) self._tox.AV.kill() def _kill_tox(self): LOG.debug("_kill_tox") self._tox.kill() def _test_relays(self, lElts=None): env = self._test_env() if lElts is None: lElts = env['lElts'] # shuffle(env['lElts']) LOG.debug(f"_test_relays {len(env['lElts'])}") for host,port,key in env['lElts'][:10]: try: oRet = self._tox.add_tcp_relay(host, port, key) LOG.debug('add_tcp_relay to ' +host +':' +str(port) \ +' : ' +str(oRet)) except Exception as e: LOG.warn('tox_add_tcp_relay ' +host +' : ' +str(e)) # LOG.error(traceback.format_exc()) # LOG.info("Connected status: " +repr(self._tox.self_get_connection_status())) def _test_tox(self): self.test_net() self._ms.log_console() def test_net(self, lElts=None, oThread=None, iMax=4): LOG.debug("test_net " +self._args.network) # bootstrap LOG.debug('Calling generate_nodes: ') lNodes = ts.generate_nodes(oArgs=self._args) if lNodes: self._settings['current_nodes'] = lNodes else: LOG.warn('empty generate_nodes: ') # if oThread and oThread._stop_thread: return LOG.debug("test_net network=" +self._args.network +' iMax=' +str(iMax)) if self._args.network not in ['local', 'localnew', 'newlocal']: b = ts.bAreWeConnected() if b is None: i = os.system('ip route|grep ^def') if i > 0: b = False else: b = True if not b: LOG.warn("No default route for network " +self._args.network) text = 'You have no default route - are you connected?' reply = util_ui.question(text, "Are you connected?") if not reply: return iMax = 1 else: LOG.debug("Have default route for network " +self._args.network) LOG.debug(f"test_net {self._args.network} iMax= {iMax}") i = 0 while i < iMax: # if oThread and oThread._stop_thread: return i = i + 1 LOG.debug(f"bootstrapping status # {i}") self._test_bootstrap() if hasattr(self._args, 'proxy_type') and self._args.proxy_type > 0: LOG.debug(f"relaying status # {i}") self._test_relays() status = self._tox.self_get_connection_status() LOG.debug(f"connecting status # {i}" +' : ' +repr(status)) if status > 0: LOG.info(f"Connected # {i}" +' : ' +repr(status)) break QtCore.QThread.msleep(3000) # NO QtCore.QCoreApplication.processEvents() LOG.trace(f"Connected status #{i}: {status!r}") sleep(1) def _test_env(self): _settings = self._settings if 'proxy_type' not in _settings or _settings['proxy_type'] == 0 or \ not _settings['proxy_host'] or not _settings['proxy_port']: env = dict( prot = 'ipv4') elif _settings['proxy_type'] == 2: env = dict(prot = 'socks5', https_proxy='', \ socks_proxy='socks5://' \ +_settings['proxy_host'] +':' \ +str(_settings['proxy_port'])) elif _settings['proxy_type'] == 1: env = dict(prot = 'https', socks_proxy='', \ https_proxy='http://' \ +_settings['proxy_host'] +':' \ +str(_settings['proxy_port'])) if 'current_nodes' in _settings and _settings['current_nodes']: LOG.debug("Using current nodes "+' : ' +str(len(_settings['current_nodes']))) lElts = _settings['current_nodes'] elif _settings['network'] in ['local', 'newlocal']: lElts = lLOCAL elif _settings['network'] == 'old': lElts = lGOOD elif 'proxy_type' not in _settings or _settings['proxy_type'] == 0 or \ not _settings['proxy_host'] or not _settings['proxy_port']: lElts = lNEW else: lElts = lRELAYS env['lElts'] = lElts LOG.debug(f"test_env {len(env['lElts'])}") return env def _test_bootstrap(self, lElts=None): env = self._test_env() if lElts is None: lElts = env['lElts'] #shuffle(env['lElts']) LOG.debug(f"_test_bootstrap #Elts={len(lElts)}") LOG.trace(f"_test_bootstrap lElts={lElts[:10]}") for host,port,key in lElts[:10]: try: assert len(key) == 64, key assert len(host) <= 16, host if type(port) == str: port = int(port) oRet = self._tox.bootstrap(host, port, key) LOG.debug('bootstrap to ' +host +':' +str(port) \ +' : ' +repr(oRet)) except Exception as e: LOG.warn('self._tox.bootstrap host=' +host \ +' port=' +str(port) \ +' key=' +key \ +' : ' +str(e)) # LOG.error(traceback.format_exc()) LOG.debug("Connected status: " +repr(self._tox.self_get_connection_status())) def _test_socks(self, lElts=None): LOG.debug("_test_socks") if not self._tox: return title = 'Extended Test Suite' text = 'Run the Extended Test Suite?\nThe program may freeze for 1-10 minutes.' i = os.system('ip route|grep ^def >/dev/null') if i > 0: text += '\nYou have no default route - are you connected?' reply = util_ui.question(text, title) if not reply: return env = self._test_env() if lElts is None: lElts = env['lElts'] # shuffle(env['lElts']) try: inodeinfo_test(env['lElts'], env) except Exception as e: # json.decoder.JSONDecodeError LOG.error(f"test_tox ' +' : {e}") LOG.error('_test_tox(): ' \ +'\n' + traceback.format_exc()) title = 'Extended Test Suite Error' text = 'Error:' + str(e) util_ui.message_box(text, title) # LOG.info("Connected status: " +repr(self._tox.self_get_connection_status())) self._ms.log_console() def _test_main(self): from tests.tests_socks import main as tests_main LOG.debug("_test_socks") if not self._tox: return title = 'Extended Test Suite' text = 'Run the Extended Test Suite?\nThe program may freeze for 20-60 minutes.' reply = util_ui.question(text, title) if reply: if hasattr(self._args, 'proxy_type') and self._args.proxy_type: lArgs = ['--proxy_host', self._args.proxy_host, '--proxy_port', str(self._args.proxy_port), '--proxy_type', str(self._args.proxy_type), ] else: lArgs = list() try: tests_main(lArgs) except Exception as e: LOG.error(f"_test_socks(): {e}") LOG.error('_test_socks(): ' \ +'\n' + traceback.format_exc()) title = 'Extended Test Suite Error' text = 'Error:' + str(e) util_ui.message_box(text, title) self._ms.log_console() class GEventProcessing: """Interoperability class between Qt/gevent that allows processing gevent tasks during Qt idle periods.""" def __init__(self, idle_period=IDLE_PERIOD): # Limit the IDLE handler's frequency while still allow for gevent # to trigger a microthread anytime self._idle_period = idle_period # IDLE timer: on_idle is called whenever no Qt events left for # processing self._timer = QTimer() self._timer.timeout.connect(self.process_events) self._timer.start(0) def __enter__(self): pass def __exit__(self, *exc_info): self._timer.stop() def process_events(self, idle_period=None): if idle_period is None: idle_period = self._idle_period # Cooperative yield, allow gevent to monitor file handles via libevent gevent.sleep(idle_period)