ayaka.state
1import inspect 2from typing import TYPE_CHECKING, List, Literal, Union, Awaitable, Callable 3from typing_extensions import Self 4 5from .config import ayaka_root_config 6from .constant import _enter_exit_during 7from .depend import AyakaDepend, AyakaInput 8 9 10if TYPE_CHECKING: 11 from .ayaka import AyakaApp 12 13 14def add_flag(): 15 _enter_exit_during.set(_enter_exit_during.get()+1) 16 17 18def sub_flag(): 19 _enter_exit_during.set(_enter_exit_during.get()-1) 20 21 22class AyakaState: 23 def __init__(self, key="root", parent: Self = None): 24 self.key = key 25 self.parent = parent 26 if not parent: 27 self.keys = [key] 28 else: 29 self.keys = [*parent.keys, key] 30 self.children: List[Self] = [] 31 32 self.enter_funcs = [] 33 self.exit_funcs = [] 34 self.triggers: List[AyakaTrigger] = [] 35 36 def __getitem__(self, k): 37 if isinstance(k, slice): 38 s = AyakaState("") 39 s.keys = self.keys[k] 40 return s 41 42 for node in self.children: 43 if node.key == k: 44 return node 45 node = self.__class__(k, self) 46 self.children.append(node) 47 return node 48 49 def __getattr__(self, k: str): 50 return self[k] 51 52 def __iter__(self): 53 return iter(self.children) 54 55 def __str__(self) -> str: 56 return ".".join(self.keys) 57 58 def __repr__(self) -> str: 59 return f"{self.__class__.__name__}({self})" 60 61 def join(self, *keys: str) -> Self: 62 node = self 63 for key in keys: 64 node = node[key] 65 return node 66 67 def belong(self, node: Self): 68 if len(self.keys) < len(node.keys): 69 return False 70 71 for i in range(len(node.keys)): 72 if node.keys[i] != self.keys[i]: 73 return False 74 75 return True 76 77 def __ge__(self, node: Self): 78 return self.belong(node) 79 80 def __le__(self, node: Self): 81 return node.belong(self) 82 83 def __gt__(self, node: Self): 84 return self >= node and len(self.keys) > len(node.keys) 85 86 def __lt__(self, node: Self): 87 return self <= node and len(self.keys) < len(node.keys) 88 89 def __eq__(self, node: Self): 90 return self <= node and len(self.keys) == len(node.keys) 91 92 def dict(self): 93 data = [child.dict() for child in self.children] 94 return { 95 "name": self.key, 96 "triggers": self.triggers, 97 "children": data, 98 } 99 100 async def enter(self): 101 if ayaka_root_config.debug: 102 print(">>>", self.key) 103 add_flag() 104 for func in self.enter_funcs: 105 await func() 106 sub_flag() 107 108 async def exit(self): 109 if ayaka_root_config.debug: 110 print("<<<", self.key) 111 add_flag() 112 for func in self.exit_funcs: 113 await func() 114 sub_flag() 115 116 def on_enter(self): 117 def decorator(func): 118 self.enter_funcs.append(func) 119 return func 120 return decorator 121 122 def on_exit(self): 123 def decorator(func): 124 self.exit_funcs.append(func) 125 return func 126 return decorator 127 128 def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 129 def decorator(func): 130 t = AyakaTrigger(func, cmds, deep, app, block, self) 131 self.triggers.append(t) 132 return func 133 return decorator 134 135 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 136 return self.on_cmd([], app, deep, block) 137 138 139class AyakaTrigger: 140 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 141 self.func = func 142 self.cmds = cmds 143 self.deep = deep 144 self.app = app 145 self.block = block 146 self.state = state 147 148 # 默认没有解析模型 149 model = None 150 sig = inspect.signature(func) 151 for k, v in sig.parameters.items(): 152 cls = v.annotation 153 if issubclass(cls, AyakaInput): 154 model = cls 155 break 156 self.model = model 157 158 # 生成帮助 159 doc = "" if not func.__doc__ else f"| {func.__doc__}" 160 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 161 162 if not model: 163 help = f"- {cmd_str} {doc}" 164 else: 165 data = model.help() 166 keys_str = " ".join(f"<{k}>" for k in data.keys()) 167 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 168 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 169 170 self.help = help 171 if len(state.keys) > 1: 172 s = str(state[1:]) 173 if s not in self.app.state_helps: 174 self.app.state_helps[s] = [] 175 self.app.state_helps[s].append(help) 176 else: 177 self.app.idle_helps.append(help) 178 179 if ayaka_root_config.debug: 180 print(repr(self)) 181 182 async def run(self): 183 params = {} 184 sig = inspect.signature(self.func) 185 186 for k, v in sig.parameters.items(): 187 cls = v.annotation 188 if issubclass(cls, AyakaDepend): 189 d = await cls._create_by_app(self.app) 190 if not d: 191 return 192 params[k] = d 193 194 await self.func(**params) 195 196 def __repr__(self) -> str: 197 return f"{self.__class__.__name__}({self})" 198 199 def __str__(self) -> str: 200 data = { 201 "func": self.func.__name__, 202 "app_name": self.app.name, 203 "cmds": self.cmds, 204 "deep": self.deep, 205 "block": self.block 206 } 207 return " ".join(f"{k}={v}" for k, v in data.items()) 208 209 210root_state = AyakaState()
def
add_flag():
def
sub_flag():
class
AyakaState:
23class AyakaState: 24 def __init__(self, key="root", parent: Self = None): 25 self.key = key 26 self.parent = parent 27 if not parent: 28 self.keys = [key] 29 else: 30 self.keys = [*parent.keys, key] 31 self.children: List[Self] = [] 32 33 self.enter_funcs = [] 34 self.exit_funcs = [] 35 self.triggers: List[AyakaTrigger] = [] 36 37 def __getitem__(self, k): 38 if isinstance(k, slice): 39 s = AyakaState("") 40 s.keys = self.keys[k] 41 return s 42 43 for node in self.children: 44 if node.key == k: 45 return node 46 node = self.__class__(k, self) 47 self.children.append(node) 48 return node 49 50 def __getattr__(self, k: str): 51 return self[k] 52 53 def __iter__(self): 54 return iter(self.children) 55 56 def __str__(self) -> str: 57 return ".".join(self.keys) 58 59 def __repr__(self) -> str: 60 return f"{self.__class__.__name__}({self})" 61 62 def join(self, *keys: str) -> Self: 63 node = self 64 for key in keys: 65 node = node[key] 66 return node 67 68 def belong(self, node: Self): 69 if len(self.keys) < len(node.keys): 70 return False 71 72 for i in range(len(node.keys)): 73 if node.keys[i] != self.keys[i]: 74 return False 75 76 return True 77 78 def __ge__(self, node: Self): 79 return self.belong(node) 80 81 def __le__(self, node: Self): 82 return node.belong(self) 83 84 def __gt__(self, node: Self): 85 return self >= node and len(self.keys) > len(node.keys) 86 87 def __lt__(self, node: Self): 88 return self <= node and len(self.keys) < len(node.keys) 89 90 def __eq__(self, node: Self): 91 return self <= node and len(self.keys) == len(node.keys) 92 93 def dict(self): 94 data = [child.dict() for child in self.children] 95 return { 96 "name": self.key, 97 "triggers": self.triggers, 98 "children": data, 99 } 100 101 async def enter(self): 102 if ayaka_root_config.debug: 103 print(">>>", self.key) 104 add_flag() 105 for func in self.enter_funcs: 106 await func() 107 sub_flag() 108 109 async def exit(self): 110 if ayaka_root_config.debug: 111 print("<<<", self.key) 112 add_flag() 113 for func in self.exit_funcs: 114 await func() 115 sub_flag() 116 117 def on_enter(self): 118 def decorator(func): 119 self.enter_funcs.append(func) 120 return func 121 return decorator 122 123 def on_exit(self): 124 def decorator(func): 125 self.exit_funcs.append(func) 126 return func 127 return decorator 128 129 def on_cmd(self, cmds: List[str], app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 130 def decorator(func): 131 t = AyakaTrigger(func, cmds, deep, app, block, self) 132 self.triggers.append(t) 133 return func 134 return decorator 135 136 def on_text(self, app: "AyakaApp", deep: Union[int, Literal["all"]] = 0, block=True): 137 return self.on_cmd([], app, deep, block)
AyakaState(key='root', parent: typing_extensions.Self = None)
24 def __init__(self, key="root", parent: Self = None): 25 self.key = key 26 self.parent = parent 27 if not parent: 28 self.keys = [key] 29 else: 30 self.keys = [*parent.keys, key] 31 self.children: List[Self] = [] 32 33 self.enter_funcs = [] 34 self.exit_funcs = [] 35 self.triggers: List[AyakaTrigger] = []
def
on_cmd( self, cmds: List[str], app: ayaka.ayaka.AyakaApp, deep: Union[int, Literal['all']] = 0, block=True):
class
AyakaTrigger:
140class AyakaTrigger: 141 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 142 self.func = func 143 self.cmds = cmds 144 self.deep = deep 145 self.app = app 146 self.block = block 147 self.state = state 148 149 # 默认没有解析模型 150 model = None 151 sig = inspect.signature(func) 152 for k, v in sig.parameters.items(): 153 cls = v.annotation 154 if issubclass(cls, AyakaInput): 155 model = cls 156 break 157 self.model = model 158 159 # 生成帮助 160 doc = "" if not func.__doc__ else f"| {func.__doc__}" 161 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 162 163 if not model: 164 help = f"- {cmd_str} {doc}" 165 else: 166 data = model.help() 167 keys_str = " ".join(f"<{k}>" for k in data.keys()) 168 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 169 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 170 171 self.help = help 172 if len(state.keys) > 1: 173 s = str(state[1:]) 174 if s not in self.app.state_helps: 175 self.app.state_helps[s] = [] 176 self.app.state_helps[s].append(help) 177 else: 178 self.app.idle_helps.append(help) 179 180 if ayaka_root_config.debug: 181 print(repr(self)) 182 183 async def run(self): 184 params = {} 185 sig = inspect.signature(self.func) 186 187 for k, v in sig.parameters.items(): 188 cls = v.annotation 189 if issubclass(cls, AyakaDepend): 190 d = await cls._create_by_app(self.app) 191 if not d: 192 return 193 params[k] = d 194 195 await self.func(**params) 196 197 def __repr__(self) -> str: 198 return f"{self.__class__.__name__}({self})" 199 200 def __str__(self) -> str: 201 data = { 202 "func": self.func.__name__, 203 "app_name": self.app.name, 204 "cmds": self.cmds, 205 "deep": self.deep, 206 "block": self.block 207 } 208 return " ".join(f"{k}={v}" for k, v in data.items())
AyakaTrigger( func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal['all']], app: ayaka.ayaka.AyakaApp, block: bool, state: ayaka.state.AyakaState)
141 def __init__(self, func: Callable[..., Awaitable], cmds: List[str], deep: Union[int, Literal["all"]], app: "AyakaApp", block: bool, state: AyakaState): 142 self.func = func 143 self.cmds = cmds 144 self.deep = deep 145 self.app = app 146 self.block = block 147 self.state = state 148 149 # 默认没有解析模型 150 model = None 151 sig = inspect.signature(func) 152 for k, v in sig.parameters.items(): 153 cls = v.annotation 154 if issubclass(cls, AyakaInput): 155 model = cls 156 break 157 self.model = model 158 159 # 生成帮助 160 doc = "" if not func.__doc__ else f"| {func.__doc__}" 161 cmd_str = '/'.join(cmds) if cmds else "<任意文字>" 162 163 if not model: 164 help = f"- {cmd_str} {doc}" 165 else: 166 data = model.help() 167 keys_str = " ".join(f"<{k}>" for k in data.keys()) 168 data_str = "\n".join(f" <{k}> {v}" for k, v in data.items()) 169 help = f"- {cmd_str} {keys_str} {doc}\n{data_str}" 170 171 self.help = help 172 if len(state.keys) > 1: 173 s = str(state[1:]) 174 if s not in self.app.state_helps: 175 self.app.state_helps[s] = [] 176 self.app.state_helps[s].append(help) 177 else: 178 self.app.idle_helps.append(help) 179 180 if ayaka_root_config.debug: 181 print(repr(self))
async def
run(self):
183 async def run(self): 184 params = {} 185 sig = inspect.signature(self.func) 186 187 for k, v in sig.parameters.items(): 188 cls = v.annotation 189 if issubclass(cls, AyakaDepend): 190 d = await cls._create_by_app(self.app) 191 if not d: 192 return 193 params[k] = d 194 195 await self.func(**params)