forked from Tecnologias-multimedia/InterCom
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintercom_empty.py
More file actions
70 lines (58 loc) · 3.08 KB
/
intercom_empty.py
File metadata and controls
70 lines (58 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Don't send empty bitplanes.
#
# The sender adds to the number of received bitplanes the number of
# skipped (zero) bitplanes of the chunk sent.
# The receiver computes the first received
# bitplane (apart from the bitplane with the signs) and report a
# number of bitplanes received equal to the real number of received
# bitplanes plus the number of skipped bitplanes.
import struct
import numpy as np
from intercom import Intercom
from intercom_dfc import Intercom_DFC
if __debug__:
import sys
class Intercom_empty(Intercom_DFC):
def init(self, args):
Intercom_DFC.init(self, args)
self.ignored_bps = 0
self.packet_format = f"!BHBB{self.frames_per_chunk//8}B"
def receive_and_buffer(self):
message, source_address = self.receiving_sock.recvfrom(Intercom.MAX_MESSAGE_SIZE)
ig_bps, received_chunk_number, received_bitplane_number, self.NORB, *bitplane = struct.unpack(self.packet_format, message)
bitplane = np.asarray(bitplane, dtype=np.uint8)
bitplane = np.unpackbits(bitplane)
bitplane = bitplane.astype(np.uint16)
self._buffer[received_chunk_number % self.cells_in_buffer][:, received_bitplane_number%self.number_of_channels] |= (bitplane << received_bitplane_number//self.number_of_channels)
self.received_bitplanes_per_chunk[received_chunk_number % self.cells_in_buffer] += 1 + ig_bps
return received_chunk_number
def send_bitplane(self, indata, bitplane_number, last_BPTS):
bitplane = (indata[:, bitplane_number%self.number_of_channels] >> bitplane_number//self.number_of_channels) & 1
if not np.any(bitplane) and bitplane_number != last_BPTS:
self.ignored_bps += 1
else:
bitplane = bitplane.astype(np.uint8)
bitplane = np.packbits(bitplane)
message = struct.pack(self.packet_format, self.ignored_bps, self.recorded_chunk_number, bitplane_number, self.received_bitplanes_per_chunk[(self.played_chunk_number+1) % self.cells_in_buffer]+1, *bitplane)
self.sending_sock.sendto(message, (self.destination_IP_addr, self.destination_port))
self.ignored_bps = 0
def send(self, indata):
signs = indata & 0x8000
magnitudes = abs(indata)
indata = signs | magnitudes
self.NOBPTS = int(0.75*self.NOBPTS + 0.25*self.NORB)
self.NOBPTS += 1
if self.NOBPTS > self.max_NOBPTS:
self.NOBPTS = self.max_NOBPTS
last_BPTS = self.max_NOBPTS - self.NOBPTS - 1
self.send_bitplane(indata, self.max_NOBPTS-1, last_BPTS)
self.send_bitplane(indata, self.max_NOBPTS-2, last_BPTS)
for bitplane_number in range(self.max_NOBPTS-3, last_BPTS, -1):
self.send_bitplane(indata, bitplane_number, last_BPTS)
self.recorded_chunk_number = (self.recorded_chunk_number + 1) % self.MAX_CHUNK_NUMBER
if __name__ == "__main__":
intercom = Intercom_empty()
parser = intercom.add_args()
args = parser.parse_args()
intercom.init(args)
intercom.run()