ayaka.state

  1import inspect
  2from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable
  3from typing_extensions import Self
  4
  5from .config import ayaka_root_config
  6from .constant import _enter_exit_during
  7from .depend import AyakaDepend, AyakaInput
  8
  9
 10if TYPE_CHECKING:
 11    from .ayaka import AyakaApp
 12
 13
 14def add_flag():
 15    _enter_exit_during.set(_enter_exit_during.get()+1)
 16
 17
 18def sub_flag():
 19    _enter_exit_during.set(_enter_exit_during.get()-1)
 20
 21
 22class AyakaState:
 23    def __init__(self, key="root", parent: Self = None):
 24        self.key = key
 25        self.parent = parent
 26        if not parent:
 27            self.keys = [key]
 28        else:
 29            self.keys = [*parent.keys, key]
 30        self.children: List[Self] = []
 31
 32        self.enter_funcs = []
 33        self.exit_funcs = []
 34        self.triggers: List[AyakaTrigger] = []
 35
 36    def __getitem__(self, k):
 37        if isinstance(k, slice):
 38            s = AyakaState("")
 39            s.keys = self.keys[k]
 40            return s
 41
 42        for node in self.children:
 43            if node.key == k:
 44                return node
 45        node = self.__class__(k, self)
 46        self.children.append(node)
 47        return node
 48
 49    def __getattr__(self, k: str):
 50        return self[k]
 51
 52    def __iter__(self):
 53        return iter(self.children)
 54
 55    def __str__(self) -> str:
 56        return ".".join(self.keys)
 57
 58    def __repr__(self) -> str:
 59        return f"{self.__class__.__name__}({self})"
 60
 61    def join(self, *keys: str) -> Self:
 62        node = self
 63        for key in keys:
 64            node = node[key]
 65        return node
 66
 67    def belong(self, node: Self):
 68        if len(self.keys) < len(node.keys):
 69            return False
 70
 71        for i in range(len(node.keys)):
 72            if node.keys[i] != self.keys[i]:
 73                return False
 74
 75        return True
 76
 77    def __ge__(self, node: Self):
 78        return self.belong(node)
 79
 80    def __le__(self, node: Self):
 81        return node.belong(self)
 82
 83    def __gt__(self, node: Self):
 84        return self >= node and len(self.keys) > len(node.keys)
 85
 86    def __lt__(self, node: Self):
 87        return self <= node and len(self.keys) < len(node.keys)
 88
 89    def __eq__(self, node: Self):
 90        return self <= node and len(self.keys) == len(node.keys)
 91
 92    def dict(self):
 93        data = [child.dict() for child in self.children]
 94        return {
 95            "name": self.key,
 96            "triggers": self.triggers,
 97            "children": data,
 98        }
 99
100    async def enter(self):
101        if ayaka_root_config.debug:
102            print(">>>", self.key)
103        add_flag()
104        for func in self.enter_funcs:
105            await func()
106        sub_flag()
107
108    async def exit(self):
109        if ayaka_root_config.debug:
110            print("<<<", self.key)
111        add_flag()
112        for func in self.exit_funcs:
113            await func()
114        sub_flag()
115
116    def on_enter(self):
117        def decorator(func):
118            self.enter_funcs.append(func)
119            return func
120        return decorator
121
122    def on_exit(self):
123        def decorator(func):
124            self.exit_funcs.append(func)
125            return func
126        return decorator
127
128    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
129        def decorator(func):
130            t = AyakaTrigger(func, cmds, deep, app, block, self)
131            self.triggers.append(t)
132            return func
133        return decorator
134
135    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
136        return self.on_cmd([], app, deep, block)
137
138
139class AyakaTrigger:
140    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
141        self.func = func
142        self.cmds = cmds
143        self.deep = deep
144        self.app = app
145        self.block = block
146        self.state = state
147
148        # 默认没有解析模型
149        model = None
150        sig = inspect.signature(func)
151        for k, v in sig.parameters.items():
152            cls = v.annotation
153            if issubclass(cls, AyakaInput):
154                model = cls
155                break
156        self.model = model
157
158        # 生成帮助
159        doc = "" if not func.__doc__ else f"| {func.__doc__}"
160        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
161
162        if not model:
163            help = f"- {cmd_str} {doc}"
164        else:
165            data = model.help()
166            keys_str = " ".join(f"<{k}>" for k in data.keys())
167            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
168            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
169
170        self.help = help
171        if len(state.keys) > 1:
172            s = str(state[1:])
173            if s not in self.app.state_helps:
174                self.app.state_helps[s] = []
175            self.app.state_helps[s].append(help)
176        else:
177            self.app.idle_helps.append(help)
178
179        if ayaka_root_config.debug:
180            print(repr(self))
181
182    async def run(self):
183        params = {}
184        sig = inspect.signature(self.func)
185
186        for k, v in sig.parameters.items():
187            cls = v.annotation
188            if issubclass(cls, AyakaDepend):
189                d = await cls._create_by_app(self.app)
190                if not d:
191                    return
192                params[k] = d
193
194        await self.func(**params)
195
196    def __repr__(self) -> str:
197        return f"{self.__class__.__name__}({self})"
198
199    def __str__(self) -> str:
200        data = {
201            "func": self.func.__name__,
202            "app_name": self.app.name,
203            "cmds": self.cmds,
204            "deep": self.deep,
205            "block": self.block
206        }
207        return " ".join(f"{k}={v}" for k, v in data.items())
208
209
210root_state = AyakaState()
def add_flag():
15def add_flag():
16    _enter_exit_during.set(_enter_exit_during.get()+1)
def sub_flag():
19def sub_flag():
20    _enter_exit_during.set(_enter_exit_during.get()-1)
class AyakaState:
 23class AyakaState:
 24    def __init__(self, key="root", parent: Self = None):
 25        self.key = key
 26        self.parent = parent
 27        if not parent:
 28            self.keys = [key]
 29        else:
 30            self.keys = [*parent.keys, key]
 31        self.children: List[Self] = []
 32
 33        self.enter_funcs = []
 34        self.exit_funcs = []
 35        self.triggers: List[AyakaTrigger] = []
 36
 37    def __getitem__(self, k):
 38        if isinstance(k, slice):
 39            s = AyakaState("")
 40            s.keys = self.keys[k]
 41            return s
 42
 43        for node in self.children:
 44            if node.key == k:
 45                return node
 46        node = self.__class__(k, self)
 47        self.children.append(node)
 48        return node
 49
 50    def __getattr__(self, k: str):
 51        return self[k]
 52
 53    def __iter__(self):
 54        return iter(self.children)
 55
 56    def __str__(self) -> str:
 57        return ".".join(self.keys)
 58
 59    def __repr__(self) -> str:
 60        return f"{self.__class__.__name__}({self})"
 61
 62    def join(self, *keys: str) -> Self:
 63        node = self
 64        for key in keys:
 65            node = node[key]
 66        return node
 67
 68    def belong(self, node: Self):
 69        if len(self.keys) < len(node.keys):
 70            return False
 71
 72        for i in range(len(node.keys)):
 73            if node.keys[i] != self.keys[i]:
 74                return False
 75
 76        return True
 77
 78    def __ge__(self, node: Self):
 79        return self.belong(node)
 80
 81    def __le__(self, node: Self):
 82        return node.belong(self)
 83
 84    def __gt__(self, node: Self):
 85        return self >= node and len(self.keys) > len(node.keys)
 86
 87    def __lt__(self, node: Self):
 88        return self <= node and len(self.keys) < len(node.keys)
 89
 90    def __eq__(self, node: Self):
 91        return self <= node and len(self.keys) == len(node.keys)
 92
 93    def dict(self):
 94        data = [child.dict() for child in self.children]
 95        return {
 96            "name": self.key,
 97            "triggers": self.triggers,
 98            "children": data,
 99        }
100
101    async def enter(self):
102        if ayaka_root_config.debug:
103            print(">>>", self.key)
104        add_flag()
105        for func in self.enter_funcs:
106            await func()
107        sub_flag()
108
109    async def exit(self):
110        if ayaka_root_config.debug:
111            print("<<<", self.key)
112        add_flag()
113        for func in self.exit_funcs:
114            await func()
115        sub_flag()
116
117    def on_enter(self):
118        def decorator(func):
119            self.enter_funcs.append(func)
120            return func
121        return decorator
122
123    def on_exit(self):
124        def decorator(func):
125            self.exit_funcs.append(func)
126            return func
127        return decorator
128
129    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
130        def decorator(func):
131            t = AyakaTrigger(func, cmds, deep, app, block, self)
132            self.triggers.append(t)
133            return func
134        return decorator
135
136    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
137        return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
24    def __init__(self, key="root", parent: Self = None):
25        self.key = key
26        self.parent = parent
27        if not parent:
28            self.keys = [key]
29        else:
30            self.keys = [*parent.keys, key]
31        self.children: List[Self] = []
32
33        self.enter_funcs = []
34        self.exit_funcs = []
35        self.triggers: List[AyakaTrigger] = []
def join(self, *keys: str) -> typing_extensions.Self:
62    def join(self, *keys: str) -> Self:
63        node = self
64        for key in keys:
65            node = node[key]
66        return node
def belong(self, node: typing_extensions.Self):
68    def belong(self, node: Self):
69        if len(self.keys) < len(node.keys):
70            return False
71
72        for i in range(len(node.keys)):
73            if node.keys[i] != self.keys[i]:
74                return False
75
76        return True
def dict(self):
93    def dict(self):
94        data = [child.dict() for child in self.children]
95        return {
96            "name": self.key,
97            "triggers": self.triggers,
98            "children": data,
99        }
async def enter(self):
101    async def enter(self):
102        if ayaka_root_config.debug:
103            print(">>>", self.key)
104        add_flag()
105        for func in self.enter_funcs:
106            await func()
107        sub_flag()
async def exit(self):
109    async def exit(self):
110        if ayaka_root_config.debug:
111            print("<<<", self.key)
112        add_flag()
113        for func in self.exit_funcs:
114            await func()
115        sub_flag()
def on_enter(self):
117    def on_enter(self):
118        def decorator(func):
119            self.enter_funcs.append(func)
120            return func
121        return decorator
def on_exit(self):
123    def on_exit(self):
124        def decorator(func):
125            self.exit_funcs.append(func)
126            return func
127        return decorator
def on_cmd( self, cmds: List[str], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
129    def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
130        def decorator(func):
131            t = AyakaTrigger(func, cmds, deep, app, block, self)
132            self.triggers.append(t)
133            return func
134        return decorator
def on_text( self, app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
136    def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True):
137        return self.on_cmd([], app, deep, block)
class AyakaTrigger:
140class AyakaTrigger:
141    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
142        self.func = func
143        self.cmds = cmds
144        self.deep = deep
145        self.app = app
146        self.block = block
147        self.state = state
148
149        # 默认没有解析模型
150        model = None
151        sig = inspect.signature(func)
152        for k, v in sig.parameters.items():
153            cls = v.annotation
154            if issubclass(cls, AyakaInput):
155                model = cls
156                break
157        self.model = model
158
159        # 生成帮助
160        doc = "" if not func.__doc__ else f"| {func.__doc__}"
161        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
162
163        if not model:
164            help = f"- {cmd_str} {doc}"
165        else:
166            data = model.help()
167            keys_str = " ".join(f"<{k}>" for k in data.keys())
168            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
169            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
170
171        self.help = help
172        if len(state.keys) > 1:
173            s = str(state[1:])
174            if s not in self.app.state_helps:
175                self.app.state_helps[s] = []
176            self.app.state_helps[s].append(help)
177        else:
178            self.app.idle_helps.append(help)
179
180        if ayaka_root_config.debug:
181            print(repr(self))
182
183    async def run(self):
184        params = {}
185        sig = inspect.signature(self.func)
186
187        for k, v in sig.parameters.items():
188            cls = v.annotation
189            if issubclass(cls, AyakaDepend):
190                d = await cls._create_by_app(self.app)
191                if not d:
192                    return
193                params[k] = d
194
195        await self.func(**params)
196
197    def __repr__(self) -> str:
198        return f"{self.__class__.__name__}({self})"
199
200    def __str__(self) -> str:
201        data = {
202            "func": self.func.__name__,
203            "app_name": self.app.name,
204            "cmds": self.cmds,
205            "deep": self.deep,
206            "block": self.block
207        }
208        return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
141    def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState):
142        self.func = func
143        self.cmds = cmds
144        self.deep = deep
145        self.app = app
146        self.block = block
147        self.state = state
148
149        # 默认没有解析模型
150        model = None
151        sig = inspect.signature(func)
152        for k, v in sig.parameters.items():
153            cls = v.annotation
154            if issubclass(cls, AyakaInput):
155                model = cls
156                break
157        self.model = model
158
159        # 生成帮助
160        doc = "" if not func.__doc__ else f"| {func.__doc__}"
161        cmd_str = '/'.join(cmds) if cmds else "<任意文字>"
162
163        if not model:
164            help = f"- {cmd_str} {doc}"
165        else:
166            data = model.help()
167            keys_str = " ".join(f"<{k}>" for k in data.keys())
168            data_str = "\n".join(f"    <{k}> {v}" for k, v in data.items())
169            help = f"- {cmd_str} {keys_str} {doc}\n{data_str}"
170
171        self.help = help
172        if len(state.keys) > 1:
173            s = str(state[1:])
174            if s not in self.app.state_helps:
175                self.app.state_helps[s] = []
176            self.app.state_helps[s].append(help)
177        else:
178            self.app.idle_helps.append(help)
179
180        if ayaka_root_config.debug:
181            print(repr(self))
async def run(self):
183    async def run(self):
184        params = {}
185        sig = inspect.signature(self.func)
186
187        for k, v in sig.parameters.items():
188            cls = v.annotation
189            if issubclass(cls, AyakaDepend):
190                d = await cls._create_by_app(self.app)
191                if not d:
192                    return
193                params[k] = d
194
195        await self.func(**params)