Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 168 additions & 117 deletions admin/handler.go

Large diffs are not rendered by default.

176 changes: 138 additions & 38 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type Account struct {

}

const (
defaultBackgroundRefreshInterval = 2 * time.Minute
defaultUsageProbeMaxAge = 10 * time.Minute
defaultRecoveryProbeInterval = 30 * time.Minute
)

// SchedulerBreakdown 调度评分拆解
type SchedulerBreakdown struct {
UnauthorizedPenalty float64
Expand Down Expand Up @@ -705,27 +711,31 @@ func (a *Account) GetLastUsedAt() time.Time {

// Store 多账号管理器(数据库 + Token 缓存)
type Store struct {
mu sync.RWMutex
accounts []*Account
globalProxy string
maxConcurrency int64 // 每账号最大并发数
testConcurrency int64 // 批量测试并发数
testModel atomic.Value // 测试连接使用的模型(string)
db *database.DB
tokenCache cache.TokenCache
usageProbeMu sync.RWMutex
usageProbe func(context.Context, *Account) error
usageProbeBatch atomic.Bool
recoveryProbeBatch atomic.Bool
autoCleanUnauthorized atomic.Bool
autoCleanRateLimited atomic.Bool
autoCleanFullUsage atomic.Bool
autoCleanError atomic.Bool
autoCleanExpired atomic.Bool
autoCleanupBatch atomic.Bool
maxRetries int64 // 请求失败最大重试次数(换号重试)
stopCh chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
accounts []*Account
globalProxy string
maxConcurrency int64 // 每账号最大并发数
testConcurrency int64 // 批量测试并发数
testModel atomic.Value // 测试连接使用的模型(string)
db *database.DB
tokenCache cache.TokenCache
usageProbeMu sync.RWMutex
usageProbe func(context.Context, *Account) error
usageProbeBatch atomic.Bool
recoveryProbeBatch atomic.Bool
autoCleanUnauthorized atomic.Bool
autoCleanRateLimited atomic.Bool
autoCleanFullUsage atomic.Bool
autoCleanError atomic.Bool
autoCleanExpired atomic.Bool
autoCleanupBatch atomic.Bool
maxRetries int64 // 请求失败最大重试次数(换号重试)
backgroundRefreshInterval int64 // 后台刷新/探针巡检间隔(ns)
usageProbeMaxAge int64 // 用量探针快照最大缓存时长(ns)
recoveryProbeInterval int64 // 恢复探测最小间隔(ns)
backgroundRefreshWakeCh chan struct{}
stopCh chan struct{}
wg sync.WaitGroup

// 代理池
proxyPool []string // 已启用的代理 URL 列表
Expand Down Expand Up @@ -775,23 +785,30 @@ func truthyEnv(v string) bool {
func NewStore(db *database.DB, tc cache.TokenCache, settings *database.SystemSettings) *Store {
if settings == nil {
settings = &database.SystemSettings{
MaxConcurrency: 2,
TestConcurrency: 50,
TestModel: "gpt-5.4",
ProxyURL: "",
MaxConcurrency: 2,
TestConcurrency: 50,
TestModel: "gpt-5.4",
BackgroundRefreshIntervalMinutes: 2,
UsageProbeMaxAgeMinutes: 10,
RecoveryProbeIntervalMinutes: 30,
ProxyURL: "",
}
}
s := &Store{
globalProxy: settings.ProxyURL,
maxConcurrency: int64(settings.MaxConcurrency),
testConcurrency: int64(settings.TestConcurrency),
db: db,
tokenCache: tc,
stopCh: make(chan struct{}),
proxyPoolEnabled: settings.ProxyPoolEnabled,
sessionBindings: make(map[string]sessionAffinity),
globalProxy: settings.ProxyURL,
maxConcurrency: int64(settings.MaxConcurrency),
testConcurrency: int64(settings.TestConcurrency),
db: db,
tokenCache: tc,
backgroundRefreshWakeCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
proxyPoolEnabled: settings.ProxyPoolEnabled,
sessionBindings: make(map[string]sessionAffinity),
}
s.testModel.Store(settings.TestModel)
s.SetBackgroundRefreshInterval(time.Duration(settings.BackgroundRefreshIntervalMinutes) * time.Minute)
s.SetUsageProbeMaxAge(time.Duration(settings.UsageProbeMaxAgeMinutes) * time.Minute)
s.SetRecoveryProbeInterval(time.Duration(settings.RecoveryProbeIntervalMinutes) * time.Minute)
s.autoCleanUnauthorized.Store(settings.AutoCleanUnauthorized)
s.autoCleanRateLimited.Store(settings.AutoCleanRateLimited)
s.autoCleanFullUsage.Store(settings.AutoCleanFullUsage)
Expand Down Expand Up @@ -1015,6 +1032,61 @@ func (s *Store) SetAutoCleanExpired(enabled bool) {
s.autoCleanExpired.Store(enabled)
}

// SetBackgroundRefreshInterval 设置后台刷新/探针巡检间隔。
func (s *Store) SetBackgroundRefreshInterval(d time.Duration) {
if d <= 0 {
d = defaultBackgroundRefreshInterval
}
atomic.StoreInt64(&s.backgroundRefreshInterval, int64(d))
select {
case s.backgroundRefreshWakeCh <- struct{}{}:
default:
}
}

// GetBackgroundRefreshInterval 获取后台刷新/探针巡检间隔。
func (s *Store) GetBackgroundRefreshInterval() time.Duration {
d := time.Duration(atomic.LoadInt64(&s.backgroundRefreshInterval))
if d <= 0 {
return defaultBackgroundRefreshInterval
}
return d
}

// SetUsageProbeMaxAge 设置用量探针最大缓存时长。
func (s *Store) SetUsageProbeMaxAge(d time.Duration) {
if d <= 0 {
d = defaultUsageProbeMaxAge
}
atomic.StoreInt64(&s.usageProbeMaxAge, int64(d))
}

// GetUsageProbeMaxAge 获取用量探针最大缓存时长。
func (s *Store) GetUsageProbeMaxAge() time.Duration {
d := time.Duration(atomic.LoadInt64(&s.usageProbeMaxAge))
if d <= 0 {
return defaultUsageProbeMaxAge
}
return d
}

// SetRecoveryProbeInterval 设置恢复探测最小间隔。
func (s *Store) SetRecoveryProbeInterval(d time.Duration) {
if d <= 0 {
d = defaultRecoveryProbeInterval
}
atomic.StoreInt64(&s.recoveryProbeInterval, int64(d))
}

// GetRecoveryProbeInterval 获取恢复探测最小间隔。
func (s *Store) GetRecoveryProbeInterval() time.Duration {
d := time.Duration(atomic.LoadInt64(&s.recoveryProbeInterval))
if d <= 0 {
return defaultRecoveryProbeInterval
}
return d
}

// CleanExpiredNow 立即执行一次过期清理,返回清理数量
func (s *Store) CleanExpiredNow() int {
return s.CleanExpiredAccounts(context.Background(), 30*time.Minute)
Expand Down Expand Up @@ -1149,24 +1221,37 @@ func (s *Store) StartBackgroundRefresh() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
refreshTicker := time.NewTicker(2 * time.Minute)
refreshTimer := time.NewTimer(s.GetBackgroundRefreshInterval())
autoCleanupTicker := time.NewTicker(30 * time.Second)
fullUsageCleanupTicker := time.NewTicker(5 * time.Minute)
expiredCleanupTicker := time.NewTicker(15 * time.Minute)
// 添加定时重建 FastScheduler 以优化性能
rebuildSchedulerTicker := time.NewTicker(10 * time.Minute)
defer refreshTicker.Stop()
defer refreshTimer.Stop()
defer autoCleanupTicker.Stop()
defer fullUsageCleanupTicker.Stop()
defer expiredCleanupTicker.Stop()
defer rebuildSchedulerTicker.Stop()

resetRefreshTimer := func() {
if !refreshTimer.Stop() {
select {
case <-refreshTimer.C:
default:
}
}
refreshTimer.Reset(s.GetBackgroundRefreshInterval())
}

for {
select {
case <-refreshTicker.C:
case <-refreshTimer.C:
s.parallelRefreshAll(context.Background())
s.TriggerUsageProbeAsync()
s.TriggerRecoveryProbeAsync()
refreshTimer.Reset(s.GetBackgroundRefreshInterval())
case <-s.backgroundRefreshWakeCh:
resetRefreshTimer()
case <-autoCleanupTicker.C:
s.TriggerAutoCleanupAsync()
case <-fullUsageCleanupTicker.C:
Expand Down Expand Up @@ -1518,6 +1603,21 @@ func (s *Store) GetTestConcurrency() int {
return int(atomic.LoadInt64(&s.testConcurrency))
}

// GetBackgroundRefreshIntervalMinutes 获取后台巡检间隔(分钟)。
func (s *Store) GetBackgroundRefreshIntervalMinutes() int {
return int(s.GetBackgroundRefreshInterval() / time.Minute)
}

// GetUsageProbeMaxAgeMinutes 获取用量探针最大缓存时长(分钟)。
func (s *Store) GetUsageProbeMaxAgeMinutes() int {
return int(s.GetUsageProbeMaxAge() / time.Minute)
}

// GetRecoveryProbeIntervalMinutes 获取恢复探测最小间隔(分钟)。
func (s *Store) GetRecoveryProbeIntervalMinutes() int {
return int(s.GetRecoveryProbeInterval() / time.Minute)
}

// SetModelMapping 动态更新模型映射 JSON
func (s *Store) SetModelMapping(mapping string) {
s.modelMapping.Store(mapping)
Expand Down Expand Up @@ -2008,7 +2108,7 @@ func (s *Store) parallelProbeUsage(ctx context.Context) {
var wg sync.WaitGroup

for _, acc := range accounts {
if !acc.NeedsUsageProbe(10 * time.Minute) {
if !acc.NeedsUsageProbe(s.GetUsageProbeMaxAge()) {
continue
}
if !acc.TryBeginUsageProbe() {
Expand Down Expand Up @@ -2050,7 +2150,7 @@ func (s *Store) parallelRecoveryProbe(ctx context.Context) {
var wg sync.WaitGroup

for _, acc := range accounts {
if !acc.NeedsRecoveryProbe(30 * time.Minute) {
if !acc.NeedsRecoveryProbe(s.GetRecoveryProbeInterval()) {
continue
}
if !acc.TryBeginRecoveryProbe() {
Expand Down
Loading
Loading