From ef286afa5484b96f88b86f107fdd09ab16f4a972 Mon Sep 17 00:00:00 2001 From: Janus-sama <71907314+Janus-sama@users.noreply.github.com> Date: Wed, 24 Sep 2025 13:14:01 +0100 Subject: [PATCH 01/12] Update README.rst --- README.rst | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index 4e9315d..ba5f6ca 100644 --- a/README.rst +++ b/README.rst @@ -1,25 +1,24 @@ clamd ===== -.. image:: https://travis-ci.org/graingert/python-clamd.png?branch=master - :alt: travis build status - :target: https://travis-ci.org/graingert/python-clamd - About ----- -`clamd` is a portable Python module to use the ClamAV anti-virus engine on +`python_clamd` is a portable Python module to use the ClamAV anti-virus engine on Windows, Linux, MacOSX and other platforms. It requires a running instance of the `clamd` daemon. -This is a fork of pyClamd v0.2.0 created by Philippe Lagadec and published on his website: http://www.decalage.info/en/python/pyclamd which in turn is a slightly improved version of pyClamd v0.1.1 created by Alexandre Norman and published on his website: http://xael.org/norman/python/pyclamd/ +This is a fork of python-clamd which is a fork of pyClamd v0.2.0 created by Philippe Lagadec and published on his website: http://www.decalage.info/en/python/pyclamd which in turn is a slightly improved version of pyClamd v0.1.1 created by Alexandre Norman and published on his website: http://xael.org/norman/python/pyclamd/. + +This fork exists to update the dependencies of clamd in python as `pkg_resources` has been deprecated and would be removed soon. +I will update and make patches to this wrapper, including typing and annotations. Usage ----- To use with a unix socket:: - >>> import clamd - >>> cd = clamd.ClamdUnixSocket() + >>> import python_clamd + >>> cd = python_clamd.ClamdUnixSocket() >>> cd.ping() 'PONG' >>> cd.version() # doctest: +ELLIPSIS From 27b1302da955af6bb4293fd8a17a0900849d1cd8 Mon Sep 17 00:00:00 2001 From: Janus-sama <71907314+Janus-sama@users.noreply.github.com> Date: Wed, 24 Sep 2025 13:26:14 +0100 Subject: [PATCH 02/12] Update and rename README.rst to README.md --- README.md | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ README.rst | 52 ------------------------------------------- 2 files changed, 65 insertions(+), 52 deletions(-) create mode 100644 README.md delete mode 100644 README.rst diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7d2caa --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# clamd + +## About +`python_clamd` is a modernized Python wrapper for the [ClamAV](https://www.clamav.net/) anti-virus engine. +It allows you to interact with a running `clamd` daemon on **Linux, macOS, and Windows**. + +This project is a fork of `python-clamd` (last updated 2014), itself a fork of `pyClamd` v0.2.0 by Philippe Lagadec, which in turn extended `pyClamd` v0.1.1 by Alexandre Norman. + +### Why this fork? +- Updated for **Python 3.13+ compatibility** (replaced deprecated `pkg_resources` with `importlib.metadata`). +- Added **type hints and annotations** for better IDE/autocomplete and `mypy` support. +- Actively maintained with patches and modernization. + +--- + +## Usage + +### Connect via Unix socket +```python +import python_clamd +cd = python_clamd.ClamdUnixSocket() +cd.ping() # 'PONG' +cd.version() # 'ClamAV ...' +cd.reload() # 'RELOADING' +``` + +### Scan a file +```python +open('/tmp/EICAR','wb').write(python_clamd.EICAR) +cd.scan('/tmp/EICAR') +# {'/tmp/EICAR': ('FOUND', 'Eicar-Test-Signature')} +``` + +### Scan a stream +```python +from io import BytesIO +cd.instream(BytesIO(python_clamd.EICAR)) +# {'stream': ('FOUND', 'Eicar-Test-Signature')} +``` + +## Installation +### Python package + +Coming soon via PyPI: +```python +pip install python-clamd +``` + +### ClamAV daemon +On Ubuntu: +```bash +sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs +sudo freshclam +sudo service clamav-daemon start +``` + +## Supported Versions +* [x] Python 3.9 – 3.13 +* [ ] Python 2.x (dropped) + +## License +Released under the LGPL license. + +## Contributing +PRs and issues are welcome. This fork exists to keep ClamAV bindings usable on modern Python. diff --git a/README.rst b/README.rst deleted file mode 100644 index ba5f6ca..0000000 --- a/README.rst +++ /dev/null @@ -1,52 +0,0 @@ -clamd -===== - -About ------ -`python_clamd` is a portable Python module to use the ClamAV anti-virus engine on -Windows, Linux, MacOSX and other platforms. It requires a running instance of -the `clamd` daemon. - -This is a fork of python-clamd which is a fork of pyClamd v0.2.0 created by Philippe Lagadec and published on his website: http://www.decalage.info/en/python/pyclamd which in turn is a slightly improved version of pyClamd v0.1.1 created by Alexandre Norman and published on his website: http://xael.org/norman/python/pyclamd/. - -This fork exists to update the dependencies of clamd in python as `pkg_resources` has been deprecated and would be removed soon. -I will update and make patches to this wrapper, including typing and annotations. - -Usage ------ - -To use with a unix socket:: - - >>> import python_clamd - >>> cd = python_clamd.ClamdUnixSocket() - >>> cd.ping() - 'PONG' - >>> cd.version() # doctest: +ELLIPSIS - 'ClamAV ... - >>> cd.reload() - 'RELOADING' - -To scan a file:: - - >>> open('/tmp/EICAR','wb').write(clamd.EICAR) - >>> cd.scan('/tmp/EICAR') - {'/tmp/EICAR': ('FOUND', 'Eicar-Test-Signature')} - -To scan a stream:: - - >>> from io import BytesIO - >>> cd.instream(BytesIO(clamd.EICAR)) - {'stream': ('FOUND', 'Eicar-Test-Signature')} - - -License -------- -`clamd` is released as open-source software under the LGPL license. - -clamd Install -------------- -How to install the ClamAV daemon `clamd` under Ubuntu:: - - sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs - sudo freshclam - sudo service clamav-daemon start From fab40224de4d8cd0599409b68d10538efab32f87 Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:47:11 +0100 Subject: [PATCH 03/12] python clamd directory patch --- src/python_clamd/__init__.py | 325 +++++++++++++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 src/python_clamd/__init__.py diff --git a/src/python_clamd/__init__.py b/src/python_clamd/__init__.py new file mode 100644 index 0000000..ec16214 --- /dev/null +++ b/src/python_clamd/__init__.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from importlib import metadata + +try: + __version__ = metadata.version("pyclamd") +except metadata.PackageNotFoundError: + __version__ = "" + +import socket +import sys +import struct +import contextlib +import re +import base64 + +scan_response = re.compile( + r"^(?P.*): ((?P.+) )?(?P(FOUND|OK|ERROR))$") +EICAR = base64.b64decode( + b'WDVPIVAlQEFQWzRcUFpYNTQoUF4pN0NDKTd9JEVJQ0FSLVNUQU5E' + b'QVJELUFOVElWSVJVUy1URVNU\nLUZJTEUhJEgrSCo=\n' +) + + +class ClamdError(Exception): + pass + + +class ResponseError(ClamdError): + pass + + +class BufferTooLongError(ResponseError): + """Class for errors with clamd using INSTREAM with a buffer lenght > StreamMaxLength in /etc/clamav/clamd.conf""" + + +class ConnectionError(ClamdError): + """Class for errors communication with clamd""" + + +class ClamdNetworkSocket(object): + """ + Class for using clamd with a network socket + + :params str host: machine host default localhost + :params int port: clamd port default 3310 + :params int timeout: socket timeout default None + + """ + + def __init__(self, host: str = '127.0.0.1', port: int = 3310, timeout: float | None = None): + """ + class initialization + + :params str host: hostname or ip address + :params int port: TCP port + :params float|None timeout: socket timeout + """ + + self.host = host + self.port = port + self.timeout = timeout + + def _init_socket(self) -> None: + """ + internal use only + + :return None: + + :raises ConnectionError: + """ + try: + self.clamd_socket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) + self.clamd_socket.connect((self.host, self.port)) + self.clamd_socket.settimeout(self.timeout) + + except socket.error: + e = sys.exc_info()[1] + raise ConnectionError(self._error_message(e)) + + def _error_message(self, exception: Exception) -> str: + # args for socket.error can either be (errno, "message") + # or just "message" + if len(exception.args) == 1: + return "Error connecting to {host}:{port}. {msg}.".format( + host=self.host, + port=self.port, + msg=exception.args[0] + ) + else: + return "Error {erno} connecting {host}:{port}. {msg}.".format( + erno=exception.args[0], + host=self.host, + port=self.port, + msg=exception.args[1] + ) + + def ping(self): + return self._basic_command("PING") + + def version(self): + return self._basic_command("VERSION") + + def reload(self): + return self._basic_command("RELOAD") + + def shutdown(self): + """ + Force Clamd to shutdown and exit + + :return None: + + :raises ConnectionError: if there is a of communication problem + """ + try: + self._init_socket() + self._send_command('SHUTDOWN') + # result = self._recv_response() + finally: + self._close_socket() + + def scan(self, file): + return self._file_system_scan('SCAN', file) + + def contscan(self, file): + return self._file_system_scan('CONTSCAN', file) + + def multiscan(self, file): + return self._file_system_scan('MULTISCAN', file) + + def _basic_command(self, command): + """ + Send a command to the clamav server, and return the reply. + """ + self._init_socket() + try: + self._send_command(command) + response = self._recv_response().rsplit("ERROR", 1) + if len(response) > 1: + raise ResponseError(response[0]) + else: + return response[0] + finally: + self._close_socket() + + def _file_system_scan(self, command, file): + """ + Scan a file or directory using multiple threads. + + This function performs a filesystem scan using ClamAV, + does not stop on errors or virus detection, and supports archives. + + :param str command: The ClamAV command to execute. + :param str file: filename or directory (MUST BE ABSOLUTE PATH !) + + :return dict: `{filename1: ('FOUND', 'virusname'), filename2: ('ERROR', 'reason')}` + + :raise ConnectionError: If there is a communication issue during scanning. + """ + + try: + self._init_socket() + self._send_command(command, file) + + dr = {} + for result in self._recv_response_multiline().split('\n'): + if result: + filename, reason, status = self._parse_response(result) + dr[filename] = (status, reason) + + return dr + + finally: + self._close_socket() + + def instream(self, buff): + """ + Scan a buffer + + :params file buff: buffer to scan + + :return dict: `{filename1: ("virusname", "status")}`: + + :raises BufferTooLongError: if the buffer size exceeds clamd limits + :raises ConnectionError: if there is a communication problem + """ + + try: + self._init_socket() + self._send_command('INSTREAM') + + max_chunk_size = 1024 # MUST be < StreamMaxLength in /etc/clamav/clamd.conf + + chunk = buff.read(max_chunk_size) + while chunk: + size = struct.pack(b'!L', len(chunk)) + self.clamd_socket.send(size + chunk) + chunk = buff.read(max_chunk_size) + + self.clamd_socket.send(struct.pack(b'!L', 0)) + + result = self._recv_response() + + if len(result) > 0: + if result == 'INSTREAM size limit exceeded. ERROR': + raise BufferTooLongError(result) + + filename, reason, status = self._parse_response(result) + return {filename: (status, reason)} + finally: + self._close_socket() + + def stats(self): + """ + Get Clamscan stats + + :return str: clamscan stats + + :raises ConnectionError: if there is a communication problem + """ + self._init_socket() + try: + self._send_command('STATS') + return self._recv_response_multiline() + finally: + self._close_socket() + + def _send_command(self, cmd, *args): + """ + `man clamd` recommends to prefix commands with z, but we will use \n + terminated strings, as python<->clamd has some problems with \0x00 + """ + concat_args = '' + if args: + concat_args = ' ' + ' '.join(args) + + cmd = 'n{cmd}{args}\n'.format( + cmd=cmd, args=concat_args).encode('utf-8') + self.clamd_socket.send(cmd) + + def _recv_response(self): + """ + receive line from clamd + """ + try: + with contextlib.closing(self.clamd_socket.makefile('rb')) as f: + return f.readline().decode('utf-8').strip() + except (socket.error, socket.timeout): + e = sys.exc_info()[1] + raise ConnectionError( + "Error while reading from socket: {0}".format(e.args)) + + def _recv_response_multiline(self): + """ + receive multiple line response from clamd and strip all whitespace characters + """ + try: + with contextlib.closing(self.clamd_socket.makefile('rb')) as f: + return f.read().decode('utf-8') + except (socket.error, socket.timeout): + e = sys.exc_info()[1] + raise ConnectionError( + "Error while reading from socket: {0}".format(e.args)) + + def _close_socket(self): + """ + close clamd socket + """ + self.clamd_socket.close() + return + + def _parse_response(self, msg): + """ + parses responses for SCAN, CONTSCAN, MULTISCAN and STREAM commands. + """ + try: + return scan_response.match(msg).group("path", "virus", "status") + except AttributeError: + raise ResponseError(msg.rsplit("ERROR", 1)[0]) + + +class ClamdUnixSocket(ClamdNetworkSocket): + """ + Class for using clamd with an unix socket + """ + + def __init__(self, path: str = "/var/run/clamav/clamd.ctl", timeout: float | None = None): + """ + class initialisation + + :params str path: unix socket path + :params float|None timeout: socket timeout + """ + + self.unix_socket = path + self.timeout = timeout + + def _init_socket(self): + """ + internal use only + """ + try: + self.clamd_socket = socket.socket( + socket.AF_UNIX, socket.SOCK_STREAM) + self.clamd_socket.connect(self.unix_socket) + self.clamd_socket.settimeout(self.timeout) + except socket.error: + e = sys.exc_info()[1] + raise ConnectionError(self._error_message(e)) + + def _error_message(self, exception): + # args for socket.error can either be (errno, "message") + # or just "message" + if len(exception.args) == 1: + return "Error connecting to {path}. {msg}.".format( + path=self.unix_socket, + msg=exception.args[0] + ) + else: + return "Error {erno} connecting {path}. {msg}.".format( + erno=exception.args[0], + path=self.unix_socket, + msg=exception.args[1] + ) From 9862d9573a0406209ad46d83e79ed17f3cd9da34 Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:47:26 +0100 Subject: [PATCH 04/12] tests updates --- src/tests/test_api.py | 103 ++++++++++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/src/tests/test_api.py b/src/tests/test_api.py index 42423b6..d12ba54 100644 --- a/src/tests/test_api.py +++ b/src/tests/test_api.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from __future__ import unicode_literals -import clamd +import python_clamd from io import BytesIO from contextlib import contextmanager import tempfile @@ -15,6 +14,8 @@ other = stat.S_IROTH execute = (stat.S_IEXEC | stat.S_IXOTH) +FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH + @contextmanager def mkdtemp(*args, **kwargs): @@ -25,57 +26,79 @@ def mkdtemp(*args, **kwargs): shutil.rmtree(temp_dir) -class TestUnixSocket(object): - kwargs = {} +@pytest.fixture(scope="module") +def clamd_unix_socket(): + socket_path = os.environ.get( + 'CLAMD_UNIX_SOCKET', + '/var/run/clamav/pyclamd.ctl' + ) + return python_clamd.ClamdUnixSocket(path=socket_path) - def setup(self): - self.cd = clamd.ClamdUnixSocket(**self.kwargs) - def test_ping(self): - assert self.cd.ping() +class TestUnixSocket: + kwargs = {} + + def test_ping(self, clamd_unix_socket): + assert clamd_unix_socket.ping() == 'PONG' - def test_version(self): - assert self.cd.version().startswith("ClamAV") + def test_version(self, clamd_unix_socket): + assert clamd_unix_socket.version().startswith("ClamAV") - def test_reload(self): - assert self.cd.reload() == 'RELOADING' + def test_reload(self, clamd_unix_socket): + assert clamd_unix_socket.reload().strip() == 'RELOADING' - def test_scan(self): - with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f: - f.write(clamd.EICAR) + def test_scan_eicar_found(self, clamd_unix_socket): + with tempfile.NamedTemporaryFile('wb', prefix="python-pyclamd", delete=False) as f: + f.write(python_clamd.EICAR) f.flush() - os.fchmod(f.fileno(), (mine | other)) - expected = {f.name: ('FOUND', 'Eicar-Test-Signature')} + # Set permissions + os.chmod(f.name, FILE_PERMISSIONS) + file_name = f.name - assert self.cd.scan(f.name) == expected + expected = {file_name: ('FOUND', 'Win.Test.EICAR_HDB-1')} - def test_unicode_scan(self): - with tempfile.NamedTemporaryFile('wb', prefix=u"python-clamdλ") as f: - f.write(clamd.EICAR) + try: + assert clamd_unix_socket.scan(file_name) == expected + finally: + os.remove(file_name) + + def test_scan_unicode_path(self, clamd_unix_socket): + with tempfile.NamedTemporaryFile('wb', prefix="python-clamd_λ_ü", delete=False) as f: + f.write(python_clamd.EICAR) f.flush() - os.fchmod(f.fileno(), (mine | other)) - expected = {f.name: ('FOUND', 'Eicar-Test-Signature')} + os.chmod(f.name, FILE_PERMISSIONS) + file_name = f.name + + expected = {file_name: ('FOUND', 'Win.Test.EICAR_HDB-1')} - assert self.cd.scan(f.name) == expected + try: + assert clamd_unix_socket.scan(file_name) == expected + finally: + os.remove(file_name) - def test_multiscan(self): + def test_multiscan(self, clamd_unix_socket): expected = {} - with mkdtemp(prefix="python-clamd") as d: - for i in range(10): - with open(os.path.join(d, "file" + str(i)), 'wb') as f: - f.write(clamd.EICAR) - os.fchmod(f.fileno(), (mine | other)) - expected[f.name] = ('FOUND', 'Eicar-Test-Signature') - os.chmod(d, (mine | other | execute)) + with tempfile.TemporaryDirectory(prefix="python-clamd_multi") as d: + for i in range(3): # Reduced to 3 for faster testing + file_path = os.path.join(d, "file" + str(i)) + with open(file_path, 'wb') as f: + f.write(python_clamd.EICAR) + os.chmod(file_path, FILE_PERMISSIONS) + expected[file_path] = ('FOUND', 'Win.Test.EICAR_HDB-1') + + os.chmod(d, stat.S_IRWXU | stat.S_IRGRP | + stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - assert self.cd.multiscan(d) == expected + assert clamd_unix_socket.multiscan(d) == expected - def test_instream(self): - expected = {'stream': ('FOUND', 'Eicar-Test-Signature')} - assert self.cd.instream(BytesIO(clamd.EICAR)) == expected + def test_instream_eicar_found(self, clamd_unix_socket): + expected = {'stream': ('FOUND', 'Win.Test.EICAR_HDB-1')} + assert clamd_unix_socket.instream( + BytesIO(python_clamd.EICAR)) == expected - def test_insteam_success(self): - assert self.cd.instream(BytesIO(b"foo")) == {'stream': ('OK', None)} + def test_instream_success(self, clamd_unix_socket): + assert clamd_unix_socket.instream(BytesIO(b"foo bar content")) == { + 'stream': ('OK', None)} class TestUnixSocketTimeout(TestUnixSocket): @@ -83,5 +106,5 @@ class TestUnixSocketTimeout(TestUnixSocket): def test_cannot_connect(): - with pytest.raises(clamd.ConnectionError): - clamd.ClamdUnixSocket(path="/tmp/404").ping() + with pytest.raises(python_clamd.ConnectionError): + python_clamd.ClamdUnixSocket(path="/tmp/404").ping() From 4c350e0c4b41bf516b40f8af540ec8436a1e43ac Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:47:51 +0100 Subject: [PATCH 05/12] Setup updates --- setup.cfg | 7 ++----- setup.py | 41 ++++++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/setup.cfg b/setup.cfg index c5f2335..68c61a2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,2 @@ -[nosetests] -with-doctest=1 - -[wheel] -universal=1 +[bdist_wheel] +universal=0 \ No newline at end of file diff --git a/setup.py b/setup.py index 901a206..6ca71ff 100755 --- a/setup.py +++ b/setup.py @@ -1,28 +1,35 @@ -#!/usr/bin/env python -from ez_setup import use_setuptools -use_setuptools() - from setuptools import setup, find_packages -readme = open('README.rst').read() -history = open('CHANGES.rst').read().replace('.. :changelog:', '') +try: + with open('README.rst', encoding='utf-8') as f: + readme = f.read() +except FileNotFoundError: + readme = '' + +try: + with open('CHANGES.rst', encoding='utf-8') as f: + history = f.read().replace('.. :changelog:', '') +except FileNotFoundError: + history = '' setup( - name="clamd", - version='1.0.3.dev0', - author="Thomas Grainger", - author_email="python-clamd@graingert.co.uk", - maintainer="Thomas Grainger", - maintainer_email = "python-clamd@graingert.co.uk", - keywords = "python, clamav, antivirus, scanner, virus, libclamav, clamd", - description = "Clamd is a python interface to Clamd (Clamav daemon).", + name="python_clamd", + version='0.0.1.dev0', + author="janus-sama", + author_email="zino4onowori@gmail.com", + maintainer="janus-sama", + maintainer_email="zino4onowori@gmail.com", + keywords="python, clamav, antivirus, scanner, virus, libclamav, clamd", + description="Updated Clamd is a python interface to Clamd (Clamav daemon).", long_description=readme + '\n\n' + history, - url="https://github.com/graingert/python-clamd", + url="https://github.com/janus-sama/python_clamd", package_dir={'': 'src'}, packages=find_packages('src', exclude="tests"), - classifiers = [ - "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)", + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", ], + license="LGPL-2.1-only", zip_safe=True, include_package_data=False, ) From 788dd1240973dbd159440cf8403f516f28b309de Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:48:01 +0100 Subject: [PATCH 06/12] Tox patch --- tox.ini | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tox.ini b/tox.ini index a7dbf0c..91e02a5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,15 +1,25 @@ [tox] -envlist = py{26,27,33,34,35}, lint +envlist = py{38,39,310,311,312}, lint +minversion = 3.8 [testenv] -commands = py.test {posargs} +commands = pytest {posargs} + deps = - pytest==2.8.2 + . + pytest + +setenv = + CLAMD_HOST = 127.0.0.1 + CLAMD_PORT = 3310 + CLAMD_UNIX_SOCKET = /var/run/clamav/clamd.ctl [testenv:lint] +description = Run linting checks using flake8 deps = - flake8==2.4.0 -commands=flake8 src + flake8 +commands = flake8 src [flake8] max-line-length = 117 +exclude = .git,__pycache__,.tox,venv,build \ No newline at end of file From 49d722da7b84dc81ce3e22a43579f3e432578f20 Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:48:20 +0100 Subject: [PATCH 07/12] Packaging tools patch --- .travis.yml | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 06d046d..2ea41c6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,21 @@ language: python -python: 3.5 +python: + - 3.8 + - 3.9 + - 3.10 + - 3.11 + - 3.12 env: - - TOX_ENV=py26 - - TOX_ENV=py27 - - TOX_ENV=pypy - - TOX_ENV=py34 - - TOX_ENV=py35 + # Python 3 versions for Tox + - TOX_ENV=py38 + - TOX_ENV=py39 + - TOX_ENV=py310 + - TOX_ENV=py311 + - TOX_ENV=py312 - TOX_ENV=lint install: - - sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs + - sudo apt-get update -qq + - sudo apt-get install -y clamav-daemon clamav-freshclam - sudo freshclam --verbose - sudo service clamav-daemon start - pip install tox From e6c2b418c7b4af53f4b02e41b6e091030ecdeb4a Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:49:06 +0100 Subject: [PATCH 08/12] Manifest patch --- MANIFEST.in | 1 - 1 file changed, 1 deletion(-) diff --git a/MANIFEST.in b/MANIFEST.in index ad7f912..2e2d213 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,5 @@ include CHANGES.rst include README.rst -include ez_setup.py recursive-exclude * __pycache__ recursive-exclude * *.py[co] From 4dab02650ade39f2fabbc3b1ea3b5ed0f5ff5cf6 Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:49:22 +0100 Subject: [PATCH 09/12] Change logs --- CHANGES.rst | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 0477c20..8625a4c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,40 +1,53 @@ Changes ========= +1.1.0 (2025-11-25) +------------------ + +* **Removed Python 2 support entirely.** The library is now **Python 3 only** (targeting Python 3.8+). +* Updated and enhanced all **docstrings** and type hints for improved clarity and maintainability. + +--- + 1.0.3 (unreleased) ------------------ -- Nothing changed yet. +* Nothing changed yet. +--- 1.0.2 (2014-08-21) ------------------ -- Remove all dependencies. clamd is now standalone! -- Use plain setuptools no d2to1. -- Create universal wheel. +* Remove all dependencies. `clamd` is now standalone! +* Use plain setuptools, no d2to1. +* Create universal wheel. +--- 1.0.1 (2013-03-06) ------------------ -- Updated d2to1 dependency +* Updated d2to1 dependency +--- 1.0.0 (2013-02-08) ------------------ -- Change public interface, including exceptions -- Support Python 3.3, withdraw 2.5 support +* Change public interface, including exceptions. +* Support Python 3.3, withdraw 2.5 support. +--- 0.3.4 (2013-02-01) ------------------ -- Use regex to parse file status reponse instead of complicated string split/join +* Use regex to parse file status response instead of complicated string split/join. +--- 0.3.3 (2013-01-28) ------------------ -- First version of clamd that can be installed from PyPI +* First version of clamd that can be installed from PyPI \ No newline at end of file From c561253969209608f96bade872fe7a3355ea47ad Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:49:37 +0100 Subject: [PATCH 10/12] read me corrections --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c7d2caa..8e8ec87 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ It allows you to interact with a running `clamd` daemon on **Linux, macOS, and W This project is a fork of `python-clamd` (last updated 2014), itself a fork of `pyClamd` v0.2.0 by Philippe Lagadec, which in turn extended `pyClamd` v0.1.1 by Alexandre Norman. ### Why this fork? -- Updated for **Python 3.13+ compatibility** (replaced deprecated `pkg_resources` with `importlib.metadata`). +- Updated for **Python 3.12+ compatibility** (replaced deprecated `pkg_resources` with `importlib.metadata`). - Added **type hints and annotations** for better IDE/autocomplete and `mypy` support. - Actively maintained with patches and modernization. From 82ddac8d3b11fae7b9217e86223fef6c20b9b278 Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:49:50 +0100 Subject: [PATCH 11/12] cleaning directory --- src/clamd/__init__.py | 315 ------------------------------------------ 1 file changed, 315 deletions(-) delete mode 100644 src/clamd/__init__.py diff --git a/src/clamd/__init__.py b/src/clamd/__init__.py deleted file mode 100644 index 92ff640..0000000 --- a/src/clamd/__init__.py +++ /dev/null @@ -1,315 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - -try: - __version__ = __import__('pkg_resources').get_distribution('clamd').version -except: - __version__ = '' - -# $Source$ - - -import socket -import sys -import struct -import contextlib -import re -import base64 - -scan_response = re.compile(r"^(?P.*): ((?P.+) )?(?P(FOUND|OK|ERROR))$") -EICAR = base64.b64decode( - b'WDVPIVAlQEFQWzRcUFpYNTQoUF4pN0NDKTd9JEVJQ0FSLVNUQU5E' - b'QVJELUFOVElWSVJVUy1URVNU\nLUZJTEUhJEgrSCo=\n' -) - - -class ClamdError(Exception): - pass - - -class ResponseError(ClamdError): - pass - - -class BufferTooLongError(ResponseError): - """Class for errors with clamd using INSTREAM with a buffer lenght > StreamMaxLength in /etc/clamav/clamd.conf""" - - -class ConnectionError(ClamdError): - """Class for errors communication with clamd""" - - -class ClamdNetworkSocket(object): - """ - Class for using clamd with a network socket - """ - def __init__(self, host='127.0.0.1', port=3310, timeout=None): - """ - class initialisation - - host (string) : hostname or ip address - port (int) : TCP port - timeout (float or None) : socket timeout - """ - - self.host = host - self.port = port - self.timeout = timeout - - def _init_socket(self): - """ - internal use only - """ - try: - self.clamd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.clamd_socket.connect((self.host, self.port)) - self.clamd_socket.settimeout(self.timeout) - - except socket.error: - e = sys.exc_info()[1] - raise ConnectionError(self._error_message(e)) - - def _error_message(self, exception): - # args for socket.error can either be (errno, "message") - # or just "message" - if len(exception.args) == 1: - return "Error connecting to {host}:{port}. {msg}.".format( - host=self.host, - port=self.port, - msg=exception.args[0] - ) - else: - return "Error {erno} connecting {host}:{port}. {msg}.".format( - erno=exception.args[0], - host=self.host, - port=self.port, - msg=exception.args[1] - ) - - def ping(self): - return self._basic_command("PING") - - def version(self): - return self._basic_command("VERSION") - - def reload(self): - return self._basic_command("RELOAD") - - def shutdown(self): - """ - Force Clamd to shutdown and exit - - return: nothing - - May raise: - - ConnectionError: in case of communication problem - """ - try: - self._init_socket() - self._send_command('SHUTDOWN') - # result = self._recv_response() - finally: - self._close_socket() - - def scan(self, file): - return self._file_system_scan('SCAN', file) - - def contscan(self, file): - return self._file_system_scan('CONTSCAN', file) - - def multiscan(self, file): - return self._file_system_scan('MULTISCAN', file) - - def _basic_command(self, command): - """ - Send a command to the clamav server, and return the reply. - """ - self._init_socket() - try: - self._send_command(command) - response = self._recv_response().rsplit("ERROR", 1) - if len(response) > 1: - raise ResponseError(response[0]) - else: - return response[0] - finally: - self._close_socket() - - def _file_system_scan(self, command, file): - """ - Scan a file or directory given by filename using multiple threads (faster on SMP machines). - Do not stop on error or virus found. - Scan with archive support enabled. - - file (string): filename or directory (MUST BE ABSOLUTE PATH !) - - return: - - (dict): {filename1: ('FOUND', 'virusname'), filename2: ('ERROR', 'reason')} - - May raise: - - ConnectionError: in case of communication problem - """ - - try: - self._init_socket() - self._send_command(command, file) - - dr = {} - for result in self._recv_response_multiline().split('\n'): - if result: - filename, reason, status = self._parse_response(result) - dr[filename] = (status, reason) - - return dr - - finally: - self._close_socket() - - def instream(self, buff): - """ - Scan a buffer - - buff filelikeobj: buffer to scan - - return: - - (dict): {filename1: ("virusname", "status")} - - May raise : - - BufferTooLongError: if the buffer size exceeds clamd limits - - ConnectionError: in case of communication problem - """ - - try: - self._init_socket() - self._send_command('INSTREAM') - - max_chunk_size = 1024 # MUST be < StreamMaxLength in /etc/clamav/clamd.conf - - chunk = buff.read(max_chunk_size) - while chunk: - size = struct.pack(b'!L', len(chunk)) - self.clamd_socket.send(size + chunk) - chunk = buff.read(max_chunk_size) - - self.clamd_socket.send(struct.pack(b'!L', 0)) - - result = self._recv_response() - - if len(result) > 0: - if result == 'INSTREAM size limit exceeded. ERROR': - raise BufferTooLongError(result) - - filename, reason, status = self._parse_response(result) - return {filename: (status, reason)} - finally: - self._close_socket() - - def stats(self): - """ - Get Clamscan stats - - return: (string) clamscan stats - - May raise: - - ConnectionError: in case of communication problem - """ - self._init_socket() - try: - self._send_command('STATS') - return self._recv_response_multiline() - finally: - self._close_socket() - - def _send_command(self, cmd, *args): - """ - `man clamd` recommends to prefix commands with z, but we will use \n - terminated strings, as python<->clamd has some problems with \0x00 - """ - concat_args = '' - if args: - concat_args = ' ' + ' '.join(args) - - cmd = 'n{cmd}{args}\n'.format(cmd=cmd, args=concat_args).encode('utf-8') - self.clamd_socket.send(cmd) - - def _recv_response(self): - """ - receive line from clamd - """ - try: - with contextlib.closing(self.clamd_socket.makefile('rb')) as f: - return f.readline().decode('utf-8').strip() - except (socket.error, socket.timeout): - e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: {0}".format(e.args)) - - def _recv_response_multiline(self): - """ - receive multiple line response from clamd and strip all whitespace characters - """ - try: - with contextlib.closing(self.clamd_socket.makefile('rb')) as f: - return f.read().decode('utf-8') - except (socket.error, socket.timeout): - e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: {0}".format(e.args)) - - def _close_socket(self): - """ - close clamd socket - """ - self.clamd_socket.close() - return - - def _parse_response(self, msg): - """ - parses responses for SCAN, CONTSCAN, MULTISCAN and STREAM commands. - """ - try: - return scan_response.match(msg).group("path", "virus", "status") - except AttributeError: - raise ResponseError(msg.rsplit("ERROR", 1)[0]) - - -class ClamdUnixSocket(ClamdNetworkSocket): - """ - Class for using clamd with an unix socket - """ - def __init__(self, path="/var/run/clamav/clamd.ctl", timeout=None): - """ - class initialisation - - path (string) : unix socket path - timeout (float or None) : socket timeout - """ - - self.unix_socket = path - self.timeout = timeout - - def _init_socket(self): - """ - internal use only - """ - try: - self.clamd_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.clamd_socket.connect(self.unix_socket) - self.clamd_socket.settimeout(self.timeout) - except socket.error: - e = sys.exc_info()[1] - raise ConnectionError(self._error_message(e)) - - def _error_message(self, exception): - # args for socket.error can either be (errno, "message") - # or just "message" - if len(exception.args) == 1: - return "Error connecting to {path}. {msg}.".format( - path=self.unix_socket, - msg=exception.args[0] - ) - else: - return "Error {erno} connecting {path}. {msg}.".format( - erno=exception.args[0], - path=self.unix_socket, - msg=exception.args[1] - ) From 6d3f7e538566f146a08a1e709580aa0fc54e9d6c Mon Sep 17 00:00:00 2001 From: Janus-sama Date: Tue, 25 Nov 2025 22:50:03 +0100 Subject: [PATCH 12/12] deprecated code removed --- ez_setup.py | 332 ---------------------------------------------------- 1 file changed, 332 deletions(-) delete mode 100644 ez_setup.py diff --git a/ez_setup.py b/ez_setup.py deleted file mode 100644 index b2435fe..0000000 --- a/ez_setup.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python -"""Bootstrap setuptools installation - -To use setuptools in your package's setup.py, include this -file in the same directory and add this to the top of your setup.py:: - - from ez_setup import use_setuptools - use_setuptools() - -To require a specific version of setuptools, set a download -mirror, or use an alternate download directory, simply supply -the appropriate options to ``use_setuptools()``. - -This file can also be run as a script to install or upgrade setuptools. -""" -import os -import shutil -import sys -import tempfile -import zipfile -import optparse -import subprocess -import platform -import textwrap -import contextlib - -from distutils import log - -try: - from urllib.request import urlopen -except ImportError: - from urllib2 import urlopen - -try: - from site import USER_SITE -except ImportError: - USER_SITE = None - -DEFAULT_VERSION = "5.4.2" -DEFAULT_URL = "https://pypi.python.org/packages/source/s/setuptools/" - -def _python_cmd(*args): - """ - Return True if the command succeeded. - """ - args = (sys.executable,) + args - return subprocess.call(args) == 0 - - -def _install(archive_filename, install_args=()): - with archive_context(archive_filename): - # installing - log.warn('Installing Setuptools') - if not _python_cmd('setup.py', 'install', *install_args): - log.warn('Something went wrong during the installation.') - log.warn('See the error message above.') - # exitcode will be 2 - return 2 - - -def _build_egg(egg, archive_filename, to_dir): - with archive_context(archive_filename): - # building an egg - log.warn('Building a Setuptools egg in %s', to_dir) - _python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir) - # returning the result - log.warn(egg) - if not os.path.exists(egg): - raise IOError('Could not build the egg.') - - -class ContextualZipFile(zipfile.ZipFile): - """ - Supplement ZipFile class to support context manager for Python 2.6 - """ - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() - - def __new__(cls, *args, **kwargs): - """ - Construct a ZipFile or ContextualZipFile as appropriate - """ - if hasattr(zipfile.ZipFile, '__exit__'): - return zipfile.ZipFile(*args, **kwargs) - return super(ContextualZipFile, cls).__new__(cls) - - -@contextlib.contextmanager -def archive_context(filename): - # extracting the archive - tmpdir = tempfile.mkdtemp() - log.warn('Extracting in %s', tmpdir) - old_wd = os.getcwd() - try: - os.chdir(tmpdir) - with ContextualZipFile(filename) as archive: - archive.extractall() - - # going in the directory - subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) - os.chdir(subdir) - log.warn('Now working in %s', subdir) - yield - - finally: - os.chdir(old_wd) - shutil.rmtree(tmpdir) - - -def _do_download(version, download_base, to_dir, download_delay): - egg = os.path.join(to_dir, 'setuptools-%s-py%d.%d.egg' - % (version, sys.version_info[0], sys.version_info[1])) - if not os.path.exists(egg): - archive = download_setuptools(version, download_base, - to_dir, download_delay) - _build_egg(egg, archive, to_dir) - sys.path.insert(0, egg) - - # Remove previously-imported pkg_resources if present (see - # https://bitbucket.org/pypa/setuptools/pull-request/7/ for details). - if 'pkg_resources' in sys.modules: - del sys.modules['pkg_resources'] - - import setuptools - setuptools.bootstrap_install_from = egg - - -def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, - to_dir=os.curdir, download_delay=15): - to_dir = os.path.abspath(to_dir) - rep_modules = 'pkg_resources', 'setuptools' - imported = set(sys.modules).intersection(rep_modules) - try: - import pkg_resources - except ImportError: - return _do_download(version, download_base, to_dir, download_delay) - try: - pkg_resources.require("setuptools>=" + version) - return - except pkg_resources.DistributionNotFound: - return _do_download(version, download_base, to_dir, download_delay) - except pkg_resources.VersionConflict as VC_err: - if imported: - msg = textwrap.dedent(""" - The required version of setuptools (>={version}) is not available, - and can't be installed while this script is running. Please - install a more recent version first, using - 'easy_install -U setuptools'. - - (Currently using {VC_err.args[0]!r}) - """).format(VC_err=VC_err, version=version) - sys.stderr.write(msg) - sys.exit(2) - - # otherwise, reload ok - del pkg_resources, sys.modules['pkg_resources'] - return _do_download(version, download_base, to_dir, download_delay) - -def _clean_check(cmd, target): - """ - Run the command to download target. If the command fails, clean up before - re-raising the error. - """ - try: - subprocess.check_call(cmd) - except subprocess.CalledProcessError: - if os.access(target, os.F_OK): - os.unlink(target) - raise - -def download_file_powershell(url, target): - """ - Download the file at url to target using Powershell (which will validate - trust). Raise an exception if the command cannot complete. - """ - target = os.path.abspath(target) - ps_cmd = ( - "[System.Net.WebRequest]::DefaultWebProxy.Credentials = " - "[System.Net.CredentialCache]::DefaultCredentials; " - "(new-object System.Net.WebClient).DownloadFile(%(url)r, %(target)r)" - % vars() - ) - cmd = [ - 'powershell', - '-Command', - ps_cmd, - ] - _clean_check(cmd, target) - -def has_powershell(): - if platform.system() != 'Windows': - return False - cmd = ['powershell', '-Command', 'echo test'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_powershell.viable = has_powershell - -def download_file_curl(url, target): - cmd = ['curl', url, '--silent', '--output', target] - _clean_check(cmd, target) - -def has_curl(): - cmd = ['curl', '--version'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_curl.viable = has_curl - -def download_file_wget(url, target): - cmd = ['wget', url, '--quiet', '--output-document', target] - _clean_check(cmd, target) - -def has_wget(): - cmd = ['wget', '--version'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_wget.viable = has_wget - -def download_file_insecure(url, target): - """ - Use Python to download the file, even though it cannot authenticate the - connection. - """ - src = urlopen(url) - try: - # Read all the data in one block. - data = src.read() - finally: - src.close() - - # Write all the data in one block to avoid creating a partial file. - with open(target, "wb") as dst: - dst.write(data) - -download_file_insecure.viable = lambda: True - -def get_best_downloader(): - downloaders = ( - download_file_powershell, - download_file_curl, - download_file_wget, - download_file_insecure, - ) - viable_downloaders = (dl for dl in downloaders if dl.viable()) - return next(viable_downloaders, None) - -def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, - to_dir=os.curdir, delay=15, downloader_factory=get_best_downloader): - """ - Download setuptools from a specified location and return its filename - - `version` should be a valid setuptools version number that is available - as an egg for download under the `download_base` URL (which should end - with a '/'). `to_dir` is the directory where the egg will be downloaded. - `delay` is the number of seconds to pause before an actual download - attempt. - - ``downloader_factory`` should be a function taking no arguments and - returning a function for downloading a URL to a target. - """ - # making sure we use the absolute path - to_dir = os.path.abspath(to_dir) - zip_name = "setuptools-%s.zip" % version - url = download_base + zip_name - saveto = os.path.join(to_dir, zip_name) - if not os.path.exists(saveto): # Avoid repeated downloads - log.warn("Downloading %s", url) - downloader = downloader_factory() - downloader(url, saveto) - return os.path.realpath(saveto) - -def _build_install_args(options): - """ - Build the arguments to 'python setup.py install' on the setuptools package - """ - return ['--user'] if options.user_install else [] - -def _parse_args(): - """ - Parse the command line for options - """ - parser = optparse.OptionParser() - parser.add_option( - '--user', dest='user_install', action='store_true', default=False, - help='install in user site package (requires Python 2.6 or later)') - parser.add_option( - '--download-base', dest='download_base', metavar="URL", - default=DEFAULT_URL, - help='alternative URL from where to download the setuptools package') - parser.add_option( - '--insecure', dest='downloader_factory', action='store_const', - const=lambda: download_file_insecure, default=get_best_downloader, - help='Use internal, non-validating downloader' - ) - parser.add_option( - '--version', help="Specify which version to download", - default=DEFAULT_VERSION, - ) - options, args = parser.parse_args() - # positional arguments are ignored - return options - -def main(): - """Install or upgrade setuptools and EasyInstall""" - options = _parse_args() - archive = download_setuptools( - version=options.version, - download_base=options.download_base, - downloader_factory=options.downloader_factory, - ) - return _install(archive, _build_install_args(options)) - -if __name__ == '__main__': - sys.exit(main())