ayaka.ayaka

ayaka核心

  1'''ayaka核心'''
  2import asyncio
  3import datetime
  4import inspect
  5import json
  6from math import ceil
  7from pathlib import Path
  8from loguru import logger
  9from typing import List, Dict, Literal, Union
 10from .depend import AyakaCache
 11from .config import ayaka_root_config, ayaka_data_path
 12from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict
 13from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent
 14from .state import AyakaState, AyakaTrigger, root_state
 15from .on import AyakaOn, AyakaTimer
 16
 17
 18class AyakaGroup:
 19    def __repr__(self) -> str:
 20        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
 21
 22    def __init__(self, bot_id: int, group_id: int) -> None:
 23        self.bot_id = bot_id
 24        self.group_id = group_id
 25        self.state = root_state
 26
 27        # 添加app,并分配独立数据空间
 28        self.apps: List["AyakaApp"] = []
 29        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
 30        for app in app_list:
 31            # if app.name not in forbid_names:
 32            self.apps.append(app)
 33            self.cache_dict[app.name] = {}
 34
 35        group_list.append(self)
 36
 37        if ayaka_root_config.debug:
 38            print(self)
 39
 40    async def back(self):
 41        if self.state.parent:
 42            await self.state.exit()
 43            self.state = self.state.parent
 44            return self.state
 45
 46    async def goto(self, state: AyakaState):
 47        if _enter_exit_during.get() > 0:
 48            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
 49
 50        keys = state.keys
 51
 52        # 找到第一个不同的结点
 53        n0 = len(keys)
 54        n1 = len(self.state.keys)
 55        n = min(n0, n1)
 56        for i in range(n):
 57            if keys[i] != self.state.keys[i]:
 58                break
 59        else:
 60            i += 1
 61
 62        # 回退
 63        for j in range(i, n1):
 64            await self.back()
 65        keys = keys[i:]
 66
 67        # 重新出发
 68        for key in keys:
 69            self.state = self.state[key]
 70            await self.state.enter()
 71        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
 72        return self.state
 73
 74    def get_app(self, name: str):
 75        '''根据app名获取该group所启用的app,不存在则返回None'''
 76        for app in self.apps:
 77            if app.name == name:
 78                return app
 79
 80
 81class AyakaApp:
 82    def __repr__(self) -> str:
 83        return f"{self.__class__.__name__}({self.name})"
 84
 85    def __init__(self, name: str) -> None:
 86        self.path = Path(inspect.stack()[1].filename)
 87
 88        for app in app_list:
 89            if app.name == name:
 90                raise Exception(
 91                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 92
 93        self.name = name
 94        self.ayaka_root_config = ayaka_root_config
 95        self.funcs = []
 96        self.on = AyakaOn(self)
 97        self.timers: List[AyakaTimer] = []
 98        self.root_state = root_state
 99        self._intro = "没有介绍"
100        self.state_helps: Dict[str, List[str]] = {}
101        self.idle_helps: List[str] = []
102
103        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
104        app_list.append(self)
105        if ayaka_root_config.debug:
106            print(self)
107
108    @property
109    def intro(self):
110        help = self._intro
111        for h in self.idle_helps:
112            help += "\n" + h
113        return help
114
115    @property
116    def all_help(self):
117        help = self.intro
118        for s, hs in self.state_helps.items():
119            help += f"\n[{s}]"
120            for h in hs:
121                help += "\n" + h
122        return help
123
124    @property
125    def help(self):
126        '''获取当前状态下的帮助,没有找到则返回介绍'''
127        total_triggers = get_cascade_triggers(self.state)
128
129        helps = []
130        cmds = []
131        for ts in total_triggers:
132            flag = 1
133            for t in ts:
134                if t.app == self:
135                    for c in t.cmds:
136                        if c in cmds:
137                            break
138                    else:
139                        if flag:
140                            helps.append(f"[{t.state[1:]}]")
141                            flag = 0
142                        cmds.extend(t.cmds)
143                        helps.append(t.help)
144
145        if not helps:
146            return self.intro
147        return "\n".join(helps)
148
149    @help.setter
150    def help(self, help: str):
151        self._intro = help
152
153    @property
154    def user_name(self):
155        '''*timer触发时不可用*
156
157        当前消息的发送人的群名片或昵称
158        '''
159        s = self.event.sender
160        name = s.card or s.nickname
161        return name
162
163    @property
164    def user_id(self):
165        '''*timer触发时不可用*
166
167        当前消息的发送人的uid
168        '''
169        return self.event.user_id
170
171    @property
172    def bot(self):
173        '''*timer触发时不可用*
174
175        当前bot
176        '''
177        return _bot.get()
178
179    @property
180    def event(self):
181        '''*timer触发时不可用*
182
183        当前消息
184        '''
185        return _event.get()
186
187    @property
188    def cache(self):
189        '''*timer触发时不可用*
190
191        当前群组的缓存空间'''
192        return self.group.cache_dict[self.name]
193
194    @property
195    def group_id(self):
196        '''*timer触发时不可用*
197
198        当前群组的id
199
200        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
201        '''
202        return self.group.group_id
203
204    @property
205    def bot_id(self):
206        '''*timer触发时不可用*
207
208        当前bot的id
209        '''
210        return self.group.bot_id
211
212    @property
213    def group(self):
214        '''*timer触发时不可用*
215
216        当前群组
217
218        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
219        '''
220        return _group.get()
221
222    @property
223    def arg(self):
224        '''*timer触发时不可用*
225
226        当前消息在移除了命令后的剩余部分
227        '''
228        return _arg.get()
229
230    @property
231    def args(self):
232        '''*timer触发时不可用*
233
234        当前消息在移除了命令后,剩余部分按照空格分割后的数组
235
236        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
237        '''
238        return _args.get()
239
240    @property
241    def cmd(self):
242        '''*timer触发时不可用*
243
244        当前消息的命令头
245        '''
246        return _cmd.get()
247
248    @property
249    def message(self):
250        '''*timer触发时不可用*
251
252        当前消息
253        '''
254        return _message.get()
255
256    @property
257    def state(self):
258        return self.group.state
259
260    def get_state(self, *keys: str):
261        '''
262            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
263
264            >>> get_state(key1, key2) -> [root.test].key1.key2
265
266            特别的,keys可以为空,例如:
267
268            >>> get_state() -> [root.test]
269        '''
270        keys = [self.name, *keys]
271        return root_state.join(*keys)
272
273    async def set_state(self, state_or_key: Union[AyakaState, str], *keys: str):
274        return await self.goto(state_or_key, *keys)
275
276    async def goto(self, state_or_key: Union[AyakaState, str], *keys: str):
277        '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填'''
278        if isinstance(state_or_key, AyakaState):
279            state = state_or_key
280        else:
281            state = self.get_state(*state_or_key.split("."), *keys)
282        return await self.group.goto(state)
283
284    async def back(self):
285        return await self.group.back()
286
287    def _add_func(self, func):
288        if func not in self.funcs:
289            self.funcs.append(func)
290
291    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
292        '''注册深度监听'''
293        def decorator(func):
294            func.deep = deep
295            self._add_func(func)
296            return func
297        return decorator
298
299    def on_no_block(self, block: bool = False):
300        '''注册非阻断'''
301        def decorator(func):
302            func.block = block
303            self._add_func(func)
304            return func
305        return decorator
306
307    def on_cmd(self, *cmds: str):
308        '''注册命令触发,不填写命令则视为文本消息'''
309        def decorator(func):
310            func.cmds = cmds
311            self._add_func(func)
312            return func
313        return decorator
314
315    def on_state(self, *states: Union[AyakaState, str, List[str]]):
316        '''注册有状态响应,不填写states则为root.插件名状态'''
317        _states = []
318        if not states:
319            states = [[]]
320        for s in states:
321            if isinstance(s, str):
322                s = self.get_state(s)
323            elif isinstance(s, list):
324                s = self.get_state(*s)
325            _states.append(s)
326
327        def decorator(func):
328            func.states = _states
329            self._add_func(func)
330            return func
331        return decorator
332
333    def set_start_cmds(self, *cmds: str):
334        '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式'''
335        async def func():
336            '''打开应用'''
337            await self.start()
338        func.cmds = cmds
339        self.funcs.append(func)
340
341    async def start(self, state: str = None):
342        '''*timer触发时不可用*
343
344        启动应用,并发送提示
345
346        state参数为兼容旧API'''
347        if not state:
348            state = self.get_state()
349            await self.goto(state)
350        else:
351            states = state.split(".")
352            await self.goto(*states)
353        await self.send(f"已打开应用 [{self.name}]")
354
355    async def close(self):
356        '''*timer触发时不可用*
357
358        关闭应用,并发送提示'''
359        await self.goto(root_state)
360        await self.send(f"已关闭应用 [{self.name}]")
361
362    def add_listener(self, user_id: int):
363        '''为该群组添加对指定私聊的监听'''
364        private_listener_dict[user_id].append(self.group_id)
365
366    def remove_listener(self, user_id: int = 0):
367        '''默认移除该群组对其他私聊的所有监听'''
368        id = self.group_id
369
370        if user_id == 0:
371            for ids in private_listener_dict.values():
372                if id in ids:
373                    ids.remove(id)
374            return
375
376        if id in private_listener_dict[user_id]:
377            private_listener_dict[user_id].remove(self.group_id)
378
379    async def send(self, message):
380        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
381        # 这里不使用event,因为一些event可能来自其他设备的监听传递
382        await self.bot.send_group_msg(group_id=self.group_id, message=message)
383
384    def pack_messages(self, bot_id, messages):
385        '''转换为cqhttp node格式'''
386        data: List[MessageSegment] = []
387        for m in messages:
388            if isinstance(m, MessageSegment) and m.type == "node":
389                data.append(m)
390            else:
391                m = MessageSegment.node_custom(
392                    user_id=bot_id,
393                    nickname="Ayaka Bot",
394                    content=str(m)
395                )
396                data.append(m)
397        return data
398
399    async def send_many(self, messages):
400        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
401        # 分割长消息组(不可超过100条
402        div_len = 100
403        div_cnt = ceil(len(messages) / div_len)
404        for i in range(div_cnt):
405            msgs = self.pack_messages(
406                self.bot_id,
407                messages[i*div_len: (i+1)*div_len]
408            )
409            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
410
411    def t_check(self, bot_id: int):
412        # 未连接
413        bot = get_bot(bot_id)
414        if not bot:
415            logger.warning(f"BOT({bot_id}) 未连接")
416            return
417
418        return bot
419
420    async def t_send(self, bot_id: int, group_id: int, message):
421        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
422        bot = self.t_check(bot_id)
423        if not bot:
424            return
425
426        await bot.send_group_msg(group_id=group_id, message=message)
427
428    async def t_send_many(self, bot_id: int, group_id: int, messages):
429        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
430        bot = self.t_check(bot_id)
431        if not bot:
432            return
433
434        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
435        div_len = 80
436        div_cnt = ceil(len(messages) / div_len)
437        for i in range(div_cnt):
438            msgs = self.pack_messages(
439                bot_id,
440                messages[i*div_len: (i+1)*div_len]
441            )
442            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
443
444
445def get_bot(bot_id: int):
446    '''获取已连接的bot'''
447    bot_id = str(bot_id)
448    for bot in bot_list:
449        if bot.self_id == bot_id:
450            return bot
451
452
453def get_group(bot_id: int, group_id: int):
454    '''获取对应的AyakaGroup对象,自动增加'''
455    for group in group_list:
456        if group.bot_id == bot_id and group.group_id == group_id:
457            break
458    else:
459        group = AyakaGroup(bot_id, group_id)
460    return group
461
462
463def get_app(app_name: str):
464    for app in app_list:
465        if app.name == app_name:
466            return app
467
468
469async def deal_event(bot: Bot, event: MessageEvent):
470    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
471    if ayaka_root_config.exclude_old_msg:
472        time_i = int(datetime.datetime.now().timestamp())
473        if event.time < time_i - 60:
474            return
475
476    _bot.set(bot)
477    _event.set(event)
478
479    bot_id = int(bot.self_id)
480
481    if isinstance(event, GroupMessageEvent):
482        group_id = event.group_id
483        await deal_group(bot_id, group_id)
484
485    else:
486        id = event.user_id
487        group_ids = private_listener_dict.get(id, [])
488        ts = [asyncio.create_task(deal_group(bot_id, group_id))
489              for group_id in group_ids]
490        await asyncio.gather(*ts)
491
492
493async def deal_group(bot_id: int, group_id: int):
494    prefix = ayaka_root_config.prefix
495
496    # 群组
497    group = get_group(bot_id, group_id)
498    _group.set(group)
499
500    # 消息
501    message = _event.get().message
502    _message.set(message)
503
504    # 从子状态开始向上查找可用的触发
505    state = group.state
506    cascade_triggers = get_cascade_triggers(state, 0)
507
508    # 命令
509    # 消息前缀文本
510    first = get_first(message)
511    if first.startswith(prefix):
512        first = first[len(prefix):]
513        for ts in cascade_triggers:
514            if await deal_cmd_triggers(ts, message, first, state):
515                return
516
517    # 命令退化成消息
518    for ts in cascade_triggers:
519        if await deal_text_triggers(ts, message, state):
520            return
521
522
523async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
524    sep = ayaka_root_config.separate
525
526    # 命令
527    temp = [t for t in triggers if t.cmds]
528    # 根据命令长度排序,长命令优先级更高
529    cmd_ts = [(c, t) for t in temp for c in t.cmds]
530    cmd_ts.sort(key=lambda x: len(x[0]), reverse=1)
531
532    for c, t in cmd_ts:
533        if first.startswith(c):
534            # 设置上下文
535            # 设置命令
536            _cmd.set(c)
537            # 设置参数
538            left = first[len(c):].lstrip(sep)
539            if left:
540                arg = Message([MessageSegment.text(left), *message[1:]])
541            else:
542                arg = Message(message[1:])
543            _arg.set(arg)
544            _args.set(divide_message(arg))
545
546            # 触发
547            log_trigger(c, t.app.name, state, t.func.__name__)
548            await t.run()
549
550            # 阻断后续
551            if t.block:
552                return True
553
554
555async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
556    # 消息
557    text_ts = [t for t in triggers if not t.cmds]
558
559    # 设置上下文
560    # 设置命令
561    _cmd.set("")
562    # 设置参数
563    _arg.set(message)
564    _args.set(divide_message(message))
565
566    for t in text_ts:
567        # 触发
568        log_trigger("", t.app.name, state, t.func.__name__)
569        await t.run()
570
571        # 阻断后续
572        if t.block:
573            return True
574
575
576def get_cascade_triggers(state: AyakaState, deep: int = 0):
577    # 根据深度筛选funcs
578    ts = [
579        t for t in state.triggers
580        if t.deep == "all" or t.deep >= deep
581    ]
582    cascade_triggers = [ts]
583
584    # 获取父状态的方法
585    if state.parent:
586        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
587
588    # 排除空项
589    cascade_triggers = [ts for ts in cascade_triggers if ts]
590    return cascade_triggers
591
592
593def log_trigger(cmd, app_name, state, func_name):
594    '''日志记录'''
595    items = []
596    items.append(f"状态:<c>{state}</c>")
597    items.append(f"应用:<y>{app_name}</y>")
598    if cmd:
599        items.append(f"命令:<y>{cmd}</y>")
600    else:
601        items.append("命令:<g>无</g>")
602    items.append(f"回调:<c>{func_name}</c>")
603    info = " | ".join(items)
604    logger.opt(colors=True).debug(info)
605
606
607def get_first(message: Message):
608    first = ""
609    for m in message:
610        if m.type == "text":
611            first += str(m)
612        else:
613            break
614    return first
615
616
617def divide_message(message: Message) -> List[MessageSegment]:
618    args = []
619    sep = ayaka_root_config.separate
620
621    for m in message:
622        if m.is_text():
623            ss = str(m).split(sep)
624            args.extend(MessageSegment.text(s) for s in ss if s)
625        else:
626            args.append(m)
627
628    return args
629
630
631def regist_func(app: AyakaApp, func):
632    '''注册回调'''
633    # 默认是无状态应用,从root开始触发
634    states: List[AyakaState] = getattr(func, "states", [root_state])
635    # 默认是消息响应
636    cmds: List[str] = getattr(func, "cmds", [])
637    # 默认监听深度为0
638    deep: int = getattr(func, "deep", 0)
639    # 默认阻断
640    block: bool = getattr(func, "block", True)
641
642    # 注册
643    for s in states:
644        s.on_cmd(cmds, app, deep, block)(func)
645
646    return func
647
648
649driver = get_driver()
650
651
652@driver.on_startup
653async def startup():
654    # 注册所有回调
655    for app in app_list:
656        for func in app.funcs:
657            regist_func(app, func)
658
659    if ayaka_root_config.debug:
660        s = json.dumps(
661            root_state.dict(), ensure_ascii=0,
662            indent=4, default=repr
663        )
664        path = ayaka_data_path / "all_state.json"
665        with path.open("w+", encoding="utf8") as f:
666            f.write(s)
667
668on_message(priority=20, block=False, handlers=[deal_event])
class AyakaGroup:
19class AyakaGroup:
20    def __repr__(self) -> str:
21        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
22
23    def __init__(self, bot_id: int, group_id: int) -> None:
24        self.bot_id = bot_id
25        self.group_id = group_id
26        self.state = root_state
27
28        # 添加app,并分配独立数据空间
29        self.apps: List["AyakaApp"] = []
30        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
31        for app in app_list:
32            # if app.name not in forbid_names:
33            self.apps.append(app)
34            self.cache_dict[app.name] = {}
35
36        group_list.append(self)
37
38        if ayaka_root_config.debug:
39            print(self)
40
41    async def back(self):
42        if self.state.parent:
43            await self.state.exit()
44            self.state = self.state.parent
45            return self.state
46
47    async def goto(self, state: AyakaState):
48        if _enter_exit_during.get() > 0:
49            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
50
51        keys = state.keys
52
53        # 找到第一个不同的结点
54        n0 = len(keys)
55        n1 = len(self.state.keys)
56        n = min(n0, n1)
57        for i in range(n):
58            if keys[i] != self.state.keys[i]:
59                break
60        else:
61            i += 1
62
63        # 回退
64        for j in range(i, n1):
65            await self.back()
66        keys = keys[i:]
67
68        # 重新出发
69        for key in keys:
70            self.state = self.state[key]
71            await self.state.enter()
72        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
73        return self.state
74
75    def get_app(self, name: str):
76        '''根据app名获取该group所启用的app,不存在则返回None'''
77        for app in self.apps:
78            if app.name == name:
79                return app
AyakaGroup(bot_id: int, group_id: int)
23    def __init__(self, bot_id: int, group_id: int) -> None:
24        self.bot_id = bot_id
25        self.group_id = group_id
26        self.state = root_state
27
28        # 添加app,并分配独立数据空间
29        self.apps: List["AyakaApp"] = []
30        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
31        for app in app_list:
32            # if app.name not in forbid_names:
33            self.apps.append(app)
34            self.cache_dict[app.name] = {}
35
36        group_list.append(self)
37
38        if ayaka_root_config.debug:
39            print(self)
async def back(self):
41    async def back(self):
42        if self.state.parent:
43            await self.state.exit()
44            self.state = self.state.parent
45            return self.state
async def goto(self, state: ayaka.state.AyakaState):
47    async def goto(self, state: AyakaState):
48        if _enter_exit_during.get() > 0:
49            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
50
51        keys = state.keys
52
53        # 找到第一个不同的结点
54        n0 = len(keys)
55        n1 = len(self.state.keys)
56        n = min(n0, n1)
57        for i in range(n):
58            if keys[i] != self.state.keys[i]:
59                break
60        else:
61            i += 1
62
63        # 回退
64        for j in range(i, n1):
65            await self.back()
66        keys = keys[i:]
67
68        # 重新出发
69        for key in keys:
70            self.state = self.state[key]
71            await self.state.enter()
72        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
73        return self.state
def get_app(self, name: str):
75    def get_app(self, name: str):
76        '''根据app名获取该group所启用的app,不存在则返回None'''
77        for app in self.apps:
78            if app.name == name:
79                return app

根据app名获取该group所启用的app,不存在则返回None

class AyakaApp:
 82class AyakaApp:
 83    def __repr__(self) -> str:
 84        return f"{self.__class__.__name__}({self.name})"
 85
 86    def __init__(self, name: str) -> None:
 87        self.path = Path(inspect.stack()[1].filename)
 88
 89        for app in app_list:
 90            if app.name == name:
 91                raise Exception(
 92                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 93
 94        self.name = name
 95        self.ayaka_root_config = ayaka_root_config
 96        self.funcs = []
 97        self.on = AyakaOn(self)
 98        self.timers: List[AyakaTimer] = []
 99        self.root_state = root_state
100        self._intro = "没有介绍"
101        self.state_helps: Dict[str, List[str]] = {}
102        self.idle_helps: List[str] = []
103
104        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
105        app_list.append(self)
106        if ayaka_root_config.debug:
107            print(self)
108
109    @property
110    def intro(self):
111        help = self._intro
112        for h in self.idle_helps:
113            help += "\n" + h
114        return help
115
116    @property
117    def all_help(self):
118        help = self.intro
119        for s, hs in self.state_helps.items():
120            help += f"\n[{s}]"
121            for h in hs:
122                help += "\n" + h
123        return help
124
125    @property
126    def help(self):
127        '''获取当前状态下的帮助,没有找到则返回介绍'''
128        total_triggers = get_cascade_triggers(self.state)
129
130        helps = []
131        cmds = []
132        for ts in total_triggers:
133            flag = 1
134            for t in ts:
135                if t.app == self:
136                    for c in t.cmds:
137                        if c in cmds:
138                            break
139                    else:
140                        if flag:
141                            helps.append(f"[{t.state[1:]}]")
142                            flag = 0
143                        cmds.extend(t.cmds)
144                        helps.append(t.help)
145
146        if not helps:
147            return self.intro
148        return "\n".join(helps)
149
150    @help.setter
151    def help(self, help: str):
152        self._intro = help
153
154    @property
155    def user_name(self):
156        '''*timer触发时不可用*
157
158        当前消息的发送人的群名片或昵称
159        '''
160        s = self.event.sender
161        name = s.card or s.nickname
162        return name
163
164    @property
165    def user_id(self):
166        '''*timer触发时不可用*
167
168        当前消息的发送人的uid
169        '''
170        return self.event.user_id
171
172    @property
173    def bot(self):
174        '''*timer触发时不可用*
175
176        当前bot
177        '''
178        return _bot.get()
179
180    @property
181    def event(self):
182        '''*timer触发时不可用*
183
184        当前消息
185        '''
186        return _event.get()
187
188    @property
189    def cache(self):
190        '''*timer触发时不可用*
191
192        当前群组的缓存空间'''
193        return self.group.cache_dict[self.name]
194
195    @property
196    def group_id(self):
197        '''*timer触发时不可用*
198
199        当前群组的id
200
201        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
202        '''
203        return self.group.group_id
204
205    @property
206    def bot_id(self):
207        '''*timer触发时不可用*
208
209        当前bot的id
210        '''
211        return self.group.bot_id
212
213    @property
214    def group(self):
215        '''*timer触发时不可用*
216
217        当前群组
218
219        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
220        '''
221        return _group.get()
222
223    @property
224    def arg(self):
225        '''*timer触发时不可用*
226
227        当前消息在移除了命令后的剩余部分
228        '''
229        return _arg.get()
230
231    @property
232    def args(self):
233        '''*timer触发时不可用*
234
235        当前消息在移除了命令后,剩余部分按照空格分割后的数组
236
237        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
238        '''
239        return _args.get()
240
241    @property
242    def cmd(self):
243        '''*timer触发时不可用*
244
245        当前消息的命令头
246        '''
247        return _cmd.get()
248
249    @property
250    def message(self):
251        '''*timer触发时不可用*
252
253        当前消息
254        '''
255        return _message.get()
256
257    @property
258    def state(self):
259        return self.group.state
260
261    def get_state(self, *keys: str):
262        '''
263            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
264
265            >>> get_state(key1, key2) -> [root.test].key1.key2
266
267            特别的,keys可以为空,例如:
268
269            >>> get_state() -> [root.test]
270        '''
271        keys = [self.name, *keys]
272        return root_state.join(*keys)
273
274    async def set_state(self, state_or_key: Union[AyakaState, str], *keys: str):
275        return await self.goto(state_or_key, *keys)
276
277    async def goto(self, state_or_key: Union[AyakaState, str], *keys: str):
278        '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填'''
279        if isinstance(state_or_key, AyakaState):
280            state = state_or_key
281        else:
282            state = self.get_state(*state_or_key.split("."), *keys)
283        return await self.group.goto(state)
284
285    async def back(self):
286        return await self.group.back()
287
288    def _add_func(self, func):
289        if func not in self.funcs:
290            self.funcs.append(func)
291
292    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
293        '''注册深度监听'''
294        def decorator(func):
295            func.deep = deep
296            self._add_func(func)
297            return func
298        return decorator
299
300    def on_no_block(self, block: bool = False):
301        '''注册非阻断'''
302        def decorator(func):
303            func.block = block
304            self._add_func(func)
305            return func
306        return decorator
307
308    def on_cmd(self, *cmds: str):
309        '''注册命令触发,不填写命令则视为文本消息'''
310        def decorator(func):
311            func.cmds = cmds
312            self._add_func(func)
313            return func
314        return decorator
315
316    def on_state(self, *states: Union[AyakaState, str, List[str]]):
317        '''注册有状态响应,不填写states则为root.插件名状态'''
318        _states = []
319        if not states:
320            states = [[]]
321        for s in states:
322            if isinstance(s, str):
323                s = self.get_state(s)
324            elif isinstance(s, list):
325                s = self.get_state(*s)
326            _states.append(s)
327
328        def decorator(func):
329            func.states = _states
330            self._add_func(func)
331            return func
332        return decorator
333
334    def set_start_cmds(self, *cmds: str):
335        '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式'''
336        async def func():
337            '''打开应用'''
338            await self.start()
339        func.cmds = cmds
340        self.funcs.append(func)
341
342    async def start(self, state: str = None):
343        '''*timer触发时不可用*
344
345        启动应用,并发送提示
346
347        state参数为兼容旧API'''
348        if not state:
349            state = self.get_state()
350            await self.goto(state)
351        else:
352            states = state.split(".")
353            await self.goto(*states)
354        await self.send(f"已打开应用 [{self.name}]")
355
356    async def close(self):
357        '''*timer触发时不可用*
358
359        关闭应用,并发送提示'''
360        await self.goto(root_state)
361        await self.send(f"已关闭应用 [{self.name}]")
362
363    def add_listener(self, user_id: int):
364        '''为该群组添加对指定私聊的监听'''
365        private_listener_dict[user_id].append(self.group_id)
366
367    def remove_listener(self, user_id: int = 0):
368        '''默认移除该群组对其他私聊的所有监听'''
369        id = self.group_id
370
371        if user_id == 0:
372            for ids in private_listener_dict.values():
373                if id in ids:
374                    ids.remove(id)
375            return
376
377        if id in private_listener_dict[user_id]:
378            private_listener_dict[user_id].remove(self.group_id)
379
380    async def send(self, message):
381        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
382        # 这里不使用event,因为一些event可能来自其他设备的监听传递
383        await self.bot.send_group_msg(group_id=self.group_id, message=message)
384
385    def pack_messages(self, bot_id, messages):
386        '''转换为cqhttp node格式'''
387        data: List[MessageSegment] = []
388        for m in messages:
389            if isinstance(m, MessageSegment) and m.type == "node":
390                data.append(m)
391            else:
392                m = MessageSegment.node_custom(
393                    user_id=bot_id,
394                    nickname="Ayaka Bot",
395                    content=str(m)
396                )
397                data.append(m)
398        return data
399
400    async def send_many(self, messages):
401        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
402        # 分割长消息组(不可超过100条
403        div_len = 100
404        div_cnt = ceil(len(messages) / div_len)
405        for i in range(div_cnt):
406            msgs = self.pack_messages(
407                self.bot_id,
408                messages[i*div_len: (i+1)*div_len]
409            )
410            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
411
412    def t_check(self, bot_id: int):
413        # 未连接
414        bot = get_bot(bot_id)
415        if not bot:
416            logger.warning(f"BOT({bot_id}) 未连接")
417            return
418
419        return bot
420
421    async def t_send(self, bot_id: int, group_id: int, message):
422        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
423        bot = self.t_check(bot_id)
424        if not bot:
425            return
426
427        await bot.send_group_msg(group_id=group_id, message=message)
428
429    async def t_send_many(self, bot_id: int, group_id: int, messages):
430        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
431        bot = self.t_check(bot_id)
432        if not bot:
433            return
434
435        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
436        div_len = 80
437        div_cnt = ceil(len(messages) / div_len)
438        for i in range(div_cnt):
439            msgs = self.pack_messages(
440                bot_id,
441                messages[i*div_len: (i+1)*div_len]
442            )
443            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
 86    def __init__(self, name: str) -> None:
 87        self.path = Path(inspect.stack()[1].filename)
 88
 89        for app in app_list:
 90            if app.name == name:
 91                raise Exception(
 92                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 93
 94        self.name = name
 95        self.ayaka_root_config = ayaka_root_config
 96        self.funcs = []
 97        self.on = AyakaOn(self)
 98        self.timers: List[AyakaTimer] = []
 99        self.root_state = root_state
100        self._intro = "没有介绍"
101        self.state_helps: Dict[str, List[str]] = {}
102        self.idle_helps: List[str] = []
103
104        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
105        app_list.append(self)
106        if ayaka_root_config.debug:
107            print(self)
help

获取当前状态下的帮助,没有找到则返回介绍

user_name

timer触发时不可用

当前消息的发送人的群名片或昵称

user_id

timer触发时不可用

当前消息的发送人的uid

bot

timer触发时不可用

当前bot

event

timer触发时不可用

当前消息

cache

timer触发时不可用

当前群组的缓存空间

group_id

timer触发时不可用

当前群组的id

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id

bot_id

timer触发时不可用

当前bot的id

group

timer触发时不可用

当前群组

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A

arg

timer触发时不可用

当前消息在移除了命令后的剩余部分

args

timer触发时不可用

当前消息在移除了命令后,剩余部分按照空格分割后的数组

注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素

cmd

timer触发时不可用

当前消息的命令头

message

timer触发时不可用

当前消息

def get_state(self, *keys: str):
261    def get_state(self, *keys: str):
262        '''
263            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
264
265            >>> get_state(key1, key2) -> [root.test].key1.key2
266
267            特别的,keys可以为空,例如:
268
269            >>> get_state() -> [root.test]
270        '''
271        keys = [self.name, *keys]
272        return root_state.join(*keys)

假设当前状态为 root.test.a根.插件名.一级菜单项

>>> get_state(key1, key2) -> [root.test].key1.key2

特别的,keys可以为空,例如:

>>> get_state() -> [root.test]
async def set_state(self, state_or_key: Union[ayaka.state.AyakaState, str], *keys: str):
274    async def set_state(self, state_or_key: Union[AyakaState, str], *keys: str):
275        return await self.goto(state_or_key, *keys)
async def goto(self, state_or_key: Union[ayaka.state.AyakaState, str], *keys: str):
277    async def goto(self, state_or_key: Union[AyakaState, str], *keys: str):
278        '''当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填'''
279        if isinstance(state_or_key, AyakaState):
280            state = state_or_key
281        else:
282            state = self.get_state(*state_or_key.split("."), *keys)
283        return await self.group.goto(state)

当state_or_key为字符串时,调用get_state进行初始化(state_or_key.keys1.keys2),keys可选填

async def back(self):
285    async def back(self):
286        return await self.group.back()
def on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
292    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
293        '''注册深度监听'''
294        def decorator(func):
295            func.deep = deep
296            self._add_func(func)
297            return func
298        return decorator

注册深度监听

def on_no_block(self, block: bool = False):
300    def on_no_block(self, block: bool = False):
301        '''注册非阻断'''
302        def decorator(func):
303            func.block = block
304            self._add_func(func)
305            return func
306        return decorator

注册非阻断

def on_cmd(self, *cmds: str):
308    def on_cmd(self, *cmds: str):
309        '''注册命令触发,不填写命令则视为文本消息'''
310        def decorator(func):
311            func.cmds = cmds
312            self._add_func(func)
313            return func
314        return decorator

注册命令触发,不填写命令则视为文本消息

def on_state(self, *states: Union[ayaka.state.AyakaState, str, List[str]]):
316    def on_state(self, *states: Union[AyakaState, str, List[str]]):
317        '''注册有状态响应,不填写states则为root.插件名状态'''
318        _states = []
319        if not states:
320            states = [[]]
321        for s in states:
322            if isinstance(s, str):
323                s = self.get_state(s)
324            elif isinstance(s, list):
325                s = self.get_state(*s)
326            _states.append(s)
327
328        def decorator(func):
329            func.states = _states
330            self._add_func(func)
331            return func
332        return decorator

注册有状态响应,不填写states则为root.插件名状态

def set_start_cmds(self, *cmds: str):
334    def set_start_cmds(self, *cmds: str):
335        '''设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式'''
336        async def func():
337            '''打开应用'''
338            await self.start()
339        func.cmds = cmds
340        self.funcs.append(func)

设置应用启动命令,当然,你也可以通过on_cmd自定义启动方式

async def start(self, state: str = None):
342    async def start(self, state: str = None):
343        '''*timer触发时不可用*
344
345        启动应用,并发送提示
346
347        state参数为兼容旧API'''
348        if not state:
349            state = self.get_state()
350            await self.goto(state)
351        else:
352            states = state.split(".")
353            await self.goto(*states)
354        await self.send(f"已打开应用 [{self.name}]")

timer触发时不可用

启动应用,并发送提示

state参数为兼容旧API

async def close(self):
356    async def close(self):
357        '''*timer触发时不可用*
358
359        关闭应用,并发送提示'''
360        await self.goto(root_state)
361        await self.send(f"已关闭应用 [{self.name}]")

timer触发时不可用

关闭应用,并发送提示

def add_listener(self, user_id: int):
363    def add_listener(self, user_id: int):
364        '''为该群组添加对指定私聊的监听'''
365        private_listener_dict[user_id].append(self.group_id)

为该群组添加对指定私聊的监听

def remove_listener(self, user_id: int = 0):
367    def remove_listener(self, user_id: int = 0):
368        '''默认移除该群组对其他私聊的所有监听'''
369        id = self.group_id
370
371        if user_id == 0:
372            for ids in private_listener_dict.values():
373                if id in ids:
374                    ids.remove(id)
375            return
376
377        if id in private_listener_dict[user_id]:
378            private_listener_dict[user_id].remove(self.group_id)

默认移除该群组对其他私聊的所有监听

async def send(self, message):
380    async def send(self, message):
381        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
382        # 这里不使用event,因为一些event可能来自其他设备的监听传递
383        await self.bot.send_group_msg(group_id=self.group_id, message=message)

发送消息,消息的类型可以是 Message | MessageSegment | str

def pack_messages(self, bot_id, messages):
385    def pack_messages(self, bot_id, messages):
386        '''转换为cqhttp node格式'''
387        data: List[MessageSegment] = []
388        for m in messages:
389            if isinstance(m, MessageSegment) and m.type == "node":
390                data.append(m)
391            else:
392                m = MessageSegment.node_custom(
393                    user_id=bot_id,
394                    nickname="Ayaka Bot",
395                    content=str(m)
396                )
397                data.append(m)
398        return data

转换为cqhttp node格式

async def send_many(self, messages):
400    async def send_many(self, messages):
401        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
402        # 分割长消息组(不可超过100条
403        div_len = 100
404        div_cnt = ceil(len(messages) / div_len)
405        for i in range(div_cnt):
406            msgs = self.pack_messages(
407                self.bot_id,
408                messages[i*div_len: (i+1)*div_len]
409            )
410            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)

发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]

def t_check(self, bot_id: int):
412    def t_check(self, bot_id: int):
413        # 未连接
414        bot = get_bot(bot_id)
415        if not bot:
416            logger.warning(f"BOT({bot_id}) 未连接")
417            return
418
419        return bot
async def t_send(self, bot_id: int, group_id: int, message):
421    async def t_send(self, bot_id: int, group_id: int, message):
422        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
423        bot = self.t_check(bot_id)
424        if not bot:
425            return
426
427        await bot.send_group_msg(group_id=group_id, message=message)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

async def t_send_many(self, bot_id: int, group_id: int, messages):
429    async def t_send_many(self, bot_id: int, group_id: int, messages):
430        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
431        bot = self.t_check(bot_id)
432        if not bot:
433            return
434
435        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
436        div_len = 80
437        div_cnt = ceil(len(messages) / div_len)
438        for i in range(div_cnt):
439            msgs = self.pack_messages(
440                bot_id,
441                messages[i*div_len: (i+1)*div_len]
442            )
443            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

def get_bot(bot_id: int):
446def get_bot(bot_id: int):
447    '''获取已连接的bot'''
448    bot_id = str(bot_id)
449    for bot in bot_list:
450        if bot.self_id == bot_id:
451            return bot

获取已连接的bot

def get_group(bot_id: int, group_id: int):
454def get_group(bot_id: int, group_id: int):
455    '''获取对应的AyakaGroup对象,自动增加'''
456    for group in group_list:
457        if group.bot_id == bot_id and group.group_id == group_id:
458            break
459    else:
460        group = AyakaGroup(bot_id, group_id)
461    return group

获取对应的AyakaGroup对象,自动增加

def get_app(app_name: str):
464def get_app(app_name: str):
465    for app in app_list:
466        if app.name == app_name:
467            return app
async def deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
470async def deal_event(bot: Bot, event: MessageEvent):
471    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
472    if ayaka_root_config.exclude_old_msg:
473        time_i = int(datetime.datetime.now().timestamp())
474        if event.time < time_i - 60:
475            return
476
477    _bot.set(bot)
478    _event.set(event)
479
480    bot_id = int(bot.self_id)
481
482    if isinstance(event, GroupMessageEvent):
483        group_id = event.group_id
484        await deal_group(bot_id, group_id)
485
486    else:
487        id = event.user_id
488        group_ids = private_listener_dict.get(id, [])
489        ts = [asyncio.create_task(deal_group(bot_id, group_id))
490              for group_id in group_ids]
491        await asyncio.gather(*ts)

处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组

async def deal_group(bot_id: int, group_id: int):
494async def deal_group(bot_id: int, group_id: int):
495    prefix = ayaka_root_config.prefix
496
497    # 群组
498    group = get_group(bot_id, group_id)
499    _group.set(group)
500
501    # 消息
502    message = _event.get().message
503    _message.set(message)
504
505    # 从子状态开始向上查找可用的触发
506    state = group.state
507    cascade_triggers = get_cascade_triggers(state, 0)
508
509    # 命令
510    # 消息前缀文本
511    first = get_first(message)
512    if first.startswith(prefix):
513        first = first[len(prefix):]
514        for ts in cascade_triggers:
515            if await deal_cmd_triggers(ts, message, first, state):
516                return
517
518    # 命令退化成消息
519    for ts in cascade_triggers:
520        if await deal_text_triggers(ts, message, state):
521            return
async def deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
524async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
525    sep = ayaka_root_config.separate
526
527    # 命令
528    temp = [t for t in triggers if t.cmds]
529    # 根据命令长度排序,长命令优先级更高
530    cmd_ts = [(c, t) for t in temp for c in t.cmds]
531    cmd_ts.sort(key=lambda x: len(x[0]), reverse=1)
532
533    for c, t in cmd_ts:
534        if first.startswith(c):
535            # 设置上下文
536            # 设置命令
537            _cmd.set(c)
538            # 设置参数
539            left = first[len(c):].lstrip(sep)
540            if left:
541                arg = Message([MessageSegment.text(left), *message[1:]])
542            else:
543                arg = Message(message[1:])
544            _arg.set(arg)
545            _args.set(divide_message(arg))
546
547            # 触发
548            log_trigger(c, t.app.name, state, t.func.__name__)
549            await t.run()
550
551            # 阻断后续
552            if t.block:
553                return True
async def deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
556async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
557    # 消息
558    text_ts = [t for t in triggers if not t.cmds]
559
560    # 设置上下文
561    # 设置命令
562    _cmd.set("")
563    # 设置参数
564    _arg.set(message)
565    _args.set(divide_message(message))
566
567    for t in text_ts:
568        # 触发
569        log_trigger("", t.app.name, state, t.func.__name__)
570        await t.run()
571
572        # 阻断后续
573        if t.block:
574            return True
def get_cascade_triggers(state: ayaka.state.AyakaState, deep: int = 0):
577def get_cascade_triggers(state: AyakaState, deep: int = 0):
578    # 根据深度筛选funcs
579    ts = [
580        t for t in state.triggers
581        if t.deep == "all" or t.deep >= deep
582    ]
583    cascade_triggers = [ts]
584
585    # 获取父状态的方法
586    if state.parent:
587        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
588
589    # 排除空项
590    cascade_triggers = [ts for ts in cascade_triggers if ts]
591    return cascade_triggers
def log_trigger(cmd, app_name, state, func_name):
594def log_trigger(cmd, app_name, state, func_name):
595    '''日志记录'''
596    items = []
597    items.append(f"状态:<c>{state}</c>")
598    items.append(f"应用:<y>{app_name}</y>")
599    if cmd:
600        items.append(f"命令:<y>{cmd}</y>")
601    else:
602        items.append("命令:<g>无</g>")
603    items.append(f"回调:<c>{func_name}</c>")
604    info = " | ".join(items)
605    logger.opt(colors=True).debug(info)

日志记录

def get_first(message: nonebot.adapters.onebot.v11.message.Message):
608def get_first(message: Message):
609    first = ""
610    for m in message:
611        if m.type == "text":
612            first += str(m)
613        else:
614            break
615    return first
def divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
618def divide_message(message: Message) -> List[MessageSegment]:
619    args = []
620    sep = ayaka_root_config.separate
621
622    for m in message:
623        if m.is_text():
624            ss = str(m).split(sep)
625            args.extend(MessageSegment.text(s) for s in ss if s)
626        else:
627            args.append(m)
628
629    return args
def regist_func(app: ayaka.ayaka.AyakaApp, func):
632def regist_func(app: AyakaApp, func):
633    '''注册回调'''
634    # 默认是无状态应用,从root开始触发
635    states: List[AyakaState] = getattr(func, "states", [root_state])
636    # 默认是消息响应
637    cmds: List[str] = getattr(func, "cmds", [])
638    # 默认监听深度为0
639    deep: int = getattr(func, "deep", 0)
640    # 默认阻断
641    block: bool = getattr(func, "block", True)
642
643    # 注册
644    for s in states:
645        s.on_cmd(cmds, app, deep, block)(func)
646
647    return func

注册回调

@driver.on_startup
async def startup():
653@driver.on_startup
654async def startup():
655    # 注册所有回调
656    for app in app_list:
657        for func in app.funcs:
658            regist_func(app, func)
659
660    if ayaka_root_config.debug:
661        s = json.dumps(
662            root_state.dict(), ensure_ascii=0,
663            indent=4, default=repr
664        )
665        path = ayaka_data_path / "all_state.json"
666        with path.open("w+", encoding="utf8") as f:
667            f.write(s)