diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index bf62c19..6be960f 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
- rev: v0.14.10
+ rev: v0.15.12
hooks:
- id: ruff
args: [--fix]
diff --git a/DNAUID/dna_ann/__init__.py b/DNAUID/dna_ann/__init__.py
index 37b217a..90cfa4c 100644
--- a/DNAUID/dna_ann/__init__.py
+++ b/DNAUID/dna_ann/__init__.py
@@ -1,6 +1,7 @@
+from __future__ import annotations
+
import random
import asyncio
-from typing import List
from gsuid_core.sv import SV
from gsuid_core.aps import scheduler
@@ -9,79 +10,74 @@
from gsuid_core.models import Event
from gsuid_core.subscribe import gs_subscribe
+from .utils import resolve_index, fetch_ann_list, build_index_map
+from .ann_card import draw_ann_list_img, draw_ann_detail_img
from ..utils.dna_api import dna_api
-from ..dna_ann.ann_card import ann_detail_card
from ..utils.msgs.notify import send_dna_notify
from ..dna_config.dna_config import DNAConfig
sv_ann = SV("DNA公告")
sv_ann_sub = SV("订阅DNA公告", pm=3)
-task_name_ann = "订阅DNA公告"
-ann_minute_check: int = DNAConfig.get_config("AnnMinuteCheck").data or 10
+TASK_NAME_ANN = "订阅DNA公告"
+ANN_MIN_CHECK: int = DNAConfig.get_config("AnnMinuteCheck").data or 10
@sv_ann.on_command("公告")
async def ann_dna(bot: Bot, ev: Event):
- ann_id = ev.text
- if not ann_id:
- ann_list = await dna_api.get_ann_list()
- if not ann_list:
- return await bot.send("获取公告列表失败")
- anns = "\n".join([f"{x['postId']}" for x in ann_list])
- return await send_dna_notify(bot, ev, f"公告列表: \n{anns}")
+ text = ev.text.strip().replace("#", "")
+
+ if not text:
+ result = await draw_ann_list_img()
+ if isinstance(result, str):
+ return await send_dna_notify(bot, ev, result)
+ return await bot.send(result) # type: ignore
- ann_id = ann_id.replace("#", "")
- if not ann_id.isdigit():
- raise Exception("公告ID不正确")
+ posts = await fetch_ann_list(prefer_cache=True)
+ if not posts:
+ return await send_dna_notify(bot, ev, "获取公告列表失败")
- img = await ann_detail_card(ann_id)
- return await bot.send(img) # type: ignore
+ post_id = resolve_index(text, build_index_map(posts))
+ if post_id is None:
+ return await send_dna_notify(bot, ev, "公告序号不正确,发送 公告 查看可用列表")
+
+ result = await draw_ann_detail_img(post_id)
+ if isinstance(result, str):
+ return await send_dna_notify(bot, ev, result)
+ return await bot.send(result) # type: ignore
@sv_ann_sub.on_fullmatch("订阅公告")
async def sub_ann_dna(bot: Bot, ev: Event):
if not ev.group_id:
- return await bot.send("请在群聊中订阅")
+ return await send_dna_notify(bot, ev, "请在群聊中订阅")
if not DNAConfig.get_config("DNAAnnOpen").data:
- return await bot.send("二重螺旋公告推送功能已关闭")
-
- data = await gs_subscribe.get_subscribe(task_name_ann)
- if data:
- for subscribe in data:
- if subscribe.group_id == ev.group_id:
- return await bot.send("已经订阅了二重螺旋公告!")
+ return await send_dna_notify(bot, ev, "二重螺旋公告推送功能已关闭")
- await gs_subscribe.add_subscribe(
- "session",
- task_name=task_name_ann,
- event=ev,
- extra_message="",
- )
+ data = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
+ if data and any(sub.group_id == ev.group_id for sub in data):
+ return await send_dna_notify(bot, ev, "已经订阅了二重螺旋公告!")
- logger.info(data)
- await bot.send("成功订阅二重螺旋公告!")
+ await gs_subscribe.add_subscribe("session", TASK_NAME_ANN, ev, extra_message="")
+ await send_dna_notify(bot, ev, "成功订阅二重螺旋公告!")
@sv_ann_sub.on_fullmatch(("取消订阅公告", "取消公告", "退订公告"))
async def unsub_ann_dna(bot: Bot, ev: Event):
if not ev.group_id:
- return await bot.send("请在群聊中取消订阅")
+ return await send_dna_notify(bot, ev, "请在群聊中取消订阅")
- data = await gs_subscribe.get_subscribe(task_name_ann)
- if data:
- for subscribe in data:
- if subscribe.group_id == ev.group_id:
- await gs_subscribe.delete_subscribe("session", task_name_ann, ev)
- return await bot.send("成功取消订阅二重螺旋公告!")
- else:
- if not DNAConfig.get_config("DNAAnnOpen").data:
- return await bot.send("二重螺旋公告推送功能已关闭")
+ data = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
+ if data and any(sub.group_id == ev.group_id for sub in data):
+ await gs_subscribe.delete_subscribe("session", TASK_NAME_ANN, ev)
+ return await send_dna_notify(bot, ev, "成功取消订阅二重螺旋公告!")
- return await bot.send("未曾订阅二重螺旋公告!")
+ if not DNAConfig.get_config("DNAAnnOpen").data:
+ return await send_dna_notify(bot, ev, "二重螺旋公告推送功能已关闭")
+ return await send_dna_notify(bot, ev, "未曾订阅二重螺旋公告!")
-@scheduler.scheduled_job("interval", minutes=ann_minute_check)
+@scheduler.scheduled_job("interval", minutes=ANN_MIN_CHECK)
async def check_dna_ann():
if not DNAConfig.get_config("DNAAnnOpen").data:
return
@@ -90,42 +86,39 @@ async def check_dna_ann():
async def check_dna_ann_state():
logger.info("[二重螺旋公告] 定时任务: 二重螺旋公告查询..")
- datas = await gs_subscribe.get_subscribe(task_name_ann)
- if not datas:
+ subs = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
+ if not subs:
logger.info("[二重螺旋公告] 暂无群订阅")
return
- ids: List[int] = DNAConfig.get_config("DNAAnnIds").data or []
new_ann_list = await dna_api.get_ann_list()
if not new_ann_list:
return
- new_ann_ids = [int(x["postId"]) for x in new_ann_list]
- if not ids:
- DNAConfig.set_config("DNAAnnIds", new_ann_ids)
+ known_ids: list[int] = DNAConfig.get_config("DNAAnnIds").data or []
+ fresh_ids = [int(post["postId"]) for post in new_ann_list]
+
+ if not known_ids:
+ DNAConfig.set_config("DNAAnnIds", fresh_ids)
logger.info("[二重螺旋公告] 初始成功, 将在下个轮询中更新.")
return
- new_ann_need_send = []
- for ann_id in new_ann_ids:
- if ann_id not in ids:
- new_ann_need_send.append(ann_id)
-
- if not new_ann_need_send:
+ pending = [post_id for post_id in fresh_ids if post_id not in known_ids]
+ if not pending:
logger.info("[二重螺旋公告] 没有最新公告")
return
- logger.info(f"[二重螺旋公告] 更新公告id: {new_ann_need_send}")
- save_ids = sorted(ids, reverse=True)[:50] + new_ann_ids
- DNAConfig.set_config("DNAAnnIds", list(set(save_ids)))
+ logger.info(f"[二重螺旋公告] 更新公告id: {pending}")
+ merged = sorted(set(known_ids) | set(fresh_ids), reverse=True)[:50]
+ DNAConfig.set_config("DNAAnnIds", merged)
- for ann_id in new_ann_need_send:
+ for post_id in pending:
try:
- img = await ann_detail_card(ann_id, is_check_time=True)
+ img = await draw_ann_detail_img(post_id, is_check_time=True)
if isinstance(img, str):
continue
- for subscribe in datas:
- await subscribe.send(img) # type: ignore
+ for sub in subs:
+ await sub.send(img) # type: ignore
await asyncio.sleep(random.uniform(1, 3))
except Exception as e:
logger.exception(e)
diff --git a/DNAUID/dna_ann/_image.py b/DNAUID/dna_ann/_image.py
new file mode 100644
index 0000000..a4eb5bb
--- /dev/null
+++ b/DNAUID/dna_ann/_image.py
@@ -0,0 +1,124 @@
+from __future__ import annotations
+
+import hashlib
+from pathlib import Path
+from urllib.parse import quote_plus
+
+from PIL import Image, ImageOps, ImageDraw, ImageFont
+
+from gsuid_core.utils.download_resource.download_file import download
+
+from ..utils.resource.RESOURCE_PATH import ANN_CARD_PATH
+
+Color = tuple[int, int, int] | tuple[int, int, int, int]
+Size = tuple[int, int]
+
+DEFAULT_LINE_GAP = 8
+DEFAULT_ELLIPSIS = "..."
+
+QR_CACHE_PATH = ANN_CARD_PATH / "qr"
+PREVIEW_CACHE_PATH = ANN_CARD_PATH / "preview"
+DETAIL_CACHE_PATH = ANN_CARD_PATH / "detail"
+
+
+def cache_name(*parts: object, ext: str = "png") -> str:
+ raw = "|".join(str(part) for part in parts)
+ return f"{hashlib.sha1(raw.encode('utf-8')).hexdigest()}.{ext}"
+
+
+async def fetch_image(path: Path, pic_url: str, *, name: str | None = None) -> Image.Image:
+ path.mkdir(parents=True, exist_ok=True)
+ file_name = name or pic_url.split("/")[-1]
+ target = path / file_name
+ if not target.exists():
+ await download(pic_url, path, file_name, tag="[DNA]")
+ return Image.open(target).convert("RGBA")
+
+
+async def load_qr_code(url: str, size: int = 220) -> Image.Image | None:
+ qr_url = f"https://api.qrserver.com/v1/create-qr-code/?size={size}x{size}&data={quote_plus(url)}"
+ try:
+ image = await fetch_image(QR_CACHE_PATH, qr_url, name=cache_name("qr", url, size))
+ except OSError:
+ return None
+ return image.convert("RGB").resize((size, size), Image.Resampling.LANCZOS)
+
+
+def shrink_to_width(image: Image.Image, max_width: int) -> Image.Image:
+ if image.width <= max_width:
+ return image
+ ratio = max_width / image.width
+ return image.resize((int(max_width), int(image.height * ratio)), Image.Resampling.LANCZOS)
+
+
+def fit_image(image: Image.Image, size: Size) -> Image.Image:
+ return ImageOps.fit(image.convert("RGB"), size, method=Image.Resampling.LANCZOS)
+
+
+def rounded_mask(size: Size, radius: int) -> Image.Image:
+ mask = Image.new("L", size, 0)
+ ImageDraw.Draw(mask).rounded_rectangle((0, 0, size[0], size[1]), radius=radius, fill=255)
+ return mask
+
+
+def line_height(font: ImageFont.FreeTypeFont) -> int:
+ return sum(font.getmetrics())
+
+
+def wrap_text(
+ draw: ImageDraw.ImageDraw,
+ text: str,
+ font: ImageFont.FreeTypeFont,
+ max_width: int,
+ max_lines: int | None = None,
+ ellipsis: str = DEFAULT_ELLIPSIS,
+) -> list[str]:
+ lines: list[str] = []
+ raw_lines = text.splitlines() if text else [""]
+ for raw_line in raw_lines:
+ current = ""
+ for char in raw_line:
+ trial = f"{current}{char}"
+ width = draw.textbbox((0, 0), trial, font=font)[2]
+ if current and width > max_width:
+ lines.append(current)
+ current = char
+ else:
+ current = trial
+ lines.append(current if current else " ")
+
+ if max_lines and len(lines) > max_lines:
+ lines = lines[:max_lines]
+ lines[-1] = lines[-1].rstrip(" .") + ellipsis
+ return lines
+
+
+def draw_text_block(
+ draw: ImageDraw.ImageDraw,
+ xy: tuple[int, int],
+ text: str,
+ font: ImageFont.FreeTypeFont,
+ fill: Color,
+ max_width: int,
+ *,
+ line_gap: int = DEFAULT_LINE_GAP,
+ max_lines: int | None = None,
+) -> int:
+ x, y = xy
+ lines = wrap_text(draw, text, font, max_width, max_lines)
+ text_height = line_height(font)
+ for index, line in enumerate(lines):
+ draw.text((x, y), line, font=font, fill=fill)
+ y += text_height
+ if index != len(lines) - 1:
+ y += line_gap
+ return y
+
+
+def round_avatar(avatar: Image.Image, size: int) -> Image.Image:
+ canvas = Image.new("RGBA", (size, size), (0, 0, 0, 0))
+ head = avatar.convert("RGBA").resize((size, size), Image.Resampling.LANCZOS)
+ mask = Image.new("L", (size, size), 0)
+ ImageDraw.Draw(mask).ellipse((0, 0, size, size), fill=255)
+ canvas.paste(head, (0, 0), mask)
+ return canvas
diff --git a/DNAUID/dna_ann/ann_card.py b/DNAUID/dna_ann/ann_card.py
index 6773240..b4a23cc 100644
--- a/DNAUID/dna_ann/ann_card.py
+++ b/DNAUID/dna_ann/ann_card.py
@@ -1,248 +1,332 @@
-import re
-import html
+from __future__ import annotations
+
import time
-from typing import List, Union
-from datetime import datetime
+from pathlib import Path
-from PIL import Image, ImageOps, ImageDraw
+from PIL import Image, ImageDraw
from gsuid_core.logger import logger
from gsuid_core.utils.image.convert import convert_img
-from gsuid_core.utils.image.image_tools import easy_paste
-
-from ..utils import dna_api, get_datetime
-from ..utils.image import download_pic_from_url
-from ..utils.fonts.dna_fonts import unicode_font_26
-from ..utils.resource.RESOURCE_PATH import ANN_CARD_PATH
-
-
-async def ann_batch_card(post_content: List, drow_height: float) -> bytes:
- im = Image.new("RGB", (1080, drow_height), "#f9f6f2") # type: ignore
- draw = ImageDraw.Draw(im)
- x, y = 0, 0
-
- for temp in post_content:
- if temp["contentType"] == 1:
- content = temp["content"]
- drow_duanluo, _, drow_line_height, _ = split_text(content)
- for duanluo, line_count in drow_duanluo:
- if duanluo.strip():
- draw.text((x, y), duanluo, fill=(0, 0, 0), font=unicode_font_26)
- y += drow_line_height * line_count + 30
- elif (
- temp["contentType"] == 2 and "url" in temp and temp["url"].lower().endswith(("jpg", "png", "jpeg", "webp"))
- ):
- img = await download_pic_from_url(ANN_CARD_PATH, temp["url"])
- img_x = 0
- if img.width > im.width:
- ratio = im.width / img.width
- img = img.resize((int(img.width * ratio), int(img.height * ratio)))
- else:
- img_x = (im.width - img.width) // 2
- easy_paste(im, img, (img_x, y))
- y += img.size[1] + 40
- elif (
- temp["contentType"] == 5
- and "contentVideo" in temp
- and "coverUrl" in temp["contentVideo"]
- and temp["contentVideo"]["coverUrl"].lower().endswith(("jpg", "png", "jpeg", "webp"))
- ):
- try:
- video_temp = temp["contentVideo"]
- img = await download_pic_from_url(ANN_CARD_PATH, video_temp["coverUrl"])
- img_x = 0
- if img.width > im.width:
- ratio = im.width / img.width
- img = img.resize((int(img.width * ratio), int(img.height * ratio)))
- else:
- img_x = (im.width - img.width) // 2
- easy_paste(im, img, (img_x, y))
- y += img.size[1] + 40
- except Exception:
- pass
-
- if hasattr(unicode_font_26, "getbbox"):
- bbox = unicode_font_26.getbbox("囗")
- padding = (
- int(bbox[2] - bbox[0]),
- int(bbox[3] - bbox[1]),
- int(bbox[2] - bbox[0]),
- int(bbox[3] - bbox[1]),
+
+from .utils import (
+ LIST_DISPLAY_LIMIT,
+ pick_time,
+ get_post_url,
+ pick_preview,
+ pick_subject,
+ extract_blocks,
+ fetch_ann_list,
+ format_post_time,
+ post_time_to_timestamp,
+)
+from ..utils import dna_api
+from ._image import (
+ DETAIL_CACHE_PATH,
+ PREVIEW_CACHE_PATH,
+ fit_image,
+ wrap_text,
+ cache_name,
+ fetch_image,
+ line_height,
+ load_qr_code,
+ round_avatar,
+ rounded_mask,
+ draw_text_block,
+ shrink_to_width,
+)
+from ..utils.image import (
+ COLOR_GOLDENROD,
+ COLOR_FIRE_BRICK,
+ COLOR_PALE_GOLDENROD,
+ get_dna_bg,
+)
+from ..dna_config.prefix import DNA_PREFIX
+from ..utils.fonts.dna_fonts import (
+ unicode_font_18,
+ unicode_font_22,
+ unicode_font_24,
+ unicode_font_26,
+ unicode_font_28,
+ unicode_font_60,
+)
+
+WIDTH = 1080
+PADDING = 40
+GRID_GAP = 24
+CARD_RADIUS = 22
+PAGE_LIMIT = 6000
+
+GRID_COLS = 3
+
+# DNA 深色面板配色
+COLOR_PANEL_DARK = (22, 18, 36, 220)
+COLOR_PANEL_BORDER = (90, 70, 130, 180)
+COLOR_TITLE_LIGHT = (245, 240, 225)
+COLOR_TEXT_LIGHT = (210, 205, 220)
+COLOR_MUTED_LIGHT = (150, 145, 175)
+COLOR_DIVIDER_DARK = (95, 80, 130, 180)
+
+_OFFICIAL_AVATAR = Path(__file__).parent / "texture2d" / "dna_official_avatar.jpeg"
+
+
+async def _load_preview(url: str, width: int, height: int) -> Image.Image | None:
+ if not url:
+ return None
+ try:
+ image = await fetch_image(PREVIEW_CACHE_PATH, url, name=cache_name("preview", url))
+ except OSError:
+ return None
+ return fit_image(image, (width, height))
+
+
+async def _load_detail_image(url: str, max_width: int) -> Image.Image:
+ try:
+ image = await fetch_image(DETAIL_CACHE_PATH, url, name=cache_name("detail", url))
+ except OSError:
+ return Image.new("RGB", (max_width, 320), (40, 30, 60))
+ return shrink_to_width(image.convert("RGB"), max_width)
+
+
+def _load_avatar(size: int) -> Image.Image:
+ if _OFFICIAL_AVATAR.exists():
+ return round_avatar(Image.open(_OFFICIAL_AVATAR).convert("RGBA"), size)
+ return round_avatar(Image.new("RGB", (size, size), COLOR_FIRE_BRICK), size)
+
+
+def _draw_dark_panel(canvas: Image.Image, box: tuple[int, int, int, int], *, radius: int = CARD_RADIUS) -> None:
+ layer = Image.new("RGBA", canvas.size, (0, 0, 0, 0))
+ draw = ImageDraw.Draw(layer)
+ draw.rounded_rectangle(box, radius=radius, fill=COLOR_PANEL_DARK)
+ draw.rounded_rectangle(box, radius=radius, outline=COLOR_PANEL_BORDER, width=2)
+ canvas.alpha_composite(layer)
+
+
+async def draw_ann_list_img() -> bytes | str:
+ posts = await fetch_ann_list(prefer_cache=True)
+ if not posts:
+ return "获取公告列表失败"
+
+ visible = posts[:LIST_DISPLAY_LIMIT]
+ rows = (len(visible) + GRID_COLS - 1) // GRID_COLS
+
+ grid_width = WIDTH - PADDING * 2
+ card_width = (grid_width - GRID_GAP * (GRID_COLS - 1)) // GRID_COLS
+ image_height = 156
+ card_height = 308
+
+ title_band_height = 168
+ title_to_grid_gap = 32
+ footer_card_height = 96
+ grid_to_footer_gap = 36
+ canvas_height = (
+ title_band_height
+ + title_to_grid_gap
+ + rows * card_height
+ + max(0, rows - 1) * GRID_GAP
+ + grid_to_footer_gap
+ + footer_card_height
+ + PADDING
+ )
+
+ canvas = get_dna_bg(WIDTH, canvas_height).convert("RGBA")
+
+ # 顶部加深一层让标题更显眼
+ title_overlay = Image.new("RGBA", (WIDTH, title_band_height), (10, 8, 20, 80))
+ canvas.alpha_composite(title_overlay, (0, 0))
+
+ draw = ImageDraw.Draw(canvas)
+ draw.text(
+ (PADDING, title_band_height // 2),
+ "二重螺旋公告",
+ font=unicode_font_60,
+ fill=COLOR_PALE_GOLDENROD,
+ anchor="lm",
+ )
+ # 标题左侧装饰条
+ draw.rounded_rectangle(
+ (PADDING - 12, title_band_height // 2 - 32, PADDING - 4, title_band_height // 2 + 32),
+ radius=4,
+ fill=COLOR_FIRE_BRICK,
+ )
+
+ grid_top = title_band_height + title_to_grid_gap
+ for idx, post in enumerate(visible, start=1):
+ col = (idx - 1) % GRID_COLS
+ row = (idx - 1) // GRID_COLS
+ left = PADDING + col * (card_width + GRID_GAP)
+ top = grid_top + row * (card_height + GRID_GAP)
+ right = left + card_width
+ bottom = top + card_height
+
+ _draw_dark_panel(canvas, (left, top, right, bottom))
+ draw = ImageDraw.Draw(canvas)
+
+ preview = await _load_preview(pick_preview(post), card_width, image_height)
+ if preview is not None:
+ canvas.paste(preview, (left, top), rounded_mask((card_width, image_height), CARD_RADIUS))
+
+ badge_w, badge_h = 76, 40
+ badge_left = left + 14
+ badge_top = top + 14
+ draw.rounded_rectangle(
+ (badge_left, badge_top, badge_left + badge_w, badge_top + badge_h),
+ radius=12,
+ fill=COLOR_FIRE_BRICK,
+ )
+ draw.text(
+ (badge_left + badge_w // 2, badge_top + badge_h // 2),
+ f"#{idx}",
+ font=unicode_font_24,
+ fill=COLOR_PALE_GOLDENROD,
+ anchor="mm",
+ )
+
+ text_left = left + 18
+ text_top = top + image_height + 16
+ text_right = right - 18
+ subject_bottom = draw_text_block(
+ draw,
+ (text_left, text_top),
+ pick_subject(post),
+ unicode_font_22,
+ COLOR_TITLE_LIGHT,
+ text_right - text_left,
+ line_gap=4,
+ max_lines=3,
)
- else:
- w, h = unicode_font_26.getsize("囗") # type: ignore
- padding = (w, h, w, h)
- return await convert_img(ImageOps.expand(im, padding, "#f9f6f2"))
+ time_text = pick_time(post)
+ if time_text:
+ draw.text(
+ (text_left, subject_bottom + 12),
+ time_text,
+ font=unicode_font_18,
+ fill=COLOR_MUTED_LIGHT,
+ )
+
+ footer_top = grid_top + rows * card_height + max(0, rows - 1) * GRID_GAP + grid_to_footer_gap
+ _draw_dark_panel(canvas, (PADDING, footer_top, WIDTH - PADDING, footer_top + footer_card_height))
+ draw = ImageDraw.Draw(canvas)
+ draw.text(
+ (PADDING + 28, footer_top + footer_card_height // 2),
+ f"发送 {DNA_PREFIX}公告 + 序号 查看详情,例如:{DNA_PREFIX}公告 1",
+ font=unicode_font_22,
+ fill=COLOR_TEXT_LIGHT,
+ anchor="lm",
+ )
+
+ return await convert_img(canvas)
-async def ann_detail_card(post_id: Union[int, str], is_check_time=False) -> Union[bytes, str, List[bytes]]:
+async def draw_ann_detail_img(post_id: int | str, *, is_check_time: bool = False) -> bytes | str | list[bytes]:
post_id = str(post_id)
- ann_list = await dna_api.get_ann_list(True)
- if not ann_list:
- raise Exception("获取游戏公告失败,请检查接口是否正常")
- content = [x for x in ann_list if x["postId"] == post_id]
- if not content:
+ posts = await fetch_ann_list(prefer_cache=True)
+ matched = next((p for p in posts if str(p.get("postId")) == post_id), None)
+ if matched is None:
return "未找到该公告"
res = await dna_api.get_post_detail(post_id)
- if not res.is_success or not res.data or not isinstance(res.data, dict):
+ if not res.is_success or not isinstance(res.data, dict):
return "未找到该公告"
- post_data = res.data
- post_detail = post_data["postDetail"]
-
+ detail = res.data.get("postDetail") or {}
if is_check_time:
- post_time = format_post_time(post_detail["postTime"])
- now_time = int(time.time())
- logger.debug(f"公告id: {post_id}, post_time: {post_time}, now_time: {now_time}, delta: {now_time - post_time}")
- if post_time and post_time < now_time - 86400:
+ post_time = post_time_to_timestamp(detail.get("postTime"))
+ now = int(time.time())
+ logger.debug(f"[DNA公告] {post_id} post_time={post_time} now={now} delta={now - post_time}")
+ if post_time and post_time < now - 86400:
return "该公告已过期"
- post_content = post_detail["postContent"]
- if not post_content:
+ blocks = extract_blocks(detail.get("postContent") or [])
+ if not blocks:
return "未找到该公告"
- drow_height = 0
- index_start = 0
- index_end = 0
- imgs = []
- for index, temp in enumerate(post_content):
- content_type = temp["contentType"]
- if content_type == 1:
- # 文案
- content = temp["content"]
- (
- x_drow_duanluo,
- x_drow_note_height,
- x_drow_line_height,
- x_drow_height,
- ) = split_text(content)
- if content.strip():
- drow_height += x_drow_height + 30
- elif content_type == 2 and "url" in temp and temp["url"].lower().endswith(("jpg", "png", "jpeg", "webp")):
- # 图片
- img = await download_pic_from_url(ANN_CARD_PATH, temp["url"])
- img_height = img.size[1]
- if img.width > 1080:
- ratio = 1080 / img.width
- img_height = int(img.height * ratio)
- drow_height += img_height + 40
- elif (
- content_type == 5
- and "contentVideo" in temp
- and "coverUrl" in temp["contentVideo"]
- and temp["contentVideo"]["coverUrl"].lower().endswith(("jpg", "png", "jpeg", "webp", "webp"))
- ):
- try:
- # 视频图片
- video_temp = temp["contentVideo"]
- img = await download_pic_from_url(ANN_CARD_PATH, video_temp["coverUrl"])
- img_height = img.size[1]
- if img.width > 1080:
- ratio = 1080 / img.width
- img_height = int(img.height * ratio)
- drow_height += img_height + 40
- except Exception:
- pass
-
- index_end = index + 1
- if drow_height > 5000:
- img = await ann_batch_card(post_content[index_start:index_end], drow_height)
- index_start = index_end
- index_end = index + 1
- drow_height = 0
- imgs.append(img)
-
- else:
- if drow_height > 0 and index_end > index_start:
- img = await ann_batch_card(post_content[index_start:index_end], drow_height)
- imgs.append(img)
-
- return imgs
-
-
-def split_text(content: str):
- # 常见 HTML 实体与换行的预处理
- content = _normalize_content(content)
- # 按规定宽度分组
- max_line_height, total_lines = 0, 0
- allText = []
- for text in content.split("\n"):
- duanluo, line_height, line_count = get_duanluo(text)
- max_line_height = max(line_height, max_line_height)
- total_lines += line_count
- allText.append((duanluo, line_count))
- line_height = max_line_height
- total_height = total_lines * line_height
- drow_height = total_lines * line_height
- return allText, total_height, line_height, drow_height
-
-
-def get_duanluo(text: str):
- txt = Image.new("RGBA", (600, 800), (255, 255, 255, 0))
- draw = ImageDraw.Draw(txt)
- # 所有文字的段落
- duanluo = ""
- max_width = 1050
- # 宽度总和
- sum_width = 0
- # 几行
- line_count = 1
- # 行高
- line_height = 0
- for char in text:
- left, top, right, bottom = draw.textbbox((0, 0), char, unicode_font_26)
- width, height = (right - left, bottom - top)
- sum_width += width
- if sum_width > max_width: # 超过预设宽度就修改段落 以及当前行数
- line_count += 1
- sum_width = 0
- duanluo += "\n"
- duanluo += char
- line_height = max(height, line_height)
- if not duanluo.endswith("\n"):
- duanluo += "\n"
- return duanluo, line_height, line_count
-
-
-def _normalize_content(content: str) -> str:
- try:
- text = html.unescape(content)
- except Exception:
- text = content
- text = re.sub(r"
", "\n", text, flags=re.IGNORECASE)
- text = text.replace("\u00a0", " ")
+ subject = detail.get("postTitle") or pick_subject(matched)
+ time_text = format_post_time(detail.get("postTime") or matched.get("postTime"))
+ qr_image = await load_qr_code(get_post_url(post_id))
+ avatar_image = _load_avatar(120)
- return text
+ pages: list[Image.Image] = []
+ page, draw, y = _start_page(0)
+ qr_bottom = y
+ page.paste(avatar_image, (PADDING, y), avatar_image)
+ draw.text((PADDING + 140, y + 32), "二重螺旋官方", font=unicode_font_26, fill=COLOR_TITLE_LIGHT)
+ draw.text((PADDING + 140, y + 74), "官方资讯发布", font=unicode_font_22, fill=COLOR_MUTED_LIGHT)
+ y += 146
-def format_post_time(post_time: str) -> int:
- try:
- full_time = f"{get_datetime().year}-{post_time}"
- timestamp = datetime.strptime(full_time, "%Y-%m-%d").timestamp()
- return int(timestamp)
- except ValueError:
- pass
+ qr_left = WIDTH - PADDING
+ if qr_image:
+ qr_size = 164
+ qr_box_w = qr_size + 20
+ qr_box_h = qr_size + 20
+ qr_left = WIDTH - PADDING - qr_box_w
+ qr_top = PADDING
+ qr_bottom = qr_top + qr_box_h
+ qr_panel = Image.new("RGBA", (qr_box_w, qr_box_h), (245, 240, 225, 245))
+ page.alpha_composite(qr_panel, (qr_left, qr_top))
+ page.paste(qr_image.resize((qr_size, qr_size), Image.Resampling.LANCZOS), (qr_left + 10, qr_top + 10))
- # 17小时前 正则转化为timestamp
- try:
- match = re.search(r"(\d+)小时前", post_time)
- if match:
- hours = int(match.group(1))
- return int(time.time()) - hours * 3600
- except Exception:
- pass
+ title_width = (qr_left - 24) - PADDING if qr_image else WIDTH - PADDING * 2
+ y = draw_text_block(draw, (PADDING, y), subject, unicode_font_60, COLOR_PALE_GOLDENROD, title_width, line_gap=10)
+ y += 18
+ if time_text:
+ draw.text((PADDING, y), f"时间:{time_text}", font=unicode_font_22, fill=COLOR_MUTED_LIGHT)
+ y += 36
- try:
- timestamp = datetime.strptime(post_time, "%Y-%m-%d").timestamp()
- return int(timestamp)
- except ValueError:
- pass
+ y = max(y, qr_bottom + 20)
+ draw.line((PADDING, y, WIDTH - PADDING, y), fill=COLOR_DIVIDER_DARK, width=2)
+ y += 26
+
+ content_width = WIDTH - PADDING * 2
+ body_line_height = line_height(unicode_font_28)
+
+ for kind, value in blocks:
+ if kind == "text":
+ lines = wrap_text(draw, value, unicode_font_28, content_width)
+ for line in lines:
+ if y + body_line_height + PADDING > PAGE_LIMIT:
+ pages.append(_finalize_page(page, y))
+ page, draw, y = _start_page(len(pages))
+ fill = COLOR_GOLDENROD if line.startswith((">>>", "▸", "◆", "★")) else COLOR_TEXT_LIGHT
+ draw.text((PADDING, y), line, font=unicode_font_28, fill=fill)
+ y += body_line_height + 10
+ y += 6
+ continue
+
+ image = await _load_detail_image(value, content_width)
+ image_x = (WIDTH - image.width) // 2
+ crop_top = 0
+ while crop_top < image.height:
+ remain = PAGE_LIMIT - y - PADDING
+ if remain < 200:
+ pages.append(_finalize_page(page, y))
+ page, draw, y = _start_page(len(pages))
+ remain = PAGE_LIMIT - y - PADDING
+
+ crop_height = int(min(remain, image.height - crop_top))
+ part = image.crop((0, crop_top, image.width, crop_top + crop_height))
+ page.paste(part, (image_x, y))
+ y += crop_height + 16
+ crop_top += crop_height
+ if crop_top < image.height:
+ pages.append(_finalize_page(page, y))
+ page, draw, y = _start_page(len(pages))
+
+ pages.append(_finalize_page(page, y))
+
+ rendered = [await convert_img(img) for img in pages]
+ return rendered[0] if len(rendered) == 1 else rendered
+
+
+def _start_page(page_index: int) -> tuple[Image.Image, ImageDraw.ImageDraw, int]:
+ page = get_dna_bg(WIDTH, PAGE_LIMIT + 200).convert("RGBA")
+ draw = ImageDraw.Draw(page)
+ y = PADDING
+ if page_index > 0:
+ draw.text((PADDING, y), f"继续阅读 · 第 {page_index + 1} 页", font=unicode_font_22, fill=COLOR_MUTED_LIGHT)
+ y += 44
+ return page, draw, y
- try:
- timestamp = datetime.strptime(post_time, "%Y-%m-%d %H:%M").timestamp()
- return int(timestamp)
- except ValueError:
- pass
- return 0
+def _finalize_page(page: Image.Image, bottom: int) -> Image.Image:
+ return page.crop((0, 0, WIDTH, bottom + PADDING))
diff --git a/DNAUID/dna_ann/texture2d/dna_official_avatar.jpeg b/DNAUID/dna_ann/texture2d/dna_official_avatar.jpeg
new file mode 100644
index 0000000..d498db1
Binary files /dev/null and b/DNAUID/dna_ann/texture2d/dna_official_avatar.jpeg differ
diff --git a/DNAUID/dna_ann/utils.py b/DNAUID/dna_ann/utils.py
new file mode 100644
index 0000000..ab48c14
--- /dev/null
+++ b/DNAUID/dna_ann/utils.py
@@ -0,0 +1,159 @@
+from __future__ import annotations
+
+import re
+import html
+import time
+from typing import Any
+from datetime import datetime
+from collections.abc import Iterable
+
+from ..utils import dna_api
+
+POST_DETAIL_URL_TPL = "https://dnabbs.yingxiong.com/pc/detail/{post_id}"
+
+LIST_DISPLAY_LIMIT = 20
+
+_HTML_BREAK_RE = re.compile(r"
", flags=re.IGNORECASE)
+_HTML_TAG_RE = re.compile(r"<[^>]+>")
+_BLANK_LINE_RE = re.compile(r"\n{3,}")
+_RELATIVE_TIME_PARTS = ("小时前", "分钟前", "刚刚")
+_TIME_FORMATS = ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d")
+_IMAGE_EXTS = ("jpg", "jpeg", "png", "webp")
+
+
+async def fetch_ann_list(*, prefer_cache: bool = True) -> list[dict[str, Any]]:
+ return await dna_api.get_ann_list(is_cache=prefer_cache) or []
+
+
+def build_index_map(posts: Iterable[dict[str, Any]]) -> dict[int, dict[str, Any]]:
+ return {idx: post for idx, post in enumerate(posts, start=1)}
+
+
+def resolve_index(token: str, index_map: dict[int, dict[str, Any]]) -> str | None:
+ cleaned = token.strip().replace("#", "")
+ if not cleaned.isdigit():
+ return None
+ post = index_map.get(int(cleaned))
+ return str(post["postId"]) if post else None
+
+
+def get_post_url(post_id: str) -> str:
+ return POST_DETAIL_URL_TPL.format(post_id=post_id)
+
+
+def format_post_time(raw: Any) -> str:
+ if raw in (None, ""):
+ return ""
+ if isinstance(raw, (int, float)):
+ return datetime.fromtimestamp(int(raw)).strftime("%Y-%m-%d %H:%M")
+
+ text = str(raw).strip()
+ if any(part in text for part in _RELATIVE_TIME_PARTS):
+ return text
+
+ for fmt in _TIME_FORMATS:
+ try:
+ return datetime.strptime(text, fmt).strftime("%Y-%m-%d")
+ except ValueError:
+ continue
+
+ try:
+ return datetime.strptime(f"{datetime.now().year}-{text}", "%Y-%m-%d").strftime("%Y-%m-%d")
+ except ValueError:
+ return text
+
+
+def post_time_to_timestamp(raw: Any) -> int:
+ if raw in (None, ""):
+ return 0
+ if isinstance(raw, (int, float)):
+ return int(raw)
+
+ text = str(raw).strip()
+ match = re.search(r"(\d+)\s*小时前", text)
+ if match:
+ return int(time.time()) - int(match.group(1)) * 3600
+ match = re.search(r"(\d+)\s*分钟前", text)
+ if match:
+ return int(time.time()) - int(match.group(1)) * 60
+
+ for fmt in _TIME_FORMATS:
+ try:
+ return int(datetime.strptime(text, fmt).timestamp())
+ except ValueError:
+ continue
+
+ try:
+ return int(datetime.strptime(f"{datetime.now().year}-{text}", "%Y-%m-%d").timestamp())
+ except ValueError:
+ return 0
+
+
+def normalize_text(text: str) -> str:
+ raw = html.unescape(text)
+ raw = _HTML_BREAK_RE.sub("\n", raw)
+ raw = _HTML_TAG_RE.sub("", raw)
+ raw = raw.replace("\xa0", " ").replace("\r\n", "\n").replace("\r", "\n")
+ raw = _BLANK_LINE_RE.sub("\n\n", raw)
+ return raw.strip()
+
+
+def extract_blocks(post_content: list[dict[str, Any]]) -> list[tuple[str, str]]:
+ blocks: list[tuple[str, str]] = []
+ for item in post_content or []:
+ kind = item.get("contentType")
+ if kind == 1:
+ text = normalize_text(item.get("content") or "")
+ for line in text.splitlines():
+ stripped = line.strip()
+ if stripped:
+ blocks.append(("text", stripped))
+ elif kind == 2:
+ url = (item.get("url") or "").strip()
+ if url.lower().endswith(_IMAGE_EXTS):
+ blocks.append(("image", url))
+ elif kind == 5:
+ video = item.get("contentVideo") or {}
+ cover = (video.get("coverUrl") or "").strip()
+ if cover.lower().endswith(_IMAGE_EXTS):
+ blocks.append(("image", cover))
+ return blocks
+
+
+def pick_preview(post: dict[str, Any]) -> str:
+ cover = (post.get("postCover") or "").strip()
+ if cover.lower().endswith(_IMAGE_EXTS):
+ return cover
+ video = post.get("videoContent") or {}
+ if isinstance(video, dict):
+ video_cover = (video.get("coverUrl") or "").strip()
+ if video_cover.lower().endswith(_IMAGE_EXTS):
+ return video_cover
+ images = post.get("imgContent") or []
+ if isinstance(images, list):
+ for entry in images:
+ if isinstance(entry, dict):
+ url = (entry.get("url") or "").strip()
+ if url.lower().endswith(_IMAGE_EXTS):
+ return url
+ return ""
+
+
+def pick_subject(post: dict[str, Any]) -> str:
+ title = (post.get("postTitle") or "").strip()
+ if title:
+ return title
+ content = post.get("postContent")
+ if isinstance(content, str):
+ text = normalize_text(content)
+ if text:
+ return text.splitlines()[0][:40]
+ return f"#{post.get('postId', '')}"
+
+
+def pick_time(post: dict[str, Any]) -> str:
+ show = (post.get("showTime") or "").strip()
+ if show:
+ return show
+ raw = post.get("postTime") or post.get("createTime")
+ return format_post_time(raw) if raw else ""