diff --git a/.gitignore b/.gitignore index 51cbe85..9b0255b 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,5 @@ coverage.xml # Sphinx documentation docs/_build/ +# Jetbrains IDE +.idea \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d46b949 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "auto_version"] + path = auto_version + url = https://github.com/moble/auto_version.git diff --git a/README.md b/README.md index 3dae46a..58621a0 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,16 @@ jsonsocket ========== -This is a small Python library for sending data over sockets. +This is a single-file Python3 library for sending json-like data over sockets. -It allows sending lists, dictionaries, strings, etc. It can handle very large data (I've tested it with 10GB of data). Any JSON-serializable data is accepted. +(Original repos was Python2, forked it to support Python3) -Examples: +It allows sending lists, dictionaries, strings, etc. Any JSON-serializable data is accepted. -```python -from jsonsocket import Client, Server - -host = 'localhost' -port = '8000' - -# Client code: -client = Client() -client.connect(host, port).send({'some_list': [123, 456]}) -response = client.recv() -# response now is {'data': {'some_list': [123, 456]}} -client.close() - - -# Server code: -server = Server(host, port) -server.accept() -data = server.recv() -# data now is: {'some_list': [123, 456]} -server.send({'data': data}).close() +Example & test: +```bash +$python3 server_test.py & +$python3 client_test.py ``` + diff --git a/auto_version b/auto_version new file mode 160000 index 0000000..b9da163 --- /dev/null +++ b/auto_version @@ -0,0 +1 @@ +Subproject commit b9da1638e033b0f6266be56898f6ff4dfa9d8363 diff --git a/jsonsocket.py b/jsonsocket.py deleted file mode 100644 index 9dfe7a6..0000000 --- a/jsonsocket.py +++ /dev/null @@ -1,134 +0,0 @@ -import json, socket - -class Server(object): - """ - A JSON socket server used to communicate with a JSON socket client. All the - data is serialized in JSON. How to use it: - - server = Server(host, port) - while True: - server.accept() - data = server.recv() - # shortcut: data = server.accept().recv() - server.send({'status': 'ok'}) - """ - - backlog = 5 - client = None - - def __init__(self, host, port): - self.socket = socket.socket() - self.socket.bind((host, port)) - self.socket.listen(self.backlog) - - def __del__(self): - self.close() - - def accept(self): - # if a client is already connected, disconnect it - if self.client: - self.client.close() - self.client, self.client_addr = self.socket.accept() - return self - - def send(self, data): - if not self.client: - raise Exception('Cannot send data, no client is connected') - _send(self.client, data) - return self - - def recv(self): - if not self.client: - raise Exception('Cannot receive data, no client is connected') - return _recv(self.client) - - def close(self): - if self.client: - self.client.close() - self.client = None - if self.socket: - self.socket.close() - self.socket = None - - -class Client(object): - """ - A JSON socket client used to communicate with a JSON socket server. All the - data is serialized in JSON. How to use it: - - data = { - 'name': 'Patrick Jane', - 'age': 45, - 'children': ['Susie', 'Mike', 'Philip'] - } - client = Client() - client.connect(host, port) - client.send(data) - response = client.recv() - # or in one line: - response = Client().connect(host, port).send(data).recv() - """ - - socket = None - - def __del__(self): - self.close() - - def connect(self, host, port): - self.socket = socket.socket() - self.socket.connect((host, port)) - return self - - def send(self, data): - if not self.socket: - raise Exception('You have to connect first before sending data') - _send(self.socket, data) - return self - - def recv(self): - if not self.socket: - raise Exception('You have to connect first before receiving data') - return _recv(self.socket) - - def recv_and_close(self): - data = self.recv() - self.close() - return data - - def close(self): - if self.socket: - self.socket.close() - self.socket = None - - -## helper functions ## - -def _send(socket, data): - try: - serialized = json.dumps(data) - except (TypeError, ValueError), e: - raise Exception('You can only send JSON-serializable data') - # send the length of the serialized data first - socket.send('%d\n' % len(serialized)) - # send the serialized data - socket.sendall(serialized) - -def _recv(socket): - # read the length of the data, letter by letter until we reach EOL - length_str = '' - char = socket.recv(1) - while char != '\n': - length_str += char - char = socket.recv(1) - total = int(length_str) - # use a memoryview to receive the data chunk by chunk efficiently - view = memoryview(bytearray(total)) - next_offset = 0 - while total - next_offset > 0: - recv_size = socket.recv_into(view[next_offset:], total - next_offset) - next_offset += recv_size - try: - deserialized = json.loads(view.tobytes()) - except (TypeError, ValueError), e: - raise Exception('Data received was not in JSON format') - return deserialized diff --git a/jsonsocket/__init__.py b/jsonsocket/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jsonsocket/constants.py b/jsonsocket/constants.py new file mode 100644 index 0000000..730460e --- /dev/null +++ b/jsonsocket/constants.py @@ -0,0 +1,4 @@ +import sys + +broadcast_address = "" +python3 = sys.version_info[0]>=3 \ No newline at end of file diff --git a/jsonsocket/errors.py b/jsonsocket/errors.py new file mode 100644 index 0000000..f76fdb2 --- /dev/null +++ b/jsonsocket/errors.py @@ -0,0 +1,22 @@ +class IncorrectLength(Exception): + def __init__(self,message,length): + super(IncorrectLength, self).__init__("Incorrect transmitted length") + self.content = message + self.length = length + + +class NoClient(Exception): + def __init__(self): + super(NoClient, self).__init__('Cannot send data, no client is connected') + + +class ConnectFirst(Exception): + def __init__(self): + super(ConnectFirst, self).__init__('You have to connect first before sending data') + + +class AddressAlreadyInUse(Exception): + def __init__(self, host, port): + super(AddressAlreadyInUse, self).__init__("Address %s:%i already in use" % (host, port)) + self.host = host + self.port = port \ No newline at end of file diff --git a/jsonsocket/helpers.py b/jsonsocket/helpers.py new file mode 100644 index 0000000..b81fd01 --- /dev/null +++ b/jsonsocket/helpers.py @@ -0,0 +1,69 @@ +import sys +from time import time + +from jsonsocket.constants import python3 +from jsonsocket.errors import IncorrectLength +from jsonsocket.serialize import serialize, deserialize +from socket import timeout as tmout + + +class TimeoutError(Exception): + pass + + +def send(socket, data, socket_type="tcp", *args): + serialized = serialize(data) + # send the length of the serialized data first + message = '{}\n'.format(len(serialized)).encode('utf-8') + + if socket_type == "tcp": + socket.send(message) + # send the serialized data + socket.sendall(serialized) + elif socket_type == "udp": + padding = "\n" + if python3: + padding = bytes(padding, "utf-8") + content = message + serialized + padding + socket.sendto(content, args) + + +def receive(socket, socket_type="tcp", timeout=None, skip_size_info=False): + # read the length of the data, letter by letter until we reach EOL + length_str = '' + tcp = socket_type == "tcp" + t0 = time() + if tcp: + char = socket.recv(1) + while char != b'\n' and (timeout is None or (timeout is not None and (time() - t0) < timeout)): + length_str += char.decode('utf-8') + char = socket.recv(1) + if length_str == '': + raise TimeoutError("Timeout listening for data") + total = int(length_str) + # use a memoryview to receive the data chunk by chunk efficiently + view = memoryview(bytearray(total)) + next_offset = 0 + while total - next_offset > 0: + recv_size = socket.recv_into(view[next_offset:], total - next_offset) + next_offset += recv_size + deserialized = deserialize(view.tobytes()) + return deserialized + else: + if timeout: + socket.settimeout(timeout) + try: + char, addr = socket.recvfrom(2048 ** 2) + if type(char)==bytes: + char = char.decode("utf-8") + try: + if not skip_size_info: + length, char = char.split("\n")[:2] + if len(char) != int(length): + raise IncorrectLength(char,length) + except ValueError: + pass + deserialized = deserialize(char) + return deserialized, addr + except tmout: + raise TimeoutError("Timout listening for data") diff --git a/jsonsocket/serialize.py b/jsonsocket/serialize.py new file mode 100644 index 0000000..18de3c5 --- /dev/null +++ b/jsonsocket/serialize.py @@ -0,0 +1,95 @@ +import json +import struct +from base64 import b64encode, b64decode + +use_numpy, use_pandas = True, True +try: + import numpy as np +except ImportError: + use_numpy = False + +try: + import pandas as pd +except ImportError: + use_pandas = False + +complex_number_length = len(b64encode(struct.pack("ff", 0, 0)).decode("utf-8")) + 1 +complex1 = np.array([1, 1j]) + + +class BetterEncoder(json.JSONEncoder): + def default(self, obj): + if use_numpy and isinstance(obj, np.ndarray): + if "complex" in str(obj.dtype): + shape = obj.shape + values = obj.ravel() + values = np.concatenate([np.real(values), np.imag(values)]).reshape((2, len(values))).T.ravel() + encoded = b64encode(struct.pack("f" * len(values), *values)).decode("utf-8") + res = dict(_decode_type="numpy_complex", _shape=shape, _content=encoded) + return res + return obj.tolist() + + if use_pandas: + typename = obj.__class__.__name__ + res = None + if isinstance(obj, pd.DataFrame): + content = dict(zip(obj.columns, obj.values.T.tolist())) + for key, value in content.items(): + unique = np.unique(value) + if len(unique) == 1: + content[key] = unique[0] + res = dict(_decode_type=typename, _content=content) + elif isinstance(obj, pd.Series): + res = dict(_decode_type=typename, _content=obj.to_dict()) + if res: + return res + if "int" == type(obj).__name__[:3]: + return int(obj) + if "float" == type(obj).__name__[:5]: + return float(obj) + if "complex" == type(obj).__name__[:7]: + encoded = b64encode(struct.pack("ff", np.real(obj), np.imag(obj))) + encoded = "c" + encoded.decode("utf-8") + return encoded + return json.JSONEncoder.default(self, obj) + + +class BetterDecoder(json.JSONDecoder): + def __init__(self, *args, **kwargs): + super(BetterDecoder, self).__init__(object_hook=self.default, *args, **kwargs) + + def default(self, obj): + if type(obj) == str and len(obj) == complex_number_length and obj[0] == "c": + obj = np.array(struct.unpack("ff", b64decode(obj[1:]))) + if "_decode_type" in obj and obj["_decode_type"] == "numpy_complex": + content = obj["_content"] + shape = obj["_shape"] + content = b64decode(content) + content = struct.unpack("f" * (len(content) // 4), content) + content = np.reshape(content, (-1, 2)) + content = content * complex1[None, :] + content = np.sum(content,axis=1) + content = np.reshape(content, shape) + return content + if use_pandas and isinstance(obj, dict): + if "_decode_type" in obj and "_content" in obj: + obj = eval("pd.%s(obj[\"_content\"])" % obj["_decode_type"]) + return obj + + +def serialize(data): + try: + res = json.dumps(data, cls=BetterEncoder).encode('utf-8') + except (TypeError, ValueError) as e: + raise Exception('You can only send JSON-serializable data. Error is : %s' % e) + return res + + +def deserialize(data): + if type(data) == bytes: + data = data.decode('utf-8') + try: + res = json.loads(data, cls=BetterDecoder) + except (TypeError, ValueError) as e: + raise Exception('Data received was not in JSON format. Error is %s' % str(e)) + return res diff --git a/jsonsocket/tcp.py b/jsonsocket/tcp.py new file mode 100644 index 0000000..5aee03c --- /dev/null +++ b/jsonsocket/tcp.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +import socket +from threading import Thread + +from jsonsocket.errors import NoClient, ConnectFirst +from jsonsocket.helpers import send as _send, receive as _recv, TimeoutError + + +class Server(object): + """ + A JSON socket server used to communicate with a JSON socket client. All the + data is serialized in JSON. How to use it: + + server = Server(host, port) + while True: + server.accept() + data = server.recv() + # shortcut: data = server.accept().recv() + server.send({'status': 'ok'}) + """ + + backlog = 5 + client = None + + def __init__(self, host, port): + self.socket = socket.socket() + self.socket.bind((host, port)) + self.host=host + self.port=port + self.socket.listen(self.backlog) + self.__accepting=False + + def __del__(self): + self.close() + + @property + def client_connected(self): + return self.client is not None + + @property + def accepting_connexions(self): + return self.__accepting + + def accept(self): + # if a client is already connected, disconnect it + if self.client: + self.client.close() + self.__accepting = True + self.client, self.client_addr = self.socket.accept() + self.__accepting = False + return self + + def send(self, data): + if not self.client: + raise NoClient() + _send(self.client, data, socket_type="tcp") + return self + + def recv(self, close_on_timeout=False, **kwargs): + if not self.client: + raise NoClient() + try: + res = _recv(self.client, socket_type="tcp", **kwargs) + except TimeoutError: + if close_on_timeout: + self.close() + return None + return res + + def close(self): + if self.client: + self.client.close() + self.client = None + if self.socket: + if self.accepting_connexions: + c = Client() + c.connect("localhost", self.port) + c.close() + self.socket.close() + self.socket = None + + +class Client(object): + """ + A JSON socket client used to communicate with a JSON socket server. All the + data is serialized in JSON. How to use it: + + data = { + 'name': 'Patrick Jane', + 'age': 45, + 'children': ['Susie', 'Mike', 'Philip'] + } + client = Client() + client.connect(host, port) + client.send(data) + response = client.recv() + # or in one line: + response = Client().connect(host, port).send(data).recv() + """ + + socket = None + + def __del__(self): + self.close() + + def connect(self, host, port): + self.socket = socket.socket() + self.socket.connect((host, port)) + return self + + def send(self, data): + if not self.socket: + raise ConnectFirst() + try: + _send(self.socket, data, socket_type="tcp") + except socket.error as e: + if "reset by peer" in e.message: + raise NoClient() + raise + return self + + def recv(self, **kwargs): + if not self.socket: + raise ConnectFirst() + return _recv(self.socket, socket_type="tcp", **kwargs) + + def recv_and_close(self, **kwargs): + data = self.recv(**kwargs) + self.close() + return data + + def close(self): + if self.socket: + self.socket.close() + self.socket = None + + +class ServerAsync(Thread): + def __init__(self, host, port, new_client_callback, new_message_callback, client_disconnect_callback=None, + exception_callback=None, timeout=5): + super(ServerAsync, self).__init__() + self.exception_callback = exception_callback + self.client_disconnect_callback = client_disconnect_callback + self.timeout = timeout + self.new_message_callback = new_message_callback + self.new_client_callback = new_client_callback + self.server = Server(host, port) + self.__running = True + + def stop(self): + self.__running = False + self.server.close() + + def run(self): + try: + self.__running = True + + while self.__running: + self.server.accept() + if not self.__running: break + client_addr = self.server.client_addr + if self.new_client_callback: + self.new_client_callback(client_addr, self) + while 1: + try: + data = self.server.recv(timeout=self.timeout) + except (NoClient, socket.error): + break + if data is not None: + self.new_message_callback(data, self) + else: + break + if self.client_disconnect_callback: + self.client_disconnect_callback(client_addr, self) + except Exception as e: + if self.exception_callback: + self.exception_callback(e) + else: + raise + + def send(self, data): + self.server.send(data) + + @property + def client_addr(self): + return self.server.client_addr + + @property + def client(self): + return self.server.client + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + self.server.close() + self.join() diff --git a/jsonsocket/udp.py b/jsonsocket/udp.py new file mode 100644 index 0000000..c639a6e --- /dev/null +++ b/jsonsocket/udp.py @@ -0,0 +1,71 @@ +import socket + +from jsonsocket.constants import broadcast_address +from jsonsocket.helpers import send as _send, receive as _recv + +from jsonsocket.udp_async import Receiver, Advertiser + + +class UDP(object): + def __init__(self): + self.__threads = [None, None] # receive and advertise + + def receive(self, host, port, timeout=None, **kwargs): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + if host == broadcast_address: + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind((host, port)) + except socket.error: + raise Exception("Address %s:%i already in use" % (host,port)) + data, addr = _recv(s, timeout=timeout,socket_type="udp", **kwargs) + return data, addr + + def send(self, data, ip, port): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + if ip == broadcast_address: + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + _send(s, data, "udp", ip, port) + + @property + def receiving(self): + return self.__threads[1] is not None + + def receive_async(self, host, port, callback, timeout=5): + if self.receiving: + raise Exception("Already receiving") + self.__threads[1] = t = Receiver(host, port, callback, timeout, self) + t.start() + return t + + def stop_receive_async(self): + if not self.receiving: + raise Exception("Not receiving") + t = self.__threads[1] + t.stop() + self.__threads[1] = None + + @property + def advertising(self): + return self.__threads[0] is not None + + def advertise(self, data, period, ip, port): + if self.advertising: + raise ValueError("Already advertising!") + self.__threads[0] = t = Advertiser(ip, port, data, period, self) + t.start() + return t + + def stop_advertising(self): + if not self.advertising: + raise Exception("Not advertising") + t = self.__threads[0] + t.stop() + t.join() + self.__threads[0] = None + + def join(self): + for t in self.__threads: + if t is not None: + t.join() diff --git a/jsonsocket/udp_async.py b/jsonsocket/udp_async.py new file mode 100644 index 0000000..e3ff45f --- /dev/null +++ b/jsonsocket/udp_async.py @@ -0,0 +1,71 @@ +import socket +from threading import Thread +from time import sleep + +import schedule + +from jsonsocket.helpers import TimeoutError + + +class UDPThread(Thread): + def __init__(self, host, port, client): + super(UDPThread, self).__init__() + self.client = client + self.host = host + self.port = port + self.__stop = False + + @property + def stopped(self): + return self.__stop + + def stop(self): + self.__stop = True + + def __del__(self): + self.stop() + self.join() + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + self.join() + + +class Receiver(UDPThread): + def __init__(self, host, port, callback, timeout, client): + super(Receiver, self).__init__(host, port, client) + self.timeout = timeout + self.callback = callback + + def receive(self): + res = self.client.receive(self.host, self.port, timeout=self.timeout) + return res + + def run(self): + while not self.stopped: + res = None + try: + res = self.receive() + except TimeoutError: + pass + if res is not None: + self.callback(res[0], res[1], self.client) + + +class Advertiser(UDPThread): + def __init__(self, host, port, data, interval, client): + super(Advertiser, self).__init__(host, port, client) + self.interval = interval + self.data = data + + def send(self): + self.client.send(self.data, self.host, self.port) + + def run(self): + schedule.every(self.interval).seconds.do(self.send) + while not self.stopped: + schedule.run_pending() + sleep(schedule.idle_seconds()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fee9790 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +schedule \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..7f6b18a --- /dev/null +++ b/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup + +from auto_version import calculate_version + +name = 'jsonsocket' + +version = calculate_version() +setup( + name=name, + version=version, + description='This is a small Python library for sending data over sockets. ', + author='github', + author_email='', + packages=[name], #same as name + requirements=["schedule"] +) \ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_TCP.py b/test/test_TCP.py new file mode 100644 index 0000000..1de3cd5 --- /dev/null +++ b/test/test_TCP.py @@ -0,0 +1,67 @@ +from unittest import TestCase + +from jsonsocket.tcp import ServerAsync, Client + + +def new_client(addr, _srv): + print("New client: %s" % str(addr)) + + +def new_data(data, _srv): + print("New data : %s" % str(data)) + # Echo + _srv.send(data) + + +def disconnect_client(addr, _srv): + print("Client disconnected: %s" % str(addr)) + + +class TestTCP(TestCase): + def test_general(self): + port = 1234 + srv = ServerAsync("localhost", port, new_client, new_data, disconnect_client, timeout=1) + + with srv: + cl = Client() + cl.connect("localhost", port) + cl.send({"text": "Bonjour le monde!"}) + res = cl.recv() + print("Received the following echo:") + print(res) + print("Disconnecting") + cl.close() + + def test_numpy(self): + import numpy as np + port = 1237 + srv = ServerAsync("localhost", port, new_client, new_data, disconnect_client, timeout=1) + + payload = {"numpy": np.linspace(0, 1, 9)} + with srv: + cl = Client() + cl.connect("localhost", port) + cl.send(payload) + res = cl.recv() + print("Received the following echo:") + print(res) + print("Disconnecting") + cl.close() + + def test_pandas(self): + import numpy as np + from pandas import DataFrame + + port = 1236 + srv = ServerAsync("localhost", port, new_client, new_data, disconnect_client, timeout=1) + + payload = DataFrame(dict(A=[1, 2, 3], B=[4, 5, 6])) + with srv: + cl = Client() + cl.connect("localhost", port) + cl.send(payload) + res = cl.recv() + print("Received the following echo:") + print(res) + print("Disconnecting") + cl.close() diff --git a/test/test_UDP.py b/test/test_UDP.py new file mode 100644 index 0000000..9a37ef0 --- /dev/null +++ b/test/test_UDP.py @@ -0,0 +1,44 @@ +from time import sleep +from unittest import TestCase + +from jsonsocket.udp import UDP + + +def new_message(data, addr, _srv): + new_message.n_received += 1 + print("SERVER: Received data:") + print(data) + # Echo + _srv.send(data,*addr) + if new_message.n_received>=4 and _srv.receiving: + _srv.stop_receive_async() + +new_message.n_received=0 + + +class TestUDP(TestCase): + def test_general(self): + port = 1234 + data = {"data": "Bonjour!"} + s = UDP() + c = UDP() + + s.receive_async("0.0.0.0",port,new_message) + sleep(1) + c.send(data, "127.0.0.1", port) + + s.stop_receive_async() + s.join() + + def test_advertize(self): + port = 1234 + data = {"data": "Bonjour!"} + s = UDP() + c = UDP() + + s.receive_async("0.0.0.0", port, new_message) + sleep(1) + c.advertise(data, 1, "127.0.0.1", port) + + s.join() + c.stop_advertising()