diff --git a/src/qweechat/weechat/testproto.py b/src/qweechat/weechat/testproto.py
index 84d8c0a..54065fe 100755
--- a/src/qweechat/weechat/testproto.py
+++ b/src/qweechat/weechat/testproto.py
@@ -21,185 +21,219 @@
# along with QWeeChat. If not, see .
#
-#
-# Usage: python testproto.py [-h] [-v] [-6]
-#
-# With initial commands: echo "init password=xxxx" | python testproto.py localhost 5000
-# python testproto.py localhost 5000 < commands.txt
-#
-# Return code:
-# 0: OK
-# 1: missing/invalid arguments (hostname or port)
-# 2: connection to WeeChat/relay failed
-# 3: I/O error with WeeChat/relay
-#
+from __future__ import print_function
+
+import argparse
+import os
+import select
+import shlex
+import socket
+import struct
+import sys
+import time
+import traceback
-import os, sys, socket, select, struct, time
import protocol # WeeChat/relay protocol
-options = { 'h': 0, 'v': 0, '6': 0 }
-hostname = None
-port = None
-def usage():
- """Display usage."""
- print('\nSyntax: python %s [-h] [-v] [-6] \n' % sys.argv[0])
- print(' -h display this help')
- print(' -v verbose mode: long objects view (two -v: display raw messages)')
- print(' -6 connect using IPv6')
- print(' hostname, port hostname (or IP address) and port of machine running WeeChat relay')
- print('')
- print('Some commands can be piped to the script, for example:')
- print(' echo "init password=xxxx" | python %s localhost 5000' % sys.argv[0])
- print(' python %s localhost 5000 < commands.txt' % sys.argv[0])
- print('')
+class TestProto:
-def connect(address, ipv6):
- """Connect to WeeChat/relay."""
- inet = socket.AF_INET6 if ipv6 else socket.AF_INET
- sock = None
- try:
- sock = socket.socket(inet, socket.SOCK_STREAM)
- sock.connect(address)
- except:
- if sock:
- sock.close()
- print('Failed to connect to %s/%d using %s' % (address[0], address[1],
- 'IPv4' if inet == socket.AF_INET else 'IPv6'))
- return (None, None)
- print('Connected to %s/%d (%s)' % (hostname, port,
- 'IPv4' if inet == socket.AF_INET else 'IPv6'))
- return (sock, inet)
+ def __init__(self, args):
+ self.args = args
+ self.sock = None
+ self.has_quit = False
+ self.address = '{self.args.hostname}/{self.args.port} ' \
+ '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
-def send(sock, messages):
- """Send a text message to WeeChat/relay."""
- has_quit = False
- try:
- for msg in messages.split('\n'):
- if msg == 'quit':
- has_quit = True
- sock.sendall(msg + '\n')
- print('\x1b[33m<-- %s\x1b[0m' % msg)
- except:
- print('Failed to send message')
- return (False, has_quit)
- return (True, has_quit)
+ def connect(self):
+ """
+ Connect to WeeChat/relay.
+ Return True if OK, False if error.
+ """
+ inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
+ try:
+ self.sock = socket.socket(inet, socket.SOCK_STREAM)
+ self.sock.connect((self.args.hostname, self.args.port))
+ except:
+ if self.sock:
+ self.sock.close()
+ print('Failed to connect to', self.address)
+ return False
+ print('Connected to', self.address)
+ return True
-def decode(message):
- """Decode a binary message received from WeeChat/relay."""
- global options
- try:
- proto = protocol.Protocol()
- message = proto.decode(message, separator='\n' if options['v'] else ', ')
- print('')
- if options['v'] >= 2 and message.uncompressed:
- # display raw message
- print('\x1b[32m--> message uncompressed (%d bytes):\n%s\x1b[0m'
- % (message.size_uncompressed,
- protocol.hex_and_ascii(message.uncompressed, 20)))
- # display decoded message
- print('\x1b[32m--> %s\x1b[0m' % message)
- except:
- print('Error while decoding message from WeeChat')
- return False
- return True
+ def send(self, messages):
+ """
+ Send a text message to WeeChat/relay.
+ Return True if OK, False if error.
+ """
+ try:
+ for msg in messages.split('\n'):
+ if msg == 'quit':
+ self.has_quit = True
+ self.sock.sendall(msg + '\n')
+ print('\x1b[33m<-- ' + msg + '\x1b[0m')
+ except:
+ traceback.print_exc()
+ print('Failed to send message')
+ return False
+ return True
-def mainloop(sock):
- """Main loop: read keyboard, send commands, read socket and decode and display received binary messages."""
- message = ''
- recvbuf = ''
- prompt = '\x1b[36mrelay> \x1b[0m'
- sys.stdout.write(prompt)
- sys.stdout.flush()
- try:
- while True:
- inr, outr, exceptr = select.select([sys.stdin, sock], [], [], 1)
- for fd in inr:
- if fd == sys.stdin:
- buf = os.read(fd.fileno(), 4096)
- if buf:
- message += buf
- if '\n' in message:
- messages = message.split('\n')
- msgsent = '\n'.join(messages[:-1])
- if msgsent:
- (send_ok, has_quit) = send(sock, msgsent)
- if not send_ok:
- return 3
- if has_quit:
- return 0
- message = messages[-1]
+ def decode(self, message):
+ """
+ Decode a binary message received from WeeChat/relay.
+ Return True if OK, False if error.
+ """
+ try:
+ proto = protocol.Protocol()
+ msgd = proto.decode(message,
+ separator='\n' if self.args.verbose > 0
+ else ', ')
+ print('')
+ if self.args.verbose >= 2 and msgd.uncompressed:
+ # display raw message
+ print('\x1b[32m--> message uncompressed ({0} bytes):\n'
+ '{1}\x1b[0m'
+ ''.format(msgd.size_uncompressed,
+ protocol.hex_and_ascii(msgd.uncompressed, 20)))
+ # display decoded message
+ print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
+ except:
+ traceback.print_exc()
+ print('Error while decoding message from WeeChat')
+ return False
+ return True
+
+ def send_stdin(self):
+ """
+ Send commands from standard input if some data is available.
+ Return True if OK (it's OK if stdin has no commands),
+ False if error.
+ """
+ inr, outr, exceptr = select.select([sys.stdin], [], [], 0)
+ if inr:
+ data = os.read(sys.stdin.fileno(), 4096)
+ if data:
+ if not test.send(data.strip()):
+ #self.sock.close()
+ return False
+ # open stdin to read user commands
+ sys.stdin = open('/dev/tty')
+ return True
+
+ def mainloop(self):
+ """
+ Main loop: read keyboard, send commands, read socket,
+ decode/display binary messages received from WeeChat/relay.
+ Return 0 if OK, 4 if send error, 5 if decode error.
+ """
+ if self.has_quit:
+ return 0
+ message = ''
+ recvbuf = ''
+ prompt = '\x1b[36mrelay> \x1b[0m'
+ sys.stdout.write(prompt)
+ sys.stdout.flush()
+ try:
+ while not self.has_quit:
+ inr, outr, exceptr = select.select([sys.stdin, self.sock],
+ [], [], 1)
+ for fd in inr:
+ if fd == sys.stdin:
+ buf = os.read(fd.fileno(), 4096)
+ if buf:
+ message += buf
+ if '\n' in message:
+ messages = message.split('\n')
+ msgsent = '\n'.join(messages[:-1])
+ if msgsent and not self.send(msgsent):
+ return 4
+ message = messages[-1]
+ sys.stdout.write(prompt + message)
+ sys.stdout.flush()
+ else:
+ buf = fd.recv(4096)
+ if buf:
+ recvbuf += buf
+ while len(recvbuf) >= 4:
+ remainder = None
+ length = struct.unpack('>i', recvbuf[0:4])[0]
+ if len(recvbuf) < length:
+ # partial message, just wait for the
+ # end of message
+ break
+ # more than one message?
+ if length < len(recvbuf):
+ # save beginning of another message
+ remainder = recvbuf[length:]
+ recvbuf = recvbuf[0:length]
+ if not self.decode(recvbuf):
+ return 5
+ if remainder:
+ recvbuf = remainder
+ else:
+ recvbuf = ''
sys.stdout.write(prompt + message)
sys.stdout.flush()
- else:
- buf = fd.recv(4096)
- if buf:
- recvbuf += buf
- while len(recvbuf) >= 4:
- remainder = None
- length = struct.unpack('>i', recvbuf[0:4])[0]
- if len(recvbuf) < length:
- # partial message, just wait for end of message
- break
- # more than one message?
- if length < len(recvbuf):
- # save beginning of another message
- remainder = recvbuf[length:]
- recvbuf = recvbuf[0:length]
- if not decode(recvbuf):
- return 3
- if remainder:
- recvbuf = remainder
- else:
- recvbuf = ''
- sys.stdout.write(prompt + message)
- sys.stdout.flush()
- except:
- send(sock, 'quit')
+ except:
+ traceback.print_exc()
+ self.send('quit')
+ return 0
-# display help if arguments are missing
-if len(sys.argv) < 3:
- usage()
- sys.exit(1)
+ def __del__(self):
+ print('Closing connection with', self.address)
+ time.sleep(0.5)
+ self.sock.close()
-# read command line arguments
-try:
- for arg in sys.argv[1:]:
- if arg[0] == '-':
- for opt in arg[1:]:
- options[opt] = options.get(opt, 0) + 1
- elif hostname:
- port = int(arg)
- else:
- hostname = arg
-except:
- print('Invalid arguments')
- sys.exit(1)
-if options['h']:
- usage()
- sys.exit(0)
+if __name__ == "__main__":
+ # parse command line arguments
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ fromfile_prefix_chars='@',
+ description='Command-line program for testing protocol WeeChat/relay.',
+ epilog='''
+Environment variable "TESTPROTO_OPTIONS" can be set with default options.
+Argument "@file.txt" can be used to read default options in a file.
-# connect to WeeChat/relay
-(sock, inet) = connect((hostname, port), options['6'])
-if not sock:
- sys.exit(2)
+Some commands can be piped to the script, for example:
+ echo "init password=xxxx" | python {0} localhost 5000
+ python {0} localhost 5000 < commands.txt
-# send commands from standard input if some data is available
-has_quit = False
-inr, outr, exceptr = select.select([sys.stdin], [], [], 0)
-if inr:
- data = os.read(sys.stdin.fileno(), 4096)
- if data:
- (send_ok, has_quit) = send(sock, data.strip())
- if not send_ok:
- sock.close()
- sys.exit(3)
- # open stdin to read user commands
- sys.stdin = open('/dev/tty')
+The script returns:
+ 0: OK
+ 2: wrong arguments (command line)
+ 3: connection error
+ 4: send error (message sent to WeeChat)
+ 5: decode error (message received from WeeChat)
+'''.format(sys.argv[0]))
+ parser.add_argument('-6', '--ipv6', action='store_true',
+ help='connect using IPv6')
+ parser.add_argument('-v', '--verbose', action='count', default=0,
+ help='verbose mode: long objects view '
+ '(-vv: display raw messages)')
+ parser.add_argument('hostname',
+ help='hostname (or IP address) of machine running '
+ 'WeeChat/relay')
+ parser.add_argument('port', type=int,
+ help='port of machine running WeeChat/relay')
+ if len(sys.argv) == 1:
+ parser.print_help()
+ sys.exit(0)
+ args = parser.parse_args(
+ shlex.split(os.getenv('TESTPROTO_OPTIONS') or '') + sys.argv[1:])
-# main loop (wait commands, display messages received)
-if not has_quit:
- mainloop(sock)
-time.sleep(0.5)
-sock.close()
+ test = TestProto(args)
+
+ # connect to WeeChat/relay
+ if not test.connect():
+ sys.exit(3)
+
+ # send commands from standard input if some data is available
+ if not test.send_stdin():
+ sys.exit(4)
+
+ # main loop (wait commands, display messages received)
+ rc = test.mainloop()
+ del test
+ sys.exit(rc)