From 5661dc3555700220e7c7432df2adc149fe8cc7a6 Mon Sep 17 00:00:00 2001 From: Xavi Hernandez Date: Fri, 24 Jan 2025 18:36:12 +0100 Subject: [PATCH 1/2] Implement SMB2 LOCK request Signed-off-by: Xavi Hernandez --- src/smbprotocol/open.py | 126 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/src/smbprotocol/open.py b/src/smbprotocol/open.py index 14467d4c..076ae83b 100644 --- a/src/smbprotocol/open.py +++ b/src/smbprotocol/open.py @@ -257,6 +257,19 @@ class WriteFlags: SMB2_WRITEFLAG_WRITE_UNBUFFERED = 0x00000002 +class LockFlags: + """ + [MS-SMB2] v53.0 2017-09-15 + + 2.2.26.1 SMB2 LOCK Request Flags + """ + + SMB2_LOCKFLAG_SHARED_LOCK = 0x1 + SMB2_LOCKFLAG_EXCLUSIVE_LOCK = 0x2 + SMB2_LOCKFLAG_UNLOCK = 0x4 + SMB2_LOCKFLAG_FAIL_IMMEDIATELY = 0x10 + + class QueryDirectoryFlags: """ [MS-SMB2] v53.0 2017-09-15 @@ -766,6 +779,72 @@ def unpack_response(file_information_class, buffer): return query_results +class SMB2LockElement(Structure): + """ + [MS-SMB2] v53.0 2017-09-15 + + 2.2.26.1 SMB2 LOCK_ELEMENT Structure + """ + + def __init__(self): + self.fields = OrderedDict( + [ + ("offset", IntField(size=8)), + ("length", IntField(size=8, unsigned=False)), + ("flags", FlagField(size=4, flag_type=LockFlags)), + ("reserved", IntField(size=4, default=0)), + ] + ) + super(SMB2LockElement, self).__init__() + + +class SMB2LockRequest(Structure): + """ + [MS-SMB2] v53.0 2017-09-15 + + 2.2.26 SMB2 LOCK Request + """ + + COMMAND = Commands.SMB2_LOCK + + def __init__(self): + self.fields = OrderedDict( + [ + ("structure_size", IntField(size=2, default=48)), + ("lock_count", IntField(size=2, default=lambda s: len(s["locks"].get_value()))), + ("lock_sequence", IntField(size=4, default=0)), + ("file_id", BytesField(size=16)), + ( + "locks", + ListField( + list_type=StructureField(size=24, structure_type=SMB2LockElement), + list_count=lambda s: s["lock_count"].get_value(), + ), + ), + ] + ) + super(SMB2LockRequest, self).__init__() + + +class SMB2LockResponse(Structure): + """ + [MS-SMB2] v53.0 2017-09-15 + + 2.2.27 SMB2 LOCK Response + """ + + COMMAND = Commands.SMB2_LOCK + + def __init__(self): + self.fields = OrderedDict( + [ + ("structure_size", IntField(size=2, default=4)), + ("reserved", IntField(size=2, default=0)), + ] + ) + super(SMB2LockResponse, self).__init__() + + class SMB2QueryDirectoryResponse(Structure): """ [MS-SMB2] v53.0 2017-09-15 @@ -1523,3 +1602,50 @@ def _close_response(self, request): self.end_of_file = c_resp["end_of_file"].get_value() self.file_attributes = c_resp["file_attributes"].get_value() return c_resp + + def lock(self, locks, lsn=0, lsi=0, wait=True, send=True): + """ + Locks or unlocks byte regions of a file. + + Supports out of band send function, call this function with send=False + to return a tuple of (SMB2LockRequest, receive_func) instead of + sending the the request and waiting for the response. The receive_func + can be used to get the response from the server by passing in the + Request that was used to sent it out of band. + + :param locks: (List) byte ranges to lock or unlock + :param lsn: LockSequenceNumber. Only used for SMB dialects > 2.0.2 + :param lsi: LockSequenceIndex. Only used for SMB dialects > 2.0.2 + :param wait: If send=True, whether to wait for a response if + STATUS_PENDING was received from the server or fail. + :param send: Whether to send the request in the same call or return the + message to the caller and the unpack function + :return: SMB2LockResponse message received from the server + """ + + lock = SMB2LockRequest() + lock["file_id"] = self.file_id + lock["locks"] = locks + + if self.connection.dialect > Dialects.SMB_2_0_2: + if (lsn < 0) or (lsn > 15): + raise ValueError("lsn (LockSequenceNumber) must be between 0 and 15") + + # MS-SMB2 2.2.26 requires that 0 <= lsi <= 64 + if (lsi < 0) or (lsi > 64): + raise ValueError("lsi (LockSequenceIndex) must be between 0 and 64") + + lock["lock_sequence"] = (lsn << 28) + lsi + + if not send: + return lock, self._lock_response + + request = self.connection.send(lock, self.tree_connect.session.session_id, self.tree_connect.tree_connect_id) + return self._lock_response(request, wait) + + def _lock_response(self, request, wait=True): + response = self.connection.receive(request, wait=wait) + lock_response = SMB2LockResponse() + lock_response.unpack(response["data"].get_value()) + + return lock_response From 9983bff63b500ca85ed8a5efd48a181d8a32e715 Mon Sep 17 00:00:00 2001 From: Xavi Hernandez Date: Mon, 27 Jan 2025 11:35:24 +0100 Subject: [PATCH 2/2] Add tests for LOCK operation Signed-off-by: Xavi Hernandez --- tests/test_open.py | 134 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/tests/test_open.py b/tests/test_open.py index 56dab543..7de7719a 100644 --- a/tests/test_open.py +++ b/tests/test_open.py @@ -42,6 +42,7 @@ FilePipePrinterAccessMask, ImpersonationLevel, InfoType, + LockFlags, Open, ReadWriteChannel, RequestedOplockLevel, @@ -52,6 +53,9 @@ SMB2CreateResponse, SMB2FlushRequest, SMB2FlushResponse, + SMB2LockElement, + SMB2LockRequest, + SMB2LockResponse, SMB2QueryDirectoryRequest, SMB2QueryDirectoryResponse, SMB2QueryInfoRequest, @@ -1161,6 +1165,78 @@ def test_parse_message(self): assert actual["structure_size"].get_value() == 2 +class TestSMB2LockRequest: + def test_create_message(self): + lock = SMB2LockElement() + lock["offset"] = b"\x11" * 8 + lock["length"] = -1 + lock["flags"] = LockFlags.SMB2_LOCKFLAG_EXCLUSIVE_LOCK + + message = SMB2LockRequest() + message["file_id"] = b"\xff" * 16 + message["locks"] = [lock] + expected = ( + b"\x30\x00" + b"\x01\x00" + b"\x00\x00\x00\x00" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\x11\x11\x11\x11\x11\x11\x11\x11" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\x02\x00\x00\x00" + b"\x00\x00\x00\x00" + ) + actual = message.pack() + assert len(message) == 48 + assert actual == expected + + def test_parse_message(self): + actual = SMB2LockRequest() + data = ( + b"\x30\x00" + b"\x01\x00" + b"\x00\x00\x00\x00" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\x11\x11\x11\x11\x11\x11\x11\x11" + b"\xff\xff\xff\xff\xff\xff\xff\xff" + b"\x02\x00\x00\x00" + b"\x00\x00\x00\x00" + ) + actual.unpack(data) + assert len(actual) == 48 + assert actual["structure_size"].get_value() == 48 + assert actual["lock_count"].get_value() == 1 + assert actual["lock_sequence"].get_value() == 0 + assert actual["file_id"].pack() == b"\xff" * 16 + + locks = actual["locks"].get_value() + assert isinstance(locks, list) + assert len(locks) == 1 + + assert locks[0]["offset"].pack() == b"\x11" * 8 + assert locks[0]["length"].get_value() == -1 + assert locks[0]["flags"].get_value() == LockFlags.SMB2_LOCKFLAG_EXCLUSIVE_LOCK + assert locks[0]["reserved"].get_value() == 0 + + +class TestSMB2LockResponse: + def test_create_message(self): + message = SMB2LockResponse() + expected = b"\x04\x00" b"\x00\x00" + actual = message.pack() + assert len(message) == 4 + assert actual == expected + + def test_parse_message(self): + actual = SMB2LockResponse() + data = b"\x04\x00" b"\x00\x00" + actual.unpack(data) + assert len(actual) == 4 + assert actual["structure_size"].get_value() == 4 + assert actual["reserved"].get_value() == 0 + + class TestOpen: # basic file open tests for each dialect def test_dialect_2_0_2(self, smb_real): @@ -2400,3 +2476,61 @@ def truncate(open, size): assert read_and_eof(open) == (b"\x01\x02\x03", 3) finally: connection.disconnect(True) + + def test_lock_file(self, smb_real): + connection = Connection(uuid.uuid4(), smb_real[2], smb_real[3]) + connection.connect() + session = Session(connection, smb_real[0], smb_real[1]) + tree = TreeConnect(session, smb_real[4]) + open = Open(tree, "lock-file.txt") + + try: + session.connect() + tree.connect() + + open.create( + ImpersonationLevel.Impersonation, + FilePipePrinterAccessMask.MAXIMUM_ALLOWED, + FileAttributes.FILE_ATTRIBUTE_NORMAL, + 0, + CreateDisposition.FILE_OVERWRITE_IF, + CreateOptions.FILE_NON_DIRECTORY_FILE, + ) + + def do_lock(open, locks, send=True): + if send: + response = open.lock(locks, send=True) + else: + req, resp = open.lock(locks, send=False) + request = open.connection.send( + req, open.tree_connect.session.session_id, open.tree_connect.tree_connect_id + ) + response = resp(request) + assert isinstance(response, SMB2LockResponse) + + def lock(open, offset, length, send=True): + lockelem = SMB2LockElement() + lockelem["offset"] = offset + lockelem["length"] = length + lockelem["flags"] = LockFlags.SMB2_LOCKFLAG_EXCLUSIVE_LOCK + + return do_lock(open, [lockelem], send=send) + + def unlock(open, offset, length, send=True): + lockelem = SMB2LockElement() + lockelem["offset"] = offset + lockelem["length"] = length + lockelem["flags"] = LockFlags.SMB2_LOCKFLAG_UNLOCK + + return do_lock(open, [lockelem], send=send) + + # Lock and unlock the entire file + lock(open, 0, -1) + unlock(open, 0, -1) + + # Lock and unlock the entire file deferring send + lock(open, 0, -1, False) + unlock(open, 0, -1, False) + + finally: + connection.disconnect(True)