Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.10
rev: v0.15.12
hooks:
- id: ruff
args: [--fix]
Expand Down
119 changes: 56 additions & 63 deletions DNAUID/dna_ann/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import random
import asyncio
from typing import List

from gsuid_core.sv import SV
from gsuid_core.aps import scheduler
Expand All @@ -9,79 +10,74 @@
from gsuid_core.models import Event
from gsuid_core.subscribe import gs_subscribe

from .utils import resolve_index, fetch_ann_list, build_index_map
from .ann_card import draw_ann_list_img, draw_ann_detail_img
from ..utils.dna_api import dna_api
from ..dna_ann.ann_card import ann_detail_card
from ..utils.msgs.notify import send_dna_notify
from ..dna_config.dna_config import DNAConfig

sv_ann = SV("DNA公告")
sv_ann_sub = SV("订阅DNA公告", pm=3)

task_name_ann = "订阅DNA公告"
ann_minute_check: int = DNAConfig.get_config("AnnMinuteCheck").data or 10
TASK_NAME_ANN = "订阅DNA公告"
ANN_MIN_CHECK: int = DNAConfig.get_config("AnnMinuteCheck").data or 10


@sv_ann.on_command("公告")
async def ann_dna(bot: Bot, ev: Event):
ann_id = ev.text
if not ann_id:
ann_list = await dna_api.get_ann_list()
if not ann_list:
return await bot.send("获取公告列表失败")
anns = "\n".join([f"{x['postId']}" for x in ann_list])
return await send_dna_notify(bot, ev, f"公告列表: \n{anns}")
text = ev.text.strip().replace("#", "")

if not text:
result = await draw_ann_list_img()
if isinstance(result, str):
return await send_dna_notify(bot, ev, result)
return await bot.send(result) # type: ignore

ann_id = ann_id.replace("#", "")
if not ann_id.isdigit():
raise Exception("公告ID不正确")
posts = await fetch_ann_list(prefer_cache=True)
if not posts:
return await send_dna_notify(bot, ev, "获取公告列表失败")

img = await ann_detail_card(ann_id)
return await bot.send(img) # type: ignore
post_id = resolve_index(text, build_index_map(posts))
if post_id is None:
return await send_dna_notify(bot, ev, "公告序号不正确,发送 公告 查看可用列表")

result = await draw_ann_detail_img(post_id)
if isinstance(result, str):
return await send_dna_notify(bot, ev, result)
return await bot.send(result) # type: ignore


@sv_ann_sub.on_fullmatch("订阅公告")
async def sub_ann_dna(bot: Bot, ev: Event):
if not ev.group_id:
return await bot.send("请在群聊中订阅")
return await send_dna_notify(bot, ev, "请在群聊中订阅")
if not DNAConfig.get_config("DNAAnnOpen").data:
return await bot.send("二重螺旋公告推送功能已关闭")

data = await gs_subscribe.get_subscribe(task_name_ann)
if data:
for subscribe in data:
if subscribe.group_id == ev.group_id:
return await bot.send("已经订阅了二重螺旋公告!")
return await send_dna_notify(bot, ev, "二重螺旋公告推送功能已关闭")

await gs_subscribe.add_subscribe(
"session",
task_name=task_name_ann,
event=ev,
extra_message="",
)
data = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
if data and any(sub.group_id == ev.group_id for sub in data):
return await send_dna_notify(bot, ev, "已经订阅了二重螺旋公告!")

logger.info(data)
await bot.send("成功订阅二重螺旋公告!")
await gs_subscribe.add_subscribe("session", TASK_NAME_ANN, ev, extra_message="")
await send_dna_notify(bot, ev, "成功订阅二重螺旋公告")


@sv_ann_sub.on_fullmatch(("取消订阅公告", "取消公告", "退订公告"))
async def unsub_ann_dna(bot: Bot, ev: Event):
if not ev.group_id:
return await bot.send("请在群聊中取消订阅")
return await send_dna_notify(bot, ev, "请在群聊中取消订阅")

data = await gs_subscribe.get_subscribe(task_name_ann)
if data:
for subscribe in data:
if subscribe.group_id == ev.group_id:
await gs_subscribe.delete_subscribe("session", task_name_ann, ev)
return await bot.send("成功取消订阅二重螺旋公告!")
else:
if not DNAConfig.get_config("DNAAnnOpen").data:
return await bot.send("二重螺旋公告推送功能已关闭")
data = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
if data and any(sub.group_id == ev.group_id for sub in data):
await gs_subscribe.delete_subscribe("session", TASK_NAME_ANN, ev)
return await send_dna_notify(bot, ev, "成功取消订阅二重螺旋公告!")

return await bot.send("未曾订阅二重螺旋公告!")
if not DNAConfig.get_config("DNAAnnOpen").data:
return await send_dna_notify(bot, ev, "二重螺旋公告推送功能已关闭")
return await send_dna_notify(bot, ev, "未曾订阅二重螺旋公告!")


@scheduler.scheduled_job("interval", minutes=ann_minute_check)
@scheduler.scheduled_job("interval", minutes=ANN_MIN_CHECK)
async def check_dna_ann():
if not DNAConfig.get_config("DNAAnnOpen").data:
return
Expand All @@ -90,42 +86,39 @@ async def check_dna_ann():

async def check_dna_ann_state():
logger.info("[二重螺旋公告] 定时任务: 二重螺旋公告查询..")
datas = await gs_subscribe.get_subscribe(task_name_ann)
if not datas:
subs = await gs_subscribe.get_subscribe(TASK_NAME_ANN)
if not subs:
logger.info("[二重螺旋公告] 暂无群订阅")
return

ids: List[int] = DNAConfig.get_config("DNAAnnIds").data or []
new_ann_list = await dna_api.get_ann_list()
if not new_ann_list:
return

new_ann_ids = [int(x["postId"]) for x in new_ann_list]
if not ids:
DNAConfig.set_config("DNAAnnIds", new_ann_ids)
known_ids: list[int] = DNAConfig.get_config("DNAAnnIds").data or []
fresh_ids = [int(post["postId"]) for post in new_ann_list]

if not known_ids:
DNAConfig.set_config("DNAAnnIds", fresh_ids)
logger.info("[二重螺旋公告] 初始成功, 将在下个轮询中更新.")
return

new_ann_need_send = []
for ann_id in new_ann_ids:
if ann_id not in ids:
new_ann_need_send.append(ann_id)

if not new_ann_need_send:
pending = [post_id for post_id in fresh_ids if post_id not in known_ids]
if not pending:
logger.info("[二重螺旋公告] 没有最新公告")
return

logger.info(f"[二重螺旋公告] 更新公告id: {new_ann_need_send}")
save_ids = sorted(ids, reverse=True)[:50] + new_ann_ids
DNAConfig.set_config("DNAAnnIds", list(set(save_ids)))
logger.info(f"[二重螺旋公告] 更新公告id: {pending}")
merged = sorted(set(known_ids) | set(fresh_ids), reverse=True)[:50]
DNAConfig.set_config("DNAAnnIds", merged)

for ann_id in new_ann_need_send:
for post_id in pending:
try:
img = await ann_detail_card(ann_id, is_check_time=True)
img = await draw_ann_detail_img(post_id, is_check_time=True)
if isinstance(img, str):
continue
for subscribe in datas:
await subscribe.send(img) # type: ignore
for sub in subs:
await sub.send(img) # type: ignore
await asyncio.sleep(random.uniform(1, 3))
except Exception as e:
logger.exception(e)
Expand Down
124 changes: 124 additions & 0 deletions DNAUID/dna_ann/_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import hashlib
from pathlib import Path
from urllib.parse import quote_plus

from PIL import Image, ImageOps, ImageDraw, ImageFont

from gsuid_core.utils.download_resource.download_file import download

from ..utils.resource.RESOURCE_PATH import ANN_CARD_PATH

Color = tuple[int, int, int] | tuple[int, int, int, int]
Size = tuple[int, int]

DEFAULT_LINE_GAP = 8
DEFAULT_ELLIPSIS = "..."

QR_CACHE_PATH = ANN_CARD_PATH / "qr"
PREVIEW_CACHE_PATH = ANN_CARD_PATH / "preview"
DETAIL_CACHE_PATH = ANN_CARD_PATH / "detail"


def cache_name(*parts: object, ext: str = "png") -> str:
raw = "|".join(str(part) for part in parts)
return f"{hashlib.sha1(raw.encode('utf-8')).hexdigest()}.{ext}"


async def fetch_image(path: Path, pic_url: str, *, name: str | None = None) -> Image.Image:
path.mkdir(parents=True, exist_ok=True)
file_name = name or pic_url.split("/")[-1]
target = path / file_name
if not target.exists():
await download(pic_url, path, file_name, tag="[DNA]")
return Image.open(target).convert("RGBA")


async def load_qr_code(url: str, size: int = 220) -> Image.Image | None:
qr_url = f"https://api.qrserver.com/v1/create-qr-code/?size={size}x{size}&data={quote_plus(url)}"
try:
image = await fetch_image(QR_CACHE_PATH, qr_url, name=cache_name("qr", url, size))
except OSError:
return None
return image.convert("RGB").resize((size, size), Image.Resampling.LANCZOS)


def shrink_to_width(image: Image.Image, max_width: int) -> Image.Image:
if image.width <= max_width:
return image
ratio = max_width / image.width
return image.resize((int(max_width), int(image.height * ratio)), Image.Resampling.LANCZOS)


def fit_image(image: Image.Image, size: Size) -> Image.Image:
return ImageOps.fit(image.convert("RGB"), size, method=Image.Resampling.LANCZOS)


def rounded_mask(size: Size, radius: int) -> Image.Image:
mask = Image.new("L", size, 0)
ImageDraw.Draw(mask).rounded_rectangle((0, 0, size[0], size[1]), radius=radius, fill=255)
return mask


def line_height(font: ImageFont.FreeTypeFont) -> int:
return sum(font.getmetrics())


def wrap_text(
draw: ImageDraw.ImageDraw,
text: str,
font: ImageFont.FreeTypeFont,
max_width: int,
max_lines: int | None = None,
ellipsis: str = DEFAULT_ELLIPSIS,
) -> list[str]:
lines: list[str] = []
raw_lines = text.splitlines() if text else [""]
for raw_line in raw_lines:
current = ""
for char in raw_line:
trial = f"{current}{char}"
width = draw.textbbox((0, 0), trial, font=font)[2]
if current and width > max_width:
lines.append(current)
current = char
else:
current = trial
lines.append(current if current else " ")

if max_lines and len(lines) > max_lines:
lines = lines[:max_lines]
lines[-1] = lines[-1].rstrip(" .") + ellipsis
return lines


def draw_text_block(
draw: ImageDraw.ImageDraw,
xy: tuple[int, int],
text: str,
font: ImageFont.FreeTypeFont,
fill: Color,
max_width: int,
*,
line_gap: int = DEFAULT_LINE_GAP,
max_lines: int | None = None,
) -> int:
x, y = xy
lines = wrap_text(draw, text, font, max_width, max_lines)
text_height = line_height(font)
for index, line in enumerate(lines):
draw.text((x, y), line, font=font, fill=fill)
y += text_height
if index != len(lines) - 1:
y += line_gap
return y


def round_avatar(avatar: Image.Image, size: int) -> Image.Image:
canvas = Image.new("RGBA", (size, size), (0, 0, 0, 0))
head = avatar.convert("RGBA").resize((size, size), Image.Resampling.LANCZOS)
mask = Image.new("L", (size, size), 0)
ImageDraw.Draw(mask).ellipse((0, 0, size, size), fill=255)
canvas.paste(head, (0, 0), mask)
return canvas
Loading