Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions GenerateReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,17 @@ def main() -> None:
logger.info("Updating base package list via CheckDependency.py")
run_py(CHECK_DEPENDENCY_SCRIPT)

# Read NotUsed.txt
# Read list of packages marked as not used
try:
with open("./src/NotUsed.txt", "r") as f:
NotUsedPackages = set(line.strip().lower() for line in f if line.strip())
logger.info(f"Loaded {len(NotUsedPackages)} packages from NotUsed.txt")
with open(NOTUSED_PACKAGES, "r") as f:
NotUsedPackages = {
line.strip().lower() for line in f if line.strip()
}
logger.info(
f"Loaded {len(NotUsedPackages)} packages from {NOTUSED_PACKAGES}"
)
except FileNotFoundError:
logger.warning("NotUsed.txt not found.")
logger.warning(f"NotUsed file not found: {NOTUSED_PACKAGES}")
NotUsedPackages = set()

# Load package list need to processed
Expand Down Expand Up @@ -332,9 +336,12 @@ def main() -> None:
}
custodian_raw_df['Custodian'] = custodian_raw_df['Custodian'].astype(str).map(custodian_map_rev)

now = datetime.now().strftime("%Y%m-%d-%H%M")
monthly_file_path = os.path.join(monthly_report_dir, f"MonthlyReport-{now}.xlsx")
YearMonth = datetime.now().strftime("%Y%m")
now = now_sg()
timestamp = now.strftime("%Y%m-%d-%H%M")
monthly_file_path = os.path.join(
monthly_report_dir, f"MonthlyReport-{timestamp}.xlsx"
)
YearMonth = now.strftime("%Y%m")

# Write to Monthly Report
try:
Expand Down
Binary file not shown.
868 changes: 868 additions & 0 deletions WeeklyReport/2025-06-23/WeeklyReport_20250626_101759.csv

Large diffs are not rendered by default.

43,339 changes: 43,339 additions & 0 deletions WeeklyReport/2025-06-23/WeeklyReport_20250626_101759.html

Large diffs are not rendered by default.

13,300 changes: 13,300 additions & 0 deletions WeeklyReport/2025-06-23/WeeklyReport_20250626_101759.json

Large diffs are not rendered by default.

55 changes: 36 additions & 19 deletions utils/VersionSuggester.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,44 +74,61 @@ def suggest_upgrade_version(all_versions: list, current_version: str) -> str:
logger.error(f"Suggest upgrade error for {current_version}: {e}")
return "unknown"

async def suggest_safe_minor_upgrade(pkg: str, current_version: str, all_versions: list) -> str:
async def suggest_safe_minor_upgrade(
pkg: str, current_version: str, all_versions: list
) -> str:
"""Return the safest upgrade version within the same major release.

If all minor versions are vulnerable, try the next major release and
return its newest non-vulnerable version. ``Up-to-date`` is returned
when no higher secure version is found.
"""
Suggest the highest minor upgrade version that is not vulnerable.

Args:
pkg (str): Package name
current_version (str): Current installed version
all_versions (list): All available versions (str)

Returns:
str: Safe upgrade version or 'Up-to-date' or 'unknown'
"""
try:
cur_ver = version.parse(current_version)
minor_safe_versions = []

minor_candidates: list[tuple[version.Version, str]] = []
higher_major: list[tuple[version.Version, str]] = []

for v in all_versions:
try:
pv = version.parse(v)
if pv.major == cur_ver.major and pv >= cur_ver:
minor_safe_versions.append((pv, v)) # tuple of (parsed, raw)
except InvalidVersion:
continue

# Sort in descending order to get latest first
minor_safe_versions.sort(reverse=True, key=lambda x: x[0])
if pv < cur_ver:
continue

if pv.major == cur_ver.major:
minor_candidates.append((pv, v))
elif pv.major > cur_ver.major:
higher_major.append((pv, v))

# newest first within current major
minor_candidates.sort(reverse=True, key=lambda x: x[0])

sem = asyncio.Semaphore(5)
async with aiohttp.ClientSession() as session:
for _, ver_str in minor_safe_versions:
for _, ver_str in minor_candidates:
_, status, _ = await fetch_osv(session, pkg, ver_str, sem)

if status == 'No':
if status == "No":
return ver_str

if higher_major:
next_major = min(pv.major for pv, _ in higher_major)
next_major_versions = [
(pv, v) for pv, v in higher_major if pv.major == next_major
]
next_major_versions.sort(reverse=True, key=lambda x: x[0])

for _, ver_str in next_major_versions:
_, status, _ = await fetch_osv(session, pkg, ver_str, sem)
if status == "No":
return ver_str

return "Up-to-date"

except Exception as e:
except Exception as e: # pragma: no cover - network/parse issues
logger.warning(f"Error in suggest_safe_minor_upgrade: {e}")
return "unknown"

Expand Down
80 changes: 80 additions & 0 deletions utils/WhlRepacker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import zipfile
import os
import shutil
from pathlib import Path


def repack_single_wheel(wheel_path: Path, output_dir: Path):
"""Repack a single wheel to use Metadata-Version: 2.3"""
temp_dir = output_dir / (wheel_path.stem + '_unpacked')

# Unpack the wheel
with zipfile.ZipFile(wheel_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)

# Locate the METADATA file
metadata_file = None
for root, _, files in os.walk(temp_dir):
for file in files:
if file == 'METADATA':
metadata_file = Path(root) / file
break
if metadata_file:
break

if not metadata_file or not metadata_file.exists():
print(f"[ERROR] METADATA not found in: {wheel_path.name}")
shutil.rmtree(temp_dir)
return

# Modify Metadata-Version
with open(metadata_file, 'r', encoding='utf-8') as f:
lines = f.readlines()

with open(metadata_file, 'w', encoding='utf-8') as f:
for line in lines:
if line.startswith("Metadata-Version:"):
f.write("Metadata-Version: 2.3\n")
else:
f.write(line)

# Repack the wheel
new_wheel_name = wheel_path.stem + '.whl'
new_wheel_path = output_dir / new_wheel_name

with zipfile.ZipFile(new_wheel_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zipf.write(file_path, arcname)

shutil.rmtree(temp_dir)
print(f"[DONE] Repacked: {new_wheel_path.name}")

def process_all_wheels(input_dir: str, output_dir: str = None):
"""Scan folder and process all .whl files"""
input_dir = Path(input_dir)
output_dir = Path(output_dir) if output_dir else input_dir

if not input_dir.is_dir():
raise ValueError("Input path must be a directory.")

whl_files = list(input_dir.glob("*.whl"))
if not whl_files:
print("No .whl files found.")
return

print(f"Found {len(whl_files)} .whl file(s). Starting conversion...")

for whl_file in whl_files:
repack_single_wheel(whl_file, output_dir)

if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python WhlRepacker.py <input_dir> [<output_dir>]")
else:
input_dir = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else None
process_all_wheels(input_dir, output_dir)
Loading