Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions maldump/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,96 +40,109 @@ def main() -> None:

# Save the destination directory
dest: Path = args.dest.resolve()
root_dir = args.root_dir
quar_entires: list[QuarEntry] = []


if not args.velociraptor:
quar_entires.extend(run_in_one_root(
root_dir,
args.detect_avs,
))
else:
for directory in os.listdir(root_dir):
new_root_dir = os.path.join(root_dir, directory, 'uploads', 'auto', 'C%3A')
if not os.path.isdir(new_root_dir):
continue

quar_entires.extend(run_in_one_root(
new_root_dir,
args.detect_avs
))

if args.quar or args.all:
export_files(quar_entires, dest)

if args.meta or args.all:
export_meta(quar_entires, dest)

list_files(quar_entires)



def run_in_one_root(root_dir, detect_avs) -> list[QuarEntry]:
# Switch to root partition
os.chdir(args.root_dir)
os.chdir(root_dir)

logger.debug(
'Working in directory "%s", files would be stored into "%s"', os.getcwd(), dest
'Working in directory "%s"', os.getcwd()
)

# Get a list of all supported or all installed avs
avs = AVManager.detect() if args.detect_avs else AVManager.retrieve()
avs = AVManager.detect() if detect_avs else AVManager.retrieve()

logger.debug("Detected AVs: %s", [av.name for av in avs])

if args.quar:
export_files(avs, dest)
elif args.meta:
export_meta(avs, dest)
elif args.all:
export_files(avs, dest)
export_meta(avs, dest)
else:
list_files(avs)
quar_entires: list[QuarEntry] = []
for av in avs:
quar_entires.extend(av.export())

return quar_entires


def export_files(
avs: list[Quarantine], dest: Path, out_file: str = "quarantine.tar"
quar_entries: list[QuarEntry], dest: Path, out_file: str = "quarantine.tar"
) -> None:
total = 0
for av in avs:
entries = av.export()
if (len(entries)) > 0:
tar_path = dest.joinpath(out_file)
tar = tarfile.open(tar_path, total and "a" or "w")
total += len(entries)
for entry in entries:
tarinfo = tarfile.TarInfo(av.name + "/" + entry.md5)
tarinfo.size = len(entry.malfile)
tar.addfile(tarinfo, io.BytesIO(entry.malfile))
tar.close()
if (len(quar_entries)) > 0:
tar_path = dest.joinpath(out_file)
tar = tarfile.open(tar_path, total and "a" or "w")
total += len(quar_entries)
for entry in quar_entries:
tarinfo = tarfile.TarInfo(av.name + "/" + entry.md5)
tarinfo.size = len(entry.malfile)
tar.addfile(tarinfo, io.BytesIO(entry.malfile))
tar.close()
if total > 0:
print(f"Exported {total} object(s) into '{out_file}'")
else:
print("No quarantined files found!")


def export_meta(
avs: list[Quarantine], dest: Path, meta_file: str = "quarantine.csv"
quar_entries: list[QuarEntry], dest: Path, meta_file: str = "quarantine.csv"
) -> None:
entries = []
for av in avs:
for e in av.export():
d = vars(e)
d.update(antivirus=av.name)
entries.append(d)
if len(entries) > 0:
if len(quar_entries) > 0:
csv_path = dest.joinpath(meta_file)
with open(csv_path, "w", encoding="utf-8", newline="") as f:
fields = [
"timestamp",
"antivirus",
"threat",
"path",
"orig_path",
"size",
"md5",
"sha1",
"sha256",
]
writer = csv.DictWriter(f, fields, extrasaction="ignore")
writer.writeheader()
writer.writerows(entries)
print(f"Written {len(entries)} row(s) into file '{meta_file}'")
writer.writerows(vars(e) for e in quar_entries)
print(f"Written {len(quar_entries)} row(s) into file '{meta_file}'")
else:
print(
f"The file '{meta_file}' wasn't created as there is nothing in quarantine"
)


def list_files(avs: list[Quarantine]) -> None:
quarantined_file_exists = False
for i, av in enumerate(avs):
entries = av.export()
if len(entries) > 0:
quarantined_file_exists = True
if i != 0:
print()
print(Fore.YELLOW + "---", av.name, "---" + Style.RESET_ALL)
for e in entries:
print(e.path)
if not quarantined_file_exists:
def list_files(quar_entries: list[QuarEntry]) -> None:
if not quar_entries:
print("No quarantined files found!")
return

for e in quar_entries:
print(e.path)


def parse_cli() -> argparse.Namespace:
Expand Down Expand Up @@ -179,9 +192,14 @@ def parse_cli() -> argparse.Namespace:
"-t",
"--log-level",
choices=["critical", "fatal", "error", "warn", "warning", "info", "debug"],
default="warning",
default="error",
help="log level",
)
parser.add_argument(
"--velociraptor",
action="store_true",
help="load quarantine from velociraptor dump",
)
parser.add_argument(
"-v", "--version", action="version", version="%(prog)s " + __version__
)
Expand Down
6 changes: 3 additions & 3 deletions maldump/parsers/avast_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _initDB(self) -> bool:
)
self.root = ET.parse(self.location / "index.xml").getroot()
except (ParseError, OSError) as e:
logger.exception("Cannot open and parse index.xml", exc_info=e)
logger.info("Cannot open and parse index.xml", exc_info=e)
return False

# Decrypt vault.db and prepare db connection
Expand All @@ -65,7 +65,7 @@ def _initDB(self) -> bool:
with open(self.tmpfile, "wb") as f:
f.write(self._decryptVault("$AV_ASW/$VAULT/vault.db"))
except OSError as e:
logger.exception("Cannot open nor write temporary file", exc_info=e)
logger.warning("Cannot open nor write temporary file", exc_info=e)
return False

try:
Expand All @@ -74,7 +74,7 @@ def _initDB(self) -> bool:
)
self.db = sqlite3.connect(self.tmpfile)
except sqlite3.Error as e:
logger.exception("Cannot connect to SQLite3 chest database", exc_info=e)
logger.warning("Cannot connect to SQLite3 chest database", exc_info=e)
print("Avast DB Error: " + str(e))
return False

Expand Down
6 changes: 3 additions & 3 deletions maldump/parsers/avg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _initDB(self) -> bool:
)
self.root = ET.parse(self.location / "index.xml").getroot()
except (ParseError, OSError) as e:
logger.exception("Cannot open and parse index.xml", exc_info=e)
logger.info("Cannot open and parse index.xml", exc_info=e)
return False

# Decrypt vault.db and prepare db connection
Expand All @@ -65,7 +65,7 @@ def _initDB(self) -> bool:
with open(self.tmpfile, "wb") as f:
f.write(self._decryptVault("$AV_AVG/$VAULT/vault.db"))
except OSError as e:
logger.exception("Cannot open nor write temporary file", exc_info=e)
logger.warning("Cannot open nor write temporary file", exc_info=e)
return False

try:
Expand All @@ -74,7 +74,7 @@ def _initDB(self) -> bool:
)
self.db = sqlite3.connect(self.tmpfile)
except sqlite3.Error as e:
logger.exception("Cannot connect to SQLite3 chest database", exc_info=e)
logger.warning("Cannot connect to SQLite3 chest database", exc_info=e)
print("Avast DB Error: " + str(e))
return False

Expand Down
20 changes: 13 additions & 7 deletions maldump/parsers/eset_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import re
import typing
import os
from pathlib import Path

from maldump.constants import ThreatMetadata
Expand Down Expand Up @@ -90,7 +91,7 @@ def convertToDict(parser: EsetVirlogParser):
{
**{
y.name.name: y.arg if hasattr(y, "arg") else None
for y in x.record.data_fields
for y in x.record.data_fields if hasattr(y.name, "name")
},
"timestamp": x.record.win_timestamp.date_time,
}
Expand All @@ -100,13 +101,20 @@ def convertToDict(parser: EsetVirlogParser):

@log.log(lgr=logger)
def mainParsing(virlog_path):
if not virlog_path.is_file():
logger.debug("virlog.dat file not found")
return []
kt = parse(EsetParser).kaitai(EsetVirlogParser, virlog_path)
if kt is None:
logger.warning("Skipping virlog.dat parsing")
logger.warning("Skipping virlog.dat parsing at %s", os.path.abspath(virlog_path))
return []
kt.close()

threats = convertToDict(kt)
try:
threats = convertToDict(kt)
except Exception as e:
logger.warning("Cannot parse virlog.dat at %s", os.path.abspath(virlog_path), exc_info=e)
return []

parsedRecords = []
for idx, record in enumerate(threats):
Expand Down Expand Up @@ -161,9 +169,6 @@ def parse_from_log(self, _=None) -> dict[tuple[str, datetime], QuarEntry]:

for idx, metadata in enumerate(mainParsing(self.location)):
logger.debug("Parsing entry, idx %s", idx)
if metadata["user"] == "SYSTEM":
logger.debug("Entry's (idx %s) user is SYSTEM, skipping", idx)
continue
q = QuarEntry(self)
q.timestamp = metadata["timestamp"]
q.threat = metadata["infiltration"]
Expand Down Expand Up @@ -207,7 +212,7 @@ def parse_from_fs(
logger.debug('Skipping entry idx %s, path "%s"', idx, entry)
continue
timestamp = DTC.get_dt_from_stat(entry_stat)
path = str(entry)
path = orig_path = str(os.path.abspath(entry))
sha1 = None
size = entry_stat.st_size
threat = ThreatMetadata.UNKNOWN_THREAT
Expand All @@ -223,6 +228,7 @@ def parse_from_fs(
q = QuarEntry(self)
q.timestamp = timestamp
q.path = path
q.orig_path = orig_path
q.sha1 = sha1
q.size = size
q.threat = threat
Expand Down
38 changes: 25 additions & 13 deletions maldump/parsers/kaitai/eset_ndf_parser.ksy
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ meta:
title: ESET Antivirus quarantine metadata file parser (NDF)
file-extension: NDF
endian: le

seq:
- id: magic
size: 0x08
Expand Down Expand Up @@ -31,19 +32,8 @@ types:
seq:
- id: mal_path
type: widestr
- id: date_block_size
type: u4
- id: date_block_header
size: 0x04
contents: [ 0x4e, 0x49, 0x57, 0x49 ] # NIWI
- id: datetime_quar_enc_start
type: windate
- id: datetime_first_utc
type: windate
- id: datetime_quar_enc_stop
type: windate
- id: unknown_size
type: u4
- id: date_block
type: dateblock
- id: datetime_latest_occurence
type: unixdate
- id: filler1
Expand All @@ -65,6 +55,28 @@ types:
- id: mal_path2
type: widestr

dateblock:
seq:
- id: date_block_size
type: u4
- id: date_block_contents
type: dateblock_contents
if: date_block_size != 0

dateblock_contents:
seq:
- id: date_block_header
size: 0x04
contents: [ 0x4e, 0x49, 0x57, 0x49 ] # NIWI
- id: datetime_quar_enc_start
type: windate
- id: datetime_first_utc
type: windate
- id: datetime_quar_enc_stop
type: windate
- id: unknown_size
type: u4

windate:
seq:
- id: date_time
Expand Down
Loading
Loading