Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions backend/core/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
"""
# 创建数据库表
await create_table()
# 连接 redis
await redis_client.open()
# 初始化 limiter
await FastAPILimiter.init(
redis=redis_client,
Expand Down
33 changes: 15 additions & 18 deletions backend/plugin/tools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import inspect
import json
import os
Expand All @@ -11,7 +10,6 @@
from functools import lru_cache
from typing import Any

import nest_asyncio
import rtoml

from fastapi import APIRouter, Depends, Request
Expand All @@ -22,8 +20,9 @@
from backend.common.log import log
from backend.core.conf import settings
from backend.core.path_conf import PLUGIN_DIR
from backend.database.redis import redis_client
from backend.database.redis import RedisCli, redis_client
from backend.plugin.errors import PluginConfigError, PluginInjectError
from backend.utils._asyncio import run_await
from backend.utils.import_parse import import_module_cached


Expand Down Expand Up @@ -84,20 +83,18 @@ def parse_plugin_config() -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
extra_plugins = []
app_plugins = []

# 事件循环嵌套: https://pypi.org/project/nest-asyncio/
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.get_event_loop()
else:
nest_asyncio.apply()

loop.create_task(redis_client.delete_prefix(f'{settings.PLUGIN_REDIS_PREFIX}:info'))
plugin_status = loop.run_until_complete(redis_client.hgetall(f'{settings.PLUGIN_REDIS_PREFIX}:status')) # type: ignore
plugins = get_plugins()

# 使用独立单例,避免与主线程冲突
current_redis_client = RedisCli()
run_await(current_redis_client.open)()

run_await(current_redis_client.delete_prefix)(f'{settings.PLUGIN_REDIS_PREFIX}:info', exclude=plugins)
plugin_status = run_await(current_redis_client.hgetall)(f'{settings.PLUGIN_REDIS_PREFIX}:status')
if not plugin_status:
plugin_status = {}

for plugin in get_plugins():
for plugin in plugins:
data = load_plugin_config(plugin)

plugin_info = data.get('plugin')
Expand All @@ -123,13 +120,13 @@ def parse_plugin_config() -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
data['plugin']['name'] = plugin

# 缓存插件信息
loop.create_task(
redis_client.set(f'{settings.PLUGIN_REDIS_PREFIX}:info:{plugin}', json.dumps(data, ensure_ascii=False))
run_await(current_redis_client.set)(
f'{settings.PLUGIN_REDIS_PREFIX}:info:{plugin}', json.dumps(data, ensure_ascii=False)
)

# 缓存插件状态
loop.create_task(redis_client.hset(f'{settings.PLUGIN_REDIS_PREFIX}:status', mapping=plugin_status)) # type: ignore
loop.create_task(redis_client.delete(f'{settings.PLUGIN_REDIS_PREFIX}:changed'))
run_await(current_redis_client.hset)(f'{settings.PLUGIN_REDIS_PREFIX}:status', mapping=plugin_status)
run_await(current_redis_client.delete)(f'{settings.PLUGIN_REDIS_PREFIX}:changed')

return extra_plugins, app_plugins

Expand Down
69 changes: 69 additions & 0 deletions backend/utils/_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import atexit
import threading
import weakref

from typing import Awaitable, Callable, TypeVar

T = TypeVar('T')


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__loop: asyncio.AbstractEventLoop | None = None
self.__thread: threading.Thread | None = None
self.__lock = threading.Lock()
atexit.register(self.close)

def close(self):
"""关闭事件循环"""
if self.__loop:
self.__loop.stop()

def _target(self):
"""后台线程目标"""
loop = self.__loop
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""在后台线程上同步运行协程"""
with self.__lock:
name = f'{threading.current_thread().name} - runner'
if self.__loop is None:
self.__loop = asyncio.new_event_loop()
self.__thread = threading.Thread(target=self._target, daemon=True, name=name)
self.__thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__loop)
return fut.result(None)


_runner_map = weakref.WeakValueDictionary()


def run_await(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
"""将协程包装在一个函数中,该函数会阻塞,直到它执行完为止"""

def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# 如果当前此线程中正在运行循环
# 使用任务运行程序
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
return _runner_map[name].run(inner)
except RuntimeError:
# 如果没有,请创建一个新的事件循环
loop = asyncio.get_event_loop()
return loop.run_until_complete(inner)

wrapped.__doc__ = coro.__doc__
return wrapped
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ dependencies = [
"jinja2>=3.1.4",
"loguru>=0.7.3",
"msgspec>=0.19.0",
"nest-asyncio>=1.6.0",
"path==17.0.0",
"psutil>=6.0.0",
"pwdlib>=0.2.1",
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ mdurl==0.1.2
# via markdown-it-py
msgspec==0.19.0
# via fastapi-best-architecture
nest-asyncio==1.6.0
# via fastapi-best-architecture
nodeenv==1.9.1
# via pre-commit
packaging==24.2
Expand Down
11 changes: 0 additions & 11 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.