Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions llm_web_kit/config/cfg_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@

import commentjson as json

from llm_web_kit.exception.exception import ModelResourceException


def load_config() -> dict:
"""_summary_
"""Load the configuration file for the web kit. First try to read the
configuration file from the environment variable LLM_WEB_KIT_CFG_PATH. If
the environment variable is not set, use the default configuration file
path ~/.llm-web-kit.jsonc. If the configuration file does not exist, raise
an exception.

Args:
config_file (_type_): _description_
Raises:
ModelResourceException: LLM_WEB_KIT_CFG_PATH points to a non-exist file
ModelResourceException: cfg_path does not exist

Returns:
_type_: _description_
config(dict): The configuration dictionary
"""
# 首先从环境变量LLM_WEB_KIT_CFG_PATH 读取配置文件的位置
# 如果没有配置,就使用默认的配置文件位置
Expand All @@ -19,12 +26,15 @@ def load_config() -> dict:
if env_cfg_path:
cfg_path = env_cfg_path
if not os.path.exists(cfg_path):
raise FileNotFoundError(f'environment variable LLM_WEB_KIT_CFG_PATH points to a non-exist file: {cfg_path}')
raise ModelResourceException(
f'environment variable LLM_WEB_KIT_CFG_PATH points to a non-exist file: {cfg_path}'
)
else:
cfg_path = os.path.expanduser('~/.llm-web-kit.jsonc')
if not os.path.exists(cfg_path):
raise FileNotFoundError(
f'{cfg_path} does not exist, please create one or set environment variable LLM_WEB_KIT_CFG_PATH to a valid file path')
raise ModelResourceException(
f'{cfg_path} does not exist, please create one or set environment variable LLM_WEB_KIT_CFG_PATH to a valid file path'
)

# 读取配置文件
with open(cfg_path, 'r', encoding='utf-8') as f:
Expand Down
79 changes: 67 additions & 12 deletions llm_web_kit/model/resource_utils/boto3_ext.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
import re
from typing import Dict, List, Union
from typing import Dict, List, Tuple, Union

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

from llm_web_kit.config.cfg_reader import load_config
from llm_web_kit.exception.exception import ModelResourceException

__re_s3_path = re.compile('^s3://([^/]+)(?:/(.*))?$')


def is_s3_path(path: str) -> bool:
"""check a path is s3 path or not.

Args:
path (str): path

Returns:
bool: is s3 path or not
"""
return path.startswith('s3://')


def is_s3_404_error(e: Exception):
def is_s3_404_error(e: Exception) -> bool:
"""check if an exception is 404 error.

Args:
e (Exception): exception

Returns:
bool: is 404 error or not
"""
if not isinstance(e, ClientError):
return False
flag_1 = e.response.get('Error', {}).get('Code') in ['404', 'NoSuchKey']
Expand All @@ -23,22 +40,35 @@ def is_s3_404_error(e: Exception):
return any([flag_1, flag_2, flag_3])


def split_s3_path(path: str):
"""split bucket and key from path."""
def split_s3_path(path: str) -> Tuple[str, str]:
"""split bucket and key from path.

Args:
path (str): s3 path

Returns:
Tuple[str, str]: bucket and key

Raises:
ModelResourceException: if path is not s3 path
"""
if not is_s3_path(path):
raise ModelResourceException(f'{path} is not a s3 path')
m = __re_s3_path.match(path)
if m is None:
return '', ''
return m.group(1), (m.group(2) or '')


def get_s3_config(path: str):
def get_s3_config(path: str) -> Dict:
"""Get s3 config for a given path by its bucket name from the config file.

Args:
path (str): s3 path

Raises:
ValueError: if bucket not found in config
ModelResourceException: if bucket not in config
ModelResourceException: if path is not s3 path

Returns:
dict: s3 config
Expand All @@ -48,10 +78,23 @@ def get_s3_config(path: str):
if bucket in config_dict['s3']:
return config_dict['s3'][bucket]
else:
raise ValueError(f'bucket {bucket} not in config')
raise ModelResourceException(f'bucket {bucket} not in config')


def get_s3_client(path: Union[str, List[str]]) -> boto3.client:
"""Get s3 client for a given path.

Args:
path (Union[str, List[str]]): s3 path

Returns:
boto3.client: s3 client

Raises:
ModelResourceException: if bucket not in config
ModelResourceException: if path is not s3 path
"""

def get_s3_client(path: Union[str, List[str]]):
s3_config = get_s3_config(path)
try:
return boto3.client(
Expand All @@ -61,10 +104,7 @@ def get_s3_client(path: Union[str, List[str]]):
endpoint_url=s3_config['endpoint'],
config=Config(
s3={'addressing_style': s3_config.get('addressing_style', 'path')},
retries={
'max_attempts': 8,
'mode': 'standard'
},
retries={'max_attempts': 8, 'mode': 'standard'},
connect_timeout=600,
read_timeout=600,
),
Expand All @@ -84,6 +124,21 @@ def get_s3_client(path: Union[str, List[str]]):


def head_s3_object(client, path: str, raise_404=False) -> Union[Dict, None]:
"""Get s3 object metadata.

Args:
client (boto3.client): the s3 client
path (str): the s3 path
raise_404 (bool, optional): raise 404 error or not. Defaults to False.

Returns:
Union[Dict, None]: s3 object metadata or None if not found

Raises:
ClientError: if raise_404 is True and object not
ModelResourceException: if path is not s3 path
ModelResourceException: if bucket not in config
"""
bucket, key = split_s3_path(path)
try:
resp = client.head_object(Bucket=bucket, Key=key)
Expand Down
97 changes: 25 additions & 72 deletions llm_web_kit/model/resource_utils/download_assets.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
import errno
import hashlib
import os
import shutil
import tempfile
import time
from typing import Iterable, Optional

import requests
from tqdm import tqdm

from llm_web_kit.config.cfg_reader import load_config
from llm_web_kit.exception.exception import ModelInputException
from llm_web_kit.exception.exception import ModelResourceException
from llm_web_kit.libs.logger import mylogger as logger
from llm_web_kit.model.resource_utils.boto3_ext import (get_s3_client,
is_s3_path,
split_s3_path)


def try_remove(path: str):
"""Attempt to remove a file, but ignore any exceptions that occur."""
try:
os.remove(path)
except Exception:
pass
from llm_web_kit.model.resource_utils.utils import FileLockContext, try_remove


def decide_cache_dir():
Expand Down Expand Up @@ -114,59 +105,12 @@ def __del__(self):
self.response.close()


class FileLock:
"""基于文件锁的上下文管理器(跨平台兼容版)"""

def __init__(self, lock_path: str, timeout: float = 300):
self.lock_path = lock_path
self.timeout = timeout
self._fd = None

def __enter__(self):
start_time = time.time()
while True:
try:
# 原子性创建锁文件(O_EXCL标志是关键)
self._fd = os.open(
self.lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644
)
# 写入进程信息和时间戳
with os.fdopen(self._fd, 'w') as f:
f.write(f'{os.getpid()}\n{time.time()}')
return self
except OSError as e:
if e.errno != errno.EEXIST:
raise

# 检查锁是否过期
try:
with open(self.lock_path, 'r') as f:
pid, timestamp = f.read().split('\n')[:2]
if time.time() - float(timestamp) > self.timeout:
os.remove(self.lock_path)
except (FileNotFoundError, ValueError):
pass

if time.time() - start_time > self.timeout:
raise TimeoutError(f'Could not acquire lock after {self.timeout}s')
time.sleep(0.1)

def __exit__(self, exc_type, exc_val, exc_tb):
try:
if self._fd:
os.close(self._fd)
except OSError:
pass
finally:
try_remove(self.lock_path)


def verify_file_checksum(
file_path: str, md5_sum: Optional[str] = None, sha256_sum: Optional[str] = None
) -> bool:
"""校验文件哈希值."""
if not sum([bool(md5_sum), bool(sha256_sum)]) == 1:
raise ModelInputException(
raise ModelResourceException(
'Exactly one of md5_sum or sha256_sum must be provided'
)

Expand Down Expand Up @@ -210,15 +154,15 @@ def download_to_temp(conn, progress_bar) -> str:
def move_to_target(tmp_path: str, target_path: str, expected_size: int):
"""移动文件并验证."""
if os.path.getsize(tmp_path) != expected_size:
raise ValueError(
raise ModelResourceException(
f'File size mismatch: {os.path.getsize(tmp_path)} vs {expected_size}'
)

os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.move(tmp_path, target_path) # 原子操作替换

if not os.path.exists(target_path):
raise RuntimeError(f'Move failed: {tmp_path} -> {target_path}')
raise ModelResourceException(f'Move failed: {tmp_path} -> {target_path}')


def download_auto_file(
Expand Down Expand Up @@ -254,20 +198,29 @@ def download_auto_file(
"""线程安全的文件下载函数"""
lock_path = f'{target_path}.lock'

with FileLock(lock_path, timeout=lock_timeout):
# 二次检查(其他进程可能已经完成下载)
if os.path.exists(target_path):
if verify_file_checksum(target_path, md5_sum, sha256_sum):
logger.info(f'File already exists with valid checksum: {target_path}')
return target_path

if not exist_ok:
raise FileExistsError(
f'File exists with invalid checksum: {target_path}'
)
def check_callback():
return verify_file_checksum(target_path, md5_sum, sha256_sum)

if os.path.exists(target_path):
if not exist_ok:
raise ModelResourceException(
f'File exists with invalid checksum: {target_path}'
)

if verify_file_checksum(target_path, md5_sum, sha256_sum):
logger.info(f'File already exists with valid checksum: {target_path}')
return target_path
else:
logger.warning(f'Removing invalid file: {target_path}')
try_remove(target_path)

with FileLockContext(lock_path, check_callback, timeout=lock_timeout) as lock:
if lock is True:
logger.info(
f'File already exists with valid checksum: {target_path} while waiting'
)
return target_path

# 创建连接
conn_cls = S3Connection if is_s3_path(resource_path) else HttpConnection
conn = conn_cls(resource_path)
Expand Down
11 changes: 8 additions & 3 deletions llm_web_kit/model/resource_utils/singleton_resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from llm_web_kit.exception.exception import ModelResourceException


class SingletonResourceManager:

def __init__(self):
Expand All @@ -8,17 +11,19 @@ def has_name(self, name):

def set_resource(self, name: str, resource):
if not isinstance(name, str):
raise TypeError('name should be a string')
raise ModelResourceException(
f'Name should be a string, but got {type(name)}'
)
if name in self.resources:
raise AssertionError(f'Resource {name} already exists')
raise ModelResourceException(f'Resource {name} already exists')

self.resources[name] = resource

def get_resource(self, name):
if name in self.resources:
return self.resources[name]
else:
raise Exception(f'Resource {name} does not exist')
raise ModelResourceException(f'Resource {name} does not exist')

def release_resource(self, name):
if name in self.resources:
Expand Down
Loading