initial commit

This commit is contained in:
2025-02-17 20:28:17 +01:00
commit e685c7b930
53 changed files with 3285 additions and 0 deletions

View File

@@ -0,0 +1,92 @@
from .baseapi import BaseAPI
import pathlib
import subprocess
import requests
import typing_extensions
from typing import List, Optional, Dict, Any, Union
from komgapi.errors import KomgaError, LoginError, ResultErrror
from komgapi.schemas import * # Progress, Series
class UserController(BaseAPI):
def __init__(self, username, password, url, timeout=20, api_version="2") -> None:
super().__init__(username, password, url, timeout, api_version="2")
def getUsers(self) -> List[User]:
url = self.url + "users"
data = self.getRequest(url)
return [User(**user) for user in data]
def createUser(self, user: CreateUser) -> User:
url = self.url + "users"
data = self.postRequest(url, user.model_dump())
return User(**data)
def deleteUser(self, user_id: str) -> None:
url = self.url + f"users/{user_id}"
self.deleteRequest(url)
def updateUser(self, user_id: str, user: User) -> None:
userData = user.model_dump()
# remove id, email and password
userData.pop("id")
userData.pop("email")
userData.pop("password")
url = self.url + f"users/{user_id}"
data = self.patchRequest(url, user)
def getLatestAuthenticationActivity(self, user_id, api_key):
raise NotImplementedError
def changePassword(self, user_id: str, password: str) -> None:
url = self.url + f"users/{user_id}/password"
self.patchRequest(url, {"password": password})
def getUserAuthenticationActivity(
self,
unpaged: bool = None,
page: int = 0,
size: int = 20,
sort: List[str] = None,
) -> Authentication:
url = self.url + "users/authentification-activity"
params = self.setParams(locals())
data = self.getRequest(url, params)
return Authentication(**data)
def getMe(self) -> User | Violation:
url = self.url + "users/me"
data = self.getRequest(url)
return User(**data) if "id" in data else Violation(**data)
def getMyAPIKeys(self) -> List[APIKey] | Violation:
url = self.url + "users/me/api-keys"
data = self.getRequest(url)
return [APIKey(**data) for key in data] if "id" in data else []
def createAPIKey(self, key: str) -> APIKey:
url = self.url + "users/me/api-keys"
data = self.postRequest(url, f'"comment":{key}')
return APIKey(**data)
def deleteAPIKey(self, key_id: str) -> None:
url = self.url + f"users/me/api-keys/{key_id}"
self.deleteRequest(url)
def getMyAuthenticationActivity(
self,
unpaged: bool = None,
page: int = 0,
size: int = 20,
sort: List[str] = None,
) -> Authentication:
raise NotImplementedError
url = self.url + "users/me/authentication-activity"
params = self.setParams(locals())
data = self.getRequest(url, params)
print(data)
return UserAuthActivity(**data)
def changeMyPassword(self, password: str) -> None:
url = self.url + "users/me/password"
self.patchRequest(url, {"password": password})