Files
SemesterapparatsManager/src/transformers/transformers.py

516 lines
18 KiB
Python

from __future__ import annotations
import json
import re
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any, List
from src.shared.logging import log
from src.core.models import BookData
# use centralized logging from src.shared.logging
###Pydatnic models
@dataclass
class Item:
superlocation: str | None = dataclass_field(default_factory=str)
status: str | None = dataclass_field(default_factory=str)
availability: str | None = dataclass_field(default_factory=str)
notes: str | None = dataclass_field(default_factory=str)
limitation: str | None = dataclass_field(default_factory=str)
duedate: str | None = dataclass_field(default_factory=str)
id: str | None = dataclass_field(default_factory=str)
item_id: str | None = dataclass_field(default_factory=str)
ilslink: str | None = dataclass_field(default_factory=str)
number: int | None = dataclass_field(default_factory=int)
barcode: str | None = dataclass_field(default_factory=str)
reserve: str | None = dataclass_field(default_factory=str)
callnumber: str | None = dataclass_field(default_factory=str)
department: str | None = dataclass_field(default_factory=str)
locationhref: str | None = dataclass_field(default_factory=str)
location: str | None = dataclass_field(default_factory=str)
ktrl_nr: str | None = dataclass_field(default_factory=str)
def from_dict(self, data: dict):
"""Import data from dict"""
data = data["items"]
for entry in data:
for key, value in entry.items():
setattr(self, key, value)
return self
@dataclass
class RDS_AVAIL_DATA:
"""Class to store RDS availability data"""
library_sigil: str = dataclass_field(default_factory=str)
items: List[Item] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str):
"""Import data from dict"""
edata = json.loads(data)
# library sigil is first key
self.library_sigil = str(list(edata.keys())[0])
# get data from first key
edata = edata[self.library_sigil]
for location in edata:
item = Item(superlocation=location).from_dict(edata[location])
self.items.append(item)
return self
@dataclass
class RDS_DATA:
"""Class to store RDS data"""
RDS_SIGNATURE: str = dataclass_field(default_factory=str)
RDS_STATUS: str = dataclass_field(default_factory=str)
RDS_LOCATION: str = dataclass_field(default_factory=str)
RDS_URL: Any = dataclass_field(default_factory=str)
RDS_HINT: Any = dataclass_field(default_factory=str)
RDS_COMMENT: Any = dataclass_field(default_factory=str)
RDS_HOLDING: Any = dataclass_field(default_factory=str)
RDS_HOLDING_LEAK: Any = dataclass_field(default_factory=str)
RDS_INTERN: Any = dataclass_field(default_factory=str)
RDS_PROVENIENCE: Any = dataclass_field(default_factory=str)
RDS_LOCAL_NOTATION: str = dataclass_field(default_factory=str)
RDS_LEA: Any = dataclass_field(default_factory=str)
def import_from_dict(self, data: dict) -> RDS_DATA:
"""Import data from dict"""
for key, value in data.items():
setattr(self, key, value)
return self
@dataclass
class RDS_GENERIC_DATA:
LibrarySigil: str = dataclass_field(default_factory=str)
RDS_DATA: List[RDS_DATA] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str) -> RDS_GENERIC_DATA:
"""Import data from dict"""
edata = json.loads(data)
# library sigil is first key
self.LibrarySigil = str(list(edata.keys())[0])
# get data from first key
edata = edata[self.LibrarySigil]
for entry in edata:
rds_data = RDS_DATA() # Create a new RDS_DATA instance
# Populate the RDS_DATA instance from the entry
# This assumes that the entry is a dictionary that matches the structure of the RDS_DATA class
rds_data.import_from_dict(entry)
self.RDS_DATA.append(rds_data) # Add the RDS_DATA instance to the list
return self
class BaseStruct:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class ARRAYData:
def __init__(self, signature=None) -> None:
self.signature = None
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = (
source.split(search)[1]
.split("\n")[0]
.strip()
.replace("=>", "")
.strip()
)
return data
except Exception:
# # log.debug(f"ARRAYData.transform failed, {source}, {search}")
log.exception(f"ARRAYData.transform failed, no string {search}")
return ""
def _get_list_entry(source: str, search: str, entry: str) -> str:
try:
source = source.replace("\t", "").replace("\r", "")
source = source.split(search)[1].split(")")[0]
return _get_line(source, entry).replace("=>", "").strip()
except:
return ""
def _get_isbn(source: str) -> list:
try:
isbn = source.split("[isbn]")[1].split(")")[0].strip()
isbn = isbn.split("(")[1]
isbns = isbn.split("=>")
ret = []
for _ in isbns:
# remove _ from list
isb = _.split("\n")[0].strip()
if isb == "":
continue
ret.append(isb) if isb not in ret else None
return ret
except:
isbn = []
return isbn
def _get_signature(data):
try:
sig_data = (
data.split("[loksatz]")[1]
.split("[0] => ")[1]
.split("\n")[0]
.strip()
)
signature_data = eval(sig_data)
return signature_data["signatur"]
except Exception:
return None
def _get_author(data):
try:
array = data.split("[au_display_short]")[1].split(")\n")[0].strip()
except Exception:
return ""
entries = array.split("\n")
authors = []
hg_present = False
verf_present = False
lines = []
for entry in entries:
if "=>" in entry:
line = entry.split("=>")[1].strip()
if "[HerausgeberIn]" in line:
hg_present = True
if "[VerfasserIn]" in line:
verf_present = True
lines.append(line)
for line in lines:
if hg_present and verf_present:
if "[HerausgeberIn]" in line:
authors.append(line.split("[")[0].strip())
elif verf_present:
if "[VerfasserIn]" in line:
authors.append(line.split("[")[0].strip())
else:
pass
return ";".join(authors)
def _get_title(data):
titledata = None
title = ""
if "[ti_long]" in data:
titledata = data.split("[ti_long]")[1].split(")\n")[0].strip()
title = titledata.split("=>")[1].strip().split("/")[0].strip()
if "[ti_long_f]" in data:
titledata = data.split("[ti_long_f]")[1].split(")\n")[0].strip()
title = titledata.split("=>")[1].strip().split("/")[0].strip()
return title
def _get_adis_idn(data, signature):
loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)",
data,
re.DOTALL,
)
if loksatz_match:
loksatz_content = loksatz_match.group(1)
# Step 2: Extract JSON objects within the loksatz section
json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL)
# Print each JSON object
for obj in json_objects:
data = eval(obj)
if data["signatur"] == signature:
return data["adis_idn"]
def _get_in_apparat(data):
loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)",
data,
re.DOTALL,
)
if loksatz_match:
loksatz_content = loksatz_match.group(1)
# Step 2: Extract JSON objects within the loksatz section
json_objects = re.findall(r"{.*?}", loksatz_content, re.DOTALL)
# Print each JSON object
for obj in json_objects:
data = eval(obj)
if data["ausleihcode"] == "R" and data["standort"] == "40":
return True
return False
ppn = _get_line(data, "[kid]")
title = _get_title(data).strip()
author = _get_author(data)
edition = _get_list_entry(data, "[ausgabe]", "[0]").replace(",", "")
link = f"https://rds.ibs-bw.de/phfreiburg/link?kid={_get_line(data, '[kid]')}"
isbn = _get_isbn(data)
# [self._get_list_entry(data,"[isbn]","[0]"),self._get_list_entry(data,"[is]","[1]")],
language = _get_list_entry(data, "[la_facet]", "[0]")
publisher = _get_list_entry(data, "[pu]", "[0]")
year = _get_list_entry(data, "[py_display]", "[0]")
pages = _get_list_entry(data, "[umfang]", "[0]").split(":")[0].strip()
signature = (
self.signature if self.signature is not None else _get_signature(data)
)
place = _get_list_entry(data, "[pp]", "[0]")
adis_idn = _get_adis_idn(data, signature=signature)
in_apparat = _get_in_apparat(data)
return BookData(
ppn=ppn,
title=title,
author=author,
edition=edition,
link=link,
isbn=isbn,
language=language,
publisher=publisher,
year=year,
pages=pages,
signature=signature,
place=place,
adis_idn=adis_idn,
in_apparat=in_apparat,
)
class COinSData:
def __init__(self) -> None:
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = source.split(f"{search}=")[1] # .split("")[0].strip()
return data.split("rft")[0].strip() if "rft" in data else data
except:
return ""
return BookData(
ppn=_get_line(data, "rft_id").split("=")[1],
title=_get_line(data, "rft.btitle"),
author=f"{_get_line(data, 'rft.aulast')}, {_get_line(data, 'rft.aufirst')}",
edition=_get_line(data, "rft.edition"),
link=_get_line(data, "rft_id"),
isbn=_get_line(data, "rft.isbn"),
publisher=_get_line(data, "rft.pub"),
year=_get_line(data, "rft.date"),
pages=_get_line(data, "rft.tpages").split(":")[0].strip(),
)
class RISData:
def __init__(self) -> None:
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
data = source.split(f"{search} - ")[1] # .split("")[0].strip()
return data.split("\n")[0].strip() if "\n" in data else data
except:
return ""
return BookData(
ppn=_get_line(data, "DP").split("=")[1],
title=_get_line(data, "TI"),
signature=_get_line(data, "CN"),
edition=_get_line(data, "ET").replace(",", ""),
link=_get_line(data, "DP"),
isbn=_get_line(data, "SN").split(","),
author=_get_line(data, "AU").split("[")[0].strip(),
language=_get_line(data, "LA"),
publisher=_get_line(data, "PB"),
year=_get_line(data, "PY"),
pages=_get_line(data, "SP"),
)
class BibTeXData:
def __init__(self):
pass
def transform(self, data: str) -> BookData:
def _get_line(source: str, search: str) -> str:
try:
return (
data.split(search)[1]
.split("\n")[0]
.strip()
.split("=")[1]
.strip()
.replace("{", "")
.replace("}", "")
.replace(",", "")
.replace("[", "")
.replace("];", "")
)
except:
return ""
return BookData(
ppn=None,
title=_get_line(data, "title"),
signature=_get_line(data, "bestand"),
edition=_get_line(data, "edition"),
isbn=_get_line(data, "isbn"),
author=";".join(_get_line(data, "author").split(" and ")),
language=_get_line(data, "language"),
publisher=_get_line(data, "publisher"),
year=_get_line(data, "year"),
pages=_get_line(data, "pages"),
)
class RDSData:
def __init__(self):
self.retlist = []
def transform(self, data: str):
# rds_availability = RDS_AVAIL_DATA()
# rds_data = RDS_GENERIC_DATA()
def __get_raw_data(data: str) -> list:
# create base data to be turned into pydantic classes
data = data.split("RDS ----------------------------------")[1]
edata = data.strip()
edata = edata.split("\n", 9)[9]
edata = edata.split("\n")[1:]
entry_1 = edata[0]
edata = edata[1:]
entry_2 = "".join(edata)
edata = []
edata.append(entry_1)
edata.append(entry_2)
return edata
ret_data = __get_raw_data(data)
# assign data[1] to RDS_AVAIL_DATA
# assign data[0] to RDS_DATA
self.rds_data = RDS_GENERIC_DATA().import_from_dict(ret_data[1])
self.rds_availability = RDS_AVAIL_DATA().import_from_dict(ret_data[0])
self.retlist.append(self.rds_availability)
self.retlist.append(self.rds_data)
return self
def return_data(self, option=None):
if option == "rds_availability":
return self.retlist[0]
if option == "rds_data":
return self.retlist[1]
return {"rds_availability": self.retlist[0], "rds_data": self.retlist[1]}
class DictToTable:
def __init__(self):
self.work_author = None
self.section_author = None
self.year = None
self.edition = None
self.work_title = None
self.chapter_title = None
self.location = None
self.publisher = None
self.signature = None
self.type = None
self.pages = None
self.issue = None
self.isbn = None
def makeResult(self):
data = {
"work_author": self.work_author,
"section_author": self.section_author,
"year": self.year,
"edition": self.edition,
"work_title": self.work_title,
"chapter_title": self.chapter_title,
"location": self.location,
"publisher": self.publisher,
"signature": self.signature,
"issue": self.issue,
"pages": self.pages,
"isbn": self.isbn,
"type": self.type,
}
data = {k: v for k, v in data.items() if v is not None}
return data
def reset(self):
for key in self.__dict__:
setattr(self, key, None)
def transform(self, data: dict):
mode = data["mode"]
self.reset()
if mode == "book":
return self.book_assign(data)
if mode == "hg":
return self.hg_assign(data)
if mode == "zs":
return self.zs_assign(data)
return None
def book_assign(self, data):
self.type = "book"
self.work_author = data["book_author"]
self.signature = data["book_signature"]
self.location = data["book_place"]
self.year = data["book_year"]
self.work_title = data["book_title"]
self.edition = data["book_edition"]
self.pages = data["book_pages"]
self.publisher = data["book_publisher"]
self.isbn = data["book_isbn"]
return self.makeResult()
def hg_assign(self, data):
self.type = "hg"
self.section_author = data["hg_author"]
self.work_author = data["hg_editor"]
self.year = data["hg_year"]
self.work_title = data["hg_title"]
self.publisher = data["hg_publisher"]
self.location = data["hg_place"]
self.edition = data["hg_edition"]
self.chapter_title = data["hg_chaptertitle"]
self.pages = data["hg_pages"]
self.signature = data["hg_signature"]
self.isbn = data["hg_isbn"]
return self.makeResult()
def zs_assign(self, data):
self.type = "zs"
self.section_author = data["zs_author"]
self.chapter_title = data["zs_chapter_title"]
self.location = data["zs_place"]
self.issue = data["zs_issue"]
self.pages = data["zs_pages"]
self.publisher = data["zs_publisher"]
self.isbn = data["zs_isbn"]
self.year = data["zs_year"]
self.signature = data["zs_signature"]
self.work_title = data["zs_title"]
return self.makeResult()
if __name__ == "__main__":
with open("daiadata") as f:
data = f.read()
ret = RDSData().transform(data)
data = ret.return_data("rds_availability")
# log.debug(data)