ayaka.ayaka

ayaka核心

  1'''ayaka核心'''
  2import asyncio
  3import datetime
  4import inspect
  5import json
  6from math import ceil
  7from pathlib import Path
  8from loguru import logger
  9from typing import List, Dict, Literal, Union
 10from .depend import AyakaCache
 11from .config import ayaka_root_config, ayaka_data_path
 12from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, _enter_exit_during, app_list, group_list, bot_list, private_listener_dict
 13from .driver import on_message, get_driver, Message, MessageSegment, Bot, MessageEvent, GroupMessageEvent
 14from .state import AyakaState, AyakaTrigger, root_state
 15from .on import AyakaOn, AyakaTimer
 16
 17
 18class AyakaGroup:
 19    def __repr__(self) -> str:
 20        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
 21
 22    def __init__(self, bot_id: int, group_id: int) -> None:
 23        self.bot_id = bot_id
 24        self.group_id = group_id
 25        self.state = root_state
 26
 27        # 添加app,并分配独立数据空间
 28        self.apps: List["AyakaApp"] = []
 29        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
 30        for app in app_list:
 31            # if app.name not in forbid_names:
 32            self.apps.append(app)
 33            self.cache_dict[app.name] = {}
 34
 35        group_list.append(self)
 36
 37        if ayaka_root_config.debug:
 38            print(self)
 39
 40    async def back(self):
 41        if self.state.parent:
 42            await self.state.exit()
 43            self.state = self.state.parent
 44            return self.state
 45
 46    async def goto(self, state: AyakaState):
 47        if _enter_exit_during.get() > 0:
 48            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
 49
 50        keys = state.keys
 51
 52        # 找到第一个不同的结点
 53        n0 = len(keys)
 54        n1 = len(self.state.keys)
 55        n = min(n0, n1)
 56        for i in range(n):
 57            if keys[i] != self.state.keys[i]:
 58                break
 59        else:
 60            i += 1
 61
 62        # 回退
 63        for j in range(i, n1):
 64            await self.back()
 65        keys = keys[i:]
 66
 67        # 重新出发
 68        for key in keys:
 69            self.state = self.state[key]
 70            await self.state.enter()
 71        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
 72        return self.state
 73
 74    def get_app(self, name: str):
 75        '''根据app名获取该group所启用的app,不存在则返回None'''
 76        for app in self.apps:
 77            if app.name == name:
 78                return app
 79
 80
 81class AyakaApp:
 82    def __repr__(self) -> str:
 83        return f"{self.__class__.__name__}({self.name})"
 84
 85    def __init__(self, name: str) -> None:
 86        self.path = Path(inspect.stack()[1].filename)
 87
 88        for app in app_list:
 89            if app.name == name:
 90                raise Exception(
 91                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 92
 93        self.name = name
 94        self.ayaka_root_config = ayaka_root_config
 95        self.funcs = []
 96        self.on = AyakaOn(self)
 97        self.timers: List[AyakaTimer] = []
 98        self.root_state = root_state
 99        self._intro = "没有介绍"
100        self.state_helps: Dict[str, List[str]] = {}
101        self.idle_helps: List[str] = []
102
103        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
104        app_list.append(self)
105        if ayaka_root_config.debug:
106            print(self)
107
108    @property
109    def intro(self):
110        help = self._intro
111        for h in self.idle_helps:
112            help += "\n" + h
113        return help
114
115    @property
116    def all_help(self):
117        help = self.intro
118        for s, hs in self.state_helps.items():
119            help += f"\n[{s}]"
120            for h in hs:
121                help += "\n" + h
122        return help
123
124    @property
125    def help(self):
126        '''获取当前状态下的帮助,没有找到则返回介绍'''
127        total_triggers = get_cascade_triggers(self.state)
128
129        helps = []
130        cmds = []
131        for ts in total_triggers:
132            flag = 1
133            for t in ts:
134                if t.app == self:
135                    for c in t.cmds:
136                        if c in cmds:
137                            break
138                    else:
139                        if flag:
140                            helps.append(f"[{t.state[1:]}]")
141                            flag = 0
142                        cmds.extend(t.cmds)
143                        helps.append(t.help)
144
145        if not helps:
146            return self.intro
147        return "\n".join(helps)
148
149    @help.setter
150    def help(self, help: str):
151        self._intro = help
152
153    @property
154    def user_name(self):
155        '''*timer触发时不可用*
156
157        当前消息的发送人的群名片或昵称
158        '''
159        s = self.event.sender
160        name = s.card or s.nickname
161        return name
162
163    @property
164    def user_id(self):
165        '''*timer触发时不可用*
166
167        当前消息的发送人的uid
168        '''
169        return self.event.user_id
170
171    @property
172    def bot(self):
173        '''*timer触发时不可用*
174
175        当前bot
176        '''
177        return _bot.get()
178
179    @property
180    def event(self):
181        '''*timer触发时不可用*
182
183        当前消息
184        '''
185        return _event.get()
186
187    @property
188    def cache(self):
189        '''*timer触发时不可用*
190
191        当前群组的缓存空间'''
192        return self.group.cache_dict[self.name]
193
194    @property
195    def group_id(self):
196        '''*timer触发时不可用*
197
198        当前群组的id
199
200        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
201        '''
202        return self.group.group_id
203
204    @property
205    def bot_id(self):
206        '''*timer触发时不可用*
207
208        当前bot的id
209        '''
210        return self.group.bot_id
211
212    @property
213    def group(self):
214        '''*timer触发时不可用*
215
216        当前群组
217
218        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
219        '''
220        return _group.get()
221
222    @property
223    def arg(self):
224        '''*timer触发时不可用*
225
226        当前消息在移除了命令后的剩余部分
227        '''
228        return _arg.get()
229
230    @property
231    def args(self):
232        '''*timer触发时不可用*
233
234        当前消息在移除了命令后,剩余部分按照空格分割后的数组
235
236        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
237        '''
238        return _args.get()
239
240    @property
241    def cmd(self):
242        '''*timer触发时不可用*
243
244        当前消息的命令头
245        '''
246        return _cmd.get()
247
248    @property
249    def message(self):
250        '''*timer触发时不可用*
251
252        当前消息
253        '''
254        return _message.get()
255
256    @property
257    def state(self):
258        return self.group.state
259
260    def get_state(self, *keys: str):
261        '''
262            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
263
264            >>> get_state(key1, key2) -> [root.test].key1.key2
265
266            特别的,keys可以为空,例如:
267
268            >>> get_state() -> [root.test]
269        '''
270        keys = [self.name, *keys]
271        return root_state.join(*keys)
272
273    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
274        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
275        return await self.goto(state, *keys)
276
277    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
278        # keys为兼容旧API(0.5.2及以前
279        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
280        if isinstance(state, str):
281            if "." in state:
282                state = self.get_state(*state.split("."))
283            else:
284                state = self.get_state(state, *keys)
285        elif isinstance(state, list):
286            state = self.get_state(*state)
287        return await self.group.goto(state)
288
289    async def back(self):
290        '''回退当前群组的状态'''
291        return await self.group.back()
292
293    def _add_func(self, func):
294        '''如果不存在就加入self.funcs'''
295        if func not in self.funcs:
296            self.funcs.append(func)
297
298    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
299        '''注册深度监听'''
300        def decorator(func):
301            func.deep = deep
302            self._add_func(func)
303            return func
304        return decorator
305
306    def on_no_block(self, block: bool = False):
307        '''注册非阻断'''
308        def decorator(func):
309            func.block = block
310            self._add_func(func)
311            return func
312        return decorator
313
314    def on_cmd(self, *cmds: str):
315        '''注册命令触发,不填写命令则视为文本消息'''
316        def decorator(func):
317            func.cmds = cmds
318            self._add_func(func)
319            return func
320        return decorator
321
322    def on_text(self):
323        '''注册消息触发'''
324        def decorator(func):
325            self._add_func(func)
326            return func
327        return decorator
328
329    def on_state(self, *states: Union[AyakaState, str, List[str]]):
330        '''注册有状态响应,不填写states则为root.插件名状态'''
331        _states = []
332        if not states:
333            states = [[]]
334        for s in states:
335            if isinstance(s, str):
336                s = self.get_state(s)
337            elif isinstance(s, list):
338                s = self.get_state(*s)
339            _states.append(s)
340
341        def decorator(func):
342            func.states = _states
343            self._add_func(func)
344            return func
345        return decorator
346
347    def on_idle(self):
348        '''注册根结点回调'''
349        return self.on_state(self.root_state)
350
351    def set_start_cmds(self, *cmds: str):
352        '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式'''
353        @self.on_idle()
354        @self.on_cmd(*cmds)
355        async def start():
356            '''打开应用'''
357            await self.start()
358
359    def set_close_cmds(self, *cmds: str):
360        '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式'''
361        @self.on_state()
362        @self.on_deep_all()
363        @self.on_cmd(*cmds)
364        async def close():
365            '''关闭应用'''
366            await self.close()
367
368    async def start(self, state: str = None):
369        '''*timer触发时不可用*
370
371        启动应用,并发送提示
372
373        state参数为兼容旧API'''
374        if not state:
375            state = self.get_state()
376            await self.goto(state)
377        else:
378            states = state.split(".")
379            await self.goto(*states)
380        await self.send(f"已打开应用 [{self.name}]")
381
382    async def close(self):
383        '''*timer触发时不可用*
384
385        关闭应用,并发送提示'''
386        await self.goto(root_state)
387        await self.send(f"已关闭应用 [{self.name}]")
388
389    def add_listener(self, user_id: int):
390        '''为该群组添加对指定私聊的监听'''
391        private_listener_dict[user_id].append(self.group_id)
392
393    def remove_listener(self, user_id: int = 0):
394        '''默认移除该群组对其他私聊的所有监听'''
395        id = self.group_id
396
397        if user_id == 0:
398            for ids in private_listener_dict.values():
399                if id in ids:
400                    ids.remove(id)
401            return
402
403        if id in private_listener_dict[user_id]:
404            private_listener_dict[user_id].remove(self.group_id)
405
406    async def send(self, message):
407        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
408        # 这里不使用event,因为一些event可能来自其他设备的监听传递
409        await self.bot.send_group_msg(group_id=self.group_id, message=message)
410
411    def pack_messages(self, bot_id, messages):
412        '''转换为cqhttp node格式'''
413        data: List[MessageSegment] = []
414        for m in messages:
415            if isinstance(m, MessageSegment) and m.type == "node":
416                data.append(m)
417            else:
418                m = MessageSegment.node_custom(
419                    user_id=bot_id,
420                    nickname="Ayaka Bot",
421                    content=str(m)
422                )
423                data.append(m)
424        return data
425
426    async def send_many(self, messages):
427        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
428        # 分割长消息组(不可超过100条
429        div_len = 100
430        div_cnt = ceil(len(messages) / div_len)
431        for i in range(div_cnt):
432            msgs = self.pack_messages(
433                self.bot_id,
434                messages[i*div_len: (i+1)*div_len]
435            )
436            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
437
438    def t_check(self, bot_id: int):
439        # 未连接
440        bot = get_bot(bot_id)
441        if not bot:
442            logger.warning(f"BOT({bot_id}) 未连接")
443            return
444
445        return bot
446
447    async def t_send(self, bot_id: int, group_id: int, message):
448        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
449        bot = self.t_check(bot_id)
450        if not bot:
451            return
452
453        await bot.send_group_msg(group_id=group_id, message=message)
454
455    async def t_send_many(self, bot_id: int, group_id: int, messages):
456        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
457        bot = self.t_check(bot_id)
458        if not bot:
459            return
460
461        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
462        div_len = 80
463        div_cnt = ceil(len(messages) / div_len)
464        for i in range(div_cnt):
465            msgs = self.pack_messages(
466                bot_id,
467                messages[i*div_len: (i+1)*div_len]
468            )
469            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
470
471
472def get_bot(bot_id: int):
473    '''获取已连接的bot'''
474    bot_id = str(bot_id)
475    for bot in bot_list:
476        if bot.self_id == bot_id:
477            return bot
478
479
480def get_group(bot_id: int, group_id: int):
481    '''获取对应的AyakaGroup对象,自动增加'''
482    for group in group_list:
483        if group.bot_id == bot_id and group.group_id == group_id:
484            break
485    else:
486        group = AyakaGroup(bot_id, group_id)
487    return group
488
489
490def get_app(app_name: str):
491    for app in app_list:
492        if app.name == app_name:
493            return app
494
495
496async def deal_event(bot: Bot, event: MessageEvent):
497    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
498    if ayaka_root_config.exclude_old_msg:
499        time_i = int(datetime.datetime.now().timestamp())
500        if event.time < time_i - 60:
501            return
502
503    _bot.set(bot)
504    _event.set(event)
505
506    bot_id = int(bot.self_id)
507
508    if isinstance(event, GroupMessageEvent):
509        group_id = event.group_id
510        await deal_group(bot_id, group_id)
511
512    else:
513        id = event.user_id
514        group_ids = private_listener_dict.get(id, [])
515        ts = [asyncio.create_task(deal_group(bot_id, group_id))
516              for group_id in group_ids]
517        await asyncio.gather(*ts)
518
519
520async def deal_group(bot_id: int, group_id: int):
521    prefix = ayaka_root_config.prefix
522
523    # 群组
524    group = get_group(bot_id, group_id)
525    _group.set(group)
526
527    # 消息
528    message = _event.get().message
529    _message.set(message)
530
531    # 从子状态开始向上查找可用的触发
532    state = group.state
533    cascade_triggers = get_cascade_triggers(state, 0)
534
535    # 命令
536    # 消息前缀文本
537    first = get_first(message)
538    if first.startswith(prefix):
539        first = first[len(prefix):]
540        for ts in cascade_triggers:
541            if await deal_cmd_triggers(ts, message, first, state):
542                return
543
544    # 命令退化成消息
545    for ts in cascade_triggers:
546        if await deal_text_triggers(ts, message, state):
547            return
548
549
550async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
551    sep = ayaka_root_config.separate
552
553    # 命令
554    temp = [t for t in triggers if t.cmds]
555    # 根据命令长度排序,长命令优先级更高
556    cmd_ts = [(c, t) for t in temp for c in t.cmds]
557    cmd_ts.sort(key=lambda x: len(x[0]), reverse=1)
558
559    for c, t in cmd_ts:
560        if first.startswith(c):
561            # 设置上下文
562            # 设置命令
563            _cmd.set(c)
564            # 设置参数
565            left = first[len(c):].lstrip(sep)
566            if left:
567                arg = Message([MessageSegment.text(left), *message[1:]])
568            else:
569                arg = Message(message[1:])
570            _arg.set(arg)
571            _args.set(divide_message(arg))
572
573            # 触发
574            log_trigger(c, t.app.name, state, t.func.__name__)
575            await t.run()
576
577            # 阻断后续
578            if t.block:
579                return True
580
581
582async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
583    # 消息
584    text_ts = [t for t in triggers if not t.cmds]
585
586    # 设置上下文
587    # 设置命令
588    _cmd.set("")
589    # 设置参数
590    _arg.set(message)
591    _args.set(divide_message(message))
592
593    for t in text_ts:
594        # 触发
595        log_trigger("", t.app.name, state, t.func.__name__)
596        await t.run()
597
598        # 阻断后续
599        if t.block:
600            return True
601
602
603def get_cascade_triggers(state: AyakaState, deep: int = 0):
604    # 根据深度筛选funcs
605    ts = [
606        t for t in state.triggers
607        if t.deep == "all" or t.deep >= deep
608    ]
609    cascade_triggers = [ts]
610
611    # 获取父状态的方法
612    if state.parent:
613        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
614
615    # 排除空项
616    cascade_triggers = [ts for ts in cascade_triggers if ts]
617    return cascade_triggers
618
619
620def log_trigger(cmd, app_name, state, func_name):
621    '''日志记录'''
622    items = []
623    items.append(f"状态:<c>{state}</c>")
624    items.append(f"应用:<y>{app_name}</y>")
625    if cmd:
626        items.append(f"命令:<y>{cmd}</y>")
627    else:
628        items.append("命令:<g>无</g>")
629    items.append(f"回调:<c>{func_name}</c>")
630    info = " | ".join(items)
631    logger.opt(colors=True).debug(info)
632
633
634def get_first(message: Message):
635    first = ""
636    for m in message:
637        if m.type == "text":
638            first += str(m)
639        else:
640            break
641    return first
642
643
644def divide_message(message: Message) -> List[MessageSegment]:
645    args = []
646    sep = ayaka_root_config.separate
647
648    for m in message:
649        if m.is_text():
650            ss = str(m).split(sep)
651            args.extend(MessageSegment.text(s) for s in ss if s)
652        else:
653            args.append(m)
654
655    return args
656
657
658def regist_func(app: AyakaApp, func):
659    '''注册回调'''
660    # 默认是无状态应用,从root开始触发
661    states: List[AyakaState] = getattr(func, "states", [root_state])
662    # 默认是消息响应
663    cmds: List[str] = getattr(func, "cmds", [])
664    # 默认监听深度为0
665    deep: int = getattr(func, "deep", 0)
666    # 默认阻断
667    block: bool = getattr(func, "block", True)
668
669    # 注册
670    for s in states:
671        s.on_cmd(cmds, app, deep, block)(func)
672
673    return func
674
675
676driver = get_driver()
677
678
679@driver.on_startup
680async def startup():
681    # 注册所有回调
682    for app in app_list:
683        for func in app.funcs:
684            regist_func(app, func)
685
686    if ayaka_root_config.debug:
687        s = json.dumps(
688            root_state.dict(), ensure_ascii=0,
689            indent=4, default=repr
690        )
691        path = ayaka_data_path / "all_state.json"
692        with path.open("w+", encoding="utf8") as f:
693            f.write(s)
694
695on_message(priority=20, block=False, handlers=[deal_event])
class AyakaGroup:
19class AyakaGroup:
20    def __repr__(self) -> str:
21        return f"{self.__class__.__name__}({self.bot_id}, {self.group_id}, {self.apps})"
22
23    def __init__(self, bot_id: int, group_id: int) -> None:
24        self.bot_id = bot_id
25        self.group_id = group_id
26        self.state = root_state
27
28        # 添加app,并分配独立数据空间
29        self.apps: List["AyakaApp"] = []
30        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
31        for app in app_list:
32            # if app.name not in forbid_names:
33            self.apps.append(app)
34            self.cache_dict[app.name] = {}
35
36        group_list.append(self)
37
38        if ayaka_root_config.debug:
39            print(self)
40
41    async def back(self):
42        if self.state.parent:
43            await self.state.exit()
44            self.state = self.state.parent
45            return self.state
46
47    async def goto(self, state: AyakaState):
48        if _enter_exit_during.get() > 0:
49            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
50
51        keys = state.keys
52
53        # 找到第一个不同的结点
54        n0 = len(keys)
55        n1 = len(self.state.keys)
56        n = min(n0, n1)
57        for i in range(n):
58            if keys[i] != self.state.keys[i]:
59                break
60        else:
61            i += 1
62
63        # 回退
64        for j in range(i, n1):
65            await self.back()
66        keys = keys[i:]
67
68        # 重新出发
69        for key in keys:
70            self.state = self.state[key]
71            await self.state.enter()
72        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
73        return self.state
74
75    def get_app(self, name: str):
76        '''根据app名获取该group所启用的app,不存在则返回None'''
77        for app in self.apps:
78            if app.name == name:
79                return app
AyakaGroup(bot_id: int, group_id: int)
23    def __init__(self, bot_id: int, group_id: int) -> None:
24        self.bot_id = bot_id
25        self.group_id = group_id
26        self.state = root_state
27
28        # 添加app,并分配独立数据空间
29        self.apps: List["AyakaApp"] = []
30        self.cache_dict: Dict[str, Dict[str, AyakaCache]] = {}
31        for app in app_list:
32            # if app.name not in forbid_names:
33            self.apps.append(app)
34            self.cache_dict[app.name] = {}
35
36        group_list.append(self)
37
38        if ayaka_root_config.debug:
39            print(self)
async def back(self):
41    async def back(self):
42        if self.state.parent:
43            await self.state.exit()
44            self.state = self.state.parent
45            return self.state
async def goto(self, state: ayaka.state.AyakaState):
47    async def goto(self, state: AyakaState):
48        if _enter_exit_during.get() > 0:
49            logger.warning("你正在AyakaState的enter/exit方法中进行状态转移,这可能会导致无法预料的错误")
50
51        keys = state.keys
52
53        # 找到第一个不同的结点
54        n0 = len(keys)
55        n1 = len(self.state.keys)
56        n = min(n0, n1)
57        for i in range(n):
58            if keys[i] != self.state.keys[i]:
59                break
60        else:
61            i += 1
62
63        # 回退
64        for j in range(i, n1):
65            await self.back()
66        keys = keys[i:]
67
68        # 重新出发
69        for key in keys:
70            self.state = self.state[key]
71            await self.state.enter()
72        logger.opt(colors=True).debug(f"状态:<c>{self.state}</c>")
73        return self.state
def get_app(self, name: str):
75    def get_app(self, name: str):
76        '''根据app名获取该group所启用的app,不存在则返回None'''
77        for app in self.apps:
78            if app.name == name:
79                return app

根据app名获取该group所启用的app,不存在则返回None

class AyakaApp:
 82class AyakaApp:
 83    def __repr__(self) -> str:
 84        return f"{self.__class__.__name__}({self.name})"
 85
 86    def __init__(self, name: str) -> None:
 87        self.path = Path(inspect.stack()[1].filename)
 88
 89        for app in app_list:
 90            if app.name == name:
 91                raise Exception(
 92                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 93
 94        self.name = name
 95        self.ayaka_root_config = ayaka_root_config
 96        self.funcs = []
 97        self.on = AyakaOn(self)
 98        self.timers: List[AyakaTimer] = []
 99        self.root_state = root_state
100        self._intro = "没有介绍"
101        self.state_helps: Dict[str, List[str]] = {}
102        self.idle_helps: List[str] = []
103
104        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
105        app_list.append(self)
106        if ayaka_root_config.debug:
107            print(self)
108
109    @property
110    def intro(self):
111        help = self._intro
112        for h in self.idle_helps:
113            help += "\n" + h
114        return help
115
116    @property
117    def all_help(self):
118        help = self.intro
119        for s, hs in self.state_helps.items():
120            help += f"\n[{s}]"
121            for h in hs:
122                help += "\n" + h
123        return help
124
125    @property
126    def help(self):
127        '''获取当前状态下的帮助,没有找到则返回介绍'''
128        total_triggers = get_cascade_triggers(self.state)
129
130        helps = []
131        cmds = []
132        for ts in total_triggers:
133            flag = 1
134            for t in ts:
135                if t.app == self:
136                    for c in t.cmds:
137                        if c in cmds:
138                            break
139                    else:
140                        if flag:
141                            helps.append(f"[{t.state[1:]}]")
142                            flag = 0
143                        cmds.extend(t.cmds)
144                        helps.append(t.help)
145
146        if not helps:
147            return self.intro
148        return "\n".join(helps)
149
150    @help.setter
151    def help(self, help: str):
152        self._intro = help
153
154    @property
155    def user_name(self):
156        '''*timer触发时不可用*
157
158        当前消息的发送人的群名片或昵称
159        '''
160        s = self.event.sender
161        name = s.card or s.nickname
162        return name
163
164    @property
165    def user_id(self):
166        '''*timer触发时不可用*
167
168        当前消息的发送人的uid
169        '''
170        return self.event.user_id
171
172    @property
173    def bot(self):
174        '''*timer触发时不可用*
175
176        当前bot
177        '''
178        return _bot.get()
179
180    @property
181    def event(self):
182        '''*timer触发时不可用*
183
184        当前消息
185        '''
186        return _event.get()
187
188    @property
189    def cache(self):
190        '''*timer触发时不可用*
191
192        当前群组的缓存空间'''
193        return self.group.cache_dict[self.name]
194
195    @property
196    def group_id(self):
197        '''*timer触发时不可用*
198
199        当前群组的id
200
201        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id
202        '''
203        return self.group.group_id
204
205    @property
206    def bot_id(self):
207        '''*timer触发时不可用*
208
209        当前bot的id
210        '''
211        return self.group.bot_id
212
213    @property
214    def group(self):
215        '''*timer触发时不可用*
216
217        当前群组
218
219        注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A
220        '''
221        return _group.get()
222
223    @property
224    def arg(self):
225        '''*timer触发时不可用*
226
227        当前消息在移除了命令后的剩余部分
228        '''
229        return _arg.get()
230
231    @property
232    def args(self):
233        '''*timer触发时不可用*
234
235        当前消息在移除了命令后,剩余部分按照空格分割后的数组
236
237        注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素
238        '''
239        return _args.get()
240
241    @property
242    def cmd(self):
243        '''*timer触发时不可用*
244
245        当前消息的命令头
246        '''
247        return _cmd.get()
248
249    @property
250    def message(self):
251        '''*timer触发时不可用*
252
253        当前消息
254        '''
255        return _message.get()
256
257    @property
258    def state(self):
259        return self.group.state
260
261    def get_state(self, *keys: str):
262        '''
263            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
264
265            >>> get_state(key1, key2) -> [root.test].key1.key2
266
267            特别的,keys可以为空,例如:
268
269            >>> get_state() -> [root.test]
270        '''
271        keys = [self.name, *keys]
272        return root_state.join(*keys)
273
274    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
275        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
276        return await self.goto(state, *keys)
277
278    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
279        # keys为兼容旧API(0.5.2及以前
280        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
281        if isinstance(state, str):
282            if "." in state:
283                state = self.get_state(*state.split("."))
284            else:
285                state = self.get_state(state, *keys)
286        elif isinstance(state, list):
287            state = self.get_state(*state)
288        return await self.group.goto(state)
289
290    async def back(self):
291        '''回退当前群组的状态'''
292        return await self.group.back()
293
294    def _add_func(self, func):
295        '''如果不存在就加入self.funcs'''
296        if func not in self.funcs:
297            self.funcs.append(func)
298
299    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
300        '''注册深度监听'''
301        def decorator(func):
302            func.deep = deep
303            self._add_func(func)
304            return func
305        return decorator
306
307    def on_no_block(self, block: bool = False):
308        '''注册非阻断'''
309        def decorator(func):
310            func.block = block
311            self._add_func(func)
312            return func
313        return decorator
314
315    def on_cmd(self, *cmds: str):
316        '''注册命令触发,不填写命令则视为文本消息'''
317        def decorator(func):
318            func.cmds = cmds
319            self._add_func(func)
320            return func
321        return decorator
322
323    def on_text(self):
324        '''注册消息触发'''
325        def decorator(func):
326            self._add_func(func)
327            return func
328        return decorator
329
330    def on_state(self, *states: Union[AyakaState, str, List[str]]):
331        '''注册有状态响应,不填写states则为root.插件名状态'''
332        _states = []
333        if not states:
334            states = [[]]
335        for s in states:
336            if isinstance(s, str):
337                s = self.get_state(s)
338            elif isinstance(s, list):
339                s = self.get_state(*s)
340            _states.append(s)
341
342        def decorator(func):
343            func.states = _states
344            self._add_func(func)
345            return func
346        return decorator
347
348    def on_idle(self):
349        '''注册根结点回调'''
350        return self.on_state(self.root_state)
351
352    def set_start_cmds(self, *cmds: str):
353        '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式'''
354        @self.on_idle()
355        @self.on_cmd(*cmds)
356        async def start():
357            '''打开应用'''
358            await self.start()
359
360    def set_close_cmds(self, *cmds: str):
361        '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式'''
362        @self.on_state()
363        @self.on_deep_all()
364        @self.on_cmd(*cmds)
365        async def close():
366            '''关闭应用'''
367            await self.close()
368
369    async def start(self, state: str = None):
370        '''*timer触发时不可用*
371
372        启动应用,并发送提示
373
374        state参数为兼容旧API'''
375        if not state:
376            state = self.get_state()
377            await self.goto(state)
378        else:
379            states = state.split(".")
380            await self.goto(*states)
381        await self.send(f"已打开应用 [{self.name}]")
382
383    async def close(self):
384        '''*timer触发时不可用*
385
386        关闭应用,并发送提示'''
387        await self.goto(root_state)
388        await self.send(f"已关闭应用 [{self.name}]")
389
390    def add_listener(self, user_id: int):
391        '''为该群组添加对指定私聊的监听'''
392        private_listener_dict[user_id].append(self.group_id)
393
394    def remove_listener(self, user_id: int = 0):
395        '''默认移除该群组对其他私聊的所有监听'''
396        id = self.group_id
397
398        if user_id == 0:
399            for ids in private_listener_dict.values():
400                if id in ids:
401                    ids.remove(id)
402            return
403
404        if id in private_listener_dict[user_id]:
405            private_listener_dict[user_id].remove(self.group_id)
406
407    async def send(self, message):
408        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
409        # 这里不使用event,因为一些event可能来自其他设备的监听传递
410        await self.bot.send_group_msg(group_id=self.group_id, message=message)
411
412    def pack_messages(self, bot_id, messages):
413        '''转换为cqhttp node格式'''
414        data: List[MessageSegment] = []
415        for m in messages:
416            if isinstance(m, MessageSegment) and m.type == "node":
417                data.append(m)
418            else:
419                m = MessageSegment.node_custom(
420                    user_id=bot_id,
421                    nickname="Ayaka Bot",
422                    content=str(m)
423                )
424                data.append(m)
425        return data
426
427    async def send_many(self, messages):
428        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
429        # 分割长消息组(不可超过100条
430        div_len = 100
431        div_cnt = ceil(len(messages) / div_len)
432        for i in range(div_cnt):
433            msgs = self.pack_messages(
434                self.bot_id,
435                messages[i*div_len: (i+1)*div_len]
436            )
437            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
438
439    def t_check(self, bot_id: int):
440        # 未连接
441        bot = get_bot(bot_id)
442        if not bot:
443            logger.warning(f"BOT({bot_id}) 未连接")
444            return
445
446        return bot
447
448    async def t_send(self, bot_id: int, group_id: int, message):
449        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
450        bot = self.t_check(bot_id)
451        if not bot:
452            return
453
454        await bot.send_group_msg(group_id=group_id, message=message)
455
456    async def t_send_many(self, bot_id: int, group_id: int, messages):
457        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
458        bot = self.t_check(bot_id)
459        if not bot:
460            return
461
462        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
463        div_len = 80
464        div_cnt = ceil(len(messages) / div_len)
465        for i in range(div_cnt):
466            msgs = self.pack_messages(
467                bot_id,
468                messages[i*div_len: (i+1)*div_len]
469            )
470            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
 86    def __init__(self, name: str) -> None:
 87        self.path = Path(inspect.stack()[1].filename)
 88
 89        for app in app_list:
 90            if app.name == name:
 91                raise Exception(
 92                    f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)")
 93
 94        self.name = name
 95        self.ayaka_root_config = ayaka_root_config
 96        self.funcs = []
 97        self.on = AyakaOn(self)
 98        self.timers: List[AyakaTimer] = []
 99        self.root_state = root_state
100        self._intro = "没有介绍"
101        self.state_helps: Dict[str, List[str]] = {}
102        self.idle_helps: List[str] = []
103
104        logger.opt(colors=True).success(f"应用加载成功 \"<c>{name}</c>\"")
105        app_list.append(self)
106        if ayaka_root_config.debug:
107            print(self)
help

获取当前状态下的帮助,没有找到则返回介绍

user_name

timer触发时不可用

当前消息的发送人的群名片或昵称

user_id

timer触发时不可用

当前消息的发送人的uid

bot

timer触发时不可用

当前bot

event

timer触发时不可用

当前消息

cache

timer触发时不可用

当前群组的缓存空间

group_id

timer触发时不可用

当前群组的id

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id

bot_id

timer触发时不可用

当前bot的id

group

timer触发时不可用

当前群组

注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A

arg

timer触发时不可用

当前消息在移除了命令后的剩余部分

args

timer触发时不可用

当前消息在移除了命令后,剩余部分按照空格分割后的数组

注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素

cmd

timer触发时不可用

当前消息的命令头

message

timer触发时不可用

当前消息

def get_state(self, *keys: str):
261    def get_state(self, *keys: str):
262        '''
263            假设当前状态为 `root.test.a` 即 `根.插件名.一级菜单项`
264
265            >>> get_state(key1, key2) -> [root.test].key1.key2
266
267            特别的,keys可以为空,例如:
268
269            >>> get_state() -> [root.test]
270        '''
271        keys = [self.name, *keys]
272        return root_state.join(*keys)

假设当前状态为 root.test.a根.插件名.一级菜单项

>>> get_state(key1, key2) -> [root.test].key1.key2

特别的,keys可以为空,例如:

>>> get_state() -> [root.test]
async def set_state( self, state: Union[ayaka.state.AyakaState, str, List[str]], *keys: str):
274    async def set_state(self, state: Union[AyakaState, str, List[str]], *keys: str):
275        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
276        return await self.goto(state, *keys)

变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割

async def goto( self, state: Union[ayaka.state.AyakaState, str, List[str]], *keys: str):
278    async def goto(self, state: Union[AyakaState, str, List[str]], *keys: str):
279        # keys为兼容旧API(0.5.2及以前
280        '''变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割'''
281        if isinstance(state, str):
282            if "." in state:
283                state = self.get_state(*state.split("."))
284            else:
285                state = self.get_state(state, *keys)
286        elif isinstance(state, list):
287            state = self.get_state(*state)
288        return await self.group.goto(state)

变更当前群组的状态,state可以是AyakaState、字符串或字符串列表,若字符串内包含.符号,还会自动对其进行分割

async def back(self):
290    async def back(self):
291        '''回退当前群组的状态'''
292        return await self.group.back()

回退当前群组的状态

def on_deep_all(self, deep: Union[int, Literal['all']] = 'all'):
299    def on_deep_all(self, deep: Union[int, Literal["all"]] = "all"):
300        '''注册深度监听'''
301        def decorator(func):
302            func.deep = deep
303            self._add_func(func)
304            return func
305        return decorator

注册深度监听

def on_no_block(self, block: bool = False):
307    def on_no_block(self, block: bool = False):
308        '''注册非阻断'''
309        def decorator(func):
310            func.block = block
311            self._add_func(func)
312            return func
313        return decorator

注册非阻断

def on_cmd(self, *cmds: str):
315    def on_cmd(self, *cmds: str):
316        '''注册命令触发,不填写命令则视为文本消息'''
317        def decorator(func):
318            func.cmds = cmds
319            self._add_func(func)
320            return func
321        return decorator

注册命令触发,不填写命令则视为文本消息

def on_text(self):
323    def on_text(self):
324        '''注册消息触发'''
325        def decorator(func):
326            self._add_func(func)
327            return func
328        return decorator

注册消息触发

def on_state(self, *states: Union[ayaka.state.AyakaState, str, List[str]]):
330    def on_state(self, *states: Union[AyakaState, str, List[str]]):
331        '''注册有状态响应,不填写states则为root.插件名状态'''
332        _states = []
333        if not states:
334            states = [[]]
335        for s in states:
336            if isinstance(s, str):
337                s = self.get_state(s)
338            elif isinstance(s, list):
339                s = self.get_state(*s)
340            _states.append(s)
341
342        def decorator(func):
343            func.states = _states
344            self._add_func(func)
345            return func
346        return decorator

注册有状态响应,不填写states则为root.插件名状态

def on_idle(self):
348    def on_idle(self):
349        '''注册根结点回调'''
350        return self.on_state(self.root_state)

注册根结点回调

def set_start_cmds(self, *cmds: str):
352    def set_start_cmds(self, *cmds: str):
353        '''设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式'''
354        @self.on_idle()
355        @self.on_cmd(*cmds)
356        async def start():
357            '''打开应用'''
358            await self.start()

设置应用启动命令,当然,你也可以通过app.on_cmd自定义启动方式

def set_close_cmds(self, *cmds: str):
360    def set_close_cmds(self, *cmds: str):
361        '''设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式'''
362        @self.on_state()
363        @self.on_deep_all()
364        @self.on_cmd(*cmds)
365        async def close():
366            '''关闭应用'''
367            await self.close()

设置应用关闭命令,当然,你也可以通过app.on_cmd自定义关闭方式

async def start(self, state: str = None):
369    async def start(self, state: str = None):
370        '''*timer触发时不可用*
371
372        启动应用,并发送提示
373
374        state参数为兼容旧API'''
375        if not state:
376            state = self.get_state()
377            await self.goto(state)
378        else:
379            states = state.split(".")
380            await self.goto(*states)
381        await self.send(f"已打开应用 [{self.name}]")

timer触发时不可用

启动应用,并发送提示

state参数为兼容旧API

async def close(self):
383    async def close(self):
384        '''*timer触发时不可用*
385
386        关闭应用,并发送提示'''
387        await self.goto(root_state)
388        await self.send(f"已关闭应用 [{self.name}]")

timer触发时不可用

关闭应用,并发送提示

def add_listener(self, user_id: int):
390    def add_listener(self, user_id: int):
391        '''为该群组添加对指定私聊的监听'''
392        private_listener_dict[user_id].append(self.group_id)

为该群组添加对指定私聊的监听

def remove_listener(self, user_id: int = 0):
394    def remove_listener(self, user_id: int = 0):
395        '''默认移除该群组对其他私聊的所有监听'''
396        id = self.group_id
397
398        if user_id == 0:
399            for ids in private_listener_dict.values():
400                if id in ids:
401                    ids.remove(id)
402            return
403
404        if id in private_listener_dict[user_id]:
405            private_listener_dict[user_id].remove(self.group_id)

默认移除该群组对其他私聊的所有监听

async def send(self, message):
407    async def send(self, message):
408        '''发送消息,消息的类型可以是 Message | MessageSegment | str'''
409        # 这里不使用event,因为一些event可能来自其他设备的监听传递
410        await self.bot.send_group_msg(group_id=self.group_id, message=message)

发送消息,消息的类型可以是 Message | MessageSegment | str

def pack_messages(self, bot_id, messages):
412    def pack_messages(self, bot_id, messages):
413        '''转换为cqhttp node格式'''
414        data: List[MessageSegment] = []
415        for m in messages:
416            if isinstance(m, MessageSegment) and m.type == "node":
417                data.append(m)
418            else:
419                m = MessageSegment.node_custom(
420                    user_id=bot_id,
421                    nickname="Ayaka Bot",
422                    content=str(m)
423                )
424                data.append(m)
425        return data

转换为cqhttp node格式

async def send_many(self, messages):
427    async def send_many(self, messages):
428        '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]'''
429        # 分割长消息组(不可超过100条
430        div_len = 100
431        div_cnt = ceil(len(messages) / div_len)
432        for i in range(div_cnt):
433            msgs = self.pack_messages(
434                self.bot_id,
435                messages[i*div_len: (i+1)*div_len]
436            )
437            await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)

发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]

def t_check(self, bot_id: int):
439    def t_check(self, bot_id: int):
440        # 未连接
441        bot = get_bot(bot_id)
442        if not bot:
443            logger.warning(f"BOT({bot_id}) 未连接")
444            return
445
446        return bot
async def t_send(self, bot_id: int, group_id: int, message):
448    async def t_send(self, bot_id: int, group_id: int, message):
449        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
450        bot = self.t_check(bot_id)
451        if not bot:
452            return
453
454        await bot.send_group_msg(group_id=group_id, message=message)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

async def t_send_many(self, bot_id: int, group_id: int, messages):
456    async def t_send_many(self, bot_id: int, group_id: int, messages):
457        '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用'''
458        bot = self.t_check(bot_id)
459        if not bot:
460            return
461
462        # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度
463        div_len = 80
464        div_cnt = ceil(len(messages) / div_len)
465        for i in range(div_cnt):
466            msgs = self.pack_messages(
467                bot_id,
468                messages[i*div_len: (i+1)*div_len]
469            )
470            await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)

timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用

def get_bot(bot_id: int):
473def get_bot(bot_id: int):
474    '''获取已连接的bot'''
475    bot_id = str(bot_id)
476    for bot in bot_list:
477        if bot.self_id == bot_id:
478            return bot

获取已连接的bot

def get_group(bot_id: int, group_id: int):
481def get_group(bot_id: int, group_id: int):
482    '''获取对应的AyakaGroup对象,自动增加'''
483    for group in group_list:
484        if group.bot_id == bot_id and group.group_id == group_id:
485            break
486    else:
487        group = AyakaGroup(bot_id, group_id)
488    return group

获取对应的AyakaGroup对象,自动增加

def get_app(app_name: str):
491def get_app(app_name: str):
492    for app in app_list:
493        if app.name == app_name:
494            return app
async def deal_event( bot: nonebot.adapters.onebot.v11.bot.Bot, event: nonebot.adapters.onebot.v11.event.MessageEvent):
497async def deal_event(bot: Bot, event: MessageEvent):
498    '''处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组'''
499    if ayaka_root_config.exclude_old_msg:
500        time_i = int(datetime.datetime.now().timestamp())
501        if event.time < time_i - 60:
502            return
503
504    _bot.set(bot)
505    _event.set(event)
506
507    bot_id = int(bot.self_id)
508
509    if isinstance(event, GroupMessageEvent):
510        group_id = event.group_id
511        await deal_group(bot_id, group_id)
512
513    else:
514        id = event.user_id
515        group_ids = private_listener_dict.get(id, [])
516        ts = [asyncio.create_task(deal_group(bot_id, group_id))
517              for group_id in group_ids]
518        await asyncio.gather(*ts)

处理收到的消息,将其分割为cmd和args,设置上下文相关变量的值,并将消息传递给对应的群组

async def deal_group(bot_id: int, group_id: int):
521async def deal_group(bot_id: int, group_id: int):
522    prefix = ayaka_root_config.prefix
523
524    # 群组
525    group = get_group(bot_id, group_id)
526    _group.set(group)
527
528    # 消息
529    message = _event.get().message
530    _message.set(message)
531
532    # 从子状态开始向上查找可用的触发
533    state = group.state
534    cascade_triggers = get_cascade_triggers(state, 0)
535
536    # 命令
537    # 消息前缀文本
538    first = get_first(message)
539    if first.startswith(prefix):
540        first = first[len(prefix):]
541        for ts in cascade_triggers:
542            if await deal_cmd_triggers(ts, message, first, state):
543                return
544
545    # 命令退化成消息
546    for ts in cascade_triggers:
547        if await deal_text_triggers(ts, message, state):
548            return
async def deal_cmd_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, first: str, state: ayaka.state.AyakaState):
551async def deal_cmd_triggers(triggers: List[AyakaTrigger], message: Message, first: str, state: AyakaState):
552    sep = ayaka_root_config.separate
553
554    # 命令
555    temp = [t for t in triggers if t.cmds]
556    # 根据命令长度排序,长命令优先级更高
557    cmd_ts = [(c, t) for t in temp for c in t.cmds]
558    cmd_ts.sort(key=lambda x: len(x[0]), reverse=1)
559
560    for c, t in cmd_ts:
561        if first.startswith(c):
562            # 设置上下文
563            # 设置命令
564            _cmd.set(c)
565            # 设置参数
566            left = first[len(c):].lstrip(sep)
567            if left:
568                arg = Message([MessageSegment.text(left), *message[1:]])
569            else:
570                arg = Message(message[1:])
571            _arg.set(arg)
572            _args.set(divide_message(arg))
573
574            # 触发
575            log_trigger(c, t.app.name, state, t.func.__name__)
576            await t.run()
577
578            # 阻断后续
579            if t.block:
580                return True
async def deal_text_triggers( triggers: List[ayaka.state.AyakaTrigger], message: nonebot.adapters.onebot.v11.message.Message, state: ayaka.state.AyakaState):
583async def deal_text_triggers(triggers: List[AyakaTrigger], message: Message, state: AyakaState):
584    # 消息
585    text_ts = [t for t in triggers if not t.cmds]
586
587    # 设置上下文
588    # 设置命令
589    _cmd.set("")
590    # 设置参数
591    _arg.set(message)
592    _args.set(divide_message(message))
593
594    for t in text_ts:
595        # 触发
596        log_trigger("", t.app.name, state, t.func.__name__)
597        await t.run()
598
599        # 阻断后续
600        if t.block:
601            return True
def get_cascade_triggers(state: ayaka.state.AyakaState, deep: int = 0):
604def get_cascade_triggers(state: AyakaState, deep: int = 0):
605    # 根据深度筛选funcs
606    ts = [
607        t for t in state.triggers
608        if t.deep == "all" or t.deep >= deep
609    ]
610    cascade_triggers = [ts]
611
612    # 获取父状态的方法
613    if state.parent:
614        cascade_triggers.extend(get_cascade_triggers(state.parent, deep+1))
615
616    # 排除空项
617    cascade_triggers = [ts for ts in cascade_triggers if ts]
618    return cascade_triggers
def log_trigger(cmd, app_name, state, func_name):
621def log_trigger(cmd, app_name, state, func_name):
622    '''日志记录'''
623    items = []
624    items.append(f"状态:<c>{state}</c>")
625    items.append(f"应用:<y>{app_name}</y>")
626    if cmd:
627        items.append(f"命令:<y>{cmd}</y>")
628    else:
629        items.append("命令:<g>无</g>")
630    items.append(f"回调:<c>{func_name}</c>")
631    info = " | ".join(items)
632    logger.opt(colors=True).debug(info)

日志记录

def get_first(message: nonebot.adapters.onebot.v11.message.Message):
635def get_first(message: Message):
636    first = ""
637    for m in message:
638        if m.type == "text":
639            first += str(m)
640        else:
641            break
642    return first
def divide_message( message: nonebot.adapters.onebot.v11.message.Message) -> List[nonebot.adapters.onebot.v11.message.MessageSegment]:
645def divide_message(message: Message) -> List[MessageSegment]:
646    args = []
647    sep = ayaka_root_config.separate
648
649    for m in message:
650        if m.is_text():
651            ss = str(m).split(sep)
652            args.extend(MessageSegment.text(s) for s in ss if s)
653        else:
654            args.append(m)
655
656    return args
def regist_func(app: ayaka.ayaka.AyakaApp, func):
659def regist_func(app: AyakaApp, func):
660    '''注册回调'''
661    # 默认是无状态应用,从root开始触发
662    states: List[AyakaState] = getattr(func, "states", [root_state])
663    # 默认是消息响应
664    cmds: List[str] = getattr(func, "cmds", [])
665    # 默认监听深度为0
666    deep: int = getattr(func, "deep", 0)
667    # 默认阻断
668    block: bool = getattr(func, "block", True)
669
670    # 注册
671    for s in states:
672        s.on_cmd(cmds, app, deep, block)(func)
673
674    return func

注册回调

@driver.on_startup
async def startup():
680@driver.on_startup
681async def startup():
682    # 注册所有回调
683    for app in app_list:
684        for func in app.funcs:
685            regist_func(app, func)
686
687    if ayaka_root_config.debug:
688        s = json.dumps(
689            root_state.dict(), ensure_ascii=0,
690            indent=4, default=repr
691        )
692        path = ayaka_data_path / "all_state.json"
693        with path.open("w+", encoding="utf8") as f:
694            f.write(s)