ayaka.depend.sql
1import os 2import sqlite3 3from typing import TYPE_CHECKING, List, Type 4 5from loguru import logger 6from ..config import ayaka_data_path, ayaka_root_config 7 8if TYPE_CHECKING: 9 from .db import AyakaDB 10 11PrimaryKey = {"primary": True} 12JsonKey = {"json": True} 13 14path = ayaka_data_path / "ayaka.db" 15journal_path = ayaka_data_path / "ayaka.db-journal" 16old_journal_path = ayaka_data_path / "ayaka.db-journal-old" 17db = sqlite3.connect(path) 18 19 20def execute(query, values=None): 21 if ayaka_root_config.debug: 22 print(query) 23 if values: 24 print(values) 25 if values is None: 26 cursor = db.execute(query) 27 else: 28 cursor = db.execute(query, values) 29 cursor.close() 30 31 32def executemany(query, values=None): 33 if ayaka_root_config.debug: 34 print(query) 35 if values: 36 print(values) 37 if values is None: 38 cursor = db.executemany(query) 39 else: 40 cursor = db.executemany(query, values) 41 cursor.close() 42 43 44def fetchall(query: str): 45 cursor = db.execute(query) 46 values = cursor.fetchall() 47 cursor.close() 48 if ayaka_root_config.debug: 49 print(query) 50 print(values) 51 return values 52 53 54def create_table(name: str, cls: Type["AyakaDB"]): 55 props = cls.props() 56 args = [] 57 primarys = [] 58 59 for k, v in props.items(): 60 extra: dict = v.get("extra", {}) 61 if extra.get("primary"): 62 primarys.append(k) 63 if extra.get("json"): 64 args.append(f"{k} text") 65 else: 66 args.append(f"{k} {v['type']}") 67 68 if primarys: 69 primarys_str = ",".join(f"\"{k}\"" for k in primarys) 70 args.append(f"PRIMARY KEY({primarys_str})") 71 72 args_str = ",\n".join(args) 73 query = f"create table if not exists \"{name}\" ({args_str})" 74 75 execute(query) 76 77 78def insert_or_replace(name: str, data: "AyakaDB", action: str): 79 keys = list(data.dict().keys()) 80 values = list(data.dict().values()) 81 keys_str = ",".join(keys) 82 values_str = ("(?),"*len(keys))[:-1] 83 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 84 execute(query, values) 85 86 87def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str): 88 data = datas[0] 89 keys = list(data.dict().keys()) 90 values = [[getattr(data, k) for k in keys] for data in datas] 91 keys_str = ",".join(keys) 92 values_str = ("(?),"*len(keys))[:-1] 93 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 94 executemany(query, values) 95 96 97def select_many(name: str, cls: Type["AyakaDB"], where: str = ""): 98 props = cls.props() 99 keys = list(props.keys()) 100 if where: 101 where = f"where {where}" 102 103 keys_str = ",".join(keys) 104 query = f"select {keys_str} from \"{name}\" {where}" 105 values = fetchall(query) 106 107 # 组装为字典 108 datas = [ 109 cls._create_by_db_data({k: v for k, v in zip(keys, vs)}) 110 for vs in values 111 ] 112 return datas 113 114 115def drop_table(name: str): 116 query = f"drop table if exists \"{name}\"" 117 execute(query) 118 119 120def commit(): 121 if old_journal_path.exists(): 122 os.remove(old_journal_path) 123 logger.debug("已删除旧db-journal-old文件") 124 if journal_path.exists(): 125 if ayaka_root_config.debug: 126 print("commit") 127 logger.debug("更新数据库") 128 db.commit() 129 130 # 提交失败,直接删除 131 if journal_path.exists(): 132 logger.debug("更新数据库失败,已将journal文件更名为db-journal-old") 133 journal_path.rename(old_journal_path) 134 135 136def wrap(v): 137 if isinstance(v, str): 138 return f"\"{v}\"" 139 return str(v)
def
execute(query, values=None):
def
executemany(query, values=None):
def
fetchall(query: str):
55def create_table(name: str, cls: Type["AyakaDB"]): 56 props = cls.props() 57 args = [] 58 primarys = [] 59 60 for k, v in props.items(): 61 extra: dict = v.get("extra", {}) 62 if extra.get("primary"): 63 primarys.append(k) 64 if extra.get("json"): 65 args.append(f"{k} text") 66 else: 67 args.append(f"{k} {v['type']}") 68 69 if primarys: 70 primarys_str = ",".join(f"\"{k}\"" for k in primarys) 71 args.append(f"PRIMARY KEY({primarys_str})") 72 73 args_str = ",\n".join(args) 74 query = f"create table if not exists \"{name}\" ({args_str})" 75 76 execute(query)
79def insert_or_replace(name: str, data: "AyakaDB", action: str): 80 keys = list(data.dict().keys()) 81 values = list(data.dict().values()) 82 keys_str = ",".join(keys) 83 values_str = ("(?),"*len(keys))[:-1] 84 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 85 execute(query, values)
88def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str): 89 data = datas[0] 90 keys = list(data.dict().keys()) 91 values = [[getattr(data, k) for k in keys] for data in datas] 92 keys_str = ",".join(keys) 93 values_str = ("(?),"*len(keys))[:-1] 94 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 95 executemany(query, values)
98def select_many(name: str, cls: Type["AyakaDB"], where: str = ""): 99 props = cls.props() 100 keys = list(props.keys()) 101 if where: 102 where = f"where {where}" 103 104 keys_str = ",".join(keys) 105 query = f"select {keys_str} from \"{name}\" {where}" 106 values = fetchall(query) 107 108 # 组装为字典 109 datas = [ 110 cls._create_by_db_data({k: v for k, v in zip(keys, vs)}) 111 for vs in values 112 ] 113 return datas
def
drop_table(name: str):
def
commit():
121def commit(): 122 if old_journal_path.exists(): 123 os.remove(old_journal_path) 124 logger.debug("已删除旧db-journal-old文件") 125 if journal_path.exists(): 126 if ayaka_root_config.debug: 127 print("commit") 128 logger.debug("更新数据库") 129 db.commit() 130 131 # 提交失败,直接删除 132 if journal_path.exists(): 133 logger.debug("更新数据库失败,已将journal文件更名为db-journal-old") 134 journal_path.rename(old_journal_path)
def
wrap(v):