ayaka.constant
所有上下文变量
1'''所有上下文变量''' 2from contextvars import ContextVar 3from collections import defaultdict 4from re import Match 5from typing import List, TYPE_CHECKING, Dict 6from .driver import Message, MessageSegment, Bot, MessageEvent, get_driver 7from .utils import singleton 8 9if TYPE_CHECKING: 10 from .ayaka import AyakaApp, AyakaGroup 11 12 13_bot: ContextVar[Bot] = ContextVar("_bot") 14_event: ContextVar[MessageEvent] = ContextVar("_event") 15_group: ContextVar["AyakaGroup"] = ContextVar("_group") 16_arg: ContextVar[Message] = ContextVar("_arg") 17_args: ContextVar[List[MessageSegment]] = ContextVar("_args") 18_message: ContextVar[Message] = ContextVar("_message") 19_cmd: ContextVar[str] = ContextVar("_cmd", default="") 20_cmd_regex: ContextVar[Match] = ContextVar("_cmd_regex", default=None) 21_enter_exit_during: ContextVar[int] = ContextVar( 22 "_enter_exit_during", default=0) 23 24app_list: List["AyakaApp"] = [] 25group_list: List["AyakaGroup"] = [] 26bot_list: List[Bot] = [] 27 28# 监听私聊 29private_listener_dict: Dict[int, List[int]] = defaultdict(list) 30 31 32driver = get_driver() 33 34 35@singleton 36def start_timers(): 37 '''开启插件中的定时模块,仅执行一次''' 38 for app in app_list: 39 for t in app.timers: 40 t.start() 41 42 43@driver.on_bot_connect 44async def bot_connect(bot: Bot): 45 bot_list.append(bot) 46 start_timers() 47 48 49@driver.on_bot_disconnect 50async def bot_disconnect(bot: Bot): 51 bot_list.remove(bot)
def
start_timers(*args, **kwargs):
6 def getinstance(*args, **kwargs): 7 nonlocal instance 8 if instance is None: 9 instance = cls(*args, **kwargs) 10 return instance
开启插件中的定时模块,仅执行一次
@driver.on_bot_connect
async def
bot_connect(bot: nonebot.adapters.onebot.v11.bot.Bot):
@driver.on_bot_disconnect
async def
bot_disconnect(bot: nonebot.adapters.onebot.v11.bot.Bot):