chore(codebase): format and check code

This commit is contained in:
2025-11-27 15:36:31 +01:00
parent 458174ca6d
commit ae72b22d94
3 changed files with 56 additions and 33 deletions

View File

@@ -43,7 +43,7 @@ class RDS_AVAIL_DATA:
"""Class to store RDS availability data""" """Class to store RDS availability data"""
library_sigil: str = dataclass_field(default_factory=str) library_sigil: str = dataclass_field(default_factory=str)
items: List[Item] = dataclass_field(default_factory=list) items: list[Item] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str): def import_from_dict(self, data: str):
"""Import data from dict""" """Import data from dict"""
@@ -87,7 +87,7 @@ class RDS_DATA:
@dataclass @dataclass
class RDS_GENERIC_DATA: class RDS_GENERIC_DATA:
LibrarySigil: str = dataclass_field(default_factory=str) LibrarySigil: str = dataclass_field(default_factory=str)
RDS_DATA: List[RDS_DATA] = dataclass_field(default_factory=list) RDS_DATA: list[RDS_DATA] = dataclass_field(default_factory=list)
def import_from_dict(self, data: str) -> RDS_GENERIC_DATA: def import_from_dict(self, data: str) -> RDS_GENERIC_DATA:
"""Import data from dict""" """Import data from dict"""
@@ -128,8 +128,6 @@ class ARRAYData:
return data return data
except Exception: except Exception:
# # log.debug(f"ARRAYData.transform failed, {source}, {search}")
log.exception(f"ARRAYData.transform failed, no string {search}")
return "" return ""
def _get_list_entry(source: str, search: str, entry: str) -> str: def _get_list_entry(source: str, search: str, entry: str) -> str:
@@ -137,7 +135,7 @@ class ARRAYData:
source = source.replace("\t", "").replace("\r", "") source = source.replace("\t", "").replace("\r", "")
source = source.split(search)[1].split(")")[0] source = source.split(search)[1].split(")")[0]
return _get_line(source, entry).replace("=>", "").strip() return _get_line(source, entry).replace("=>", "").strip()
except: except Exception:
return "" return ""
def _get_isbn(source: str) -> list: def _get_isbn(source: str) -> list:
@@ -153,7 +151,7 @@ class ARRAYData:
continue continue
ret.append(isb) if isb not in ret else None ret.append(isb) if isb not in ret else None
return ret return ret
except: except Exception:
isbn = [] isbn = []
return isbn return isbn
@@ -212,7 +210,9 @@ class ARRAYData:
def _get_adis_idn(data, signature): def _get_adis_idn(data, signature):
loksatz_match = re.search( loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL, r"\[loksatz\] => Array\s*\((.*?)\)",
data,
re.DOTALL,
) )
if loksatz_match: if loksatz_match:
loksatz_content = loksatz_match.group(1) loksatz_content = loksatz_match.group(1)
@@ -227,7 +227,9 @@ class ARRAYData:
def _get_in_apparat(data): def _get_in_apparat(data):
loksatz_match = re.search( loksatz_match = re.search(
r"\[loksatz\] => Array\s*\((.*?)\)", data, re.DOTALL, r"\[loksatz\] => Array\s*\((.*?)\)",
data,
re.DOTALL,
) )
if loksatz_match: if loksatz_match:
loksatz_content = loksatz_match.group(1) loksatz_content = loksatz_match.group(1)
@@ -286,7 +288,7 @@ class COinSData:
try: try:
data = source.split(f"{search}=")[1] # .split("")[0].strip() data = source.split(f"{search}=")[1] # .split("")[0].strip()
return data.split("rft")[0].strip() if "rft" in data else data return data.split("rft")[0].strip() if "rft" in data else data
except: except Exception:
return "" return ""
return BookData( return BookData(
@@ -311,7 +313,7 @@ class RISData:
try: try:
data = source.split(f"{search} - ")[1] # .split("")[0].strip() data = source.split(f"{search} - ")[1] # .split("")[0].strip()
return data.split("\n")[0].strip() if "\n" in data else data return data.split("\n")[0].strip() if "\n" in data else data
except: except Exception:
return "" return ""
return BookData( return BookData(
@@ -348,7 +350,8 @@ class BibTeXData:
.replace("[", "") .replace("[", "")
.replace("];", "") .replace("];", "")
) )
except: except Exception as e:
print(e)
return "" return ""
return BookData( return BookData(
@@ -497,6 +500,3 @@ class DictToTable:
self.signature = data["zs_signature"] self.signature = data["zs_signature"]
self.work_title = data["zs_title"] self.work_title = data["zs_title"]
return self.makeResult() return self.makeResult()

View File

@@ -37,8 +37,7 @@ def _req_text(parent: ET.Element, path: str) -> str:
def parse_marc_record(record_el: ET.Element) -> MarcRecord: def parse_marc_record(record_el: ET.Element) -> MarcRecord:
"""record_el is the <marc:record> element (default ns MARC in your sample) """record_el is the <marc:record> element (default ns MARC in your sample)"""
"""
# leader # leader
leader_text = _req_text(record_el, "marc:leader") leader_text = _req_text(record_el, "marc:leader")
@@ -61,7 +60,9 @@ def parse_marc_record(record_el: ET.Element) -> MarcRecord:
datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields)) datafields.append(DataField(tag=tag, ind1=ind1, ind2=ind2, subfields=subfields))
return MarcRecord( return MarcRecord(
leader=leader_text, controlfields=controlfields, datafields=datafields, leader=leader_text,
controlfields=controlfields,
datafields=datafields,
) )
@@ -245,7 +246,9 @@ def find_datafields_with_subfields(
def controlfield_value( def controlfield_value(
rec: MarcRecord, tag: str, default: str | None = None, rec: MarcRecord,
tag: str,
default: str | None = None,
) -> str | None: ) -> str | None:
"""Get the first controlfield value by tag (e.g., '001', '005').""" """Get the first controlfield value by tag (e.g., '001', '005')."""
for cf in rec.controlfields: for cf in rec.controlfields:
@@ -255,7 +258,9 @@ def controlfield_value(
def datafields_value( def datafields_value(
data: list[DataField], code: str, default: str | None = None, data: list[DataField],
code: str,
default: str | None = None,
) -> str | None: ) -> str | None:
"""Get the first value for a specific subfield code in a list of datafields.""" """Get the first value for a specific subfield code in a list of datafields."""
for df in data: for df in data:
@@ -266,7 +271,9 @@ def datafields_value(
def datafield_value( def datafield_value(
df: DataField, code: str, default: str | None = None, df: DataField,
code: str,
default: str | None = None,
) -> str | None: ) -> str | None:
"""Get the first value for a specific subfield code in a datafield.""" """Get the first value for a specific subfield code in a datafield."""
for sf in df.subfields: for sf in df.subfields:
@@ -337,13 +344,17 @@ def book_from_marc(rec: MarcRecord, library_identifier: str) -> BookData:
# Signature = 924 where $9 == "Frei 129" → take that field's $g # Signature = 924 where $9 == "Frei 129" → take that field's $g
frei_fields = find_datafields_with_subfields( frei_fields = find_datafields_with_subfields(
rec, "924", where_all={"9": "Frei 129"}, rec,
"924",
where_all={"9": "Frei 129"},
) )
signature = first_subfield_value_from_fields(frei_fields, "g") signature = first_subfield_value_from_fields(frei_fields, "g")
# Year = 264 $c (prefer ind2="1" publication; fallback to any 264) # Year = 264 $c (prefer ind2="1" publication; fallback to any 264)
year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value( year = first_subfield_value(rec, "264", "c", ind2="1") or first_subfield_value(
rec, "264", "c", rec,
"264",
"c",
) )
isbn = subfield_values(rec, "020", "a") isbn = subfield_values(rec, "020", "a")
mediatype = first_subfield_value(rec, "338", "a") mediatype = first_subfield_value(rec, "338", "a")
@@ -375,7 +386,8 @@ RVK_ALLOWED = r"[A-Z0-9.\-\/]" # conservative char set typically seen in RVK no
def find_newer_edition( def find_newer_edition(
swb_result: BookData, dnb_result: list[BookData], swb_result: BookData,
dnb_result: list[BookData],
) -> list[BookData] | None: ) -> list[BookData] | None:
"""New edition if: """New edition if:
- year > swb.year OR - year > swb.year OR
@@ -500,7 +512,8 @@ class QueryTransformer:
value = value.replace(", ", ",") value = value.replace(", ", ",")
value = value.replace(" ", " ") value = value.replace(" ", " ")
if key.upper() == "TITLE" and hasattr( if key.upper() == "TITLE" and hasattr(
schema, "ENCLOSE_TITLE_IN_QUOTES", schema,
"ENCLOSE_TITLE_IN_QUOTES",
): ):
if schema.ENCLOSE_TITLE_IN_QUOTES: if schema.ENCLOSE_TITLE_IN_QUOTES:
value = f'"{value}"' value = f'"{value}"'
@@ -562,7 +575,8 @@ class Api:
if not any(qa.startswith(na + "=") for na in self.notsupported_args) if not any(qa.startswith(na + "=") for na in self.notsupported_args)
] ]
query_args = QueryTransformer( query_args = QueryTransformer(
api_schema=self.prefix, arguments=query_args, api_schema=self.prefix,
arguments=query_args,
).transform() ).transform()
query = "+and+".join(query_args) query = "+and+".join(query_args)
for old, new in self.replace.items(): for old, new in self.replace.items():
@@ -592,11 +606,14 @@ class Api:
try: try:
# Per-attempt read timeout capped at remaining overall budget (but at most 30s) # Per-attempt read timeout capped at remaining overall budget (but at most 30s)
remaining = max( remaining = max(
0.0, self._overall_timeout_seconds - (time.monotonic() - start_time), 0.0,
self._overall_timeout_seconds - (time.monotonic() - start_time),
) )
read_timeout = min(30.0, remaining if remaining > 0 else 0.001) read_timeout = min(30.0, remaining if remaining > 0 else 0.001)
resp = self._session.get( resp = self._session.get(
url, headers=headers, timeout=(3.05, read_timeout), url,
headers=headers,
timeout=(3.05, read_timeout),
) )
self._last_request_time = time.monotonic() self._last_request_time = time.monotonic()
if resp.status_code == 200: if resp.status_code == 200:

View File

@@ -62,7 +62,7 @@ class WebRequest:
def canrun(self) -> None: def canrun(self) -> None:
"""Check if requests can be made.""" """Check if requests can be made."""
try: try:
#check public IP to see if the requested data can be accessed # check public IP to see if the requested data can be accessed
ip_response = requests.get("https://api.ipify.org", timeout=self.timeout) ip_response = requests.get("https://api.ipify.org", timeout=self.timeout)
ip_response.raise_for_status() ip_response.raise_for_status()
self.public_ip = ip_response.text self.public_ip = ip_response.text
@@ -125,7 +125,6 @@ class WebRequest:
def get_data(self) -> list[str] | None: def get_data(self) -> list[str] | None:
links = self.get_book_links(self.ppn) links = self.get_book_links(self.ppn)
log.debug(f"Links: {links}")
return_data: list[str] = [] return_data: list[str] = []
for link in links: for link in links:
result: str = self.search(link) # type:ignore result: str = self.search(link) # type:ignore
@@ -145,7 +144,8 @@ class WebRequest:
return return_data return return_data
return return_data return return_data
item_location = location.find( item_location = location.find(
"div", class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel", "div",
class_="col-xs-12 col-md-7 col-lg-8 rds-dl-panel",
).text.strip() ).text.strip()
if self.use_any: if self.use_any:
pre_tag = soup.find_all("pre") pre_tag = soup.find_all("pre")
@@ -251,8 +251,15 @@ class BibTextTransformer:
return self return self
def return_data( def return_data(
self, option: Any = None, self,
) -> BookData | None | RDS_GENERIC_DATA | RDS_AVAIL_DATA | dict[str, RDS_AVAIL_DATA | RDS_GENERIC_DATA]: option: Any = None,
) -> (
BookData
| None
| RDS_GENERIC_DATA
| RDS_AVAIL_DATA
| dict[str, RDS_AVAIL_DATA | RDS_GENERIC_DATA]
):
"""Return Data to caller. """Return Data to caller.
Args: Args:
@@ -279,7 +286,6 @@ class BibTextTransformer:
return None return None
def cover(isbn): def cover(isbn):
test_url = f"https://www.buchhandel.de/cover/{isbn}/{isbn}-cover-m.jpg" test_url = f"https://www.buchhandel.de/cover/{isbn}/{isbn}-cover-m.jpg"
data = requests.get(test_url, stream=True) data = requests.get(test_url, stream=True)