ayaka.state

  1import inspect
  2from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable
  3from typing_extensions import Self
  4
  5from .config import ayaka_root_config
  6from .constant import _enter_exit_during
  7from .depend import AyakaDepend, AyakaInput
  8
  9
 10if TYPE_CHECKING:
 11    from .ayaka import AyakaApp
 12
 13
 14def add_flag():
 15    _enter_exit_during.set(_enter_exit_during.get()+1)
 16
 17
 18def sub_flag():
 19    _enter_exit_during.set(_enter_exit_during.get()-1)
 20
 21
 22class AyakaState:
 23    def __init__(self, key="root", parent: Self = None):
 24        self.key = key
 25        self.parent = parent
 26        if not parent:
 27            self.keys = [key]
 28        else:
 29            self.keys = [*parent.keys, key]
 30        self.children: List[Self] = []
 31
 32        self.enter_funcs = []
 33        self.exit_funcs = []
 34        self.triggers: List[AyakaTrigger] = []
 35
 36    def __getitem__(self, k):
 37        if isinstance(k, slice):
 38            s = AyakaState("")
 39            s.keys = self.keys[k]
 40            if s.keys:
 41                s.key = s.keys[-1]
 42            return s
 43        
 44        if isinstance(k, int):
 45            s = AyakaState(self.keys[k])
 46            return s
 47
 48        for node in self.children:
 49            if node.key == k:
 50                return node
 51        node = self.__class__(k, self)
 52        self.children.append(node)
 53        return node
 54
 55    def __getattr__(self, k: str):
 56        return self[k]
 57
 58    def __iter__(self):
 59        return iter(self.children)
 60
 61    def __str__(self) -> str:
 62        return ".".join(self.keys)
 63
 64    def __repr__(self) -> str:
 65        return f"{self.__class__.__name__}({self})"
 66
 67    def join(self, *keys: str) -> Self:
 68        node = self
 69        for key in keys:
 70            node = node[key]
 71        return node
 72
 73    def belong(self, node: Self):
 74        if len(self.keys) < len(node.keys):
 75            return False
 76
 77        for i in range(len(node.keys)):
 78            if node.keys[i] != self.keys[i]:
 79                return False
 80
 81        return True
 82
 83    def __ge__(self, node: Self):
 84        return self.belong(node)
 85
 86    def __le__(self, node: Self):
 87        return node.belong(self)
 88
 89    def __gt__(self, node: Self):
 90        return self >= node and len(self.keys) > len(node.keys)
 91
 92    def __lt__(self, node: Self):
 93        return self <= node and len(self.keys) < len(node.keys)
 94
 95    def __eq__(self, node: Self):
 96        return self <= node and len(self.keys) == len(node.keys)
 97
 98    def dict(self):
 99        data = [child.dict() for child in self.children]
100        return {
101            "name": self.key,
102            "triggers": self.triggers,
103            "children": data,
104        }
105
106    async def enter(self):
107        if ayaka_root_config.debug:
108            print(">>>", self.key)
109        add_flag()
110        for func in self.enter_funcs:
111            await func()
112        sub_flag()
113
114    async def exit(self):
115        if ayaka_root_config.debug:
116            print("<<<", self.key)
117        add_flag()
118        for func in self.exit_funcs:
119            await func()
120        sub_flag()
121
122    def on_enter(self):
123        def decorator(func):
124            self.enter_funcs.append(func)
125            return func
126        return decorator
127
128    def on_exit(self):
129        def decorator(func):
130            self.exit_funcs.append(func)
131            return func
132        return decorator
133
134    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
135        def decorator(func):
136            t = AyakaTrigger(func, cmds, deep, app, block, self)
137            self.triggers.append(t)
138            return func
139        return decorator
140
141    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
142        return self.on_cmd([], app, deep, block)
143
144
145class AyakaTrigger:
146    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
147        self.func = func
148        self.cmds = cmds
149        self.deep = deep
150        self.app = app
151        self.block = block
152        self.state = state
153
154        # 默认没有解析模型
155        model = None
156        sig = inspect.signature(func)
157        for k, v in sig.parameters.items():
158            cls = v.annotation
159            if issubclass(cls, AyakaInput):
160                model = cls
161                break
162        self.model = model
163
164        # 生成帮助
165        doc = "" if not func.__doc__ else f"| {func.__doc__}"
166        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
167
168        if not model:
169            help = f"- {cmd_str} {doc}"
170        else:
171            data = model.help()
172            keys_str = " ".join(f"<{k}>" for k in data.keys())
173            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
174            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
175
176        self.help = help
177        if len(state.keys) > 1:
178            s = str(state[1:])
179            if s not in self.app.state_helps:
180                self.app.state_helps[s] = []
181            self.app.state_helps[s].append(help)
182        else:
183            self.app.idle_helps.append(help)
184
185        if ayaka_root_config.debug:
186            print(repr(self))
187
188    async def run(self):
189        params = {}
190        sig = inspect.signature(self.func)
191
192        for k, v in sig.parameters.items():
193            cls = v.annotation
194            if issubclass(cls, AyakaDepend):
195                d = await cls._create_by_app(self.app)
196                if not d:
197                    return
198                params[k] = d
199
200        await self.func(**params)
201
202    def __repr__(self) -> str:
203        return f"{self.__class__.__name__}({self})"
204
205    def __str__(self) -> str:
206        data = {
207            "func": self.func.__name__,
208            "app_name": self.app.name,
209            "cmds": self.cmds,
210            "deep": self.deep,
211            "block": self.block
212        }
213        return " ".join(f"{k}={v}" for k, v in data.items())
214
215
216root_state = AyakaState()
def add_flag():
15def add_flag():
16    _enter_exit_during.set(_enter_exit_during.get()+1)
def sub_flag():
19def sub_flag():
20    _enter_exit_during.set(_enter_exit_during.get()-1)
class AyakaState:
 23class AyakaState:
 24    def __init__(self, key="root", parent: Self = None):
 25        self.key = key
 26        self.parent = parent
 27        if not parent:
 28            self.keys = [key]
 29        else:
 30            self.keys = [*parent.keys, key]
 31        self.children: List[Self] = []
 32
 33        self.enter_funcs = []
 34        self.exit_funcs = []
 35        self.triggers: List[AyakaTrigger] = []
 36
 37    def __getitem__(self, k):
 38        if isinstance(k, slice):
 39            s = AyakaState("")
 40            s.keys = self.keys[k]
 41            if s.keys:
 42                s.key = s.keys[-1]
 43            return s
 44        
 45        if isinstance(k, int):
 46            s = AyakaState(self.keys[k])
 47            return s
 48
 49        for node in self.children:
 50            if node.key == k:
 51                return node
 52        node = self.__class__(k, self)
 53        self.children.append(node)
 54        return node
 55
 56    def __getattr__(self, k: str):
 57        return self[k]
 58
 59    def __iter__(self):
 60        return iter(self.children)
 61
 62    def __str__(self) -> str:
 63        return ".".join(self.keys)
 64
 65    def __repr__(self) -> str:
 66        return f"{self.__class__.__name__}({self})"
 67
 68    def join(self, *keys: str) -> Self:
 69        node = self
 70        for key in keys:
 71            node = node[key]
 72        return node
 73
 74    def belong(self, node: Self):
 75        if len(self.keys) < len(node.keys):
 76            return False
 77
 78        for i in range(len(node.keys)):
 79            if node.keys[i] != self.keys[i]:
 80                return False
 81
 82        return True
 83
 84    def __ge__(self, node: Self):
 85        return self.belong(node)
 86
 87    def __le__(self, node: Self):
 88        return node.belong(self)
 89
 90    def __gt__(self, node: Self):
 91        return self >= node and len(self.keys) > len(node.keys)
 92
 93    def __lt__(self, node: Self):
 94        return self <= node and len(self.keys) < len(node.keys)
 95
 96    def __eq__(self, node: Self):
 97        return self <= node and len(self.keys) == len(node.keys)
 98
 99    def dict(self):
100        data = [child.dict() for child in self.children]
101        return {
102            "name": self.key,
103            "triggers": self.triggers,
104            "children": data,
105        }
106
107    async def enter(self):
108        if ayaka_root_config.debug:
109            print(">>>", self.key)
110        add_flag()
111        for func in self.enter_funcs:
112            await func()
113        sub_flag()
114
115    async def exit(self):
116        if ayaka_root_config.debug:
117            print("<<<", self.key)
118        add_flag()
119        for func in self.exit_funcs:
120            await func()
121        sub_flag()
122
123    def on_enter(self):
124        def decorator(func):
125            self.enter_funcs.append(func)
126            return func
127        return decorator
128
129    def on_exit(self):
130        def decorator(func):
131            self.exit_funcs.append(func)
132            return func
133        return decorator
134
135    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
136        def decorator(func):
137            t = AyakaTrigger(func, cmds, deep, app, block, self)
138            self.triggers.append(t)
139            return func
140        return decorator
141
142    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
143        return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
24    def __init__(self, key="root", parent: Self = None):
25        self.key = key
26        self.parent = parent
27        if not parent:
28            self.keys = [key]
29        else:
30            self.keys = [*parent.keys, key]
31        self.children: List[Self] = []
32
33        self.enter_funcs = []
34        self.exit_funcs = []
35        self.triggers: List[AyakaTrigger] = []
def join(self, *keys: str) -> typing_extensions.Self:
68    def join(self, *keys: str) -> Self:
69        node = self
70        for key in keys:
71            node = node[key]
72        return node
def belong(self, node: typing_extensions.Self):
74    def belong(self, node: Self):
75        if len(self.keys) < len(node.keys):
76            return False
77
78        for i in range(len(node.keys)):
79            if node.keys[i] != self.keys[i]:
80                return False
81
82        return True
def dict(self):
 99    def dict(self):
100        data = [child.dict() for child in self.children]
101        return {
102            "name": self.key,
103            "triggers": self.triggers,
104            "children": data,
105        }
async def enter(self):
107    async def enter(self):
108        if ayaka_root_config.debug:
109            print(">>>", self.key)
110        add_flag()
111        for func in self.enter_funcs:
112            await func()
113        sub_flag()
async def exit(self):
115    async def exit(self):
116        if ayaka_root_config.debug:
117            print("<<<", self.key)
118        add_flag()
119        for func in self.exit_funcs:
120            await func()
121        sub_flag()
def on_enter(self):
123    def on_enter(self):
124        def decorator(func):
125            self.enter_funcs.append(func)
126            return func
127        return decorator
def on_exit(self):
129    def on_exit(self):
130        def decorator(func):
131            self.exit_funcs.append(func)
132            return func
133        return decorator
def on_cmd( self, cmds: List[str], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
135    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
136        def decorator(func):
137            t = AyakaTrigger(func, cmds, deep, app, block, self)
138            self.triggers.append(t)
139            return func
140        return decorator
def on_text( self, app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
142    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
143        return self.on_cmd([], app, deep, block)
class AyakaTrigger:
146class AyakaTrigger:
147    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
148        self.func = func
149        self.cmds = cmds
150        self.deep = deep
151        self.app = app
152        self.block = block
153        self.state = state
154
155        # 默认没有解析模型
156        model = None
157        sig = inspect.signature(func)
158        for k, v in sig.parameters.items():
159            cls = v.annotation
160            if issubclass(cls, AyakaInput):
161                model = cls
162                break
163        self.model = model
164
165        # 生成帮助
166        doc = "" if not func.__doc__ else f"| {func.__doc__}"
167        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
168
169        if not model:
170            help = f"- {cmd_str} {doc}"
171        else:
172            data = model.help()
173            keys_str = " ".join(f"<{k}>" for k in data.keys())
174            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
175            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
176
177        self.help = help
178        if len(state.keys) > 1:
179            s = str(state[1:])
180            if s not in self.app.state_helps:
181                self.app.state_helps[s] = []
182            self.app.state_helps[s].append(help)
183        else:
184            self.app.idle_helps.append(help)
185
186        if ayaka_root_config.debug:
187            print(repr(self))
188
189    async def run(self):
190        params = {}
191        sig = inspect.signature(self.func)
192
193        for k, v in sig.parameters.items():
194            cls = v.annotation
195            if issubclass(cls, AyakaDepend):
196                d = await cls._create_by_app(self.app)
197                if not d:
198                    return
199                params[k] = d
200
201        await self.func(**params)
202
203    def __repr__(self) -> str:
204        return f"{self.__class__.__name__}({self})"
205
206    def __str__(self) -> str:
207        data = {
208            "func": self.func.__name__,
209            "app_name": self.app.name,
210            "cmds": self.cmds,
211            "deep": self.deep,
212            "block": self.block
213        }
214        return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
147    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
148        self.func = func
149        self.cmds = cmds
150        self.deep = deep
151        self.app = app
152        self.block = block
153        self.state = state
154
155        # 默认没有解析模型
156        model = None
157        sig = inspect.signature(func)
158        for k, v in sig.parameters.items():
159            cls = v.annotation
160            if issubclass(cls, AyakaInput):
161                model = cls
162                break
163        self.model = model
164
165        # 生成帮助
166        doc = "" if not func.__doc__ else f"| {func.__doc__}"
167        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
168
169        if not model:
170            help = f"- {cmd_str} {doc}"
171        else:
172            data = model.help()
173            keys_str = " ".join(f"<{k}>" for k in data.keys())
174            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
175            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
176
177        self.help = help
178        if len(state.keys) > 1:
179            s = str(state[1:])
180            if s not in self.app.state_helps:
181                self.app.state_helps[s] = []
182            self.app.state_helps[s].append(help)
183        else:
184            self.app.idle_helps.append(help)
185
186        if ayaka_root_config.debug:
187            print(repr(self))
async def run(self):
189    async def run(self):
190        params = {}
191        sig = inspect.signature(self.func)
192
193        for k, v in sig.parameters.items():
194            cls = v.annotation
195            if issubclass(cls, AyakaDepend):
196                d = await cls._create_by_app(self.app)
197                if not d:
198                    return
199                params[k] = d
200
201        await self.func(**params)