diff --git a/Beast/CPS Network/broadcast.py b/Beast/CPS Network/broadcast.py new file mode 100644 index 0000000..d42513d --- /dev/null +++ b/Beast/CPS Network/broadcast.py @@ -0,0 +1,29 @@ +import socket + +class Broadcast: + # the constructor method that takes port number as an argument + def __init__(self, port): + self.port = port + # socket that is used for broadcasting + self.broadcaster = self.broadcast_socket() + # socket used to receive message from boradcast + self.broadcastReceiver = self.broadcast_socket() + self.broadcastReceiver.bind(('',self.port)) + + # creating a broadcast socket + def broadcast_socket(self): + # Initialize socket + b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + # Enable broadcast mode + b.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + return b + + # send to broadcast method + def send_to_broadcast(self, message): + broadcastIP = '' + self.broadcaster.sendto(message.encode(), (broadcastIP, self.port)) + + # receive from braodcast method + def read_from_broadcast(self): + data, addr = self.broadcastReceiver.recvfrom(1024) + return data, addr \ No newline at end of file diff --git a/Beast/CPS Network/tcpIPNode.py b/Beast/CPS Network/tcpIPNode.py new file mode 100644 index 0000000..e3325b9 --- /dev/null +++ b/Beast/CPS Network/tcpIPNode.py @@ -0,0 +1,23 @@ +import socket +from subprocess import check_output + +class TCP_IP_Node: + # the contructor method takes port number as an argument and intializes the port, IPAddrs and tcpSocket attributes. + def __init__(self, port): + self.port = port + self.IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') # the IPAddrs attribute is bind IP address of the local device + self.tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # creates a new socket object using the TCP protocol for network communication + + # method that makes the tcp socket a client that connects to the server of a given IP address and port number + def tcp_ip_connect(self, IPAddrs, port): + self.tcpSocket.connect((IPAddrs, port)) + + # method that enables the tcp socket to listen to and accept connections. In other words, making the tcp socket a server socket that listens for incoming connections from clients + def tcp_ip_accept(self): + self.tcpSocket.listen(1) + client, clientAddrs = self.tcpSocket.accept() + return client, clientAddrs + + # method that binds an IP address and port to the tcp socket before the socket can listen and accept connections + def tcp_ip_bind(self): + self.tcpSocket.bind((self.IPAddrs, self.port)) \ No newline at end of file diff --git a/Beast/CPS Network/transponderManager.py b/Beast/CPS Network/transponderManager.py new file mode 100644 index 0000000..c1b0388 --- /dev/null +++ b/Beast/CPS Network/transponderManager.py @@ -0,0 +1,244 @@ +from broadcast import Broadcast +from tcpIPNode import TCP_IP_Node +from xbeeNode import XbeeNode +from subprocess import check_output +from queue import Queue +from digi.xbee.devices import * +import time +import threading +import socket +import json + +# a threading function that takes a Broadcast object and a port number as its arguments and broadcast the port number every second +def broadcasting(broadcast, port): + while True: + msg = str(port) + broadcast.send_to_broadcast(msg) + time.sleep(1) + +# a threading function that creates TCP/IP connections with other devices on the network that are not already connected by finding the devices on the network via braodcast +# the function takes the following arguments: lock (the lock from the threading module), broadcast (a Broadcsat object), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def establish_tcp_ip_connection(lock, broadcast, port, tcpIpConnectionList, collectionOfTelemetryInfo): + while True: + msg, addrs = broadcast.read_from_broadcast() + connectingIPAddrs = addrs[0] + connectingPort = int(msg.decode()) + tcpIPNode = TCP_IP_Node(port) + if (tcpIPNode.IPAddrs != connectingIPAddrs) and (connectingIPAddrs not in tcpIpConnectionList): + try: + tcpIPNode.tcp_ip_connect(connectingIPAddrs, connectingPort) + lock.acquire() + tcpIpConnectionList.append(connectingIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + connectingIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(tcpIPNode.tcpSocket, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, tcpIPNode.tcpSocket, tcpIPNode.IPAddrs, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + except ConnectionRefusedError: + print("(TCP/IP) Connection Refused From " + connectingIPAddrs) + time.sleep(0.1) + +# a threading function that acts as a server that listens for connections and accepts connections from clients that are not yet already connected via TCP/IP +# the function takes the following arguments: lock (the lock from the threading module), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def tcp_ip_connection_listener(lock, port, tcpIpConnectionList, collectionOfTelemetryInfo): + tcpIPNode = TCP_IP_Node(port) + tcpIPNode.tcp_ip_bind() + while True: + client, clientAddrs = tcpIPNode.tcp_ip_accept() + clientIPAddrs = clientAddrs[0] + if clientIPAddrs not in tcpIpConnectionList: + lock.acquire() + tcpIpConnectionList.append(clientIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + clientIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(client, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, client, tcpIPNode.IPAddrs, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + else: + client.close() + time.sleep(0.1) + +# a threading function that send telemetry information to a given IP address every half a second via TCP/IP while the device of the IP address is still connected to the network +# the function takes the following arguments: tcpIpNode (a TCP_IP_Node object), clientIp (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_send_via_tcp_ip(tcpIpNode, clientIp, tcpIpConnectionList, collectionOfTelemetryInfo): + while clientIp in tcpIpConnectionList: + msg = json.dumps(collectionOfTelemetryInfo) + try: + tcpIpNode.send(msg.encode()) + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + time.sleep(0.5) + +# a threading function that listens for and receives telemetry information from a connected device on the network via TCP/IP +# the thread stops when no information is recieved from the connected device within 5 seconds, signaling a disconnection +# the function takes the following arguments: lock (the lock from the threading module), tcpIpNode (a TCP_IP_Node object), selfIp (IP address of the local device), ipAddrs (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_receive_via_tcp_ip(lock, tcpIpNode, selfIp, ipAddrs, tcpIpConnectionList, collectionOfTelemetryInfo): + while ipAddrs in tcpIpConnectionList: + tcpIpNode.settimeout(5) + try: + data = tcpIpNode.recv(1024) + if not data: + break + data = data.decode().split('}', 1)[0] + '}' + data = json.loads(data) + for k, v in data.items(): + if k != selfIp: + lock.acquire() + collectionOfTelemetryInfo[k] = v + lock.release() + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + lock.acquire() + tcpIpConnectionList.remove(ipAddrs) + lock.release() + print("(TCP/IP) Disconnected from: " + ipAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + +# a threading function that takes the an XbeeNode object and an IP address as its parameters and broadcasts the IP address every second via XBee +def xbee_broadcast(xbeeNode, ipAddrs): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_addrs(ipAddrs) + time.sleep(1) + +# a threading function that listens for and recieves message from the XBee module. If the message is an IP address, form a connection with device of the IP address if not already connected +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), xbeeConnectionList (a list), tcpIpConnectionList (a list), q (a Queue object), collectionOfTelemetryInfo (a list), and selfIp (IP address of the local device) +def xbee_connection_maker(lock, xbeeNode, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, selfIp): + time.sleep(2) + while True: + msg = xbeeNode.xbee_connection_listener() + if msg is not None: + msg = msg.data.decode() + key = "addr:" + addrs = msg[len(key):len(msg)] + separation = addrs.index(":") + addrs64Bit = addrs[0:separation] + IPAddrs = addrs[(separation + 1):len(addrs)] + if (msg[0:len(key)] == key) and (IPAddrs not in xbeeConnectionList) and (IPAddrs not in tcpIpConnectionList): + lock.acquire() + xbeeConnectionList.append(IPAddrs) + lock.release() + print("(Xbee) Connected to: " + IPAddrs) + print("(Xbee) Number of connected device: " + str(len(xbeeConnectionList))) + xbeeThreadedConnection = threading.Thread(target=xbee_connection, args=(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo, )) + xbeeThreadedConnection.start() + time.sleep(0.5) + +# the threading function that establishes a connection with another device discovered on the network. While connected the function also listens for and receives telemetry information from the connected XBee device +# the thread stops when the connection stopped or some sort of disconnection signal was received +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), addrs64Bit (64 bit local XBee address), selfIp (IP address of the local device), IPAddrs (IP address of the connecting device), tcpIpConnectionList (a list), xbeeConnectionList (a list), q (a Queue object), and collectionOfTelemetryInfo (a list) +def xbee_connection(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo): + remote = RemoteXBeeDevice(xbeeNode.xbee, XBee64BitAddress.from_hex_string(addrs64Bit)) + timeout = 5 + while (IPAddrs not in tcpIpConnectionList): + try: + msg = xbeeNode.xbee.read_data_from(remote, timeout) + # read message received that is not broadcasted address. Broadcasted address always begins with "0013A2004" + if msg is not None: + lock.acquire() + if q.full(): + q.get() + q.put(msg.remote_device) + if remote not in q.queue: + lock.release() + break + else: + q.put(msg.remote_device) + + msg = msg.data.decode() + key = "info:" + if msg[0:len(key)] == key: + d = msg[len(key):len(msg)] + data = json.loads(d) + #combining the telemetry info received with locally known telemetry info + for k, v in data.items(): + if k != selfIp: + collectionOfTelemetryInfo[k] = v + lock.release() + except Exception as e: + print(e) + break + time.sleep(0.1) + lock.acquire() + xbeeConnectionList.remove(IPAddrs) + lock.release() + print("(Xbee) Threaded connection with " + IPAddrs + " stopped.") + print("(Xbee) List of connected device: " + str(xbeeConnectionList)) + +# a threading function that takes the an XbeeNode object and a collectionOfTelemetryInfo (a list) as its parameters and broadcasts the collectionOfTelemetryInfo every second via XBee +def xbee_broadcast_telemetry(xbeeNode, collectionOfTelemetryInfo): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_telemetry(collectionOfTelemetryInfo) + time.sleep(1) + +# a threading function that the user can interact with when prompt for a command +# Here are the list of commands and the kind of response the user receives: +# "data" - outputs a list of telemetry information of the connected devices in the network +# "tcpip" - outputs a list of IP addresses of the connected devices in the network that are communicating via TCP/IP +# "xbee" - outputs a list of IP addresses of the connected devices in the network that are communicating via XBee +# "help" - outputs the list of commands to use +def user_prompt(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList): + time.sleep(2) + while True: + userInput = input("Please enter a command: ") + print(userInput) + if userInput == 'data': + print(collectionOfTelemetryInfo) + elif userInput == "tcpip": + print(tcpIpConnectionList) + elif userInput == "xbee": + print(xbeeConnectionList) + elif userInput == "help": + print("Here are the commands to use:\n data\n tcpip\n xbee\n") + else: + print("Sorry this is not a valid command. Try again.") + +# a temporary threading function that generate fake telemetry information (used for testing purposes). The correct telemetry information should be obtained from the marvelmind devices. This function shall be changed or removed in the future!!!!!! +def counter(lock, collectionOfTelemetryInfo, ipAddrs): + i = 0 + while True: + lock.acquire() + collectionOfTelemetryInfo[ipAddrs] = str(i) + lock.release() + i+=1 + time.sleep(5) + +# a function that starts all the thraeding functions +def start(): + broadcastPort = 8025 + tcpIpPort = 8026 + IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') + lock = threading.Lock() + b = Broadcast(broadcastPort) + x = XbeeNode() + queueSize = 10 + q = Queue(queueSize) + tcpIpConnectionList = [] + xbeeConnectionList = [] + collectionOfTelemetryInfo = {} + broadcastingThread = threading.Thread(target=broadcasting, args=(b, tcpIpPort, )) + establishTcpIpConnectionThread = threading.Thread(target=establish_tcp_ip_connection, args=(lock, b, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + tcpIpConnectionListenerThread = threading.Thread(target=tcp_ip_connection_listener, args=(lock, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + xbeeBroadcastingThread = threading.Thread(target=xbee_broadcast, args=(x, IPAddrs, )) + xbeeConnectionMakerThread = threading.Thread(target=xbee_connection_maker, args=(lock, x, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, IPAddrs, )) + xbeeBroadcastTelemetryThread = threading.Thread(target=xbee_broadcast_telemetry, args=(x, collectionOfTelemetryInfo, )) + counterThread = threading.Thread(target=counter, args=(lock, collectionOfTelemetryInfo, IPAddrs)) + userPromptThread = threading.Thread(target=user_prompt, args=(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList, )) + print("Local IP Address: " + IPAddrs) + print("Establishing connections...") + broadcastingThread.start() + establishTcpIpConnectionThread.start() + tcpIpConnectionListenerThread.start() + xbeeBroadcastingThread.start() + xbeeConnectionMakerThread.start() + xbeeBroadcastTelemetryThread.start() + counterThread.start() + userPromptThread.start() + + +start() \ No newline at end of file diff --git a/Beast/CPS Network/xbeeNode.py b/Beast/CPS Network/xbeeNode.py new file mode 100644 index 0000000..7bf2719 --- /dev/null +++ b/Beast/CPS Network/xbeeNode.py @@ -0,0 +1,29 @@ +from digi.xbee.devices import * +import json + +class XbeeNode: + # the constructor method initializes and creates a new xbee xbee device object + def __init__(self): + self.xbee = XBeeDevice("/dev/ttyUSB0", 9600) + self.xbee.open() + self.xbeeAddrs = str(self.xbee.get_64bit_addr()) + timeout = 100000000 + self.xbee.set_sync_ops_timeout(timeout) + + # this method takes an IP address as an argument and sends that IP address to the broadcast of the xbee network + def xbee_broadcast_addrs(self, ipAddrs): + addrs = "addr:" + self.xbeeAddrs + ":" + ipAddrs + self.xbee.send_data_broadcast(addrs) + + # this method takes some telemetry information as an argument and sends it to the broadcast of the xbee network + def xbee_broadcast_telemetry(self, telemetry): + t = json.dumps(telemetry) + msg = "info:" + t + self.xbee.send_data_broadcast(msg) + + # this method is used to read data from an xbee module + def xbee_connection_listener(self): + msg = self.xbee.read_data() + return msg + + diff --git a/Beast/Xbee/Beast_Xbee_Connection.py b/Beast/Xbee/Beast_Xbee_Connection.py new file mode 100644 index 0000000..04193f3 --- /dev/null +++ b/Beast/Xbee/Beast_Xbee_Connection.py @@ -0,0 +1,106 @@ +from digi.xbee.devices import * +from threading import Thread, Lock +from time import sleep +import json +from queue import Queue + +xbee = XBeeDevice("/dev/ttyUSB0", 9600) +xbee.open() + +address = str(xbee.get_64bit_addr()) +print("Local 64 bit address: " + address) +print("Waiting for connection...") + +addrsBook = [] +list_of_conn = [] +queueSize = 10 +q = Queue(queueSize) + +#global telemetry info variable/dictionary +telemetryInfo = {} + +timeout = 100000000 #set timeout to infinite or None +xbee.set_sync_ops_timeout(timeout) #allows for xbee sent_data method to run forever, Prevents "Packet listener is not running" error + +def broadcasting(localXbee, localAddress): + while True: + #broadcasting local 64 bit address + localAddr = "addr:" + localAddress + localXbee.send_data_broadcast(localAddr) + sleep(1) + + +def connection_listener(localXbee, lock): + while True: + msg = localXbee.read_data() + # read messages that begins with "0013A2004" (it means that its an address to connect to) and take only address that is not already connected to + if msg is not None: + data = msg.data.decode() + key = "addr:" + addr = data[len(key):len(data)] # actual message with key removed + if data[0:len(key)] == key and addr not in addrsBook: + addrsBook.append(addr) + print("Connected to: " + addr) + conn = Thread(target = threaded_connection, args = (localXbee, addr, lock, )) +# list_of_conn.append(conn) + conn.start() + sleep(0.5) + +# threaded location broadcasting method +def broadcast_location(localXbee, localAddress): + while True: + telemetryInfo[localAddress] = "beast" # add or update new local position to telemtryInfo + temp = list(telemetryInfo.items()) # avoid "dictionary chnaged size during iteration" error + for key, value in temp: + info = 'info:{"%s":"%s"}' % (key, value) + localXbee.send_data_broadcast(info) +# print(telemetryInfo) + sleep(1) + +def threaded_connection(localXbee, deviceAddr, lock): + remote = RemoteXBeeDevice(localXbee, XBee64BitAddress.from_hex_string(deviceAddr)) + timeout = 5 # in seconds + while True: + lock.acquire() + try: + d = localXbee.read_data_from(remote, timeout) + # read message received that is not broadcasted address. Broadcasted address always begins with "0013A2004" + if d is not None: + if q.full(): + q.get() + q.put(d.remote_device) + if remote not in q.queue: + break + else: + q.put(d.remote_device) + + data = d.data.decode() + key = "info:" + if data[0:len(key)] == key: + m = data[len(key):len(data)] + m_as_dict = json.loads(m) + #combining the telemetry info received with locally known telemetry info + telemetryInfo.update(m_as_dict) + except Exception as e: + print(e) + break + lock.release() + sleep(0.1) + + addrsBook.remove(deviceAddr) + telemetryInfo.clear() + print("Threaded connection with " + deviceAddr + " stopped.") + print("List of connected device: " + str(addrsBook)) + lock.release() + sleep(0.1) + +lock = Lock() +bCastLocalAddr = Thread(target = broadcasting, args = (xbee, address, )) +connListener = Thread(target = connection_listener, args = (xbee, lock, )) +bCastLocation = Thread(target = broadcast_location, args = (xbee, address, )) +bCastLocalAddr.start() +connListener.start() +bCastLocation.start() + + +xbee.close() \ No newline at end of file diff --git a/Domino/CPS Network/broadcast.py b/Domino/CPS Network/broadcast.py new file mode 100644 index 0000000..d42513d --- /dev/null +++ b/Domino/CPS Network/broadcast.py @@ -0,0 +1,29 @@ +import socket + +class Broadcast: + # the constructor method that takes port number as an argument + def __init__(self, port): + self.port = port + # socket that is used for broadcasting + self.broadcaster = self.broadcast_socket() + # socket used to receive message from boradcast + self.broadcastReceiver = self.broadcast_socket() + self.broadcastReceiver.bind(('',self.port)) + + # creating a broadcast socket + def broadcast_socket(self): + # Initialize socket + b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + # Enable broadcast mode + b.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + return b + + # send to broadcast method + def send_to_broadcast(self, message): + broadcastIP = '' + self.broadcaster.sendto(message.encode(), (broadcastIP, self.port)) + + # receive from braodcast method + def read_from_broadcast(self): + data, addr = self.broadcastReceiver.recvfrom(1024) + return data, addr \ No newline at end of file diff --git a/Domino/CPS Network/tcpIPNode.py b/Domino/CPS Network/tcpIPNode.py new file mode 100644 index 0000000..e3325b9 --- /dev/null +++ b/Domino/CPS Network/tcpIPNode.py @@ -0,0 +1,23 @@ +import socket +from subprocess import check_output + +class TCP_IP_Node: + # the contructor method takes port number as an argument and intializes the port, IPAddrs and tcpSocket attributes. + def __init__(self, port): + self.port = port + self.IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') # the IPAddrs attribute is bind IP address of the local device + self.tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # creates a new socket object using the TCP protocol for network communication + + # method that makes the tcp socket a client that connects to the server of a given IP address and port number + def tcp_ip_connect(self, IPAddrs, port): + self.tcpSocket.connect((IPAddrs, port)) + + # method that enables the tcp socket to listen to and accept connections. In other words, making the tcp socket a server socket that listens for incoming connections from clients + def tcp_ip_accept(self): + self.tcpSocket.listen(1) + client, clientAddrs = self.tcpSocket.accept() + return client, clientAddrs + + # method that binds an IP address and port to the tcp socket before the socket can listen and accept connections + def tcp_ip_bind(self): + self.tcpSocket.bind((self.IPAddrs, self.port)) \ No newline at end of file diff --git a/Domino/CPS Network/transponderManager.py b/Domino/CPS Network/transponderManager.py new file mode 100644 index 0000000..c1b0388 --- /dev/null +++ b/Domino/CPS Network/transponderManager.py @@ -0,0 +1,244 @@ +from broadcast import Broadcast +from tcpIPNode import TCP_IP_Node +from xbeeNode import XbeeNode +from subprocess import check_output +from queue import Queue +from digi.xbee.devices import * +import time +import threading +import socket +import json + +# a threading function that takes a Broadcast object and a port number as its arguments and broadcast the port number every second +def broadcasting(broadcast, port): + while True: + msg = str(port) + broadcast.send_to_broadcast(msg) + time.sleep(1) + +# a threading function that creates TCP/IP connections with other devices on the network that are not already connected by finding the devices on the network via braodcast +# the function takes the following arguments: lock (the lock from the threading module), broadcast (a Broadcsat object), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def establish_tcp_ip_connection(lock, broadcast, port, tcpIpConnectionList, collectionOfTelemetryInfo): + while True: + msg, addrs = broadcast.read_from_broadcast() + connectingIPAddrs = addrs[0] + connectingPort = int(msg.decode()) + tcpIPNode = TCP_IP_Node(port) + if (tcpIPNode.IPAddrs != connectingIPAddrs) and (connectingIPAddrs not in tcpIpConnectionList): + try: + tcpIPNode.tcp_ip_connect(connectingIPAddrs, connectingPort) + lock.acquire() + tcpIpConnectionList.append(connectingIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + connectingIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(tcpIPNode.tcpSocket, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, tcpIPNode.tcpSocket, tcpIPNode.IPAddrs, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + except ConnectionRefusedError: + print("(TCP/IP) Connection Refused From " + connectingIPAddrs) + time.sleep(0.1) + +# a threading function that acts as a server that listens for connections and accepts connections from clients that are not yet already connected via TCP/IP +# the function takes the following arguments: lock (the lock from the threading module), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def tcp_ip_connection_listener(lock, port, tcpIpConnectionList, collectionOfTelemetryInfo): + tcpIPNode = TCP_IP_Node(port) + tcpIPNode.tcp_ip_bind() + while True: + client, clientAddrs = tcpIPNode.tcp_ip_accept() + clientIPAddrs = clientAddrs[0] + if clientIPAddrs not in tcpIpConnectionList: + lock.acquire() + tcpIpConnectionList.append(clientIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + clientIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(client, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, client, tcpIPNode.IPAddrs, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + else: + client.close() + time.sleep(0.1) + +# a threading function that send telemetry information to a given IP address every half a second via TCP/IP while the device of the IP address is still connected to the network +# the function takes the following arguments: tcpIpNode (a TCP_IP_Node object), clientIp (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_send_via_tcp_ip(tcpIpNode, clientIp, tcpIpConnectionList, collectionOfTelemetryInfo): + while clientIp in tcpIpConnectionList: + msg = json.dumps(collectionOfTelemetryInfo) + try: + tcpIpNode.send(msg.encode()) + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + time.sleep(0.5) + +# a threading function that listens for and receives telemetry information from a connected device on the network via TCP/IP +# the thread stops when no information is recieved from the connected device within 5 seconds, signaling a disconnection +# the function takes the following arguments: lock (the lock from the threading module), tcpIpNode (a TCP_IP_Node object), selfIp (IP address of the local device), ipAddrs (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_receive_via_tcp_ip(lock, tcpIpNode, selfIp, ipAddrs, tcpIpConnectionList, collectionOfTelemetryInfo): + while ipAddrs in tcpIpConnectionList: + tcpIpNode.settimeout(5) + try: + data = tcpIpNode.recv(1024) + if not data: + break + data = data.decode().split('}', 1)[0] + '}' + data = json.loads(data) + for k, v in data.items(): + if k != selfIp: + lock.acquire() + collectionOfTelemetryInfo[k] = v + lock.release() + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + lock.acquire() + tcpIpConnectionList.remove(ipAddrs) + lock.release() + print("(TCP/IP) Disconnected from: " + ipAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + +# a threading function that takes the an XbeeNode object and an IP address as its parameters and broadcasts the IP address every second via XBee +def xbee_broadcast(xbeeNode, ipAddrs): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_addrs(ipAddrs) + time.sleep(1) + +# a threading function that listens for and recieves message from the XBee module. If the message is an IP address, form a connection with device of the IP address if not already connected +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), xbeeConnectionList (a list), tcpIpConnectionList (a list), q (a Queue object), collectionOfTelemetryInfo (a list), and selfIp (IP address of the local device) +def xbee_connection_maker(lock, xbeeNode, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, selfIp): + time.sleep(2) + while True: + msg = xbeeNode.xbee_connection_listener() + if msg is not None: + msg = msg.data.decode() + key = "addr:" + addrs = msg[len(key):len(msg)] + separation = addrs.index(":") + addrs64Bit = addrs[0:separation] + IPAddrs = addrs[(separation + 1):len(addrs)] + if (msg[0:len(key)] == key) and (IPAddrs not in xbeeConnectionList) and (IPAddrs not in tcpIpConnectionList): + lock.acquire() + xbeeConnectionList.append(IPAddrs) + lock.release() + print("(Xbee) Connected to: " + IPAddrs) + print("(Xbee) Number of connected device: " + str(len(xbeeConnectionList))) + xbeeThreadedConnection = threading.Thread(target=xbee_connection, args=(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo, )) + xbeeThreadedConnection.start() + time.sleep(0.5) + +# the threading function that establishes a connection with another device discovered on the network. While connected the function also listens for and receives telemetry information from the connected XBee device +# the thread stops when the connection stopped or some sort of disconnection signal was received +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), addrs64Bit (64 bit local XBee address), selfIp (IP address of the local device), IPAddrs (IP address of the connecting device), tcpIpConnectionList (a list), xbeeConnectionList (a list), q (a Queue object), and collectionOfTelemetryInfo (a list) +def xbee_connection(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo): + remote = RemoteXBeeDevice(xbeeNode.xbee, XBee64BitAddress.from_hex_string(addrs64Bit)) + timeout = 5 + while (IPAddrs not in tcpIpConnectionList): + try: + msg = xbeeNode.xbee.read_data_from(remote, timeout) + # read message received that is not broadcasted address. Broadcasted address always begins with "0013A2004" + if msg is not None: + lock.acquire() + if q.full(): + q.get() + q.put(msg.remote_device) + if remote not in q.queue: + lock.release() + break + else: + q.put(msg.remote_device) + + msg = msg.data.decode() + key = "info:" + if msg[0:len(key)] == key: + d = msg[len(key):len(msg)] + data = json.loads(d) + #combining the telemetry info received with locally known telemetry info + for k, v in data.items(): + if k != selfIp: + collectionOfTelemetryInfo[k] = v + lock.release() + except Exception as e: + print(e) + break + time.sleep(0.1) + lock.acquire() + xbeeConnectionList.remove(IPAddrs) + lock.release() + print("(Xbee) Threaded connection with " + IPAddrs + " stopped.") + print("(Xbee) List of connected device: " + str(xbeeConnectionList)) + +# a threading function that takes the an XbeeNode object and a collectionOfTelemetryInfo (a list) as its parameters and broadcasts the collectionOfTelemetryInfo every second via XBee +def xbee_broadcast_telemetry(xbeeNode, collectionOfTelemetryInfo): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_telemetry(collectionOfTelemetryInfo) + time.sleep(1) + +# a threading function that the user can interact with when prompt for a command +# Here are the list of commands and the kind of response the user receives: +# "data" - outputs a list of telemetry information of the connected devices in the network +# "tcpip" - outputs a list of IP addresses of the connected devices in the network that are communicating via TCP/IP +# "xbee" - outputs a list of IP addresses of the connected devices in the network that are communicating via XBee +# "help" - outputs the list of commands to use +def user_prompt(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList): + time.sleep(2) + while True: + userInput = input("Please enter a command: ") + print(userInput) + if userInput == 'data': + print(collectionOfTelemetryInfo) + elif userInput == "tcpip": + print(tcpIpConnectionList) + elif userInput == "xbee": + print(xbeeConnectionList) + elif userInput == "help": + print("Here are the commands to use:\n data\n tcpip\n xbee\n") + else: + print("Sorry this is not a valid command. Try again.") + +# a temporary threading function that generate fake telemetry information (used for testing purposes). The correct telemetry information should be obtained from the marvelmind devices. This function shall be changed or removed in the future!!!!!! +def counter(lock, collectionOfTelemetryInfo, ipAddrs): + i = 0 + while True: + lock.acquire() + collectionOfTelemetryInfo[ipAddrs] = str(i) + lock.release() + i+=1 + time.sleep(5) + +# a function that starts all the thraeding functions +def start(): + broadcastPort = 8025 + tcpIpPort = 8026 + IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') + lock = threading.Lock() + b = Broadcast(broadcastPort) + x = XbeeNode() + queueSize = 10 + q = Queue(queueSize) + tcpIpConnectionList = [] + xbeeConnectionList = [] + collectionOfTelemetryInfo = {} + broadcastingThread = threading.Thread(target=broadcasting, args=(b, tcpIpPort, )) + establishTcpIpConnectionThread = threading.Thread(target=establish_tcp_ip_connection, args=(lock, b, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + tcpIpConnectionListenerThread = threading.Thread(target=tcp_ip_connection_listener, args=(lock, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + xbeeBroadcastingThread = threading.Thread(target=xbee_broadcast, args=(x, IPAddrs, )) + xbeeConnectionMakerThread = threading.Thread(target=xbee_connection_maker, args=(lock, x, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, IPAddrs, )) + xbeeBroadcastTelemetryThread = threading.Thread(target=xbee_broadcast_telemetry, args=(x, collectionOfTelemetryInfo, )) + counterThread = threading.Thread(target=counter, args=(lock, collectionOfTelemetryInfo, IPAddrs)) + userPromptThread = threading.Thread(target=user_prompt, args=(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList, )) + print("Local IP Address: " + IPAddrs) + print("Establishing connections...") + broadcastingThread.start() + establishTcpIpConnectionThread.start() + tcpIpConnectionListenerThread.start() + xbeeBroadcastingThread.start() + xbeeConnectionMakerThread.start() + xbeeBroadcastTelemetryThread.start() + counterThread.start() + userPromptThread.start() + + +start() \ No newline at end of file diff --git a/Domino/CPS Network/xbeeNode.py b/Domino/CPS Network/xbeeNode.py new file mode 100644 index 0000000..7bf2719 --- /dev/null +++ b/Domino/CPS Network/xbeeNode.py @@ -0,0 +1,29 @@ +from digi.xbee.devices import * +import json + +class XbeeNode: + # the constructor method initializes and creates a new xbee xbee device object + def __init__(self): + self.xbee = XBeeDevice("/dev/ttyUSB0", 9600) + self.xbee.open() + self.xbeeAddrs = str(self.xbee.get_64bit_addr()) + timeout = 100000000 + self.xbee.set_sync_ops_timeout(timeout) + + # this method takes an IP address as an argument and sends that IP address to the broadcast of the xbee network + def xbee_broadcast_addrs(self, ipAddrs): + addrs = "addr:" + self.xbeeAddrs + ":" + ipAddrs + self.xbee.send_data_broadcast(addrs) + + # this method takes some telemetry information as an argument and sends it to the broadcast of the xbee network + def xbee_broadcast_telemetry(self, telemetry): + t = json.dumps(telemetry) + msg = "info:" + t + self.xbee.send_data_broadcast(msg) + + # this method is used to read data from an xbee module + def xbee_connection_listener(self): + msg = self.xbee.read_data() + return msg + + diff --git a/Domino/Xbee/Domino_Xbee_Connection.py b/Domino/Xbee/Domino_Xbee_Connection.py new file mode 100644 index 0000000..d608330 --- /dev/null +++ b/Domino/Xbee/Domino_Xbee_Connection.py @@ -0,0 +1,106 @@ +from digi.xbee.devices import * +from threading import Thread, Lock +from time import sleep +import json +from queue import Queue + +xbee = XBeeDevice("/dev/ttyUSB0", 9600) +xbee.open() + +address = str(xbee.get_64bit_addr()) +print("Local 64 bit address: " + address) +print("Waiting for connection...") + +addrsBook = [] +list_of_conn = [] +queueSize = 10 +q = Queue(queueSize) + +#global telemetry info variable/dictionary +telemetryInfo = {} + +timeout = 100000000 #set timeout to infinite or None +xbee.set_sync_ops_timeout(timeout) #allows for xbee sent_data method to run forever, Prevents "Packet listener is not running" error + +def broadcasting(localXbee, localAddress): + while True: + #broadcasting local 64 bit address + localAddr = "addr:" + localAddress + localXbee.send_data_broadcast(localAddr) + sleep(1) +## print(addrsBook) +## print(telemetryInfo) + + +def connection_listener(localXbee, lock): + while True: + msg = localXbee.read_data() + # read messages that begins with "0013A2004" (it means that its an address to connect to) and take only address that is not already connected to + if msg is not None: + data = msg.data.decode() + key = "addr:" + addr = data[len(key):len(data)] # actual message with key removed + if data[0:len(key)] == key and addr not in addrsBook: + addrsBook.append(addr) + print("Connected to: " + addr) + conn = Thread(target = threaded_connection, args = (localXbee, addr, lock, )) +## list_of_conn.append(conn) + conn.start() + sleep(0.5) + +# threaded location broadcasting method +def broadcast_location(localXbee, localAddress): + while True: + telemetryInfo[localAddress] = "domino" # add or update new local position to telemtryInfo + temp = list(telemetryInfo.items()) + for key, value in temp: + info = 'info:{"%s":"%s"}' % (key, value) + localXbee.send_data_broadcast(info) + sleep(1) + +def threaded_connection(localXbee, deviceAddr, lock): + remote = RemoteXBeeDevice(localXbee, XBee64BitAddress.from_hex_string(deviceAddr)) + timeout = 5 # in seconds + while True: + lock.acquire() + try: + d = localXbee.read_data_from(remote, timeout) + # read message received that is not broadcasted address. Broadcasted address always begins with "0013A2004" + if d is not None: + if q.full(): + q.get() + q.put(d.remote_device) + if remote not in q.queue: + break + else: + q.put(d.remote_device) + + data = d.data.decode() + key = "info:" + if data[0:len(key)] == key: + m = data[len(key):len(data)] + m_as_dict = json.loads(m) + #combining the telemetry info received with locally known telemetry info + telemetryInfo.update(m_as_dict) + except Exception as e: + print(e) + break + lock.release() + sleep(0.1) + addrsBook.remove(deviceAddr) + telemetryInfo.clear() + print("Threaded connection with " + deviceAddr + " stopped.") + print("List of connected device: " + str(addrsBook)) + lock.release() + sleep(0.1) + +lock = Lock() +bCastLocalAddr = Thread(target = broadcasting, args = (xbee, address, )) +connListener = Thread(target = connection_listener, args = (xbee, lock, )) +bCastLocation = Thread(target = broadcast_location, args = (xbee, address, )) +bCastLocalAddr.start() +connListener.start() +bCastLocation.start() + + +xbee.close() \ No newline at end of file diff --git a/Kro/CPS Network/broadcast.py b/Kro/CPS Network/broadcast.py new file mode 100644 index 0000000..d42513d --- /dev/null +++ b/Kro/CPS Network/broadcast.py @@ -0,0 +1,29 @@ +import socket + +class Broadcast: + # the constructor method that takes port number as an argument + def __init__(self, port): + self.port = port + # socket that is used for broadcasting + self.broadcaster = self.broadcast_socket() + # socket used to receive message from boradcast + self.broadcastReceiver = self.broadcast_socket() + self.broadcastReceiver.bind(('',self.port)) + + # creating a broadcast socket + def broadcast_socket(self): + # Initialize socket + b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + # Enable broadcast mode + b.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + return b + + # send to broadcast method + def send_to_broadcast(self, message): + broadcastIP = '' + self.broadcaster.sendto(message.encode(), (broadcastIP, self.port)) + + # receive from braodcast method + def read_from_broadcast(self): + data, addr = self.broadcastReceiver.recvfrom(1024) + return data, addr \ No newline at end of file diff --git a/Kro/CPS Network/tcpIPNode.py b/Kro/CPS Network/tcpIPNode.py new file mode 100644 index 0000000..e3325b9 --- /dev/null +++ b/Kro/CPS Network/tcpIPNode.py @@ -0,0 +1,23 @@ +import socket +from subprocess import check_output + +class TCP_IP_Node: + # the contructor method takes port number as an argument and intializes the port, IPAddrs and tcpSocket attributes. + def __init__(self, port): + self.port = port + self.IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') # the IPAddrs attribute is bind IP address of the local device + self.tcpSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # creates a new socket object using the TCP protocol for network communication + + # method that makes the tcp socket a client that connects to the server of a given IP address and port number + def tcp_ip_connect(self, IPAddrs, port): + self.tcpSocket.connect((IPAddrs, port)) + + # method that enables the tcp socket to listen to and accept connections. In other words, making the tcp socket a server socket that listens for incoming connections from clients + def tcp_ip_accept(self): + self.tcpSocket.listen(1) + client, clientAddrs = self.tcpSocket.accept() + return client, clientAddrs + + # method that binds an IP address and port to the tcp socket before the socket can listen and accept connections + def tcp_ip_bind(self): + self.tcpSocket.bind((self.IPAddrs, self.port)) \ No newline at end of file diff --git a/Kro/CPS Network/transponderManager.py b/Kro/CPS Network/transponderManager.py new file mode 100644 index 0000000..c1b0388 --- /dev/null +++ b/Kro/CPS Network/transponderManager.py @@ -0,0 +1,244 @@ +from broadcast import Broadcast +from tcpIPNode import TCP_IP_Node +from xbeeNode import XbeeNode +from subprocess import check_output +from queue import Queue +from digi.xbee.devices import * +import time +import threading +import socket +import json + +# a threading function that takes a Broadcast object and a port number as its arguments and broadcast the port number every second +def broadcasting(broadcast, port): + while True: + msg = str(port) + broadcast.send_to_broadcast(msg) + time.sleep(1) + +# a threading function that creates TCP/IP connections with other devices on the network that are not already connected by finding the devices on the network via braodcast +# the function takes the following arguments: lock (the lock from the threading module), broadcast (a Broadcsat object), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def establish_tcp_ip_connection(lock, broadcast, port, tcpIpConnectionList, collectionOfTelemetryInfo): + while True: + msg, addrs = broadcast.read_from_broadcast() + connectingIPAddrs = addrs[0] + connectingPort = int(msg.decode()) + tcpIPNode = TCP_IP_Node(port) + if (tcpIPNode.IPAddrs != connectingIPAddrs) and (connectingIPAddrs not in tcpIpConnectionList): + try: + tcpIPNode.tcp_ip_connect(connectingIPAddrs, connectingPort) + lock.acquire() + tcpIpConnectionList.append(connectingIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + connectingIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(tcpIPNode.tcpSocket, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, tcpIPNode.tcpSocket, tcpIPNode.IPAddrs, connectingIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + except ConnectionRefusedError: + print("(TCP/IP) Connection Refused From " + connectingIPAddrs) + time.sleep(0.1) + +# a threading function that acts as a server that listens for connections and accepts connections from clients that are not yet already connected via TCP/IP +# the function takes the following arguments: lock (the lock from the threading module), port (a port number), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def tcp_ip_connection_listener(lock, port, tcpIpConnectionList, collectionOfTelemetryInfo): + tcpIPNode = TCP_IP_Node(port) + tcpIPNode.tcp_ip_bind() + while True: + client, clientAddrs = tcpIPNode.tcp_ip_accept() + clientIPAddrs = clientAddrs[0] + if clientIPAddrs not in tcpIpConnectionList: + lock.acquire() + tcpIpConnectionList.append(clientIPAddrs) + lock.release() + print("(TCP/IP) Connected to: " + clientIPAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + sendThread = threading.Thread(target=telemetry_send_via_tcp_ip, args=(client, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + receiveThread = threading.Thread(target=telemetry_receive_via_tcp_ip, args=(lock, client, tcpIPNode.IPAddrs, clientIPAddrs, tcpIpConnectionList, collectionOfTelemetryInfo, )) + sendThread.start() + receiveThread.start() + else: + client.close() + time.sleep(0.1) + +# a threading function that send telemetry information to a given IP address every half a second via TCP/IP while the device of the IP address is still connected to the network +# the function takes the following arguments: tcpIpNode (a TCP_IP_Node object), clientIp (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_send_via_tcp_ip(tcpIpNode, clientIp, tcpIpConnectionList, collectionOfTelemetryInfo): + while clientIp in tcpIpConnectionList: + msg = json.dumps(collectionOfTelemetryInfo) + try: + tcpIpNode.send(msg.encode()) + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + time.sleep(0.5) + +# a threading function that listens for and receives telemetry information from a connected device on the network via TCP/IP +# the thread stops when no information is recieved from the connected device within 5 seconds, signaling a disconnection +# the function takes the following arguments: lock (the lock from the threading module), tcpIpNode (a TCP_IP_Node object), selfIp (IP address of the local device), ipAddrs (IP address of the connecting device), tcpIpConnectionList (a list), and collectionOfTelemetryInfo (a list) +def telemetry_receive_via_tcp_ip(lock, tcpIpNode, selfIp, ipAddrs, tcpIpConnectionList, collectionOfTelemetryInfo): + while ipAddrs in tcpIpConnectionList: + tcpIpNode.settimeout(5) + try: + data = tcpIpNode.recv(1024) + if not data: + break + data = data.decode().split('}', 1)[0] + '}' + data = json.loads(data) + for k, v in data.items(): + if k != selfIp: + lock.acquire() + collectionOfTelemetryInfo[k] = v + lock.release() + except (ConnectionResetError, BrokenPipeError, socket.timeout): + break + lock.acquire() + tcpIpConnectionList.remove(ipAddrs) + lock.release() + print("(TCP/IP) Disconnected from: " + ipAddrs) + print("(TCP/IP) Number of connected device: " + str(len(tcpIpConnectionList))) + +# a threading function that takes the an XbeeNode object and an IP address as its parameters and broadcasts the IP address every second via XBee +def xbee_broadcast(xbeeNode, ipAddrs): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_addrs(ipAddrs) + time.sleep(1) + +# a threading function that listens for and recieves message from the XBee module. If the message is an IP address, form a connection with device of the IP address if not already connected +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), xbeeConnectionList (a list), tcpIpConnectionList (a list), q (a Queue object), collectionOfTelemetryInfo (a list), and selfIp (IP address of the local device) +def xbee_connection_maker(lock, xbeeNode, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, selfIp): + time.sleep(2) + while True: + msg = xbeeNode.xbee_connection_listener() + if msg is not None: + msg = msg.data.decode() + key = "addr:" + addrs = msg[len(key):len(msg)] + separation = addrs.index(":") + addrs64Bit = addrs[0:separation] + IPAddrs = addrs[(separation + 1):len(addrs)] + if (msg[0:len(key)] == key) and (IPAddrs not in xbeeConnectionList) and (IPAddrs not in tcpIpConnectionList): + lock.acquire() + xbeeConnectionList.append(IPAddrs) + lock.release() + print("(Xbee) Connected to: " + IPAddrs) + print("(Xbee) Number of connected device: " + str(len(xbeeConnectionList))) + xbeeThreadedConnection = threading.Thread(target=xbee_connection, args=(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo, )) + xbeeThreadedConnection.start() + time.sleep(0.5) + +# the threading function that establishes a connection with another device discovered on the network. While connected the function also listens for and receives telemetry information from the connected XBee device +# the thread stops when the connection stopped or some sort of disconnection signal was received +# the function takes the following arguments: lock (the lock from the threading module), xbeeNode (an XbeeNode object), addrs64Bit (64 bit local XBee address), selfIp (IP address of the local device), IPAddrs (IP address of the connecting device), tcpIpConnectionList (a list), xbeeConnectionList (a list), q (a Queue object), and collectionOfTelemetryInfo (a list) +def xbee_connection(lock, xbeeNode, addrs64Bit, selfIp, IPAddrs, tcpIpConnectionList, xbeeConnectionList, q, collectionOfTelemetryInfo): + remote = RemoteXBeeDevice(xbeeNode.xbee, XBee64BitAddress.from_hex_string(addrs64Bit)) + timeout = 5 + while (IPAddrs not in tcpIpConnectionList): + try: + msg = xbeeNode.xbee.read_data_from(remote, timeout) + # read message received that is not broadcasted address. Broadcasted address always begins with "0013A2004" + if msg is not None: + lock.acquire() + if q.full(): + q.get() + q.put(msg.remote_device) + if remote not in q.queue: + lock.release() + break + else: + q.put(msg.remote_device) + + msg = msg.data.decode() + key = "info:" + if msg[0:len(key)] == key: + d = msg[len(key):len(msg)] + data = json.loads(d) + #combining the telemetry info received with locally known telemetry info + for k, v in data.items(): + if k != selfIp: + collectionOfTelemetryInfo[k] = v + lock.release() + except Exception as e: + print(e) + break + time.sleep(0.1) + lock.acquire() + xbeeConnectionList.remove(IPAddrs) + lock.release() + print("(Xbee) Threaded connection with " + IPAddrs + " stopped.") + print("(Xbee) List of connected device: " + str(xbeeConnectionList)) + +# a threading function that takes the an XbeeNode object and a collectionOfTelemetryInfo (a list) as its parameters and broadcasts the collectionOfTelemetryInfo every second via XBee +def xbee_broadcast_telemetry(xbeeNode, collectionOfTelemetryInfo): + time.sleep(2) + while True: + xbeeNode.xbee_broadcast_telemetry(collectionOfTelemetryInfo) + time.sleep(1) + +# a threading function that the user can interact with when prompt for a command +# Here are the list of commands and the kind of response the user receives: +# "data" - outputs a list of telemetry information of the connected devices in the network +# "tcpip" - outputs a list of IP addresses of the connected devices in the network that are communicating via TCP/IP +# "xbee" - outputs a list of IP addresses of the connected devices in the network that are communicating via XBee +# "help" - outputs the list of commands to use +def user_prompt(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList): + time.sleep(2) + while True: + userInput = input("Please enter a command: ") + print(userInput) + if userInput == 'data': + print(collectionOfTelemetryInfo) + elif userInput == "tcpip": + print(tcpIpConnectionList) + elif userInput == "xbee": + print(xbeeConnectionList) + elif userInput == "help": + print("Here are the commands to use:\n data\n tcpip\n xbee\n") + else: + print("Sorry this is not a valid command. Try again.") + +# a temporary threading function that generate fake telemetry information (used for testing purposes). The correct telemetry information should be obtained from the marvelmind devices. This function shall be changed or removed in the future!!!!!! +def counter(lock, collectionOfTelemetryInfo, ipAddrs): + i = 0 + while True: + lock.acquire() + collectionOfTelemetryInfo[ipAddrs] = str(i) + lock.release() + i+=1 + time.sleep(5) + +# a function that starts all the thraeding functions +def start(): + broadcastPort = 8025 + tcpIpPort = 8026 + IPAddrs = check_output(['hostname', '-I']).decode().strip(' \n') + lock = threading.Lock() + b = Broadcast(broadcastPort) + x = XbeeNode() + queueSize = 10 + q = Queue(queueSize) + tcpIpConnectionList = [] + xbeeConnectionList = [] + collectionOfTelemetryInfo = {} + broadcastingThread = threading.Thread(target=broadcasting, args=(b, tcpIpPort, )) + establishTcpIpConnectionThread = threading.Thread(target=establish_tcp_ip_connection, args=(lock, b, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + tcpIpConnectionListenerThread = threading.Thread(target=tcp_ip_connection_listener, args=(lock, tcpIpPort, tcpIpConnectionList, collectionOfTelemetryInfo, )) + xbeeBroadcastingThread = threading.Thread(target=xbee_broadcast, args=(x, IPAddrs, )) + xbeeConnectionMakerThread = threading.Thread(target=xbee_connection_maker, args=(lock, x, xbeeConnectionList, tcpIpConnectionList, q, collectionOfTelemetryInfo, IPAddrs, )) + xbeeBroadcastTelemetryThread = threading.Thread(target=xbee_broadcast_telemetry, args=(x, collectionOfTelemetryInfo, )) + counterThread = threading.Thread(target=counter, args=(lock, collectionOfTelemetryInfo, IPAddrs)) + userPromptThread = threading.Thread(target=user_prompt, args=(collectionOfTelemetryInfo, tcpIpConnectionList, xbeeConnectionList, )) + print("Local IP Address: " + IPAddrs) + print("Establishing connections...") + broadcastingThread.start() + establishTcpIpConnectionThread.start() + tcpIpConnectionListenerThread.start() + xbeeBroadcastingThread.start() + xbeeConnectionMakerThread.start() + xbeeBroadcastTelemetryThread.start() + counterThread.start() + userPromptThread.start() + + +start() \ No newline at end of file diff --git a/Kro/CPS Network/xbeeNode.py b/Kro/CPS Network/xbeeNode.py new file mode 100644 index 0000000..7bf2719 --- /dev/null +++ b/Kro/CPS Network/xbeeNode.py @@ -0,0 +1,29 @@ +from digi.xbee.devices import * +import json + +class XbeeNode: + # the constructor method initializes and creates a new xbee xbee device object + def __init__(self): + self.xbee = XBeeDevice("/dev/ttyUSB0", 9600) + self.xbee.open() + self.xbeeAddrs = str(self.xbee.get_64bit_addr()) + timeout = 100000000 + self.xbee.set_sync_ops_timeout(timeout) + + # this method takes an IP address as an argument and sends that IP address to the broadcast of the xbee network + def xbee_broadcast_addrs(self, ipAddrs): + addrs = "addr:" + self.xbeeAddrs + ":" + ipAddrs + self.xbee.send_data_broadcast(addrs) + + # this method takes some telemetry information as an argument and sends it to the broadcast of the xbee network + def xbee_broadcast_telemetry(self, telemetry): + t = json.dumps(telemetry) + msg = "info:" + t + self.xbee.send_data_broadcast(msg) + + # this method is used to read data from an xbee module + def xbee_connection_listener(self): + msg = self.xbee.read_data() + return msg + + diff --git a/Kro/Xbee/Kro_Xbee_Connection.py b/Kro/Xbee/Kro_Xbee_Connection.py new file mode 100644 index 0000000..dba35c3 --- /dev/null +++ b/Kro/Xbee/Kro_Xbee_Connection.py @@ -0,0 +1,102 @@ +from digi.xbee.devices import * +from threading import Thread, Lock +from time import sleep +import json +from queue import Queue + +xbee = XBeeDevice("/dev/ttyUSB0", 9600) +xbee.open() + +address = str(xbee.get_64bit_addr()) +print("Local 64 bit address: " + address) +print("Waiting for connection...") + +addrsBook = [] +list_of_conn = [] +queueSize = 10 +q = Queue(queueSize) + +#global telemetry info variable/dictionary +telemetryInfo = {} + +timeout = 100000000 # set timeout to infinite or None +xbee.set_sync_ops_timeout(timeout) # allows for xbee sent_data() method to run forever. Prevents "Packet listener is not running" error + +def broadcasting(localXbee, localAddress): + while True: + localAddr = "addr:" + localAddress + localXbee.send_data_broadcast(localAddr) + sleep(1) +# print(addrsBook) +# print(telemetryInfo) + +def connection_listener(localXbee, lock): + while True: + msg = localXbee.read_data() + if msg is not None: + data = msg.data.decode() + key = "addr:" + addr = data[len(key):len(data)] + if data[0:len(key)] == key and addr not in addrsBook: + addrsBook.append(addr) + print("Conncted to: " + addr) + conn = Thread(target = threaded_connection, args = (localXbee, addr, lock, )) +# list_of_conn.append(conn) + conn.start() + sleep(0.5) + +def broadcast_location(localXbee, localAddress): + while True: + #add or update new local position to telemetryInfo + telemetryInfo[localAddress] = "kro" + temp = list(telemetryInfo.items()) + for key, value in temp: + info = 'info:{"%s":"%s"}' % (key, value) + localXbee.send_data_broadcast(info) + sleep(1) + +def threaded_connection(localXbee, deviceAddr, lock): + remote = RemoteXBeeDevice(localXbee, XBee64BitAddress.from_hex_string(deviceAddr)) + timeout = 5 # in seconds + while True: + lock.acquire() + try: + d = localXbee.read_data_from(remote, timeout) + if (d is not None): + if q.full(): + q.get() + q.put(d.remote_device) + if remote not in q.queue: + break + else: + q.put(d.remote_device) + + data = d.data.decode() + key = "info:" + if data[0:len(key)] == key: + m = data[len(key):len(data)] + m_as_dict = json.loads(m) + #combining the telemetry info received with locally known telemetry info + telemetryInfo.update(m_as_dict) + except Exception as e: + print(e) + break + lock.release() + sleep(0.1) + addrsBook.remove(deviceAddr) + telemetryInfo.clear() + print(" Threaded connection with " + deviceAddr + " stopped") + print("List of connected device: " + str(addrsBook)) + lock.release() + sleep(0.1) + +lock = Lock() +bCastLocalAddr = Thread(target = broadcasting, args = (xbee, address, )) +connListener = Thread(target = connection_listener, args = (xbee, lock, )) +bCastLocation = Thread(target = broadcast_location, args = (xbee, address, )) +bCastLocalAddr.start() +connListener.start() +bCastLocation.start() + + +xbee.close() \ No newline at end of file