from __future__ import annotations import json import re from dataclasses import dataclass from dataclasses import field as dataclass_field from typing import Any, List from src.logic.dataclass import BookData import loguru import sys log = loguru.logger log.remove() log.add(sys.stdout) log.add("logs/application.log", rotation="1 MB", retention="10 days") ###Pydatnic models @dataclass class Item: superlocation: str | None = dataclass_field(default_factory=str) status: str | None = dataclass_field(default_factory=str) availability: str | None = dataclass_field(default_factory=str) notes: str | None = dataclass_field(default_factory=str) limitation: str | None = dataclass_field(default_factory=str) duedate: str | None = dataclass_field(default_factory=str) id: str | None = dataclass_field(default_factory=str) item_id: str | None = dataclass_field(default_factory=str) ilslink: str | None = dataclass_field(default_factory=str) number: int | None = dataclass_field(default_factory=int) barcode: str | None = dataclass_field(default_factory=str) reserve: str | None = dataclass_field(default_factory=str) callnumber: str | None = dataclass_field(default_factory=str) department: str | None = dataclass_field(default_factory=str) locationhref: str | None = dataclass_field(default_factory=str) location: str | None = dataclass_field(default_factory=str) def from_dict(self, data: dict): """Import data from dict""" data = data["items"] for entry in data: for key, value in entry.items(): setattr(self, key, value) return self @dataclass class RDS_AVAIL_DATA: """Class to store RDS availability data""" library_sigil: str = dataclass_field(default_factory=str) items: List[Item] = dataclass_field(default_factory=list) def import_from_dict(self, data: str): """Import data from dict""" edata = json.loads(data) # library sigil is first key self.library_sigil = str(list(edata.keys())[0]) # get data from first key edata = edata[self.library_sigil] for location in edata: item = Item(superlocation=location).from_dict(edata[location]) self.items.append(item) return self @dataclass class RDS_DATA: """Class to store RDS data""" RDS_SIGNATURE: str = dataclass_field(default_factory=str) RDS_STATUS: str = dataclass_field(default_factory=str) RDS_LOCATION: str = dataclass_field(default_factory=str) RDS_URL: Any = dataclass_field(default_factory=str) RDS_HINT: Any = dataclass_field(default_factory=str) RDS_COMMENT: Any = dataclass_field(default_factory=str) RDS_HOLDING: Any = dataclass_field(default_factory=str) RDS_HOLDING_LEAK: Any = dataclass_field(default_factory=str) RDS_INTERN: Any = dataclass_field(default_factory=str) RDS_PROVENIENCE: Any = dataclass_field(default_factory=str) RDS_LOCAL_NOTATION: str = dataclass_field(default_factory=str) RDS_LEA: Any = dataclass_field(default_factory=str) def import_from_dict(self, data: dict) -> RDS_DATA: """Import data from dict""" for key, value in data.items(): setattr(self, key, value) return self @dataclass class RDS_GENERIC_DATA: LibrarySigil: str = dataclass_field(default_factory=str) RDS_DATA: List[RDS_DATA] = dataclass_field(default_factory=list) def import_from_dict(self, data: str) -> RDS_GENERIC_DATA: """Import data from dict""" edata = json.loads(data) # library sigil is first key self.LibrarySigil = str(list(edata.keys())[0]) # get data from first key edata = edata[self.LibrarySigil] for entry in edata: rds_data = RDS_DATA() # Create a new RDS_DATA instance # Populate the RDS_DATA instance from the entry # This assumes that the entry is a dictionary that matches the structure of the RDS_DATA class rds_data.import_from_dict(entry) self.RDS_DATA.append(rds_data) # Add the RDS_DATA instance to the list return self class BaseStruct: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) class ARRAYData: def __init__(self, signature=None) -> None: self.signature = None pass def transform(self, data: str) -> BookData: def _get_line(source: str, search: str) -> str: try: data = ( source.split(search)[1] .split("\n")[0] .strip() .replace("=>", "") .strip() ) return data except Exception: # # log.debug(f"ARRAYData.transform failed, {source}, {search}") log.exception(f"ARRAYData.transform failed, no string {search}") return "" def _get_list_entry(source: str, search: str, entry: str) -> str: try: source = source.replace("\t", "").replace("\r", "") source = source.split(search)[1].split(")")[0] return _get_line(source, entry).replace("=>", "").strip() except: return "" def _get_isbn(source: str) -> list: try: isbn = source.split("[isbn]")[1].split(")")[0].strip() isbn = isbn.split("(")[1] isbns = isbn.split("=>") ret = [] for _ in isbns: # remove _ from list isb = _.split("\n")[0].strip() if isb == "": continue ret.append(isb) if isb not in ret else None return ret except: isbn = [] return isbn def _get_signature(data): try: sig_data = ( data.split("[loksatz]")[1] .split("[0] => ")[1] .split("\n")[0] .strip() ) signature_data = eval(sig_data) return signature_data["signatur"] except Exception: return None def _get_author(data): try: array = data.split("[au_display_short]")[1].split(")\n")[0].strip() except Exception: return "" entries = array.split("\n") authors = [] hg_present = False verf_present = False lines = [] for entry in entries: if "=>" in entry: line = entry.split("=>")[1].strip() if "[HerausgeberIn]" in line: hg_present = True if "[VerfasserIn]" in line: verf_present = True lines.append(line) for line in lines: if hg_present and verf_present: if "[HerausgeberIn]" in line: authors.append(line.split("[")[0].strip()) elif verf_present: if "[VerfasserIn]" in line: authors.append(line.split("[")[0].strip()) else: pass return ";".join(authors) def _get_title(data): titledata = None title = "" if "[ti_long]" in data: titledata = data.split("[ti_long]")[1].split(")\n")[0].strip() title = titledata.split("=>")[1].strip().split("/")[0].strip() if "[ti_long_f]" in data: titledata = data.split("[ti_long_f]")[1].split(")\n")[0].strip() title = titledata.split("=>")[1].strip().split("/")[0].strip() return title def _get_adis_idn(data, signature): loksatz_match = re.search( r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL ) if loksatz_match: loksatz_content = loksatz_match.group(1) # Step 2: Extract JSON objects within the loksatz section json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL) # Print each JSON object for obj in json_objects: data = eval(obj) if data["signatur"] == signature: return data["adis_idn"] def _get_in_apparat(data): loksatz_match = re.search( r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL ) if loksatz_match: loksatz_content = loksatz_match.group(1) # Step 2: Extract JSON objects within the loksatz section json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL) # Print each JSON object for obj in json_objects: data = eval(obj) if data["ausleihcode"] == "R" and data["standort"] == "40": return True else: return False ppn = _get_line(data, "[kid]") title = _get_title(data).strip() author = _get_author(data) edition = _get_list_entry(data, "[ausgabe]", "[0]").replace(",", "") link = f"https://rds.ibs-bw.de/phfreiburg/link?kid={_get_line(data, '[kid]')}" isbn = _get_isbn(data) # [self._get_list_entry(data,"[isbn]","[0]"),self._get_list_entry(data,"[is]","[1]")], language = _get_list_entry(data, "[la_facet]", "[0]") publisher = _get_list_entry(data, "[pu]", "[0]") year = _get_list_entry(data, "[py_display]", "[0]") pages = _get_list_entry(data, "[umfang]", "[0]").split(":")[0].strip() signature = ( self.signature if self.signature is not None else _get_signature(data) ) place = _get_list_entry(data, "[pp]", "[0]") adis_idn = _get_adis_idn(data, signature=signature) in_apparat = _get_in_apparat(data) return BookData( ppn=ppn, title=title, author=author, edition=edition, link=link, isbn=isbn, language=language, publisher=publisher, year=year, pages=pages, signature=signature, place=place, adis_idn=adis_idn, in_apparat=in_apparat, ) class COinSData: def __init__(self) -> None: pass def transform(self, data: str) -> BookData: def _get_line(source: str, search: str) -> str: try: data = source.split(f"{search}=")[1] # .split("")[0].strip() return data.split("rft")[0].strip() if "rft" in data else data except: return "" return BookData( ppn=_get_line(data, "rft_id").split("=")[1], title=_get_line(data, "rft.btitle"), author=f"{_get_line(data, 'rft.aulast')}, {_get_line(data, 'rft.aufirst')}", edition=_get_line(data, "rft.edition"), link=_get_line(data, "rft_id"), isbn=_get_line(data, "rft.isbn"), publisher=_get_line(data, "rft.pub"), year=_get_line(data, "rft.date"), pages=_get_line(data, "rft.tpages").split(":")[0].strip(), ) class RISData: def __init__(self) -> None: pass def transform(self, data: str) -> BookData: def _get_line(source: str, search: str) -> str: try: data = source.split(f"{search} - ")[1] # .split("")[0].strip() return data.split("\n")[0].strip() if "\n" in data else data except: return "" return BookData( ppn=_get_line(data, "DP").split("=")[1], title=_get_line(data, "TI"), signature=_get_line(data, "CN"), edition=_get_line(data, "ET").replace(",", ""), link=_get_line(data, "DP"), isbn=_get_line(data, "SN").split(","), author=_get_line(data, "AU").split("[")[0].strip(), language=_get_line(data, "LA"), publisher=_get_line(data, "PB"), year=_get_line(data, "PY"), pages=_get_line(data, "SP"), ) class BibTeXData: def __init__(self): pass def transform(self, data: str) -> BookData: def _get_line(source: str, search: str) -> str: try: return ( data.split(search)[1] .split("\n")[0] .strip() .split("=")[1] .strip() .replace("{", "") .replace("}", "") .replace(",", "") .replace("[", "") .replace("];", "") ) except: return "" return BookData( ppn=None, title=_get_line(data, "title"), signature=_get_line(data, "bestand"), edition=_get_line(data, "edition"), isbn=_get_line(data, "isbn"), author=";".join(_get_line(data, "author").split(" and ")), language=_get_line(data, "language"), publisher=_get_line(data, "publisher"), year=_get_line(data, "year"), pages=_get_line(data, "pages"), ) class RDSData: def __init__(self): self.retlist = [] def transform(self, data: str): # rds_availability = RDS_AVAIL_DATA() # rds_data = RDS_GENERIC_DATA() def __get_raw_data(data: str) -> list: # create base data to be turned into pydantic classes data = data.split("RDS ----------------------------------")[1] edata = data.strip() edata = edata.split("\n", 9)[9] edata = edata.split("\n")[1:] entry_1 = edata[0] edata = edata[1:] entry_2 = "".join(edata) edata = [] edata.append(entry_1) edata.append(entry_2) return edata ret_data = __get_raw_data(data) # assign data[1] to RDS_AVAIL_DATA # assign data[0] to RDS_DATA self.rds_data = RDS_GENERIC_DATA().import_from_dict(ret_data[1]) self.rds_availability = RDS_AVAIL_DATA().import_from_dict(ret_data[0]) self.retlist.append(self.rds_availability) self.retlist.append(self.rds_data) return self def return_data(self, option=None): if option == "rds_availability": return self.retlist[0] elif option == "rds_data": return self.retlist[1] else: return {"rds_availability": self.retlist[0], "rds_data": self.retlist[1]} class DictToTable: def __init__(self): self.work_author = None self.section_author = None self.year = None self.edition = None self.work_title = None self.chapter_title = None self.location = None self.publisher = None self.signature = None self.type = None self.pages = None self.issue = None self.isbn = None def makeResult(self): data = { "work_author": self.work_author, "section_author": self.section_author, "year": self.year, "edition": self.edition, "work_title": self.work_title, "chapter_title": self.chapter_title, "location": self.location, "publisher": self.publisher, "signature": self.signature, "issue": self.issue, "pages": self.pages, "isbn": self.isbn, "type": self.type, } data = {k: v for k, v in data.items() if v is not None} return data def reset(self): for key in self.__dict__: setattr(self, key, None) def transform(self, data: dict): mode = data["mode"] self.reset() if mode == "book": return self.book_assign(data) elif mode == "hg": return self.hg_assign(data) elif mode == "zs": return self.zs_assign(data) else: return None def book_assign(self, data): self.type = "book" self.work_author = data["book_author"] self.signature = data["book_signature"] self.location = data["book_place"] self.year = data["book_year"] self.work_title = data["book_title"] self.edition = data["book_edition"] self.pages = data["book_pages"] self.publisher = data["book_publisher"] self.isbn = data["book_isbn"] return self.makeResult() def hg_assign(self, data): self.type = "hg" self.section_author = data["hg_author"] self.work_author = data["hg_editor"] self.year = data["hg_year"] self.work_title = data["hg_title"] self.publisher = data["hg_publisher"] self.location = data["hg_place"] self.edition = data["hg_edition"] self.chapter_title = data["hg_chaptertitle"] self.pages = data["hg_pages"] self.signature = data["hg_signature"] self.isbn = data["hg_isbn"] return self.makeResult() def zs_assign(self, data): self.type = "zs" self.section_author = data["zs_author"] self.chapter_title = data["zs_chapter_title"] self.location = data["zs_place"] self.issue = data["zs_issue"] self.pages = data["zs_pages"] self.publisher = data["zs_publisher"] self.isbn = data["zs_isbn"] self.year = data["zs_year"] self.signature = data["zs_signature"] self.work_title = data["zs_title"] return self.makeResult() if __name__ == "__main__": with open("daiadata", "r") as f: data = f.read() ret = RDSData().transform(data) data = ret.return_data("rds_availability") # log.debug(data)