Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
17c138c
Add initial commit
jozef-sabo Mar 31, 2025
46c19ab
Fix operating system type
jozef-sabo Apr 2, 2025
56589c3
Add logging possibility using arguments
jozef-sabo Apr 7, 2025
84f8ca0
Add logging initializer
jozef-sabo Apr 7, 2025
f68511a
Add logging in __main__.py
jozef-sabo Apr 7, 2025
ccdc34f
Add logging in utils.py
jozef-sabo Apr 7, 2025
fde06b8
Add logging and error handling in eset_parser.py
jozef-sabo Apr 7, 2025
cde5dc7
Fix ruff error
jozef-sabo Apr 7, 2025
50083fe
Replace logging for each function by using decorator
jozef-sabo Apr 7, 2025
23a7772
Fix ruff format checks
jozef-sabo Apr 7, 2025
fa75b88
Fix ruff format checks and return values
jozef-sabo Apr 8, 2025
55d3879
Move datetime import from TYPE_CHECKING
jozef-sabo Apr 13, 2025
cfba55d
Add Reader and Parser to utils.py
jozef-sabo Apr 13, 2025
7febc6c
Move kaitai struct processing and file opening from ESET
jozef-sabo Apr 13, 2025
80c107b
Fix logging function in ESET
jozef-sabo Apr 13, 2025
82ef3d1
Add entry_stat to Parser
jozef-sabo Apr 13, 2025
fc9bdb6
Use new stat in ESET
jozef-sabo Apr 13, 2025
74f4f6d
Add new logging techniques to Avast
jozef-sabo Apr 13, 2025
ba8c90e
Add new logging techniques to AVG
jozef-sabo Apr 13, 2025
cb6ba8c
Add new logging techniques to Avira
jozef-sabo Apr 13, 2025
ed959d2
Add new logging techniques to Forticlient
jozef-sabo Apr 13, 2025
d4f10c8
Add new logging techniques to Gdata
jozef-sabo Apr 13, 2025
4cd9696
Add new logging techniques to Kaspersky Anti-Virus
jozef-sabo Apr 13, 2025
d1065aa
Add new logging techniques to MalwareBytes
jozef-sabo Apr 13, 2025
7f7f56f
Add new logging techniques to McAfee
jozef-sabo Apr 13, 2025
31aa1d9
Add new logging techniques to Windows Defender
jozef-sabo Apr 13, 2025
20026e5
Fix mypy and ruff errors
jozef-sabo Apr 13, 2025
cc5d4a6
Fix naming convention in Avira Antivirus
jozef-sabo Apr 13, 2025
1422b60
Rebase: Include changes from main and fix
jozef-sabo Apr 13, 2025
26878f2
Merge branch 'main' into fix-exception-handling
jozef-sabo Apr 13, 2025
36d96ec
Fix ruff checks
jozef-sabo Apr 13, 2025
60c07a9
Move logging decorator to utils.py
jozef-sabo Apr 22, 2025
c628d2c
Move logging decorator to utils and use local logger for all the AVs
jozef-sabo Apr 22, 2025
e17c9b5
Use local logger in __main__.py
jozef-sabo Apr 22, 2025
60e891c
Do not use global variable in __main__.py
jozef-sabo Apr 22, 2025
0b1738b
Fix types in utils.py
jozef-sabo Apr 22, 2025
cae9d7e
Fix types in utils.py 2
jozef-sabo Apr 22, 2025
568b60a
Fix imports in __init__.py
jozef-sabo Apr 22, 2025
4596952
Fix logging for unlogged elements
jozef-sabo Apr 23, 2025
883479e
Fix spacing for the debugger
jozef-sabo Apr 23, 2025
79ceb98
Move unlogged objects to a separate class alongside with their imports
jozef-sabo Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions maldump/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import argparse
import csv
import ctypes
import getpass
import io
import logging
import os
import sys
import tarfile
Expand All @@ -20,14 +22,19 @@
from maldump.structures import Quarantine

__version__ = "0.5.0"
logger = logging.getLogger(__name__)


def main() -> None:
init()
args = parse_cli()
init_logging(args.log_level)

# Admin privileges are required for optimal function (windows only)
if sys.platform == "win32" and not ctypes.windll.shell32.IsUserAnAdmin():
logger.critical(
"The program executed on Windows machine without proper privileges"
)
print("Please try again with admin privileges")
sys.exit(1)

Expand All @@ -37,9 +44,15 @@ def main() -> None:
# Switch to root partition
os.chdir(args.root_dir)

logger.debug(
'Working in directory "%s", files would be stored into "%s"', os.getcwd(), dest
)

# Get a list of all installed avs
avs = AVManager.detect()

logger.debug("Detected AVs: %s", [av.name for av in avs])

if args.quar:
export_files(avs, dest)
elif args.meta:
Expand Down Expand Up @@ -156,6 +169,13 @@ def parse_cli() -> argparse.Namespace:
parser.add_argument(
"-a", "--all", action="store_true", help="equivalent of running both -q and -m"
)
parser.add_argument(
"-t",
"--log-level",
choices=["critical", "fatal", "error", "warn", "warning", "info", "debug"],
default="warning",
help="log level",
)
parser.add_argument(
"-v", "--version", action="version", version="%(prog)s " + __version__
)
Expand All @@ -170,5 +190,21 @@ def parse_cli() -> argparse.Namespace:
return parser.parse_args()


def init_logging(log_level: str) -> None:
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: " + log_level) # noqa: TRY004
logging.basicConfig(
handlers=[
# logging.FileHandler("syslog.log", mode="w", encoding="utf-8"),
logging.StreamHandler(sys.stderr)
],
level=numeric_level,
format="%(asctime)s:%(levelname)s:%(name)s:%(module)s:%(message)s",
)
logger.debug("Logging started, logger initialized successfully")
logger.info("Logging as user %s", getpass.getuser())


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions maldump/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from enum import Enum
from typing import Any
from xml.etree.ElementTree import Element


class OperatingSystem(Enum):
Expand All @@ -9,3 +11,35 @@ class OperatingSystem(Enum):

class ThreatMetadata(str, Enum):
UNKNOWN_THREAT = "Unknown-no-metadata"


class UnloggedObjects:
@staticmethod
def __contains__(item: Any) -> bool:
from maldump.parsers.avast_parser import AvastParser
from maldump.parsers.avg_parser import AVGParser
from maldump.parsers.eset_parser import EsetParser
from maldump.parsers.forticlient_parser import ForticlientParser
from maldump.parsers.kaitai.forticlient_parser import (
ForticlientParser as ForticlientKaitaiParser,
)
from maldump.parsers.kaspersky_parser import KasperskyParser
from maldump.parsers.malwarebytes_parser import MalwarebytesParser
from maldump.parsers.mcafee_parser import McafeeParser
from maldump.parsers.windef_parser import WindowsDefenderParser

unlogged = {
bytes,
EsetParser,
AvastParser,
AVGParser,
ForticlientParser,
KasperskyParser,
MalwarebytesParser,
McafeeParser,
WindowsDefenderParser,
ForticlientKaitaiParser.Timestamp,
Element,
}

return item in unlogged
94 changes: 76 additions & 18 deletions maldump/parsers/avast_parser.py

Large diffs are not rendered by default.

96 changes: 77 additions & 19 deletions maldump/parsers/avg_parser.py

Large diffs are not rendered by default.

27 changes: 19 additions & 8 deletions maldump/parsers/avira_parser.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
from __future__ import annotations

from datetime import datetime as dt
import logging

from maldump.parsers.kaitai.avira_parser import AviraParser as KaitaiParser
from maldump.structures import Parser, QuarEntry
from maldump.utils import Parser as parse

logger = logging.getLogger(__name__)


class AviraParser(Parser):
def parse_from_log(self, _=None) -> dict[str, QuarEntry]:
def parse_from_log(self, data=None):
pass

def parse_from_fs(self, _=None) -> dict[str, QuarEntry]:
logger.info("Parsing from filesystem in %s", self.name)
quarfiles = {}
for metafile in self.location.glob("*.qua"):
kt = KaitaiParser.from_file(metafile)
for idx, metafile in enumerate(self.location.glob("*.qua")):
logger.debug('Parsing entry, idx %s, path "%s"', idx, metafile)

kt = parse(self).kaitai(KaitaiParser, metafile)
if kt is None:
logger.debug('Skipping entry idx %s, path "%s"', idx, metafile)
continue

q = QuarEntry()
q.timestamp = dt.fromtimestamp(kt.qua_time)

q.timestamp = parse(self).timestamp(kt.qua_time)
q.threat = kt.mal_type
q.path = kt.filename[4:]
q.malfile = kt.mal_file
quarfiles[str(metafile)] = q
kt.close()

return quarfiles

def parse_from_fs(self, data=None):
pass
66 changes: 50 additions & 16 deletions maldump/parsers/eset_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import annotations

import logging
import re
import typing
from pathlib import Path
Expand All @@ -24,10 +25,15 @@
from maldump.parsers.kaitai.eset_virlog_parser import EsetVirlogParser
from maldump.structures import Parser, QuarEntry
from maldump.utils import DatetimeConverter as DTC
from maldump.utils import Logger as log
from maldump.utils import Parser as parse
from maldump.utils import Reader as read

if typing.TYPE_CHECKING:
from datetime import datetime

logger = logging.getLogger(__name__)

__author__ = "Ladislav Baco"
__copyright__ = "Copyright (C) 2017"
__credits__ = "Ladislav Baco"
Expand All @@ -37,6 +43,7 @@
__status__ = "Development"


@log.log(lgr=logger)
def parseRecord(record: dict):
return {
"timestamp": record.get("timestamp"),
Expand All @@ -51,6 +58,7 @@ def parseRecord(record: dict):
}


@log.log(lgr=logger)
def convertToDict(parser: EsetVirlogParser):
return [
{
Expand All @@ -64,9 +72,20 @@ def convertToDict(parser: EsetVirlogParser):
]


@log.log(lgr=logger)
def mainParsing(virlog_path):
parser = EsetVirlogParser.from_file(filename=virlog_path)
threats = convertToDict(parser)
kt = parse(EsetParser).kaitai(EsetVirlogParser, virlog_path)
if kt is None:
logger.warning("Skipping virlog.dat parsing")
return []
kt.close()

threats = convertToDict(kt)

parsedRecords = []
for idx, record in enumerate(threats):
logger.debug("Parsing raw record %s/%s", idx + 1, len(threats))
parsedRecords.append(parseRecord(record))

return [parseRecord(record) for record in threats]

Expand All @@ -80,37 +99,44 @@ def __init__(self):
)
self.regex_entry = re.compile(r"([0-9a-fA-F]+)\.NQF$")

@log.log(lgr=logger)
def _decrypt(self, data: bytes) -> bytes:
return bytes([((b - 84) % 256) ^ 0xA5 for b in data])

@log.log(lgr=logger)
def _get_malfile(self, username: str, sha1: str) -> bytes:
quarfile = self.quarpath.format(username=username)
quarfile = Path(quarfile) / (sha1.upper() + ".NQF")
try:
with open(quarfile, "rb") as f:
data = f.read()
decrypted_data = self._decrypt(data)
except OSError:
# logging
print("Eset Error: could not read file", quarfile)

return decrypted_data
data = read.contents(quarfile, filetype="malware")
if data is None:
return b""

return self._decrypt(data)

@log.log(lgr=logger)
def _get_metadata(self, path: Path, objhash: str) -> KaitaiParserMetadata | None:
# metadata file has .NDF extension
metadata_path = path / (objhash + ".NDF")
if not metadata_path.is_file():
logger.debug("Metadata file not found")
return None

kt = parse(self).kaitai(KaitaiParserMetadata, metadata_path)
if kt is None:
return None

kt = KaitaiParserMetadata.from_file(metadata_path)
kt.close()
return kt

def parse_from_log(self, _=None) -> dict[tuple[str, datetime], QuarEntry]:
logger.info("Parsing from log in %s", self.name)
quarfiles: dict[tuple[str, datetime], QuarEntry] = {}

for metadata in mainParsing(self.location):
for idx, metadata in enumerate(mainParsing(self.location)):
logger.debug("Parsing entry, idx %s", idx)
if metadata["user"] == "SYSTEM":
logger.debug("Entry's (idx %s) user is SYSTEM, skipping", idx)
continue
q = QuarEntry()
q.timestamp = metadata["timestamp"]
Expand All @@ -124,26 +150,34 @@ def parse_from_log(self, _=None) -> dict[tuple[str, datetime], QuarEntry]:
def parse_from_fs(
self, data: dict[tuple[str, datetime], QuarEntry] | None = None
) -> dict[tuple[str, datetime], QuarEntry]:
logger.info("Parsing from filesystem in %s", self.name)
quarfiles = {}

actual_path = Path("Users/")
for entry in actual_path.glob(
"*/AppData/Local/ESET/ESET Security/Quarantine/*.NQF"
for idx, entry in enumerate(
actual_path.glob("*/AppData/Local/ESET/ESET Security/Quarantine/*.NQF")
):
logger.debug('Parsing entry, idx %s, path "%s"', idx, entry)
res_path = re.match(self.regex_entry, entry.name)
res_user = re.match(self.regex_user, str(entry))

if not res_path:
logger.debug(
"Entry's (idx %s) filename of incorrect format, skipping", idx
)
continue

user = res_user.group(1)
objhash = res_path.group(1)

if (objhash.lower(), user) in data:
logger.debug("Entry (idx %s) already found, skipping", idx)
continue

entry_stat = entry.stat()

entry_stat = parse(self).entry_stat(entry)
if entry_stat is None:
logger.debug('Skipping entry idx %s, path "%s"', idx, entry)
continue
timestamp = DTC.get_dt_from_stat(entry_stat)
path = str(entry)
sha1 = None
Expand Down
18 changes: 16 additions & 2 deletions maldump/parsers/forticlient_parser.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
from __future__ import annotations

import logging
from datetime import datetime as dt

from maldump.parsers.kaitai.forticlient_parser import ForticlientParser as KaitaiParser
from maldump.structures import Parser, QuarEntry
from maldump.utils import Logger as log
from maldump.utils import Parser as parse

logger = logging.getLogger(__name__)


class ForticlientParser(Parser):
@log.log(lgr=logger)
def _normalize_path(self, path):
if path[2:4] == "?\\":
path = path[4:]
return path

@log.log(lgr=logger)
def _get_time(self, ts):
return dt(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)

def parse_from_log(self, data=None):
pass

def parse_from_fs(self, _=None) -> dict[str, QuarEntry]:
logger.info("Parsing from log in %s", self.name)
quarfiles = {}

for metafile in self.location.glob("*[!.meta]"):
kt = KaitaiParser.from_file(metafile)
for idx, metafile in enumerate(self.location.glob("*[!.meta]")):
logger.debug('Parsing entry, idx %s, path "%s"', idx, metafile)

kt = parse(self).kaitai(KaitaiParser, metafile)
if kt is None:
logger.debug('Skipping entry idx %s, path "%s"', idx, metafile)
continue

q = QuarEntry()
q.timestamp = self._get_time(kt.timestamp)
q.threat = kt.mal_type
Expand Down
Loading