#!/usr/local/bin/python3.7 # # simple telnet client. relies on python standard telnetlib. # # + optional SSL wrapping. # + user escape character. (like the standard bsd telnet client). # + debug logging via syslog. # # $Id: tn.py,v 1.25 2019/09/19 04:07:18 pkern Exp $ from __future__ import print_function from types import MethodType import argparse import atexit import sys import telnetlib import select import socket import ssl import os import logging.handlers import tty, termios ##### # see tn_fill_rawq() # telnetlib setting: recv_deflt = 50 recvnbytes = recv_deflt ##### def GetArgs(): parser = argparse.ArgumentParser(description='basic telnet client with optional SSL [defaults]') parser.add_argument('-d', '--debug', type=int, default=0, action='store', help='set debugging level') eHex = "0x1d" parser.add_argument('-x', '--escape', default=eHex, action='store', help="input escape char [" + eHex + "]") parser.add_argument('-S', '--SSL', dest='SSL', action='store_true', help='connect with SSL [no]') parser.add_argument("hostIP", help='remote IP address') parser.add_argument("portNo", type=int, help='remote port number') parser.set_defaults(SSL=False) args = parser.parse_args() return args #end-def ### # see interact() in /usr/local/lib/python2.7/telnetlib.py def my_interact_raw(tn, ttyf, esc): tn.msg("esc %r", esc) while 1: rfd, wfd, xfd = select.select([tn, ttyf], [], []) if tn in rfd: try: rcvd = tn.read_very_eager() except EOFError: print('\r\n*** Connection closed by server ***\r\n') return if rcvd: sys.stdout.buffer.write(rcvd) sys.stdout.flush() # # if ttyf in rfd: ch = os.read(ttyf.fileno(), 1) if not ch: print('\r\n\n # input empty. exiting.\r\n') return if ch == esc: print('\r\n\n # escape received on input. exiting.\r\n') return # tn.write(ch) # # #end-def ## # alternate telnetlib open() to add SSL wrapper. # see https://stackoverflow.com/questions/1647586/is-it-possible-to-change-an-instances-method-implementation-without-changing-al def tn_ssl_open(self, host, port=0, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): """Connect to a host via SSL. The optional second argument is the port number, which defaults to the standard telnet port (23). Don't try to reopen an already connected instance. """ self.eof = 0 if not port: port = TELNET_PORT self.host = host self.port = port self.timeout = timeout sock = socket.create_connection((host, port), timeout) # ssl.wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version={see docs}, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True, ciphers=None) sslsock = ssl.wrap_socket(sock) self.sock = sslsock self.SSL = True print(" # telnet + SSL " + repr(sslsock.cipher()) + "\r\n") #end-def ## # alternate telnetlib msg() to use syslogging. def tn_msg(self, msg, *args): """Send telnetlib debug messages via syslog. """ rem = self.host + "," + repr(self.port) fmt = rem + " " + msg if self.debuglevel > 0: if args: logging.debug(fmt % args) else: logging.debug(fmt) # # #end-def ## # alternate telnetlib sock_avail() to handle SSL leftovers. # https://stackoverflow.com/questions/3187565/select-and-ssl-in-python def tn_sock_avail(self): """Test whether data is available on the socket or pending within the SSL processing pipeline. """ global recvnbytes, recv_deflt if select.select([self], [], [], 0) == ([self], [], []): recvnbytes = recv_deflt return True if hasattr(self.sock, 'pending'): ssl_left = self.sock.pending() if ssl_left > 0: self.msg("tn_sock_avail ssl_left %d", ssl_left) # ... for recv() in fill_rawq() ... recvnbytes = ssl_left return True # # return False #end-def ## # alternate telnetlib fill_rawq() to change recv() size. def tn_fill_rawq(self): """Fill raw queue from exactly one recv() system call. Block if no data is immediately available. Set self.eof when connection is closed. Same as telnetlib fill_rawq(), but with variable recv-nbytes. """ global recvnbytes, recv_deflt if self.irawq >= len(self.rawq): self.rawq = b'' self.irawq = 0 # The buffer size should be fairly small so as to avoid quadratic # behavior in process_rawq() above buf = self.sock.recv(recvnbytes) self.msg("recv %d/%d %r", len(buf), recvnbytes, buf) self.eof = (not buf) self.rawq = self.rawq + buf recvnbytes = recv_deflt #end-def ## # Start program def main(): global recvnbytes, recv_deflt args = GetArgs() pathbase = os.path.basename(sys.argv[0]) progname = os.path.splitext(pathbase)[0] logger = logging.getLogger('') logger.setLevel(logging.DEBUG if args.debug else logging.INFO) # default is to send logs to localhost/514. syslhnd = logging.handlers.SysLogHandler() # https://docs.python.org/2/library/logging.html#logrecord-attributes syslfmt = logging.Formatter("%s[%%(process)d]: %%(message)s" % progname) syslhnd.formatter = syslfmt logger.addHandler(syslhnd) tn = telnetlib.Telnet() tn.msg = MethodType(tn_msg, tn) tn.fill_rawq = MethodType(tn_fill_rawq, tn) if args.SSL: tn.open = MethodType(tn_ssl_open, tn) tn.sock_avail = MethodType(tn_sock_avail, tn) # esc = int(args.escape, 16) if esc < 0 or esc > 255: print("\r\n*** unexpected escape: " + hex(esc) + " ***\r\n", file=sys.stderr) return # # print("\r\n esc " + str(esc) + ".\r\n") ctrl = chr(esc + ord('@')) escMsg = " # escape character is '^" + str(ctrl) + "'." print("\r\n" + escMsg + "\r\n") if args.debug > 0: print(" # debug= " + repr(args.debug) + "\r\n") print(" # NOTE: debug mode will reveal typed passwords.\r\n") ttyf = open("/dev/tty") omode = termios.tcgetattr(ttyf.fileno()) try: tty.setraw(ttyf.fileno()) tn.open(args.hostIP, args.portNo) tn.set_debuglevel(args.debug) my_interact_raw(tn, ttyf, bytes([ esc ])) tn.close() except Exception as e: print("tn exception: " + str(e), file=sys.stderr) finally: termios.tcsetattr(ttyf.fileno(), termios.TCSADRAIN, omode) ttyf.close() # #end-def ## # Start program if __name__ == "__main__": main()