diff --git a/.travis.yml b/.travis.yml index 06d046d..2ea41c6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,21 @@ language: python -python: 3.5 +python: + - 3.8 + - 3.9 + - 3.10 + - 3.11 + - 3.12 env: - - TOX_ENV=py26 - - TOX_ENV=py27 - - TOX_ENV=pypy - - TOX_ENV=py34 - - TOX_ENV=py35 + # Python 3 versions for Tox + - TOX_ENV=py38 + - TOX_ENV=py39 + - TOX_ENV=py310 + - TOX_ENV=py311 + - TOX_ENV=py312 - TOX_ENV=lint install: - - sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs + - sudo apt-get update -qq + - sudo apt-get install -y clamav-daemon clamav-freshclam - sudo freshclam --verbose - sudo service clamav-daemon start - pip install tox diff --git a/CHANGES.rst b/CHANGES.rst index 0477c20..8625a4c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,40 +1,53 @@ Changes ========= +1.1.0 (2025-11-25) +------------------ + +* **Removed Python 2 support entirely.** The library is now **Python 3 only** (targeting Python 3.8+). +* Updated and enhanced all **docstrings** and type hints for improved clarity and maintainability. + +--- + 1.0.3 (unreleased) ------------------ -- Nothing changed yet. +* Nothing changed yet. +--- 1.0.2 (2014-08-21) ------------------ -- Remove all dependencies. clamd is now standalone! -- Use plain setuptools no d2to1. -- Create universal wheel. +* Remove all dependencies. `clamd` is now standalone! +* Use plain setuptools, no d2to1. +* Create universal wheel. +--- 1.0.1 (2013-03-06) ------------------ -- Updated d2to1 dependency +* Updated d2to1 dependency +--- 1.0.0 (2013-02-08) ------------------ -- Change public interface, including exceptions -- Support Python 3.3, withdraw 2.5 support +* Change public interface, including exceptions. +* Support Python 3.3, withdraw 2.5 support. +--- 0.3.4 (2013-02-01) ------------------ -- Use regex to parse file status reponse instead of complicated string split/join +* Use regex to parse file status response instead of complicated string split/join. +--- 0.3.3 (2013-01-28) ------------------ -- First version of clamd that can be installed from PyPI +* First version of clamd that can be installed from PyPI \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in index ad7f912..2e2d213 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,5 @@ include CHANGES.rst include README.rst -include ez_setup.py recursive-exclude * __pycache__ recursive-exclude * *.py[co] diff --git a/README.md b/README.md new file mode 100644 index 0000000..8e8ec87 --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# clamd + +## About +`python_clamd` is a modernized Python wrapper for the [ClamAV](https://www.clamav.net/) anti-virus engine. +It allows you to interact with a running `clamd` daemon on **Linux, macOS, and Windows**. + +This project is a fork of `python-clamd` (last updated 2014), itself a fork of `pyClamd` v0.2.0 by Philippe Lagadec, which in turn extended `pyClamd` v0.1.1 by Alexandre Norman. + +### Why this fork? +- Updated for **Python 3.12+ compatibility** (replaced deprecated `pkg_resources` with `importlib.metadata`). +- Added **type hints and annotations** for better IDE/autocomplete and `mypy` support. +- Actively maintained with patches and modernization. + +--- + +## Usage + +### Connect via Unix socket +```python +import python_clamd +cd = python_clamd.ClamdUnixSocket() +cd.ping() # 'PONG' +cd.version() # 'ClamAV ...' +cd.reload() # 'RELOADING' +``` + +### Scan a file +```python +open('/tmp/EICAR','wb').write(python_clamd.EICAR) +cd.scan('/tmp/EICAR') +# {'/tmp/EICAR': ('FOUND', 'Eicar-Test-Signature')} +``` + +### Scan a stream +```python +from io import BytesIO +cd.instream(BytesIO(python_clamd.EICAR)) +# {'stream': ('FOUND', 'Eicar-Test-Signature')} +``` + +## Installation +### Python package + +Coming soon via PyPI: +```python +pip install python-clamd +``` + +### ClamAV daemon +On Ubuntu: +```bash +sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs +sudo freshclam +sudo service clamav-daemon start +``` + +## Supported Versions +* [x] Python 3.9 – 3.13 +* [ ] Python 2.x (dropped) + +## License +Released under the LGPL license. + +## Contributing +PRs and issues are welcome. This fork exists to keep ClamAV bindings usable on modern Python. diff --git a/README.rst b/README.rst deleted file mode 100644 index 4e9315d..0000000 --- a/README.rst +++ /dev/null @@ -1,53 +0,0 @@ -clamd -===== - -.. image:: https://travis-ci.org/graingert/python-clamd.png?branch=master - :alt: travis build status - :target: https://travis-ci.org/graingert/python-clamd - -About ------ -`clamd` is a portable Python module to use the ClamAV anti-virus engine on -Windows, Linux, MacOSX and other platforms. It requires a running instance of -the `clamd` daemon. - -This is a fork of pyClamd v0.2.0 created by Philippe Lagadec and published on his website: http://www.decalage.info/en/python/pyclamd which in turn is a slightly improved version of pyClamd v0.1.1 created by Alexandre Norman and published on his website: http://xael.org/norman/python/pyclamd/ - -Usage ------ - -To use with a unix socket:: - - >>> import clamd - >>> cd = clamd.ClamdUnixSocket() - >>> cd.ping() - 'PONG' - >>> cd.version() # doctest: +ELLIPSIS - 'ClamAV ... - >>> cd.reload() - 'RELOADING' - -To scan a file:: - - >>> open('/tmp/EICAR','wb').write(clamd.EICAR) - >>> cd.scan('/tmp/EICAR') - {'/tmp/EICAR': ('FOUND', 'Eicar-Test-Signature')} - -To scan a stream:: - - >>> from io import BytesIO - >>> cd.instream(BytesIO(clamd.EICAR)) - {'stream': ('FOUND', 'Eicar-Test-Signature')} - - -License -------- -`clamd` is released as open-source software under the LGPL license. - -clamd Install -------------- -How to install the ClamAV daemon `clamd` under Ubuntu:: - - sudo apt-get install clamav-daemon clamav-freshclam clamav-unofficial-sigs - sudo freshclam - sudo service clamav-daemon start diff --git a/ez_setup.py b/ez_setup.py deleted file mode 100644 index b2435fe..0000000 --- a/ez_setup.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python -"""Bootstrap setuptools installation - -To use setuptools in your package's setup.py, include this -file in the same directory and add this to the top of your setup.py:: - - from ez_setup import use_setuptools - use_setuptools() - -To require a specific version of setuptools, set a download -mirror, or use an alternate download directory, simply supply -the appropriate options to ``use_setuptools()``. - -This file can also be run as a script to install or upgrade setuptools. -""" -import os -import shutil -import sys -import tempfile -import zipfile -import optparse -import subprocess -import platform -import textwrap -import contextlib - -from distutils import log - -try: - from urllib.request import urlopen -except ImportError: - from urllib2 import urlopen - -try: - from site import USER_SITE -except ImportError: - USER_SITE = None - -DEFAULT_VERSION = "5.4.2" -DEFAULT_URL = "https://pypi.python.org/packages/source/s/setuptools/" - -def _python_cmd(*args): - """ - Return True if the command succeeded. - """ - args = (sys.executable,) + args - return subprocess.call(args) == 0 - - -def _install(archive_filename, install_args=()): - with archive_context(archive_filename): - # installing - log.warn('Installing Setuptools') - if not _python_cmd('setup.py', 'install', *install_args): - log.warn('Something went wrong during the installation.') - log.warn('See the error message above.') - # exitcode will be 2 - return 2 - - -def _build_egg(egg, archive_filename, to_dir): - with archive_context(archive_filename): - # building an egg - log.warn('Building a Setuptools egg in %s', to_dir) - _python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir) - # returning the result - log.warn(egg) - if not os.path.exists(egg): - raise IOError('Could not build the egg.') - - -class ContextualZipFile(zipfile.ZipFile): - """ - Supplement ZipFile class to support context manager for Python 2.6 - """ - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() - - def __new__(cls, *args, **kwargs): - """ - Construct a ZipFile or ContextualZipFile as appropriate - """ - if hasattr(zipfile.ZipFile, '__exit__'): - return zipfile.ZipFile(*args, **kwargs) - return super(ContextualZipFile, cls).__new__(cls) - - -@contextlib.contextmanager -def archive_context(filename): - # extracting the archive - tmpdir = tempfile.mkdtemp() - log.warn('Extracting in %s', tmpdir) - old_wd = os.getcwd() - try: - os.chdir(tmpdir) - with ContextualZipFile(filename) as archive: - archive.extractall() - - # going in the directory - subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) - os.chdir(subdir) - log.warn('Now working in %s', subdir) - yield - - finally: - os.chdir(old_wd) - shutil.rmtree(tmpdir) - - -def _do_download(version, download_base, to_dir, download_delay): - egg = os.path.join(to_dir, 'setuptools-%s-py%d.%d.egg' - % (version, sys.version_info[0], sys.version_info[1])) - if not os.path.exists(egg): - archive = download_setuptools(version, download_base, - to_dir, download_delay) - _build_egg(egg, archive, to_dir) - sys.path.insert(0, egg) - - # Remove previously-imported pkg_resources if present (see - # https://bitbucket.org/pypa/setuptools/pull-request/7/ for details). - if 'pkg_resources' in sys.modules: - del sys.modules['pkg_resources'] - - import setuptools - setuptools.bootstrap_install_from = egg - - -def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, - to_dir=os.curdir, download_delay=15): - to_dir = os.path.abspath(to_dir) - rep_modules = 'pkg_resources', 'setuptools' - imported = set(sys.modules).intersection(rep_modules) - try: - import pkg_resources - except ImportError: - return _do_download(version, download_base, to_dir, download_delay) - try: - pkg_resources.require("setuptools>=" + version) - return - except pkg_resources.DistributionNotFound: - return _do_download(version, download_base, to_dir, download_delay) - except pkg_resources.VersionConflict as VC_err: - if imported: - msg = textwrap.dedent(""" - The required version of setuptools (>={version}) is not available, - and can't be installed while this script is running. Please - install a more recent version first, using - 'easy_install -U setuptools'. - - (Currently using {VC_err.args[0]!r}) - """).format(VC_err=VC_err, version=version) - sys.stderr.write(msg) - sys.exit(2) - - # otherwise, reload ok - del pkg_resources, sys.modules['pkg_resources'] - return _do_download(version, download_base, to_dir, download_delay) - -def _clean_check(cmd, target): - """ - Run the command to download target. If the command fails, clean up before - re-raising the error. - """ - try: - subprocess.check_call(cmd) - except subprocess.CalledProcessError: - if os.access(target, os.F_OK): - os.unlink(target) - raise - -def download_file_powershell(url, target): - """ - Download the file at url to target using Powershell (which will validate - trust). Raise an exception if the command cannot complete. - """ - target = os.path.abspath(target) - ps_cmd = ( - "[System.Net.WebRequest]::DefaultWebProxy.Credentials = " - "[System.Net.CredentialCache]::DefaultCredentials; " - "(new-object System.Net.WebClient).DownloadFile(%(url)r, %(target)r)" - % vars() - ) - cmd = [ - 'powershell', - '-Command', - ps_cmd, - ] - _clean_check(cmd, target) - -def has_powershell(): - if platform.system() != 'Windows': - return False - cmd = ['powershell', '-Command', 'echo test'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_powershell.viable = has_powershell - -def download_file_curl(url, target): - cmd = ['curl', url, '--silent', '--output', target] - _clean_check(cmd, target) - -def has_curl(): - cmd = ['curl', '--version'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_curl.viable = has_curl - -def download_file_wget(url, target): - cmd = ['wget', url, '--quiet', '--output-document', target] - _clean_check(cmd, target) - -def has_wget(): - cmd = ['wget', '--version'] - with open(os.path.devnull, 'wb') as devnull: - try: - subprocess.check_call(cmd, stdout=devnull, stderr=devnull) - except Exception: - return False - return True - -download_file_wget.viable = has_wget - -def download_file_insecure(url, target): - """ - Use Python to download the file, even though it cannot authenticate the - connection. - """ - src = urlopen(url) - try: - # Read all the data in one block. - data = src.read() - finally: - src.close() - - # Write all the data in one block to avoid creating a partial file. - with open(target, "wb") as dst: - dst.write(data) - -download_file_insecure.viable = lambda: True - -def get_best_downloader(): - downloaders = ( - download_file_powershell, - download_file_curl, - download_file_wget, - download_file_insecure, - ) - viable_downloaders = (dl for dl in downloaders if dl.viable()) - return next(viable_downloaders, None) - -def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, - to_dir=os.curdir, delay=15, downloader_factory=get_best_downloader): - """ - Download setuptools from a specified location and return its filename - - `version` should be a valid setuptools version number that is available - as an egg for download under the `download_base` URL (which should end - with a '/'). `to_dir` is the directory where the egg will be downloaded. - `delay` is the number of seconds to pause before an actual download - attempt. - - ``downloader_factory`` should be a function taking no arguments and - returning a function for downloading a URL to a target. - """ - # making sure we use the absolute path - to_dir = os.path.abspath(to_dir) - zip_name = "setuptools-%s.zip" % version - url = download_base + zip_name - saveto = os.path.join(to_dir, zip_name) - if not os.path.exists(saveto): # Avoid repeated downloads - log.warn("Downloading %s", url) - downloader = downloader_factory() - downloader(url, saveto) - return os.path.realpath(saveto) - -def _build_install_args(options): - """ - Build the arguments to 'python setup.py install' on the setuptools package - """ - return ['--user'] if options.user_install else [] - -def _parse_args(): - """ - Parse the command line for options - """ - parser = optparse.OptionParser() - parser.add_option( - '--user', dest='user_install', action='store_true', default=False, - help='install in user site package (requires Python 2.6 or later)') - parser.add_option( - '--download-base', dest='download_base', metavar="URL", - default=DEFAULT_URL, - help='alternative URL from where to download the setuptools package') - parser.add_option( - '--insecure', dest='downloader_factory', action='store_const', - const=lambda: download_file_insecure, default=get_best_downloader, - help='Use internal, non-validating downloader' - ) - parser.add_option( - '--version', help="Specify which version to download", - default=DEFAULT_VERSION, - ) - options, args = parser.parse_args() - # positional arguments are ignored - return options - -def main(): - """Install or upgrade setuptools and EasyInstall""" - options = _parse_args() - archive = download_setuptools( - version=options.version, - download_base=options.download_base, - downloader_factory=options.downloader_factory, - ) - return _install(archive, _build_install_args(options)) - -if __name__ == '__main__': - sys.exit(main()) diff --git a/setup.cfg b/setup.cfg index c5f2335..68c61a2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,2 @@ -[nosetests] -with-doctest=1 - -[wheel] -universal=1 +[bdist_wheel] +universal=0 \ No newline at end of file diff --git a/setup.py b/setup.py index 901a206..6ca71ff 100755 --- a/setup.py +++ b/setup.py @@ -1,28 +1,35 @@ -#!/usr/bin/env python -from ez_setup import use_setuptools -use_setuptools() - from setuptools import setup, find_packages -readme = open('README.rst').read() -history = open('CHANGES.rst').read().replace('.. :changelog:', '') +try: + with open('README.rst', encoding='utf-8') as f: + readme = f.read() +except FileNotFoundError: + readme = '' + +try: + with open('CHANGES.rst', encoding='utf-8') as f: + history = f.read().replace('.. :changelog:', '') +except FileNotFoundError: + history = '' setup( - name="clamd", - version='1.0.3.dev0', - author="Thomas Grainger", - author_email="python-clamd@graingert.co.uk", - maintainer="Thomas Grainger", - maintainer_email = "python-clamd@graingert.co.uk", - keywords = "python, clamav, antivirus, scanner, virus, libclamav, clamd", - description = "Clamd is a python interface to Clamd (Clamav daemon).", + name="python_clamd", + version='0.0.1.dev0', + author="janus-sama", + author_email="zino4onowori@gmail.com", + maintainer="janus-sama", + maintainer_email="zino4onowori@gmail.com", + keywords="python, clamav, antivirus, scanner, virus, libclamav, clamd", + description="Updated Clamd is a python interface to Clamd (Clamav daemon).", long_description=readme + '\n\n' + history, - url="https://github.com/graingert/python-clamd", + url="https://github.com/janus-sama/python_clamd", package_dir={'': 'src'}, packages=find_packages('src', exclude="tests"), - classifiers = [ - "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)", + classifiers=[ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", ], + license="LGPL-2.1-only", zip_safe=True, include_package_data=False, ) diff --git a/src/clamd/__init__.py b/src/python_clamd/__init__.py similarity index 74% rename from src/clamd/__init__.py rename to src/python_clamd/__init__.py index 92ff640..ec16214 100644 --- a/src/clamd/__init__.py +++ b/src/python_clamd/__init__.py @@ -1,14 +1,11 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from __future__ import unicode_literals +from importlib import metadata try: - __version__ = __import__('pkg_resources').get_distribution('clamd').version -except: - __version__ = '' - -# $Source$ - + __version__ = metadata.version("pyclamd") +except metadata.PackageNotFoundError: + __version__ = "" import socket import sys @@ -17,7 +14,8 @@ import re import base64 -scan_response = re.compile(r"^(?P.*): ((?P.+) )?(?P(FOUND|OK|ERROR))$") +scan_response = re.compile( + r"^(?P.*): ((?P.+) )?(?P(FOUND|OK|ERROR))$") EICAR = base64.b64decode( b'WDVPIVAlQEFQWzRcUFpYNTQoUF4pN0NDKTd9JEVJQ0FSLVNUQU5E' b'QVJELUFOVElWSVJVUy1URVNU\nLUZJTEUhJEgrSCo=\n' @@ -43,26 +41,37 @@ class ConnectionError(ClamdError): class ClamdNetworkSocket(object): """ Class for using clamd with a network socket + + :params str host: machine host default localhost + :params int port: clamd port default 3310 + :params int timeout: socket timeout default None + """ - def __init__(self, host='127.0.0.1', port=3310, timeout=None): + + def __init__(self, host: str = '127.0.0.1', port: int = 3310, timeout: float | None = None): """ - class initialisation + class initialization - host (string) : hostname or ip address - port (int) : TCP port - timeout (float or None) : socket timeout + :params str host: hostname or ip address + :params int port: TCP port + :params float|None timeout: socket timeout """ self.host = host self.port = port self.timeout = timeout - def _init_socket(self): + def _init_socket(self) -> None: """ internal use only + + :return None: + + :raises ConnectionError: """ try: - self.clamd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.clamd_socket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) self.clamd_socket.connect((self.host, self.port)) self.clamd_socket.settimeout(self.timeout) @@ -70,7 +79,7 @@ def _init_socket(self): e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) - def _error_message(self, exception): + def _error_message(self, exception: Exception) -> str: # args for socket.error can either be (errno, "message") # or just "message" if len(exception.args) == 1: @@ -100,10 +109,9 @@ def shutdown(self): """ Force Clamd to shutdown and exit - return: nothing + :return None: - May raise: - - ConnectionError: in case of communication problem + :raises ConnectionError: if there is a of communication problem """ try: self._init_socket() @@ -138,17 +146,17 @@ def _basic_command(self, command): def _file_system_scan(self, command, file): """ - Scan a file or directory given by filename using multiple threads (faster on SMP machines). - Do not stop on error or virus found. - Scan with archive support enabled. + Scan a file or directory using multiple threads. + + This function performs a filesystem scan using ClamAV, + does not stop on errors or virus detection, and supports archives. - file (string): filename or directory (MUST BE ABSOLUTE PATH !) + :param str command: The ClamAV command to execute. + :param str file: filename or directory (MUST BE ABSOLUTE PATH !) - return: - - (dict): {filename1: ('FOUND', 'virusname'), filename2: ('ERROR', 'reason')} + :return dict: `{filename1: ('FOUND', 'virusname'), filename2: ('ERROR', 'reason')}` - May raise: - - ConnectionError: in case of communication problem + :raise ConnectionError: If there is a communication issue during scanning. """ try: @@ -170,14 +178,12 @@ def instream(self, buff): """ Scan a buffer - buff filelikeobj: buffer to scan + :params file buff: buffer to scan - return: - - (dict): {filename1: ("virusname", "status")} + :return dict: `{filename1: ("virusname", "status")}`: - May raise : - - BufferTooLongError: if the buffer size exceeds clamd limits - - ConnectionError: in case of communication problem + :raises BufferTooLongError: if the buffer size exceeds clamd limits + :raises ConnectionError: if there is a communication problem """ try: @@ -209,10 +215,9 @@ def stats(self): """ Get Clamscan stats - return: (string) clamscan stats + :return str: clamscan stats - May raise: - - ConnectionError: in case of communication problem + :raises ConnectionError: if there is a communication problem """ self._init_socket() try: @@ -230,7 +235,8 @@ def _send_command(self, cmd, *args): if args: concat_args = ' ' + ' '.join(args) - cmd = 'n{cmd}{args}\n'.format(cmd=cmd, args=concat_args).encode('utf-8') + cmd = 'n{cmd}{args}\n'.format( + cmd=cmd, args=concat_args).encode('utf-8') self.clamd_socket.send(cmd) def _recv_response(self): @@ -242,7 +248,8 @@ def _recv_response(self): return f.readline().decode('utf-8').strip() except (socket.error, socket.timeout): e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: {0}".format(e.args)) + raise ConnectionError( + "Error while reading from socket: {0}".format(e.args)) def _recv_response_multiline(self): """ @@ -253,7 +260,8 @@ def _recv_response_multiline(self): return f.read().decode('utf-8') except (socket.error, socket.timeout): e = sys.exc_info()[1] - raise ConnectionError("Error while reading from socket: {0}".format(e.args)) + raise ConnectionError( + "Error while reading from socket: {0}".format(e.args)) def _close_socket(self): """ @@ -276,12 +284,13 @@ class ClamdUnixSocket(ClamdNetworkSocket): """ Class for using clamd with an unix socket """ - def __init__(self, path="/var/run/clamav/clamd.ctl", timeout=None): + + def __init__(self, path: str = "/var/run/clamav/clamd.ctl", timeout: float | None = None): """ class initialisation - path (string) : unix socket path - timeout (float or None) : socket timeout + :params str path: unix socket path + :params float|None timeout: socket timeout """ self.unix_socket = path @@ -292,7 +301,8 @@ def _init_socket(self): internal use only """ try: - self.clamd_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.clamd_socket = socket.socket( + socket.AF_UNIX, socket.SOCK_STREAM) self.clamd_socket.connect(self.unix_socket) self.clamd_socket.settimeout(self.timeout) except socket.error: diff --git a/src/tests/test_api.py b/src/tests/test_api.py index 42423b6..d12ba54 100644 --- a/src/tests/test_api.py +++ b/src/tests/test_api.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from __future__ import unicode_literals -import clamd +import python_clamd from io import BytesIO from contextlib import contextmanager import tempfile @@ -15,6 +14,8 @@ other = stat.S_IROTH execute = (stat.S_IEXEC | stat.S_IXOTH) +FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH + @contextmanager def mkdtemp(*args, **kwargs): @@ -25,57 +26,79 @@ def mkdtemp(*args, **kwargs): shutil.rmtree(temp_dir) -class TestUnixSocket(object): - kwargs = {} +@pytest.fixture(scope="module") +def clamd_unix_socket(): + socket_path = os.environ.get( + 'CLAMD_UNIX_SOCKET', + '/var/run/clamav/pyclamd.ctl' + ) + return python_clamd.ClamdUnixSocket(path=socket_path) - def setup(self): - self.cd = clamd.ClamdUnixSocket(**self.kwargs) - def test_ping(self): - assert self.cd.ping() +class TestUnixSocket: + kwargs = {} + + def test_ping(self, clamd_unix_socket): + assert clamd_unix_socket.ping() == 'PONG' - def test_version(self): - assert self.cd.version().startswith("ClamAV") + def test_version(self, clamd_unix_socket): + assert clamd_unix_socket.version().startswith("ClamAV") - def test_reload(self): - assert self.cd.reload() == 'RELOADING' + def test_reload(self, clamd_unix_socket): + assert clamd_unix_socket.reload().strip() == 'RELOADING' - def test_scan(self): - with tempfile.NamedTemporaryFile('wb', prefix="python-clamd") as f: - f.write(clamd.EICAR) + def test_scan_eicar_found(self, clamd_unix_socket): + with tempfile.NamedTemporaryFile('wb', prefix="python-pyclamd", delete=False) as f: + f.write(python_clamd.EICAR) f.flush() - os.fchmod(f.fileno(), (mine | other)) - expected = {f.name: ('FOUND', 'Eicar-Test-Signature')} + # Set permissions + os.chmod(f.name, FILE_PERMISSIONS) + file_name = f.name - assert self.cd.scan(f.name) == expected + expected = {file_name: ('FOUND', 'Win.Test.EICAR_HDB-1')} - def test_unicode_scan(self): - with tempfile.NamedTemporaryFile('wb', prefix=u"python-clamdλ") as f: - f.write(clamd.EICAR) + try: + assert clamd_unix_socket.scan(file_name) == expected + finally: + os.remove(file_name) + + def test_scan_unicode_path(self, clamd_unix_socket): + with tempfile.NamedTemporaryFile('wb', prefix="python-clamd_λ_ü", delete=False) as f: + f.write(python_clamd.EICAR) f.flush() - os.fchmod(f.fileno(), (mine | other)) - expected = {f.name: ('FOUND', 'Eicar-Test-Signature')} + os.chmod(f.name, FILE_PERMISSIONS) + file_name = f.name + + expected = {file_name: ('FOUND', 'Win.Test.EICAR_HDB-1')} - assert self.cd.scan(f.name) == expected + try: + assert clamd_unix_socket.scan(file_name) == expected + finally: + os.remove(file_name) - def test_multiscan(self): + def test_multiscan(self, clamd_unix_socket): expected = {} - with mkdtemp(prefix="python-clamd") as d: - for i in range(10): - with open(os.path.join(d, "file" + str(i)), 'wb') as f: - f.write(clamd.EICAR) - os.fchmod(f.fileno(), (mine | other)) - expected[f.name] = ('FOUND', 'Eicar-Test-Signature') - os.chmod(d, (mine | other | execute)) + with tempfile.TemporaryDirectory(prefix="python-clamd_multi") as d: + for i in range(3): # Reduced to 3 for faster testing + file_path = os.path.join(d, "file" + str(i)) + with open(file_path, 'wb') as f: + f.write(python_clamd.EICAR) + os.chmod(file_path, FILE_PERMISSIONS) + expected[file_path] = ('FOUND', 'Win.Test.EICAR_HDB-1') + + os.chmod(d, stat.S_IRWXU | stat.S_IRGRP | + stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - assert self.cd.multiscan(d) == expected + assert clamd_unix_socket.multiscan(d) == expected - def test_instream(self): - expected = {'stream': ('FOUND', 'Eicar-Test-Signature')} - assert self.cd.instream(BytesIO(clamd.EICAR)) == expected + def test_instream_eicar_found(self, clamd_unix_socket): + expected = {'stream': ('FOUND', 'Win.Test.EICAR_HDB-1')} + assert clamd_unix_socket.instream( + BytesIO(python_clamd.EICAR)) == expected - def test_insteam_success(self): - assert self.cd.instream(BytesIO(b"foo")) == {'stream': ('OK', None)} + def test_instream_success(self, clamd_unix_socket): + assert clamd_unix_socket.instream(BytesIO(b"foo bar content")) == { + 'stream': ('OK', None)} class TestUnixSocketTimeout(TestUnixSocket): @@ -83,5 +106,5 @@ class TestUnixSocketTimeout(TestUnixSocket): def test_cannot_connect(): - with pytest.raises(clamd.ConnectionError): - clamd.ClamdUnixSocket(path="/tmp/404").ping() + with pytest.raises(python_clamd.ConnectionError): + python_clamd.ClamdUnixSocket(path="/tmp/404").ping() diff --git a/tox.ini b/tox.ini index a7dbf0c..91e02a5 100644 --- a/tox.ini +++ b/tox.ini @@ -1,15 +1,25 @@ [tox] -envlist = py{26,27,33,34,35}, lint +envlist = py{38,39,310,311,312}, lint +minversion = 3.8 [testenv] -commands = py.test {posargs} +commands = pytest {posargs} + deps = - pytest==2.8.2 + . + pytest + +setenv = + CLAMD_HOST = 127.0.0.1 + CLAMD_PORT = 3310 + CLAMD_UNIX_SOCKET = /var/run/clamav/clamd.ctl [testenv:lint] +description = Run linting checks using flake8 deps = - flake8==2.4.0 -commands=flake8 src + flake8 +commands = flake8 src [flake8] max-line-length = 117 +exclude = .git,__pycache__,.tox,venv,build \ No newline at end of file