from typing import Any from sqlalchemy import create_engine, Column, String, Integer, Date, text from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.exc import SQLAlchemyError from komcache.schemas.sqlite import CREATE_SQLITE_TABLES from komcache.schemas.mariadb import CREATE_MARIADB_TABLES from komconfig import KomConfig import loguru log = loguru.logger log.remove() log.add("logs/cache.log", level="INFO", rotation="15MB", retention="1 week") log.add("logs/cli.log", rotation="15MB", retention="1 week") # type:ignore config = KomConfig() Base = declarative_base() class KomGrabber(Base): __tablename__ = "komgrabber" id = Column(Integer, primary_key=True) name = Column(String, unique=True, nullable=False) series_id = Column(String, nullable=False) status = Column(String, nullable=False) created_at = Column(Date, nullable=False) updated_at = Column(Date, nullable=False) last_checked = Column(Date, nullable=False) complete = Column(Integer, nullable=False) class KomCache: def __init__(self, db_path: str = ""): # Default to empty string if not provided self.db_path = db_path or config.cache.path log.debug(f"Cache path: {self.db_path}") if config.cache.mode == "local": self.db_path = db_path or config.cache.path log.debug(f"Cache path: {self.db_path}") self.engine = create_engine(f"sqlite:///{self.db_path}") elif config.cache.mode == "remote": db_url = ( config.cache.url ) # e.g., "mysql+pymysql://user:pass@host:3306/dbname" log.debug(f"Using remote DB URL: {db_url}") self.engine = create_engine(db_url) self.Session = sessionmaker(bind=self.engine) # if tables do not exist, create them if config.cache.mode == "local": if not self.query( "SELECT name FROM sqlite_master WHERE type='table' AND name='komgrabber'" ): self.create_table() elif config.cache.mode == "remote": if not self.query("SHOW TABLES LIKE 'komgrabber'"): self.create_table() def create_table(self): """Ensure all tables are created in the database.""" if config.cache.mode == "local": log.debug("Creating SQLite tables") with self.engine.begin() as connection: log.debug(f"DB engine URL: {self.engine.url}") for table_sql in CREATE_SQLITE_TABLES: connection.execute(text(table_sql)) elif config.cache.mode == "remote": log.debug("Creating MariaDB tables") with self.engine.begin() as connection: log.debug(f"DB engine URL: {self.engine.url}") for table_sql in CREATE_MARIADB_TABLES: log.debug(f"Executing table creation SQL: {table_sql}") try: connection.execute(text(table_sql)) except Exception as e: log.exception( f"Failed to execute table creation SQL:\n{table_sql}" ) log.exception(f"Error: {e}") # create the KomGrabber table using SQLAlchemy ORM Base.metadata.create_all(self.engine) def delete_table(self, table_name: str) -> bool: try: with self.engine.connect() as connection: connection.execute(text(f"DROP TABLE IF EXISTS {table_name}")) return True except SQLAlchemyError as e: log.error(f"Error deleting table {table_name}: {e}") return False def query(self, query: str, args: dict[str, Any] = None): if args is None: args = {} try: session = self.Session() result = session.execute(text(query), args).fetchall() session.close() return result except SQLAlchemyError as e: log.error(f"Error executing query: {e}") return [] def insert(self, query: str, args: dict[str, Any]) -> bool: try: session = self.Session() session.execute(text(query), args) session.commit() session.close() return True except SQLAlchemyError as e: log.error(f"Error inserting data: {e}") return False def update(self, query: str, args: dict[str, Any]) -> bool: try: session = self.Session() session.execute(text(query), args) session.commit() session.close() return True except SQLAlchemyError as e: log.error(f"Error updating data: {e}") return False def get_last_update_date(self, series_name: str) -> str: try: session = self.Session() result = ( session.query(KomGrabber.last_update_date) .filter_by(series_name=series_name) .first() ) session.close() return result[0] if result else "" except SQLAlchemyError as e: log.error(f"Error fetching last update date: {e}") return "" def fetch_one(self, query: str, args: dict[str, Any] = None): if args is None: args = {} try: session = self.Session() result = session.execute(text(query), args).fetchone() session.close() return result except SQLAlchemyError as e: log.error(f"Error executing query: {e}") return None def fetch_all(self, query: str, args: dict[str, Any] | None = None): if args is None: args = {} try: session = self.Session() result = session.execute(text(query), args).fetchall() session.close() return result except SQLAlchemyError as e: log.error(f"Error executing query: {e}") return [] if __name__ == "__main__": cache = KomCache() cache.create_table()