Implement SQLAlchemy integration and create database schemas for local and remote modes; refactor KomCache class methods #3

Merged
WorldTeacher merged 1 commits from dev into main 2025-05-29 08:49:09 +01:00
4 changed files with 173 additions and 99 deletions

View File

@@ -9,6 +9,8 @@ authors = [
requires-python = ">=3.13"
dependencies = [
"komconfig",
"pymysql>=1.1.1",
"sqlalchemy[asyncio]>=2.0.41",
]
[build-system]

View File

@@ -1,7 +1,10 @@
import sqlite3
from komconfig import KomConfig, CONFIG_PATH
from pathlib import Path
import os
from typing import Any
from sqlalchemy import create_engine, Column, String, Integer, Date, text
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from komcache.schemas.sqlite import CREATE_SQLITE_TABLES
from komcache.schemas.mariadb import CREATE_MARIADB_TABLES
from komconfig import KomConfig
import loguru
log = loguru.logger
@@ -9,130 +12,156 @@ log.remove()
log.add("logs/cache.log", level="INFO", rotation="15MB", retention="1 week")
log.add("logs/cli.log", rotation="15MB", retention="1 week") # type:ignore
config = KomConfig()
Base = declarative_base()
class KomGrabber(Base):
__tablename__ = "komgrabber"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)
series_id = Column(String, nullable=False)
status = Column(String, nullable=False)
created_at = Column(Date, nullable=False)
updated_at = Column(Date, nullable=False)
last_checked = Column(Date, nullable=False)
complete = Column(Integer, nullable=False)
class KomCache:
def __init__(self, db_path: str = CONFIG_PATH):
self.db_path = Path(db_path, "komcache.db")
if "~" in str(self.db_path):
self.db_path = os.path.expanduser(str(self.db_path))
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
def __init__(self, db_path: str = ""): # Default to empty string if not provided
self.db_path = db_path or config.cache.path
log.debug(f"Cache path: {self.db_path}")
if config.cache.mode == "local":
self.db_path = db_path or config.cache.path
log.debug(f"Cache path: {self.db_path}")
self.engine = create_engine(f"sqlite:///{self.db_path}")
elif config.cache.mode == "remote":
db_url = (
config.cache.url
) # e.g., "mysql+pymysql://user:pass@host:3306/dbname"
log.debug(f"Using remote DB URL: {db_url}")
self.engine = create_engine(db_url)
def __enter__(self):
return self
self.Session = sessionmaker(bind=self.engine)
# if tables do not exist, create them
if config.cache.mode == "local":
if not self.query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='komgrabber'"
):
self.create_table()
elif config.cache.mode == "remote":
if not self.query("SHOW TABLES LIKE 'komgrabber'"):
self.create_table()
def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
pass
def create_table(self):
"""Ensure all tables are created in the database."""
if config.cache.mode == "local":
log.debug("Creating SQLite tables")
with self.engine.begin() as connection:
log.debug(f"DB engine URL: {self.engine.url}")
def create_table(self, table_query: str) -> None:
"""
Create a table in the database.
for table_sql in CREATE_SQLITE_TABLES:
connection.execute(text(table_sql))
elif config.cache.mode == "remote":
log.debug("Creating MariaDB tables")
with self.engine.begin() as connection:
log.debug(f"DB engine URL: {self.engine.url}")
Parameters
----------
table_query : str
the SQL query to create the table.
"""
self.cursor.execute(table_query)
self.conn.commit()
for table_sql in CREATE_MARIADB_TABLES:
log.debug(f"Executing table creation SQL: {table_sql}")
try:
connection.execute(text(table_sql))
except Exception as e:
log.exception(
f"Failed to execute table creation SQL:\n{table_sql}"
)
log.exception(f"Error: {e}")
# create the KomGrabber table using SQLAlchemy ORM
Base.metadata.create_all(self.engine)
def delete_table(self, table_name: str) -> bool:
try:
self.cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
self.conn.commit()
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
return True
except sqlite3.Error as e:
print(f"Error deleting table {table_name}: {e}")
except SQLAlchemyError as e:
log.error(f"Error deleting table {table_name}: {e}")
return False
def query_table(self, table: str, query: str) -> list:
def query(self, query: str, args: dict[str, Any] = None):
if args is None:
args = {}
try:
self.cursor.execute(f"SELECT {query} FROM {table}")
return self.cursor.fetchall()
except sqlite3.Error as e:
print(f"Error querying table {table}: {e}")
session = self.Session()
result = session.execute(text(query), args).fetchall()
session.close()
return result
except SQLAlchemyError as e:
log.error(f"Error executing query: {e}")
return []
def query(self, query, args=None):
if args is None:
args = []
def insert(self, query: str, args: dict[str, Any]) -> bool:
try:
self.cursor.execute(query, args)
data = self.cursor.fetchall()
if data:
return data
else:
return None
except sqlite3.Error as e:
print(f"Error executing query: {e}")
return []
def insert(self, query: str, args=None):
if args is None:
args = []
try:
self.cursor.execute(query, args)
self.conn.commit()
session = self.Session()
session.execute(text(query), args)
session.commit()
session.close()
return True
except sqlite3.Error as e:
print(f"Error inserting data: {e}")
except SQLAlchemyError as e:
log.error(f"Error inserting data: {e}")
return False
def update(self, query, args=None):
if args is None:
args = []
def update(self, query: str, args: dict[str, Any]) -> bool:
try:
self.cursor.execute(query, args)
log.debug("Query: {}, Args: {}".format(query, args))
self.conn.commit()
session = self.Session()
session.execute(text(query), args)
session.commit()
session.close()
return True
except sqlite3.Error as e:
print(f"Error updating data: {e}")
except SQLAlchemyError as e:
log.error(f"Error updating data: {e}")
return False
def get_last_update_date(self, series_name: str) -> str:
"""
Get the last update date for a series.
Parameters
----------
series_name : str
The name of the series.
Returns
-------
str
The last update date.
"""
query = "SELECT last_update_date FROM komgrabber WHERE series_name = ?"
result = self.query(query, (series_name,))
if result:
return result[0][0]
return ""
def fetch_one(self, query: str, args=None):
if args is None:
args = []
try:
self.cursor.execute(query, args)
return self.cursor.fetchone()
except sqlite3.Error as e:
print(f"Error fetching one: {e}")
session = self.Session()
result = (
session.query(KomGrabber.last_update_date)
.filter_by(series_name=series_name)
.first()
)
session.close()
return result[0] if result else ""
except SQLAlchemyError as e:
log.error(f"Error fetching last update date: {e}")
return ""
def fetch_one(self, query: str, args: dict[str, Any] = None):
if args is None:
args = {}
try:
session = self.Session()
result = session.execute(text(query), args).fetchone()
session.close()
return result
except SQLAlchemyError as e:
log.error(f"Error executing query: {e}")
return None
def fetch_all(self, query: str, args=None):
def fetch_all(self, query: str, args: dict[str, Any] | None = None):
if args is None:
args = []
args = {}
try:
self.cursor.execute(query, args)
return self.cursor.fetchall()
except sqlite3.Error as e:
print(f"Error fetching one: {e}")
return None
session = self.Session()
result = session.execute(text(query), args).fetchall()
session.close()
return result
except SQLAlchemyError as e:
log.error(f"Error executing query: {e}")
return []
if __name__ == "__main__":
# Example usage
with KomCache() as cache:
cache.delete_table("komgrabber")
cache = KomCache()
cache.create_table()

View File

@@ -0,0 +1,23 @@
CREATE_MARIADB_TABLES = ["""
CREATE TABLE IF NOT EXISTS manga_requests (
id INT AUTO_INCREMENT PRIMARY KEY,
manga_id INT,
grabbed TINYINT(1) DEFAULT 0
);""","""CREATE TABLE IF NOT EXISTS komgrabber (
id INT AUTO_INCREMENT PRIMARY KEY,
name TEXT NOT NULL,
series_id TEXT NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_checked TIMESTAMP DEFAULT NULL,
completed TINYINT(1) DEFAULT 0
);""","""CREATE TABLE IF NOT EXISTS komtagger (
id INT AUTO_INCREMENT PRIMARY KEY,
series_id TEXT NOT NULL,
title TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_checked TIMESTAMP DEFAULT NULL,
status TEXT NOT NULL
);
"""]

View File

@@ -0,0 +1,20 @@
CREATE_SQLITE_TABLES = ["""
CREATE TABLE IF NOT EXISTS manga_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
manga_id INTEGER,
grabbed BOOLEAN DEFAULT 0);""","""CREATE TABLE IF NOT EXISTS komgrabber (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
series_id TEXT NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_checked TIMESTAMP DEFAULT 0,
completed BOOLEAN DEFAULT 0);""","""CREATE TABLE IF NOT EXISTS komtagger (
id INTEGER PRIMARY KEY AUTOINCREMENT,
series_id TEXT NOT NULL,
title TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_checked TIMESTAMP DEFAULT 0,
status TEXT NOT NULL);
"""]