From 3ce614c84616f3efd787914924dab2d61693a99c Mon Sep 17 00:00:00 2001 From: Chintan Choksi Date: Sat, 14 Feb 2015 19:42:30 -0500 Subject: [PATCH] Added files related to transparent tcp relay --- components/transparenttcprelayaffix.r2py | 540 ++++++++++++++ .../restrictions.tcp_relay | 259 +++++++ ...parenttcprelay_socketcloseduringsetup.r2py | 77 ++ .../transparent_tcp_relay.r2py | 660 ++++++++++++++++++ 4 files changed, 1536 insertions(+) create mode 100644 components/transparenttcprelayaffix.r2py create mode 100644 services/transparent_tcp_relay/restrictions.tcp_relay create mode 100644 services/transparent_tcp_relay/tests/ut_affix_transparenttcprelay_socketcloseduringsetup.r2py create mode 100644 services/transparent_tcp_relay/transparent_tcp_relay.r2py diff --git a/components/transparenttcprelayaffix.r2py b/components/transparenttcprelayaffix.r2py new file mode 100644 index 0000000..7090a35 --- /dev/null +++ b/components/transparenttcprelayaffix.r2py @@ -0,0 +1,540 @@ +""" +tcprelayaffix.r2py + +The purpose of the TCPRelayAffix is to allow a machine that is behind +a NAT or firewall to be accessed. The way it works is that the machine +to be connected to will make a connection to a TCP relay machine and +wait for incoming traffic. A TCP relay is a special machine that is not +behind NAT or firewall and can be accessed by both the server and the client. + +TODO: +Remove references to "NAT forwarding". +""" + +session = dy_import_module("session.r2py") +baseaffix = dy_import_module("baseaffix.r2py") + +# Needed to do stack_id-to-IP-address translation +namingandresolveraffix = dy_import_module("namingandresolveraffix.r2py") + +sockettimeout = dy_import_module("sockettimeout.r2py") +affix_stack = dy_import_module("affix_stack.r2py") +affix_wrapper_lib = dy_import_module("affix_wrapper_lib.r2py") +dy_import_module_symbols("tcp_relay_common_lib.r2py") +advertise = dy_import_module("advertise.r2py") # for AdvertiseError +cachedadvertise = dy_import_module("cachedadvertise.r2py") +random = dy_import_module("random.r2py") +advertisepipe = dy_import_module("advertisepipe.r2py") + + +# The default timeout that is used when connecting to forwarder +# when doing listenforconnection. +_NAT_AFFIX_DEFAULT_TIMEOUT = 10 + +# A timeout value for all send()/recv() calls. +_NAT_SOCKET_TIMEOUT = 10 + +# Whether we want to print debug statements. +_NAT_AFFIX_DEBUG_MODE = False + + + +class NatSockObj: + def __init__(self, sockobj, server_id): + """ + + The purpose of this NatSockObj is just to store + the server_id as it will be required later when + it needs to be used by tcpserver_getconnection() + """ + self._sockobj = sockobj + self._server_id = server_id + self._closed = False + + def close(self): + if self._closed: + return False + else: + self._closed = True + self._sockobj.close() + return True + + def recv(self, bytes): + return self._sockobj.recv(bytes) + + def send(self, message): + return self._sockobj.send(message) + + def getserver_id(self): + return self._server_id + + +class TransparentTCPRelayAffix(baseaffix.BaseAffix): + + + def __init__(self, next_affix, optional_args = None): + """ + + Initialize the TCPRelayAffix. + + + next_affix - the Affix component underneath us. + + optional_args - the optional args if provided will be the + forwarder ip address and port that the user wants to use. + + + None + + + AffixInternalError raised if the optional args provided is not of + the proper format. + + + None + """ + + # If optional arguments are provided, then store it as the default forwarder. + if optional_args: + assert(isinstance(optional_args, list)), "Bad optional args. Optional arg must be a list." + assert(len(optional_args) == 1), "Bad optional args. Optional args must contain 1 value. (IP:port:CLIENT_PORT_KEY)" + + try: + forwarder_ip, forwarder_port, mycontext['CLIENT_PORT_KEY'] = optional_args[0].split(':') + self.default_forwarder = [forwarder_ip, int(forwarder_port)] + except ValueError: + raise AffixInternalError("Optional arg provided is not of valid format. Must be IP:port.") + + # If no default forwarder is provided. + else: + self.default_forwarder = [] + + baseaffix.BaseAffix.__init__(self, next_affix, optional_args) + + + """ + def openconnection(self, destip, destport, localip, localport, timeout): + + + Opens a connection to the server through the NAT forwarder. The client + opens a connection to the forwarder and sends and receives all message + from it. The forwarder in turn will forward all the traffic to and forth + from the server. + + + The same arguments that the RepyV2 openconnection accepts. + + + Opens a connection to the forwarder rather then to the server directly. + + + Exceptions that are raised by the Repy V2 api call openconnection() + + + A socket like object. + + + # If there are no forwarders provided, then we raise a ConnectionRefusedError. + if not self.default_forwarder: + raise ConnectionRefusedError("NAT forwarder is not specified. Cannot establish connection.") + + + # Make a connection to the forwarder. Exceptions may be raised, if they are then we raise them. + forwarder_ip, forwarder_port = self.default_forwarder + if _NAT_AFFIX_DEBUG_MODE: + log(self, "Connecting with %s, %d, %s, %d" % (forwarder_ip, forwarder_port, localip, localport) + "\n") + + + # Use timeouts during connection setup with the forwarder (so + # a rogue forwarder can't block us indefinitely). Once the + # connection to the actual destination is set up, drop + # timeouts again as our caller will not be expecting them. + sockobj = sockettimeout.timeout_openconnection(dest_ip, + dest_port, localip, localport, timeout) + + # Let the forwarder know which server we want to connect to. + #server_id = "%s:%d" % (destip, destport) + + try: + #session.session_sendmessage(sockobj, "%s,%s" % (CONNECT_CLIENT_TAG, server_id)) + confirmation_msg = session.session_recvmessage(sockobj) + except sockettimeout.SocketTimeoutError: + # If the sockettimeout expires, raise this as a TimeoutError + # (emulating what happens when the plain openconnection + # timeout is reached). This "casting" is required as our caller + # will not expect a timeout wrapper exception from a plain + # Repy API call. + raise TimeoutError("Connection timed out while negotiating connection " + + "with the forwarder '%s:%d'." % (dest_ip, dest_port)) + + # If the forwarder confirms that we have successfully connected, then we + # return the socket like object with the server_id. + # Drop the timeout wrapping performed above (by extracting the + # non-timeout socket object), as our caller will not expect a + # timing-out sockobj! + if confirmation_msg.startswith(CONNECT_SUCCESS): + return sockobj.socket + else: + raise ConnectionRefusedError("Could not connect to a NAT forwarder.") + """ + + + def listenforconnection(self, localip, localport): + """ + + Instead of doing a listenforconnection we do an openconnection + to the next layer of the affix. We open a connection to one of + the forwarders. Returns a tcpserversocket. Note that each time + an application calls listenforconnection, a new AffixStack instance + is created with a new instance of the TCPRelayAffix, therefore we + can store the localip, localport globally in the instance of + this affix. + + + + localip - the local ip address. + + localport - the port that should be used to listen on. + + + None + + + NATServerSocket object. + """ + + server_id = "%s:%d" % (localip, localport) + + # Try to connect to a forwarder so we can receive connections + # in the future. + try: + connection_socket = self.connect_to_forwarder(localip, localport) + except (RepyArgumentError, AddressBindingError), err: + # If we get these two arguments we want to raise them as AddressBindingError. + raise AddressBindingError(repr(err)) + except SocketClosedRemote, err: + raise DuplicateTupleError(repr(err)) + + + mycontext['client_facing_port'] = session.session_recvmessage(connection_socket) + advertisepipe.add_to_pipe(mycontext['CLIENT_PORT_KEY'], mycontext['client_facing_port']) + + connection_control_sockobj = NatSockObj(connection_socket, server_id) + return affix_wrapper_lib.AffixTCPServerSocket(connection_control_sockobj, self) + + + + + + + def tcpserversocket_getconnection(self, nat_serversocket): + """ + + When we attempt to do a getconnection from a machine that is behind + NAT, we connect to the forwarder instead, and keep listening from + the forwarder, checking if a new connection needs to be made. A + connection to the forwarder has already been made, so we just need + to ask the forwarder if a connection need to be established. + + + nat_serversocket - This is a socket like object. It is an object that + contains the actual socket that is connected to a NAT forwarder as + well as keeps track of what the server_id is.T + + + None + + + Repy V2 exceptions for tcpserversocket.getconnection() + + + (remoteip, remoteport, socketobj) - for the connection. + """ + + if nat_serversocket._closed: + raise SocketClosedLocal("The NAT TCPServerSocket has been closed.") + + server_id = nat_serversocket.getserver_id() + + # Ask the server if there is any connection that can be made and get back + # the response. + try: + #session.session_sendmessage(nat_serversocket, CHECK_SERVER_CONN) + nat_response = session.session_recvmessage(nat_serversocket) + + except (SocketClosedRemote, sockettimeout.SocketTimeoutError), e: + # Trying to reconnect doesn't make sense (DuplicateTupleError, or + # maybe that forwarder doesn't exist anymore). Pretend the socket + # was closed locally even when it wasn't. + nat_serversocket.close() + + if _NAT_AFFIX_DEBUG_MODE: + log("[TCPRelayAffix] Forwarder closed connection with ", repr(e), "\n") + + raise TCPServerSocketInvalidError("TCPRelayAffix: This socket is no longer available for listening as we lost the connection to our listening proxy with error '" + repr(e) + "'.") + + + # If there is an available client, we try to establish a connection. + if nat_response == CLIENT_AVAILABLE: + theclient = self.connect_available_client(server_id) + if _NAT_AFFIX_DEBUG_MODE: + log("[TCPRelayAffix] Found available client", repr(theclient), "waiting to connect.") + return theclient + else: + raise SocketWouldBlockError("Unable to connect to any client currently.") + + + + + def tcpserver_close(self, serversocket): + # Note that we simply just close the control socket that is connected + # to the Nat Forwarder. When the connection is closed, the NAT forwarder + # automatically removes this server from the registered_server list that + # the forwarder has. + serversocket.close() + + + + def connect_available_client(self, server_id): + """ + + There is an available client that is waiting to be connected + to the server. Therefore we open up a connection to the NAT + Forwarder that we are registered with and ask the NAT forwarder + to connect us with the waiting client. + + + server_id - the unique id of this connection (IP:Port) + + + None + + + SocketWouldBlockError will be raised if we are unable to connect + successfully to a client. + + + A tuple contaiing the socket object of the connection, the client ip + and the client port. + """ + + # Extract the forwarder ip and port. + (forwarder_ip, forwarder_port) = self.default_forwarder + + # Connect to the forwarder, trying connports in random order. + possible_ports = list(getresources()[0]['connport']) + random.random_shuffle(possible_ports) + errorlist = [] + + for localport in possible_ports: + try: + log(self, "Trying to connect_available_client using", forwarder_ip, + forwarder_port, getmyip(), localport, "\n") + # Use a timeout socket during connection setup so that we don't + # hand forever if problems occur. + new_sockobj = sockettimeout.timeout_openconnection(forwarder_ip, forwarder_port, getmyip(), localport, _NAT_AFFIX_DEFAULT_TIMEOUT) + break + except (AddressBindingError, DuplicateTupleError, AlreadyListeningError, + CleanupInProgressError, ConnectionRefusedError, TimeoutError), err: + errorlist.append(repr(err)) + else: # We didn't succeed connecting via any of our local ports + if _NAT_AFFIX_DEBUG_MODE: + log("[TCPRelayAffix] Cannot accept anymore clients due to exhausted local ports.") + raise ResourceExhaustedError("TCPRelayAffix unable to connect to a client currently. Errors encountered: " + str(errorlist)) + + # Request the Nat Forwarder to connect a client to this new socket that + # we just opened up. Then we check if we were successfully connected to + # a client + try: + session.session_sendmessage(new_sockobj, CONNECT_SERVER_TAG + ',' + server_id) + response = session.session_recvmessage(new_sockobj) + except (sockettimeout.SocketTimeoutError, SocketClosedRemote, + session.SessionEOF), e: + raise TCPServerSocketInvalidError( + "Cannot get further client connections from TCP relay " + + forwarder_ip + ":" + str(forwarder_port) + " using server ID " + + server_id + " due to error " + repr(e)) + + # We successfully made a connection! So we return the socket along with + # the ip address and port of the client. + if response.startswith(CONNECT_SUCCESS): + client_id = response.split(',')[1] + client_ip, client_port_str = client_id.split(':') + client_port = int(client_port_str) + + # Everything was a success! Remove the timeout wrapper from + # the socket object as our caller won't be expecting it. + return (client_ip, client_port, new_sockobj.socket) + elif response.startswith(CONNECT_FAIL): + raise SocketWouldBlockError("Unable to connect to any client currently.") + + + + + def connect_to_forwarder(self, localip, localport): + """ + + To establish a connection to the forwarder. This connection is + later used when tcpserversocket_getconnection() is called later. + + + localip + The local IP of this node; most likely a Zenodotus name. + localport + The perceived local port for this listening TCP socket; + actually just a number so we can demultiplex different + listening sockets within the Affix. + + + None + + + A socket like object representing the control connection to + a TCP relay. Timeouts are used on send/recv calls so that a + stale control connection does not lock up the entire Affix + (and prohibits setting up a connection to a new relay). + """ + + # We use the localip and localport provided to create an ID for + # this listening (aka ```server'') socket. Each listening socket + # will have a different ID, so the same relay can be used for + # relayed listening on multiple ports. + server_id = localip + ':' + str(localport) + + # The localip must be resolved so we can use it in our + # actual network calls. + actual_ip = namingandresolveraffix.resolve_identifier(localip) + + # If a default forwarder is provided then we only try to use it. + # Otherwise we do a lookup for all the forwarders that are available. + if self.default_forwarder: + # We add the default forwarder in the forwarder list in the format that + # forwarders are usually advertised. + forwarder_list = [self.default_forwarder[0] + ':' + + str(self.default_forwarder[1])] + else: + try: + forwarder_list = cachedadvertise.lookup(NAT_FORWARDER_KEY) + # We will randomize the forwarder list to try and distribute servers + # across all the forwarders. + random.random_shuffle(forwarder_list) + except (advertise.AdvertiseError, TimeoutError), e: + raise AddressBindingError("Unable to connect to any NAT Forwarder due to error " + repr(e)) + + + # Now that we have a list of forwarders, try to connect to each one until + # we successfully do. + for cur_forwarder in forwarder_list: + # If there are no results, we sometimes get the empty string back + # from the advertising lookup. + if not cur_forwarder: + continue + + forwarder_ip, forwarder_port_str = cur_forwarder.split(':') + forwarder_port = int(forwarder_port_str) + + try: + # Use timeouts so that we don't hang indefinitely if the + # control connection (or the relay) hangs. + sockobj = sockettimeout.timeout_openconnection(forwarder_ip, + forwarder_port, actual_ip, localport, _NAT_AFFIX_DEFAULT_TIMEOUT) + # MMM: Should we also catch RepyArgumentError and AddressbindingError here? Currently we raise those. + except (AlreadyListeningError, CleanupInProgressError, + ConnectionRefusedError, DuplicateTupleError, + InternetConnectivityError, TimeoutError), err: + if _NAT_AFFIX_DEBUG_MODE: + log(self, "Unable to connect to forwarder '%s:%d' from %s. Error: %s\n" % (forwarder_ip, forwarder_port, server_id, repr(err))) + continue + except Exception, err: + if _NAT_AFFIX_DEBUG_MODE: + log(self, "TCPRelayAffix found unexpected error: %s\n. Local Address is: '%s:%d'. Desired forwarder and server_ip: %s:%d, %s" % + (repr(err), actual_ip, localport, forwarder_ip, + forwarder_port, server_id)) + raise + else: + # The control connection is established. The application-layer + # part of the control connection is managed in the huge + # try/except block below. If anything goes wrong, we move on + # and try the next relay. + # XXX We should also try to close the current control connection + # XXX in case of an error. Letting the TCP stack expire them + # XXX occupies them for much longer than required. + try: + # This is the initial message that we send to the NAT forwarder + # once we have connected and want to register the server with the + # forwarder. + register_serv_msg = SERVER_REGISTER + ',' + str(server_id) + + if _NAT_AFFIX_DEBUG_MODE: + log(self, "TCPRelayAffix successfully connected to forwarder '%s:%d'\n" % (forwarder_ip, forwarder_port)) + log(self, "TCPRelayAffix requesting to register with msg: %s\n" % register_serv_msg) + + # Register with the relay. We do this by sending + # the forwarder the unique ID for this particular connection. If the + # server returns a confirmation msg then we are connected and we can + # return the socket object. It is possible that the forwarder will + # return other messages such as SERVER_FULL or CONNECTION_REFUSED msg. + # In which case we move on to the next forwarder. + session.session_sendmessage(sockobj, register_serv_msg) + confirmation_msg = session.session_recvmessage(sockobj) + + if confirmation_msg == CONNECT_SUCCESS: + if not self.default_forwarder: + self.default_forwarder.append(forwarder_ip) + self.default_forwarder.append(forwarder_port) + + if _NAT_AFFIX_DEBUG_MODE: + log(self, "Connected to forwarder, returning. Default forwarder is: " + str(self.default_forwarder) + "\n") + return sockobj + else: + if _NAT_AFFIX_DEBUG_MODE: + log(self, "NAT Forwarder refused connection. Return msg: %s\n" % confirmation_msg) + continue + except (SocketClosedRemote, sockettimeout.SocketTimeoutError): + continue + # We have exhausted all the forwarders in the forwarder list. + else: + raise AddressBindingError("Unable to connect to any NAT Forwarder (" + + str(forwarder_list) + ") using localip/actual_ip:localport " + + str(localip) + "/" + str(actual_ip) + ":" + str(localport)) + + + + +# ---------------------- Required Public functions ------------------------ + + def copy(self): + """ + Make a copy of self. + """ + if self.affix_context['next_affix']: + affix_stack_copy = self.affix_context['next_affix'].copy() + else: + affix_stack_copy = None + + optional_args_copy = self.affix_context['optional_args'] + + affix_copy = TCPRelayAffix(affix_stack_copy, optional_args_copy) + affix_copy.default_forwarder = self.default_forwarder + + return affix_copy + + + + + def get_advertisement_string(self): + """ + The TCPRelayAffix will advertise the forwarder it is using if it has + a default forwarder. Ther server should always fill this in when + doing tcpserver_getconnection. + """ + if _NAT_AFFIX_DEBUG_MODE: + log(self, "Default forwarder is: " + str(self.default_forwarder) + "\n") + if self.default_forwarder: + forwarder_ip, forwarder_port = self.default_forwarder + nat_ad_string = "(TCPRelayAffix,%s:%d)" % (forwarder_ip, forwarder_port) + else: + nat_ad_string = "(TCPRelayAffix)" + + return nat_ad_string + self.peek().get_advertisement_string() + diff --git a/services/transparent_tcp_relay/restrictions.tcp_relay b/services/transparent_tcp_relay/restrictions.tcp_relay new file mode 100644 index 0000000..7e701df --- /dev/null +++ b/services/transparent_tcp_relay/restrictions.tcp_relay @@ -0,0 +1,259 @@ +resource cpu 1 +resource memory 150000000 +resource diskused 1000000000 +resource events 1000 +resource filewrite 10000000 +resource fileread 10000000 +resource filesopened 250 +resource insockets 500 +resource outsockets 500 +resource netsend 300000000 +resource netrecv 300000000 +resource loopsend 10000000 +resource looprecv 10000000 +resource lograte 300000 +resource random 100000 +resource messport 63100 +resource connport 63100 +resource messport 63101 +resource connport 63101 +resource messport 63102 +resource connport 63102 +resource messport 63103 +resource connport 63103 +resource messport 63104 +resource connport 63104 +resource messport 63105 +resource connport 63105 +resource messport 63106 +resource connport 63106 +resource messport 63107 +resource connport 63107 +resource messport 63108 +resource connport 63108 +resource messport 63109 +resource connport 63109 +resource messport 63110 +resource connport 63110 +resource messport 63111 +resource connport 63111 +resource messport 63112 +resource connport 63112 +resource messport 63113 +resource connport 63113 +resource messport 63114 +resource connport 63114 +resource messport 63115 +resource connport 63115 +resource messport 63116 +resource connport 63116 +resource messport 63117 +resource connport 63117 +resource messport 63118 +resource connport 63118 +resource messport 63119 +resource connport 63119 +resource messport 63120 +resource connport 63120 +resource messport 63121 +resource connport 63121 +resource messport 63122 +resource connport 63122 +resource messport 63123 +resource connport 63123 +resource messport 63124 +resource connport 63124 +resource messport 63125 +resource connport 63125 +resource messport 63126 +resource connport 63126 +resource messport 63127 +resource connport 63127 +resource messport 63128 +resource connport 63128 +resource messport 63129 +resource connport 63129 +resource messport 63130 +resource connport 63130 +resource messport 63131 +resource connport 63131 +resource messport 63132 +resource connport 63132 +resource messport 63133 +resource connport 63133 +resource messport 63134 +resource connport 63134 +resource messport 63135 +resource connport 63135 +resource messport 63136 +resource connport 63136 +resource messport 63137 +resource connport 63137 +resource messport 63138 +resource connport 63138 +resource messport 63139 +resource connport 63139 +resource messport 63140 +resource connport 63140 +resource messport 63141 +resource connport 63141 +resource messport 63142 +resource connport 63142 +resource messport 63143 +resource connport 63143 +resource messport 63144 +resource connport 63144 +resource messport 63145 +resource connport 63145 +resource messport 63146 +resource connport 63146 +resource messport 63147 +resource connport 63147 +resource messport 63148 +resource connport 63148 +resource messport 63149 +resource connport 63149 +resource messport 63150 +resource connport 63150 +resource messport 63151 +resource connport 63151 +resource messport 63152 +resource connport 63152 +resource messport 63153 +resource connport 63153 +resource messport 63154 +resource connport 63154 +resource messport 63155 +resource connport 63155 +resource messport 63156 +resource connport 63156 +resource messport 63157 +resource connport 63157 +resource messport 63158 +resource connport 63158 +resource messport 63159 +resource connport 63159 +resource messport 63160 +resource connport 63160 +resource messport 63161 +resource connport 63161 +resource messport 63162 +resource connport 63162 +resource messport 63163 +resource connport 63163 +resource messport 63164 +resource connport 63164 +resource messport 63165 +resource connport 63165 +resource messport 63166 +resource connport 63166 +resource messport 63167 +resource connport 63167 +resource messport 63168 +resource connport 63168 +resource messport 63169 +resource connport 63169 +resource messport 63170 +resource connport 63170 +resource messport 63171 +resource connport 63171 +resource messport 63172 +resource connport 63172 +resource messport 63173 +resource connport 63173 +resource messport 63174 +resource connport 63174 +resource messport 63175 +resource connport 63175 +resource messport 63176 +resource connport 63176 +resource messport 63177 +resource connport 63177 +resource messport 63178 +resource connport 63178 +resource messport 63179 +resource connport 63179 +resource messport 63180 +resource connport 63180 +resource messport 63181 +resource connport 63181 +resource messport 63182 +resource connport 63182 +resource messport 63183 +resource connport 63183 +resource messport 63184 +resource connport 63184 +resource messport 63185 +resource connport 63185 +resource messport 63186 +resource connport 63186 +resource messport 63187 +resource connport 63187 +resource messport 63188 +resource connport 63188 +resource messport 63189 +resource connport 63189 +resource messport 63190 +resource connport 63190 +resource messport 63191 +resource connport 63191 +resource messport 63192 +resource connport 63192 +resource messport 63193 +resource connport 63193 +resource messport 63194 +resource connport 63194 +resource messport 63195 +resource connport 63195 +resource messport 63196 +resource connport 63196 +resource messport 63197 +resource connport 63197 +resource messport 63198 +resource connport 63198 +resource messport 63199 +resource connport 63199 + +call gethostbyname_ex allow +call sendmess allow +call stopcomm allow # it doesn't make sense to restrict +call recvmess allow +call openconn allow +call waitforconn allow +call socket.close allow # let's not restrict +call socket.send allow # let's not restrict +call socket.recv allow # let's not restrict +# open and file.__init__ both have built in restrictions... +call open arg 0 is junk_test.out allow # can write to junk_test.out +call open arg 1 is r allow # allow an explicit read +call open arg 1 is rb allow # allow an explicit read +call open noargs is 1 allow # allow an implicit read +call file.__init__ arg 0 is junk_test.out allow # can write to junk_test.out +call file.__init__ arg 1 is r allow # allow an explicit read +call file.__init__ arg 1 is rb allow # allow an explicit read +call file.__init__ noargs is 1 allow # allow an implicit read +call file.close allow # shouldn't restrict +call file.flush allow # they are free to use +call file.next allow # free to use as well... +call file.read allow # allow read +call file.readline allow # shouldn't restrict +call file.readlines allow # shouldn't restrict +call file.seek allow # seek doesn't restrict +call file.write allow # shouldn't restrict (open restricts) +call file.writelines allow # shouldn't restrict (open restricts) +call sleep allow # harmless +call settimer allow # we can't really do anything smart +call canceltimer allow # should be okay +call exitall allow # should be harmless + +call log.write allow +call log.writelines allow +call getmyip allow # They can get the external IP address +call listdir allow # They can list the files they created +call removefile allow # They can remove the files they create +call randomfloat allow # can get random numbers +call getruntime allow # can get the elapsed time +call getlock allow # can get a mutex +call get_thread_name allow # Allow getting the thread name +call VirtualNamespace allow # Allow using VirtualNamespace's + diff --git a/services/transparent_tcp_relay/tests/ut_affix_transparenttcprelay_socketcloseduringsetup.r2py b/services/transparent_tcp_relay/tests/ut_affix_transparenttcprelay_socketcloseduringsetup.r2py new file mode 100644 index 0000000..b8d17f1 --- /dev/null +++ b/services/transparent_tcp_relay/tests/ut_affix_transparenttcprelay_socketcloseduringsetup.r2py @@ -0,0 +1,77 @@ +""" +Overloads a nat forwarder with a low max-client limitation. + +Makes sure that the forwarder and affix stack behave in an expected manner +when this event occurs. We expect to see an AddressBindingError on the +server when this occurs. + +""" +#pragma out + +#pragma repy restrictions.affix dylink.r2py + + +# required for logging +time = dy_import_module('time.r2py') +#time1 = dy_import_module('time_interface.r2py') +#ntp_time = dy_import_module('ntp_time.r2py') +#time1.time_register_method('ntp', ntp_time.ntp_time_updatetime) +time.time_updatetime(34612) + +# Make a list of usable source ports for connections +random = dy_import_module("random.r2py") +available_ports = list(getresources()[0]["connport"]) +random.random_shuffle(available_ports) + + +tcp_relay = dy_import_module('transparent_tcp_relay.r2py') +tcp_relay.MAX_CLIENTS_PER_SERVER = 1 +tcp_relay.MAX_SERVERS = 1 + +tcp_relay.mycontext['listenport_tcp'] = 12345 +createthread(tcp_relay.tcp_forwarder_listener) +forwarder_port = 12345 + + +log("Setting up NAT forwarder.\n") +log("NAT forwarder started.\n") +log("Starting botched setup attempt.\n") +# Emulate TCPRelayAffix's NAT forwarder connection setup, +# see TCPRelayAffix.connect_to_forwarder() +server_port = available_ports.pop() +sockobj = openconnection(getmyip(), forwarder_port, getmyip(), server_port, 10) +# We would now use a timeoutsocket to register etc., but we're in for trouble! +# Send an incomplete message, then drop the connection unexpectedly. +sockobj.send("100\n" + 'NAT_SERVER_REGISTER' + "," + + getmyip() + "," + str(server_port)) +sockobj.close() +log("Done with botched setup attempt.\n") +log("Setting up RepyV2 TCP connection test now.\n") + +# The forwarder should (1) stay alive and (2) dergister the previous server +# so that the MAX_SERVERS limit is restored to 1. We'll check by +# re-registering and running the TCP connection test. + + + +dy_import_module_symbols("affix_stack.r2py") + +# Set up the Affix stack to use, and override the network calls we need. +# We'll host the forwarder ourselves for this test. +affix_object = AffixStack("(TransparentTCPRelayAffix," + getmyip() + ":" + + str(forwarder_port) + ":" + str(25) + ")") +listenforconnection = affix_object.listenforconnection +#openconnection = affix_object.openconnection + + +# Connect to the forwarder, send/receive a test message. +dy_import_module_symbols("ut_repyv2api_tcpconnectiontest.r2py") +createthread(launch_server) +sleep(5) + +""" +createthread(connect_and_send_message) +sleep(10) +""" + +exitall() \ No newline at end of file diff --git a/services/transparent_tcp_relay/transparent_tcp_relay.r2py b/services/transparent_tcp_relay/transparent_tcp_relay.r2py new file mode 100644 index 0000000..e63eef0 --- /dev/null +++ b/services/transparent_tcp_relay/transparent_tcp_relay.r2py @@ -0,0 +1,660 @@ +""" + + tcp_relay.r2py + + + The purpose of this program is to act as a nat forwarder. + Messages from nodeA to nodeB could be forwarded through + this forwarder. + + + python repy.py RESTRICTION_FILE dylink.repy tcp_relay.repy TCP_PORT + [ NAT_FORWARDER_KEY ] +""" + +session = dy_import_module("session.r2py") +advertisepipe = dy_import_module("advertisepipe.r2py") +affix_stack = dy_import_module("affix_stack.r2py") +dy_import_module_symbols("tcp_relay_common_lib.r2py") +canilisten = dy_import_module("canilisten.r2py") +time = dy_import_module("time.r2py") +random = dy_import_module("random.r2py") + +SLEEP_TIME = 0.001 +MAX_BUFFER_LEN = 1400 + +buffer = 4 # main and advertise thread, and two spares +available_threads = getresources()[0]['events'] - buffer +MAX_CLIENTS_PER_SERVER = 5 +#MAX_SERVERS = max(1, int(available_threads/MAX_CLIENTS_PER_SERVER)) +MAX_SERVERS = 5 +INFO_MSG = 1 +ERR_MSG = 2 +DEBUG_MSG = 3 + + +registered_server = {} +register_lock = createlock() + + +def logmsg(message, msg_type): + + header = "[%.4f] " % time.time_getunixtime() + + if msg_type == INFO_MSG: + header += "INFO: " + elif msg_type == ERR_MSG: + header += "ERROR: " + elif msg_type == DEBUG_MSG: + header += "DEBUG: " + + log(header + message + '\n') + + + + +# ==================================================== +# TCP NAT Forwarder - Common Entry Point. +# ==================================================== +def tcp_forwarder_listener(): + + # Create a TCP server socket. + tcp_forwarder_sock = listenforconnection(getmyip(), + mycontext['listenport_tcp']) + + logmsg("Started TCP NAT Forwarder listener on '%s' port '%d'" % + (getmyip(), mycontext['listenport_tcp']), INFO_MSG) + + + while True: + + try: + # Try to see if there is any connection waiting. + remote_ip, remote_port, sockobj = tcp_forwarder_sock.getconnection() # XXX Use a timeout socket instead to avoid malicious clients/servers blocking sockets forever + logmsg("Incoming connection from '%s:%d'" % (remote_ip, remote_port), + INFO_MSG) + except SocketWouldBlockError: + sleep(SLEEP_TIME) + except Exception, err: + logmsg("Error in getconnection: " + str(err), ERR_MSG) + else: + logmsg("Got connection from " + str(remote_ip) + ":" + str(remote_port), INFO_MSG) + try: + conn_init_message = session.session_recvmessage(sockobj) + logmsg(str(remote_ip) + ":" + str(remote_port) + " said " + + conn_init_message, DEBUG_MSG) + (conn_type, conn_id) = conn_init_message.split(',') + except Exception, err: # XXX Likely too broad an except + logmsg("Error in connection establishment with " + str(remote_ip) + + ":" + str(remote_port) + ": " + + str(type(err)) + " " + str(err), DEBUG_MSG) + sockobj.close() + continue + + + if conn_type == SERVER_REGISTER: + # This is the case where a new server wants to register to this + # NAT Forwarder. + client_port_opened=createthread(register_new_server(remote_ip, remote_port, conn_id, sockobj)) + logmsg("Registered server " + remote_ip + ":" + str(remote_port), DEBUG_MSG) + return client_port_opened + elif conn_type == CONNECT_SERVER_TAG: + # This is the case when a registered server opens up a connection to + # the forwarder in order for it to be connected to a client. + logmsg("Received request from server '%s:%d' to connect to a client." % (remote_ip, remote_port), INFO_MSG) + createthread(handle_server_conn_request(remote_ip, remote_port, conn_id, sockobj)) + else: # XXX We should also close the control connection on errors! + logmsg("Incorrect connection type received from '%s:%d': %s" % + (remote_ip, remote_port, conn_type), ERR_MSG) + + + + +def register_new_server(remote_ip, remote_port, server_id, sockobj): + + def _register_server_helper(): + """ + + Register a new server with the forwarder. If we are here + then a server node has called openconnection to the NAT + forwarder for the first time in order to open up a connection + such that the NAT forwarder can make future connections. + + + remote_ip - The ip address of the node that made the initial connection. + remote_port - The port number of the node that made the initial connection. + sockobj - The socket that will be used for communication. + + + None. + + + None. + + + None. + """ + + logmsg("Server '%s' at %s:%s requesting to register" % + (server_id, remote_ip, str(remote_port)), INFO_MSG) + + # Ensure that the registered_server dict doesn't change while + # we are modifying it + register_lock.acquire(True) + + # Check to see if the server is already registered. If it is then + # we just return. + + if server_id in registered_server.keys(): + # Make sure that it is in the proper format. + if 'connected_clients' not in registered_server[server_id].keys(): + registered_server[server_id]['connected_clients'] = [] + if 'waiting_clients' not in registered_server[server_id].keys(): + registered_server[server_id]['waiting_clients'] = [] + if 'client_lock' not in registered_server[server_id].keys(): + registered_server[server_id]['client_lock'] = createlock() + # The server entry is in the correct format. + register_lock.release() + return + + # The server was not in the dict already. Add it, start + # serving threads, etc. + try: + if len(registered_server.keys()) < MAX_SERVERS: + # The server_id does not exist in our registered_server dict. + registered_server[server_id] = {} + registered_server[server_id]['connected_clients'] = [] + registered_server[server_id]['waiting_clients'] = [] + registered_server[server_id]['client_lock'] = createlock() + # All changes done. Release the lock. + register_lock.release() + + # Launch a thread that waits for any communication from the server. + # Such that if a server checks to see if there is any client waiting, + # it responds. + createthread(launch_server_communication_thread(sockobj, server_id)) + + try: + session.session_sendmessage(sockobj, CONNECT_SUCCESS) + except SocketClosedRemote, err: + logmsg("Socket closed while registering server. " + repr(err), DEBUG_MSG) + unregister_server(server_id) + + + logmsg("Registered server '%s' at %s:%s successfully. %i server slots remain." % + (server_id, remote_ip, str(remote_port), + MAX_SERVERS - len(registered_server)), INFO_MSG) + else: + try: + session.session_sendmessage(sockobj, CONNECT_FAIL) + except SocketClosedRemote: + # Too bad, now they won't receive the error message. Ignore! + pass + logmsg("Unable to register server '%s' on %s:%s. Max servers reached." % + (server_id, remote_ip, str(remote_port)), INFO_MSG) + except Exception, e: + logmsg("Error " + repr(e), INFO_MSG) + + return _register_server_helper + + + + + +# ==================================================== +# TCP Server Control Socket +# ==================================================== + +def launch_server_communication_thread(sockobj, server_id): + + def _server_communication_helper(): + """ + + This thread is launched after a server has registered with + the nat forwarder. This thread will keep running until the + control socket for the server has been closed. It is used + for communicating with the server. The server may ping from + time to time to check if there are any clients waiting for + it. If there are then the server will create a connection + with the Nat Forwarder. + + + sockobj - The socket object that is used for communication. + server_id - The id of the server + + + None + + + None + + + None + """ + + # Keep this thread alive as long as the socket object is open. + # We break out of the loop if there is any socket closed exceptions + # or any unexpected errors that arise. + + + # Selecting a random port to be opened for client-side communication + + allowed_tcp_ports = list(getresources()[0]["connport"]) + random.random_shuffle(allowed_tcp_ports) + + for client_port in allowed_tcp_ports: + try: + client_socket = listenforconnection(getmyip(), client_port) + logmsg("Opening Client Port:" + str(client_port), INFO_MSG) + #createthread(listen_for_client(client_socket, server_id, sockobj)) + #client_port_opened = client_port + break + except Exception, e: + logmsg("Error in listenforconnection on " + str(getmyip()) + " port " + + str(client_port), DEBUG_MSG) + sleep(1) + break + + # Send the information of selected-port back to the server + # so that server can advertise the port to the client + + try: + session.session_sendmessage(sockobj, str(client_port)) + except SocketClosedRemote, err: + logmsg("Socket closed. " + repr(err), DEBUG_MSG) + unregister_server(server_id) + + createthread(listen_for_client(client_socket, server_id, sockobj)) + + return _server_communication_helper + + + +# ==================================================== +# Listen for client +# ==================================================== + +def listen_for_client(client_socket, server_id, serversockobj): + + def _client_listener(): + while True: + try: + # Try to see if there is any connection waiting. + client_ip, client_port, clientsockobj = client_socket.getconnection() # XXX Use a timeout socket instead to avoid malicious clients/servers blocking sockets forever + logmsg("Incoming connection from '%s:%d'" % (client_ip, client_port), + INFO_MSG) + session.session_sendmessage(serversockobj, CLIENT_AVAILABLE) + createthread(handle_client_request(client_ip, client_port, server_id, clientsockobj)) + except SocketWouldBlockError: + sleep(SLEEP_TIME) + except Exception, err: + logmsg("Error in getconnection: " + str(err), ERR_MSG) + break + + return _client_listener + + +def handle_client_request(remote_ip, remote_port, server_id, sockobj): + + def _handle_client_request_helper(): + """ + + Take an incoming connection from a client and append it to the + registered servers waiting list. Then wait for the server to be + connected to sthe client. + + + remote_ip - The ip address of the node that made the initial connection. + remote_port - The port number of the node that made the initial connection. + sockobj - The socket that will be used for communication. + + + None. + + + None. + + + None. + """ + client_id = "%s:%d" % (remote_ip, remote_port) + logmsg("Incoming TCP connection request from '%s' for server '%s'" % + (client_id, server_id), INFO_MSG) + + # Are they connecting to an unregistered server? + if server_id not in registered_server.keys(): + try: + session.session_sendmessage(sockobj, CONNECT_FAIL) + except SocketClosedRemote: + # We don't really care if we couldn't send back the error message... + pass + + logmsg("Client '%s' attempting to connect to unregistered server '%s'" % + (client_id, server_id), ERR_MSG) + return + + + # Acquire the client lock for the server and add the socket to + # the waiting queue. Note that we add the client at index 0 since + # pop() will remove the client from the queue from the tail of the + # list. + registered_server[server_id]['client_lock'].acquire(True) + try: + registered_server[server_id]['waiting_clients'].insert(0, (sockobj, + client_id)) + finally: + registered_server[server_id]['client_lock'].release() + + + return _handle_client_request_helper + + + + +def handle_server_conn_request(remote_ip, remote_port, server_id, sockobj): + + def _handle_server_conn_request_helper(): + """ + + A connection is made to the NAT Forwarder because there + is a client that is waiting to be connected to the server. + Once this connection is made, the server and client will + be connected through the forwarder and will be able to + communicate with each other. + + + remote_ip - The ip address of the node that made the initial connection. + remote_port - The port number of the node that made the initial connection. + sockobj - The socket that will be used for communication. This is the + server socket. + + + None. + + + None. + + + None. + """ + logmsg("Server '%s' at %s:%s has made a connection in order for a client to connect." % + (server_id, remote_ip, str(remote_port)), INFO_MSG) + + # Check to make sure that the server has registered already. + if server_id not in registered_server.keys(): + try: + session.session_sendmessage(sockobj, CONNECT_FAIL) + except SocketClosedRemote: + # It's not our problem if they don't receive the error message... + pass + logmsg("Server '%s' attempting to connect to client before registering" % + server_id, ERR_MSG) + return + + + # Acquire a client lock to ensure there is no contention. + # We keep a local reference to the lock in case the server_id is + # deregistered while we still handle this connection request. + # XXX Check how the register_lock and the client_lock are used. I believe both were implemented to avoid this problem we are working around here by storing the reference. + client_lock = registered_server[server_id]['client_lock'] + client_lock.acquire(True) + + # Initialize the client ID in case the ``blanket except'' below + # is reached before an actual ID could be assigned. + client_id = "(client_id not initialized)" + + try: + if len(registered_server[server_id]['connected_clients']) < MAX_CLIENTS_PER_SERVER: + # Retrieve the list of clients that are waiting. + client_queue = registered_server[server_id]['waiting_clients'] + (cur_client_sockobj, client_id) = client_queue.pop() + + # Start up a thread that forwards data between the server and client. + # XXX We should wait with thread creation until we know that client and server have received our CONN_SUCCESS notification! + createthread(forward_data(server_id, client_id, cur_client_sockobj, sockobj)) + + # Place the client socket in the connected clients list. + registered_server[server_id]['connected_clients'].append( + cur_client_sockobj) + + # We have made a connection so we send a Connect Success message + # to both the server and the client. However the client might + # have already closed the connection due to having to wait for a + # long time. In this case we have to catch the exception. If an + # exception is raised, we do nothing as everything will be cleaned + # up when the forward_data thread tries to forward any + # messages. + try: + session.session_sendmessage(sockobj, CONNECT_SUCCESS + ',' + client_id) + except (SocketClosedRemote, SocketClosedLocal), err: + # If the server connection has been closed, we unregister + # the server. + unregister_server(server_id) + # XXX Continuing below after unregister_server tricks the client + # XXX into believing that all is well. It will need to time out + # XXX to note that it's not. Shouldn't we actively notify it and/or + # XXX close the cur_client_sockobj? + + try: + # Caveat, see above. Ending up here doesn't mean the + # connection to the server is still good. + session.session_sendmessage(cur_client_sockobj, CONNECT_SUCCESS) + except (SocketClosedRemote, SocketClosedLocal), err: + logmsg("Couldn't connect server '" + str(server_id) + + "' to client '" + str(client_id) + + "'. Connection closed with '" + repr(err) + "'.", ERR_MSG) + else: + logmsg("Made connection to '%s' from '%s'" % (server_id, client_id), + INFO_MSG) + else: + try: + session.session_sendmessage(sockobj, CONNECT_FAIL) + except SocketClosedRemote: + # They won't see the error message then. We don't care! + pass + logmsg("Unable to register additional client '" + str(client_id) + + "' to server '" + str(server_id) + + "'. Max number of clients (" + str(MAX_CLIENTS_PER_SERVER)+ + ") reached on server.", INFO_MSG) + except Exception, err: + try: + session.session_sendmessage(sockobj, CONNECT_FAIL) + except SocketClosedRemote: + # Same here, they won't see the error message. + pass + logmsg("Unable to connect client '" + str(client_id) + + "' to server '" + str(server_id) + "' due to err. " + + str(type(err)) + " " + str(err), ERR_MSG) + finally: + client_lock.release() + + return _handle_server_conn_request_helper + + +def unregister_server(server_id): + """ + The purpose of this function is to remove a registered + server from the registered server dictionary. Thus un- + registering the server. + """ + register_lock.acquire(True) + try: + registered_server.pop(server_id) + logmsg("Unregistered server '" + server_id + "'. " + + str(MAX_SERVERS - len(registered_server)) + + " server slots free.", INFO_MSG) + except KeyError: + # If the server id does not exist, we don't + # need to worry about it. + logmsg("Attempted to unregister unknow server '%s'" % server_id, DEBUG_MSG) + pass + except Exception, err: + logmsg("Unexpected error while unregistering server '%s': %s" % (str(server_id), repr(err)), DEBUG_MSG) + finally: + register_lock.release() + + + +# ============================================================== +# TCP Forwarder - Actual Message Forwarding +# ============================================================== +def forward_data(server_id, client_id, client_facing_socket, server_facing_socket): + + def _forward_data_helper(): + """ + + The function forwards all the incoming messages from one + socket to another (and the other way around). + To be started via createthread. + + + client_facing_socket - The socket connected to by the client seeking + to contact the server on whose behalf we are listening. + server_facing_socket - The socket between the server and us, created + when the server connected in in response to a CONN_AVAILABLE + message on the control connection. + + + None + + + None + + + None + """ + + is_relayed_connection_alive = True + + server_facing_send_buffer = "" + client_facing_send_buffer = "" + + while True: + # XXX Should we back off if encountering multiple SocketWouldBlockError's in a row? + if is_relayed_connection_alive: + free_server_buffer_space = MAX_BUFFER_LEN - len(server_facing_send_buffer) + if free_server_buffer_space > 0: + try: + server_facing_send_buffer += client_facing_socket.recv(free_server_buffer_space) + except SocketWouldBlockError: + pass + except (SocketClosedLocal, SocketClosedRemote): + is_relayed_connection_alive = False + client_facing_send_buffer = "" + + try: + bytes_sent = server_facing_socket.send(server_facing_send_buffer) + server_facing_send_buffer = server_facing_send_buffer[bytes_sent:] + except SocketWouldBlockError: + pass + except (SocketClosedLocal, SocketClosedRemote): + is_relayed_connection_alive = False + server_facing_send_buffer = "" + + + if is_relayed_connection_alive: + free_client_buffer_space = MAX_BUFFER_LEN - len(client_facing_send_buffer) + if free_client_buffer_space > 0: + try: + client_facing_send_buffer += server_facing_socket.recv(free_client_buffer_space) + except SocketWouldBlockError: + pass + except (SocketClosedLocal, SocketClosedRemote): + is_relayed_connection_alive = False + server_facing_send_buffer = "" + + try: + bytes_sent = client_facing_socket.send(client_facing_send_buffer) + client_facing_send_buffer = client_facing_send_buffer[bytes_sent:] + except SocketWouldBlockError: + pass + except (SocketClosedLocal, SocketClosedRemote): + is_relayed_connection_alive = False + client_facing_send_buffer = "" + + # If either side has hung up and the data it sent has been forwarded, + # close down the remaining connection. + if not is_relayed_connection_alive and \ + len(client_facing_send_buffer) == 0 and \ + len(server_facing_send_buffer) == 0: + try: + server_facing_socket.close() + except (SocketClosedLocal, SocketClosedRemote): + pass + try: + client_facing_socket.close() + except (SocketClosedLocal, SocketClosedRemote): + pass + break + + + # Remove the client socket from the connected clients list for + # the server. + try: + registered_server[server_id]['connected_clients'].remove(client_facing_socket) + except: + pass + + logmsg("Connection terminated between server '%s' and client '%s'" % + (server_id, client_id), INFO_MSG) + + + # Return the helper function. + return _forward_data_helper + + + + + +# ==================================================== +# Program Entry +# ==================================================== +if callfunc == 'initialize': + log("Getting an NTP timestamp...\n") + # Try all UDP ports; ignore TCP time servers. + for local_port in list(getresources()[0]["messport"]): + try: + time.time_updatetime(local_port) + break + except: + continue + else: + log("Could not updatetime() on any of the allowed UDP ports. Continuing with time.time_getunixtime = getruntime.\n") + time.time_getunixtime = getruntime + + + logmsg("Starting TCP relay.", INFO_MSG) + logmsg("MAX_SERVERS: " + str(MAX_SERVERS) + + ", MAX_CLIENTS_PER_SERVER: " + str(MAX_CLIENTS_PER_SERVER), INFO_MSG) + + + if len(callargs) < 1: + log("Usage:\n\tpython repy.py restrictionsfile dylink.r2py tcp_relay.r2py TCP_PORT [FORWARDER_KEY]\n") # XXX Allow for port autoconfiguration! + exitall() + + mycontext['listenport_tcp'] = int(callargs[0]) + + if len(callargs) >= 2: + # Override tcp_relay_common_lib's key + NAT_FORWARDER_KEY = callargs[1] + + myip, myport = getmyip(), mycontext['listenport_tcp'] + + (i_can_listen, ignore, ignore) = canilisten.check_specific_port(myport) + + if not i_can_listen: + logmsg( +"""NOTE WELL: It seems that your node is not able to receive incoming +connections from the public Internet. This probably leaves the TCP relay +uncontactable unless you set up port forwarding etc. on your gateway. +I'll let you proceed regardless. You hopefully know what you do.""", ERR_MSG) + +# Launch the TCP Forwarder. + logmsg("Creating forwarder thread on " + myip + ":" + str(myport), INFO_MSG) + client_port_opened=createthread(tcp_forwarder_listener) + # XXX Create a status thread to show the contents of the registered_server dict every ten minutes or so. + # Launch advertiser and advertise this forwarders ip address, tcp port. + advertise_value = myip + ':' + str(myport) + logmsg("Starting advertise thread for " + NAT_FORWARDER_KEY + + ": " + advertise_value, INFO_MSG) + advertisepipe.add_to_pipe(NAT_FORWARDER_KEY, advertise_value)