ayaka.depend.sql
1import sqlite3 2from typing import TYPE_CHECKING, List, Type 3 4from loguru import logger 5from ..config import ayaka_data_path, ayaka_root_config 6 7if TYPE_CHECKING: 8 from .db import AyakaDB 9 10PrimaryKey = {"primary": True} 11JsonKey = {"json": True} 12 13path = ayaka_data_path / "ayaka.db" 14journal_path = ayaka_data_path / "ayaka.db-journal" 15db = sqlite3.connect(path) 16 17 18def execute(query, values=None): 19 if ayaka_root_config.debug: 20 print(query) 21 if values: 22 print(values) 23 if values is None: 24 cursor = db.execute(query) 25 else: 26 cursor = db.execute(query, values) 27 cursor.close() 28 29 30def executemany(query, values=None): 31 if ayaka_root_config.debug: 32 print(query) 33 if values: 34 print(values) 35 if values is None: 36 cursor = db.executemany(query) 37 else: 38 cursor = db.executemany(query, values) 39 cursor.close() 40 41 42def fetchall(query: str): 43 cursor = db.execute(query) 44 values = cursor.fetchall() 45 cursor.close() 46 if ayaka_root_config.debug: 47 print(query) 48 print(values) 49 return values 50 51 52def create_table(name: str, cls: Type["AyakaDB"]): 53 props = cls.props() 54 args = [] 55 primarys = [] 56 57 for k, v in props.items(): 58 extra: dict = v.get("extra", {}) 59 if extra.get("primary"): 60 primarys.append(k) 61 if extra.get("json"): 62 args.append(f"{k} text") 63 else: 64 args.append(f"{k} {v['type']}") 65 66 if primarys: 67 primarys_str = ",".join(f"\"{k}\"" for k in primarys) 68 args.append(f"PRIMARY KEY({primarys_str})") 69 70 args_str = ",\n".join(args) 71 query = f"create table if not exists \"{name}\" ({args_str})" 72 73 execute(query) 74 75 76def insert_or_replace(name: str, data: "AyakaDB", action: str): 77 keys = list(data.dict().keys()) 78 values = list(data.dict().values()) 79 keys_str = ",".join(keys) 80 values_str = ("(?),"*len(keys))[:-1] 81 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 82 execute(query, values) 83 84 85def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str): 86 data = datas[0] 87 keys = list(data.dict().keys()) 88 values = [[getattr(data, k) for k in keys] for data in datas] 89 keys_str = ",".join(keys) 90 values_str = ("(?),"*len(keys))[:-1] 91 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 92 executemany(query, values) 93 94 95def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"): 96 props = cls.props() 97 keys = list(props.keys()) 98 99 # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧 100 keys_str = ",".join(keys) 101 query = f"select {keys_str} from \"{name}\" where {where}" 102 values = fetchall(query) 103 104 # 组装为字典 105 datas = [ 106 cls._create_by_db_data({k: v for k, v in zip(keys, vs)}) 107 for vs in values 108 ] 109 return datas 110 111 112def drop_table(name: str): 113 query = f"drop table if exists \"{name}\"" 114 execute(query) 115 116 117def commit(): 118 if journal_path.exists(): 119 if ayaka_root_config.debug: 120 print("commit") 121 logger.debug("更新数据库") 122 db.commit() 123 124 125def wrap(v): 126 if isinstance(v, str): 127 return f"\"{v}\"" 128 return str(v)
def
execute(query, values=None):
def
executemany(query, values=None):
def
fetchall(query: str):
53def create_table(name: str, cls: Type["AyakaDB"]): 54 props = cls.props() 55 args = [] 56 primarys = [] 57 58 for k, v in props.items(): 59 extra: dict = v.get("extra", {}) 60 if extra.get("primary"): 61 primarys.append(k) 62 if extra.get("json"): 63 args.append(f"{k} text") 64 else: 65 args.append(f"{k} {v['type']}") 66 67 if primarys: 68 primarys_str = ",".join(f"\"{k}\"" for k in primarys) 69 args.append(f"PRIMARY KEY({primarys_str})") 70 71 args_str = ",\n".join(args) 72 query = f"create table if not exists \"{name}\" ({args_str})" 73 74 execute(query)
77def insert_or_replace(name: str, data: "AyakaDB", action: str): 78 keys = list(data.dict().keys()) 79 values = list(data.dict().values()) 80 keys_str = ",".join(keys) 81 values_str = ("(?),"*len(keys))[:-1] 82 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 83 execute(query, values)
86def insert_or_replace_many(name: str, datas: List["AyakaDB"], action: str): 87 data = datas[0] 88 keys = list(data.dict().keys()) 89 values = [[getattr(data, k) for k in keys] for data in datas] 90 keys_str = ",".join(keys) 91 values_str = ("(?),"*len(keys))[:-1] 92 query = f"{action} into \"{name}\" ({keys_str}) values ({values_str})" 93 executemany(query, values)
96def select_many(name: str, cls: Type["AyakaDB"], where: str = "1"): 97 props = cls.props() 98 keys = list(props.keys()) 99 100 # 本来想用*,不过为了保险起见(后续更新的兼容性),还是老老实实写key吧 101 keys_str = ",".join(keys) 102 query = f"select {keys_str} from \"{name}\" where {where}" 103 values = fetchall(query) 104 105 # 组装为字典 106 datas = [ 107 cls._create_by_db_data({k: v for k, v in zip(keys, vs)}) 108 for vs in values 109 ] 110 return datas
def
drop_table(name: str):
def
commit():
def
wrap(v):