ayaka.ayaka
ayaka核心
1'''ayaka核心''' 2import inspect 3from math import ceil 4from pathlib import Path 5from loguru import logger 6from typing import List, Dict, Union 7 8from .ayaka_parser import parser 9from .config import INIT_STATE, ayaka_root_config, create_ayaka_plugin_config_base 10from .constant import _bot, _event, _group, _arg, _args, _message, _cmd, app_list, private_listener_dict, get_bot 11from .deal import deal_event 12from .group import get_group 13from .storage import AyakaStorage 14from .driver import on_message, MessageSegment 15from .on import AyakaOn, AyakaTimer, AyakaTrigger 16 17 18class AyakaApp: 19 def __repr__(self) -> str: 20 return f"AyakaApp({self.name}, {self.state})" 21 22 def __init__(self, name: str) -> None: 23 self.path = Path(inspect.stack()[1].filename) 24 25 for app in app_list: 26 if app.name == name: 27 raise Exception(f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 28 29 self.name = name 30 self.state = INIT_STATE 31 self.triggers: List[AyakaTrigger] = [] 32 self.timers: List[AyakaTimer] = [] 33 self._help: Dict[str, List[str]] = {} 34 self.on = AyakaOn(self) 35 self.storage = AyakaStorage(self) 36 self.parser = parser 37 self.BaseConfig = create_ayaka_plugin_config_base(name) 38 self.ayaka_root_config = ayaka_root_config 39 40 app_list.append(self) 41 if ayaka_root_config.debug: 42 print(self) 43 44 @property 45 def super_triggers(self): 46 return [t for t in self.triggers if t.super] 47 48 @property 49 def state_triggers(self): 50 return [t for t in self.triggers if not t.super and t.state is not None] 51 52 @property 53 def no_state_triggers(self): 54 return [t for t in self.triggers if not t.super and t.state is None] 55 56 @property 57 def intro(self): 58 '''获取介绍,也就是init状态下的帮助''' 59 helps = self._help.get(INIT_STATE, ["没有找到帮助"]) 60 return "\n".join(helps) 61 62 def get_helps(self, state: str): 63 helps = self._help.get(state) 64 if not helps: 65 return [] 66 return [f"[{state}]"] + helps 67 68 @property 69 def help(self): 70 '''获取当前状态下的帮助,没有找到则返回介绍''' 71 if self.group.running_app_name == self.name: 72 helps = [] 73 state = self.state 74 helps.extend(self.get_helps(state)) 75 76 while "." in state: 77 state = state.rsplit(".", 1)[0] 78 helps.extend(self.get_helps(state)) 79 80 helps.extend(self.get_helps("*")) 81 82 if helps: 83 return "\n".join(helps) 84 85 return self.intro 86 87 @property 88 def all_help(self): 89 '''获取介绍以及全部状态下的帮助''' 90 info = self.intro 91 for k, v in self._help.items(): 92 v = "\n".join(v) 93 if k != INIT_STATE: 94 info += f"\n[{k}]\n{v}" 95 return info 96 97 @help.setter 98 def help(self, help: Union[str, Dict[str, str]]): 99 '''设置帮助,若help为str,则设置为介绍,若help为dict,则设置为对应状态的帮助''' 100 if isinstance(help, dict): 101 help = {k: [v.strip()] for k, v in help.items()} 102 self._help.update(help) 103 else: 104 self._help[INIT_STATE] = [help.strip()] 105 106 @property 107 def valid(self): 108 '''*timer触发时不可用* 109 110 当前app是否被当前群组启用 111 ''' 112 return self.group.get_app(self.name) 113 114 @property 115 def cache(self): 116 '''*timer触发时不可用* 117 118 当前群组、当前app的独立数据空间 119 ''' 120 return self.group.cache_dict.get(self.name) 121 122 @property 123 def user_name(self): 124 '''*timer触发时不可用* 125 126 当前消息的发送人的群名片或昵称 127 ''' 128 s = self.event.sender 129 name = s.card or s.nickname 130 return name 131 132 @property 133 def user_id(self): 134 '''*timer触发时不可用* 135 136 当前消息的发送人的uid 137 ''' 138 return self.event.user_id 139 140 @property 141 def bot(self): 142 '''*timer触发时不可用* 143 144 当前bot 145 ''' 146 return _bot.get() 147 148 @property 149 def event(self): 150 '''*timer触发时不可用* 151 152 当前消息 153 ''' 154 return _event.get() 155 156 @property 157 def group_id(self): 158 '''*timer触发时不可用* 159 160 当前群组的id 161 162 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 163 ''' 164 return self.group.group_id 165 166 @property 167 def bot_id(self): 168 '''*timer触发时不可用* 169 170 当前bot的id 171 ''' 172 return self.group.bot_id 173 174 @property 175 def group(self): 176 '''*timer触发时不可用* 177 178 当前群组 179 180 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 181 ''' 182 return _group.get() 183 184 @property 185 def arg(self): 186 '''*timer触发时不可用* 187 188 当前消息在移除了命令后的剩余部分 189 ''' 190 return _arg.get() 191 192 @property 193 def args(self): 194 '''*timer触发时不可用* 195 196 当前消息在移除了命令后,剩余部分按照空格分割后的数组 197 198 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 199 ''' 200 return _args.get() 201 202 @property 203 def cmd(self): 204 '''*timer触发时不可用* 205 206 当前消息的命令头 207 ''' 208 return _cmd.get() 209 210 @property 211 def message(self): 212 '''*timer触发时不可用* 213 214 当前消息 215 ''' 216 return _message.get() 217 218 async def start(self, state=INIT_STATE): 219 '''*timer触发时不可用* 220 221 启动应用,并发送提示''' 222 name = self.group.running_app_name 223 if name and name != self.name: 224 await self.send("打开应用失败") 225 return False 226 self.group.running_app = self 227 self.state = state 228 await self.send(f"已打开应用 [{self.name}]") 229 return True 230 231 async def close(self): 232 '''*timer触发时不可用* 233 234 关闭应用,并发送提示''' 235 name = self.group.running_app_name 236 if name: 237 self.group.running_app = None 238 await self.send(f"已关闭应用 [{name}]") 239 else: 240 await self.send(f"没有应用在运行") 241 242 def set_state(self, state=INIT_STATE): 243 self.state = state 244 245 def add_listener(self, user_id: int): 246 '''为该群组添加对指定私聊的监听''' 247 private_listener_dict[user_id].append(self.group_id) 248 249 def remove_listener(self, user_id: int = 0): 250 '''默认移除该群组对其他私聊的所有监听''' 251 id = self.group_id 252 253 if user_id == 0: 254 for ids in private_listener_dict.values(): 255 if id in ids: 256 ids.remove(id) 257 return 258 259 if id in private_listener_dict[user_id]: 260 private_listener_dict[user_id].remove(self.group_id) 261 262 async def send(self, message): 263 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 264 # 这里不使用event,因为一些event可能来自其他设备的监听传递 265 await self.bot.send_group_msg(group_id=self.group_id, message=message) 266 267 def pack_messages(self, bot_id, messages): 268 '''转换为cqhttp node格式''' 269 data: List[MessageSegment] = [] 270 for m in messages: 271 if isinstance(m, MessageSegment) and m.type == "node": 272 data.append(m) 273 else: 274 m = MessageSegment.node_custom( 275 user_id=bot_id, 276 nickname="Ayaka Bot", 277 content=str(m) 278 ) 279 data.append(m) 280 return data 281 282 async def send_many(self, messages): 283 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 284 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 285 div_len = 80 286 div_cnt = ceil(len(messages) / div_len) 287 for i in range(div_cnt): 288 msgs = self.pack_messages( 289 self.bot_id, 290 messages[i*div_len: (i+1)*div_len] 291 ) 292 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 293 294 def t_check(self, bot_id: int, group_id: int): 295 # 未连接 296 bot = get_bot(bot_id) 297 if not bot: 298 logger.warning(f"BOT({bot_id}) 未连接") 299 return 300 301 # 已禁用 302 group = get_group(bot_id, group_id) 303 app = group.get_app(self.name) 304 if not app: 305 logger.warning(f"群聊({group_id}) 已禁用 {self.name}") 306 return 307 308 return bot 309 310 async def t_send(self, bot_id: int, group_id: int, message): 311 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 312 bot = self.t_check(bot_id, group_id) 313 if not bot: 314 return 315 316 await bot.send_group_msg(group_id=group_id, message=message) 317 318 async def t_send_many(self, bot_id: int, group_id: int, messages): 319 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 320 bot = self.t_check(bot_id, group_id) 321 if not bot: 322 return 323 324 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 325 div_len = 80 326 div_cnt = ceil(len(messages) / div_len) 327 for i in range(div_cnt): 328 msgs = self.pack_messages( 329 bot_id, 330 messages[i*div_len: (i+1)*div_len] 331 ) 332 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs) 333 334 335on_message(priority=20, block=False, handlers=[deal_event])
class
AyakaApp:
19class AyakaApp: 20 def __repr__(self) -> str: 21 return f"AyakaApp({self.name}, {self.state})" 22 23 def __init__(self, name: str) -> None: 24 self.path = Path(inspect.stack()[1].filename) 25 26 for app in app_list: 27 if app.name == name: 28 raise Exception(f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 29 30 self.name = name 31 self.state = INIT_STATE 32 self.triggers: List[AyakaTrigger] = [] 33 self.timers: List[AyakaTimer] = [] 34 self._help: Dict[str, List[str]] = {} 35 self.on = AyakaOn(self) 36 self.storage = AyakaStorage(self) 37 self.parser = parser 38 self.BaseConfig = create_ayaka_plugin_config_base(name) 39 self.ayaka_root_config = ayaka_root_config 40 41 app_list.append(self) 42 if ayaka_root_config.debug: 43 print(self) 44 45 @property 46 def super_triggers(self): 47 return [t for t in self.triggers if t.super] 48 49 @property 50 def state_triggers(self): 51 return [t for t in self.triggers if not t.super and t.state is not None] 52 53 @property 54 def no_state_triggers(self): 55 return [t for t in self.triggers if not t.super and t.state is None] 56 57 @property 58 def intro(self): 59 '''获取介绍,也就是init状态下的帮助''' 60 helps = self._help.get(INIT_STATE, ["没有找到帮助"]) 61 return "\n".join(helps) 62 63 def get_helps(self, state: str): 64 helps = self._help.get(state) 65 if not helps: 66 return [] 67 return [f"[{state}]"] + helps 68 69 @property 70 def help(self): 71 '''获取当前状态下的帮助,没有找到则返回介绍''' 72 if self.group.running_app_name == self.name: 73 helps = [] 74 state = self.state 75 helps.extend(self.get_helps(state)) 76 77 while "." in state: 78 state = state.rsplit(".", 1)[0] 79 helps.extend(self.get_helps(state)) 80 81 helps.extend(self.get_helps("*")) 82 83 if helps: 84 return "\n".join(helps) 85 86 return self.intro 87 88 @property 89 def all_help(self): 90 '''获取介绍以及全部状态下的帮助''' 91 info = self.intro 92 for k, v in self._help.items(): 93 v = "\n".join(v) 94 if k != INIT_STATE: 95 info += f"\n[{k}]\n{v}" 96 return info 97 98 @help.setter 99 def help(self, help: Union[str, Dict[str, str]]): 100 '''设置帮助,若help为str,则设置为介绍,若help为dict,则设置为对应状态的帮助''' 101 if isinstance(help, dict): 102 help = {k: [v.strip()] for k, v in help.items()} 103 self._help.update(help) 104 else: 105 self._help[INIT_STATE] = [help.strip()] 106 107 @property 108 def valid(self): 109 '''*timer触发时不可用* 110 111 当前app是否被当前群组启用 112 ''' 113 return self.group.get_app(self.name) 114 115 @property 116 def cache(self): 117 '''*timer触发时不可用* 118 119 当前群组、当前app的独立数据空间 120 ''' 121 return self.group.cache_dict.get(self.name) 122 123 @property 124 def user_name(self): 125 '''*timer触发时不可用* 126 127 当前消息的发送人的群名片或昵称 128 ''' 129 s = self.event.sender 130 name = s.card or s.nickname 131 return name 132 133 @property 134 def user_id(self): 135 '''*timer触发时不可用* 136 137 当前消息的发送人的uid 138 ''' 139 return self.event.user_id 140 141 @property 142 def bot(self): 143 '''*timer触发时不可用* 144 145 当前bot 146 ''' 147 return _bot.get() 148 149 @property 150 def event(self): 151 '''*timer触发时不可用* 152 153 当前消息 154 ''' 155 return _event.get() 156 157 @property 158 def group_id(self): 159 '''*timer触发时不可用* 160 161 当前群组的id 162 163 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A的id 164 ''' 165 return self.group.group_id 166 167 @property 168 def bot_id(self): 169 '''*timer触发时不可用* 170 171 当前bot的id 172 ''' 173 return self.group.bot_id 174 175 @property 176 def group(self): 177 '''*timer触发时不可用* 178 179 当前群组 180 181 注:若群聊A正监听私聊B,当私聊B发送消息触发插件回调时,该属性仍可正确返回群聊A 182 ''' 183 return _group.get() 184 185 @property 186 def arg(self): 187 '''*timer触发时不可用* 188 189 当前消息在移除了命令后的剩余部分 190 ''' 191 return _arg.get() 192 193 @property 194 def args(self): 195 '''*timer触发时不可用* 196 197 当前消息在移除了命令后,剩余部分按照空格分割后的数组 198 199 注:除了文字消息外,其他消息类型将自动分割,例如一串qq表情会被分割为多个元素 200 ''' 201 return _args.get() 202 203 @property 204 def cmd(self): 205 '''*timer触发时不可用* 206 207 当前消息的命令头 208 ''' 209 return _cmd.get() 210 211 @property 212 def message(self): 213 '''*timer触发时不可用* 214 215 当前消息 216 ''' 217 return _message.get() 218 219 async def start(self, state=INIT_STATE): 220 '''*timer触发时不可用* 221 222 启动应用,并发送提示''' 223 name = self.group.running_app_name 224 if name and name != self.name: 225 await self.send("打开应用失败") 226 return False 227 self.group.running_app = self 228 self.state = state 229 await self.send(f"已打开应用 [{self.name}]") 230 return True 231 232 async def close(self): 233 '''*timer触发时不可用* 234 235 关闭应用,并发送提示''' 236 name = self.group.running_app_name 237 if name: 238 self.group.running_app = None 239 await self.send(f"已关闭应用 [{name}]") 240 else: 241 await self.send(f"没有应用在运行") 242 243 def set_state(self, state=INIT_STATE): 244 self.state = state 245 246 def add_listener(self, user_id: int): 247 '''为该群组添加对指定私聊的监听''' 248 private_listener_dict[user_id].append(self.group_id) 249 250 def remove_listener(self, user_id: int = 0): 251 '''默认移除该群组对其他私聊的所有监听''' 252 id = self.group_id 253 254 if user_id == 0: 255 for ids in private_listener_dict.values(): 256 if id in ids: 257 ids.remove(id) 258 return 259 260 if id in private_listener_dict[user_id]: 261 private_listener_dict[user_id].remove(self.group_id) 262 263 async def send(self, message): 264 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 265 # 这里不使用event,因为一些event可能来自其他设备的监听传递 266 await self.bot.send_group_msg(group_id=self.group_id, message=message) 267 268 def pack_messages(self, bot_id, messages): 269 '''转换为cqhttp node格式''' 270 data: List[MessageSegment] = [] 271 for m in messages: 272 if isinstance(m, MessageSegment) and m.type == "node": 273 data.append(m) 274 else: 275 m = MessageSegment.node_custom( 276 user_id=bot_id, 277 nickname="Ayaka Bot", 278 content=str(m) 279 ) 280 data.append(m) 281 return data 282 283 async def send_many(self, messages): 284 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 285 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 286 div_len = 80 287 div_cnt = ceil(len(messages) / div_len) 288 for i in range(div_cnt): 289 msgs = self.pack_messages( 290 self.bot_id, 291 messages[i*div_len: (i+1)*div_len] 292 ) 293 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs) 294 295 def t_check(self, bot_id: int, group_id: int): 296 # 未连接 297 bot = get_bot(bot_id) 298 if not bot: 299 logger.warning(f"BOT({bot_id}) 未连接") 300 return 301 302 # 已禁用 303 group = get_group(bot_id, group_id) 304 app = group.get_app(self.name) 305 if not app: 306 logger.warning(f"群聊({group_id}) 已禁用 {self.name}") 307 return 308 309 return bot 310 311 async def t_send(self, bot_id: int, group_id: int, message): 312 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 313 bot = self.t_check(bot_id, group_id) 314 if not bot: 315 return 316 317 await bot.send_group_msg(group_id=group_id, message=message) 318 319 async def t_send_many(self, bot_id: int, group_id: int, messages): 320 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 321 bot = self.t_check(bot_id, group_id) 322 if not bot: 323 return 324 325 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 326 div_len = 80 327 div_cnt = ceil(len(messages) / div_len) 328 for i in range(div_cnt): 329 msgs = self.pack_messages( 330 bot_id, 331 messages[i*div_len: (i+1)*div_len] 332 ) 333 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
AyakaApp(name: str)
23 def __init__(self, name: str) -> None: 24 self.path = Path(inspect.stack()[1].filename) 25 26 for app in app_list: 27 if app.name == name: 28 raise Exception(f"应用{app.name} 重复注册,已忽略注册时间更晚的应用!\n{app.path}(最早注册)\n{self.path}(被忽略)") 29 30 self.name = name 31 self.state = INIT_STATE 32 self.triggers: List[AyakaTrigger] = [] 33 self.timers: List[AyakaTimer] = [] 34 self._help: Dict[str, List[str]] = {} 35 self.on = AyakaOn(self) 36 self.storage = AyakaStorage(self) 37 self.parser = parser 38 self.BaseConfig = create_ayaka_plugin_config_base(name) 39 self.ayaka_root_config = ayaka_root_config 40 41 app_list.append(self) 42 if ayaka_root_config.debug: 43 print(self)
async def
start(self, state='init'):
219 async def start(self, state=INIT_STATE): 220 '''*timer触发时不可用* 221 222 启动应用,并发送提示''' 223 name = self.group.running_app_name 224 if name and name != self.name: 225 await self.send("打开应用失败") 226 return False 227 self.group.running_app = self 228 self.state = state 229 await self.send(f"已打开应用 [{self.name}]") 230 return True
timer触发时不可用
启动应用,并发送提示
async def
close(self):
232 async def close(self): 233 '''*timer触发时不可用* 234 235 关闭应用,并发送提示''' 236 name = self.group.running_app_name 237 if name: 238 self.group.running_app = None 239 await self.send(f"已关闭应用 [{name}]") 240 else: 241 await self.send(f"没有应用在运行")
timer触发时不可用
关闭应用,并发送提示
def
add_listener(self, user_id: int):
246 def add_listener(self, user_id: int): 247 '''为该群组添加对指定私聊的监听''' 248 private_listener_dict[user_id].append(self.group_id)
为该群组添加对指定私聊的监听
def
remove_listener(self, user_id: int = 0):
250 def remove_listener(self, user_id: int = 0): 251 '''默认移除该群组对其他私聊的所有监听''' 252 id = self.group_id 253 254 if user_id == 0: 255 for ids in private_listener_dict.values(): 256 if id in ids: 257 ids.remove(id) 258 return 259 260 if id in private_listener_dict[user_id]: 261 private_listener_dict[user_id].remove(self.group_id)
默认移除该群组对其他私聊的所有监听
async def
send(self, message):
263 async def send(self, message): 264 '''发送消息,消息的类型可以是 Message | MessageSegment | str''' 265 # 这里不使用event,因为一些event可能来自其他设备的监听传递 266 await self.bot.send_group_msg(group_id=self.group_id, message=message)
发送消息,消息的类型可以是 Message | MessageSegment | str
def
pack_messages(self, bot_id, messages):
268 def pack_messages(self, bot_id, messages): 269 '''转换为cqhttp node格式''' 270 data: List[MessageSegment] = [] 271 for m in messages: 272 if isinstance(m, MessageSegment) and m.type == "node": 273 data.append(m) 274 else: 275 m = MessageSegment.node_custom( 276 user_id=bot_id, 277 nickname="Ayaka Bot", 278 content=str(m) 279 ) 280 data.append(m) 281 return data
转换为cqhttp node格式
async def
send_many(self, messages):
283 async def send_many(self, messages): 284 '''发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]''' 285 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 286 div_len = 80 287 div_cnt = ceil(len(messages) / div_len) 288 for i in range(div_cnt): 289 msgs = self.pack_messages( 290 self.bot_id, 291 messages[i*div_len: (i+1)*div_len] 292 ) 293 await self.bot.call_api("send_group_forward_msg", group_id=self.group_id, messages=msgs)
发送合并转发消息,消息的类型可以是 List[Message | MessageSegment | str]
def
t_check(self, bot_id: int, group_id: int):
295 def t_check(self, bot_id: int, group_id: int): 296 # 未连接 297 bot = get_bot(bot_id) 298 if not bot: 299 logger.warning(f"BOT({bot_id}) 未连接") 300 return 301 302 # 已禁用 303 group = get_group(bot_id, group_id) 304 app = group.get_app(self.name) 305 if not app: 306 logger.warning(f"群聊({group_id}) 已禁用 {self.name}") 307 return 308 309 return bot
async def
t_send(self, bot_id: int, group_id: int, message):
311 async def t_send(self, bot_id: int, group_id: int, message): 312 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 313 bot = self.t_check(bot_id, group_id) 314 if not bot: 315 return 316 317 await bot.send_group_msg(group_id=group_id, message=message)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用
async def
t_send_many(self, bot_id: int, group_id: int, messages):
319 async def t_send_many(self, bot_id: int, group_id: int, messages): 320 '''timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用''' 321 bot = self.t_check(bot_id, group_id) 322 if not bot: 323 return 324 325 # 分割长消息组(不可超过100条)谨慎起见,使用80作为单元长度 326 div_len = 80 327 div_cnt = ceil(len(messages) / div_len) 328 for i in range(div_cnt): 329 msgs = self.pack_messages( 330 bot_id, 331 messages[i*div_len: (i+1)*div_len] 332 ) 333 await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msgs)
timer触发回调时,想要发送消息必须使用该方法,一些上下文亦无法使用