minor and major reworks: rename swb to SRU, add a test for pdf parsing
major: rework mail to send mail as plaintext instead of html, preventing the bleed-in of html text
This commit is contained in:
@@ -1,16 +1,15 @@
|
||||
import sys
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Union
|
||||
from typing import Any
|
||||
|
||||
import fitz # PyMuPDF
|
||||
import loguru
|
||||
import pandas as pd
|
||||
from bs4 import BeautifulSoup
|
||||
from docx import Document
|
||||
|
||||
from src import LOG_DIR
|
||||
from src.backend.semester import Semester
|
||||
from src.logic.openai import name_tester, run_shortener, semester_converter
|
||||
from src.logic.dataclass import Book, SemapDocument
|
||||
|
||||
log = loguru.logger
|
||||
log.remove()
|
||||
@@ -18,116 +17,6 @@ log.add(sys.stdout, level="INFO")
|
||||
log.add(f"{LOG_DIR}/application.log", rotation="1 MB", retention="10 days")
|
||||
|
||||
|
||||
letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Book:
|
||||
author: str = None
|
||||
year: str = None
|
||||
edition: str = None
|
||||
title: str = None
|
||||
location: str = None
|
||||
publisher: str = None
|
||||
signature: str = None
|
||||
internal_notes: str = None
|
||||
|
||||
@property
|
||||
def has_signature(self) -> bool:
|
||||
return self.signature is not None and self.signature != ""
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
return all(
|
||||
[
|
||||
self.author == "",
|
||||
self.year == "",
|
||||
self.edition == "",
|
||||
self.title == "",
|
||||
self.location == "",
|
||||
self.publisher == "",
|
||||
self.signature == "",
|
||||
self.internal_notes == "",
|
||||
]
|
||||
)
|
||||
|
||||
def from_dict(self, data: dict[str, Any]):
|
||||
for key, value in data.items():
|
||||
value = value.strip()
|
||||
if value == "\u2002\u2002\u2002\u2002\u2002":
|
||||
value = ""
|
||||
|
||||
if key == "Autorenname(n):Nachname, Vorname":
|
||||
self.author = value
|
||||
elif key == "Jahr/Auflage":
|
||||
self.year = value.split("/")[0] if "/" in value else value
|
||||
self.edition = value.split("/")[1] if "/" in value else ""
|
||||
elif key == "Titel":
|
||||
self.title = value
|
||||
elif key == "Ort und Verlag":
|
||||
self.location = value.split(",")[0] if "," in value else value
|
||||
self.publisher = value.split(",")[1] if "," in value else ""
|
||||
elif key == "Standnummer":
|
||||
self.signature = value.strip()
|
||||
elif key == "Interne Vermerke":
|
||||
self.internal_notes = value
|
||||
|
||||
|
||||
@dataclass
|
||||
class SemapDocument:
|
||||
subject: str = None
|
||||
phoneNumber: int = None
|
||||
mail: str = None
|
||||
title: str = None
|
||||
title_suggestions: list[str] = None
|
||||
semester: Union[str, Semester] = None
|
||||
books: list[Book] = None
|
||||
eternal: bool = False
|
||||
personName: str = None
|
||||
personTitle: str = None
|
||||
title_length = 0
|
||||
title_max_length = 0
|
||||
|
||||
def __post_init__(self):
|
||||
self.title_suggestions = []
|
||||
|
||||
@property
|
||||
def nameSetter(self):
|
||||
data = name_tester(self.personTitle)
|
||||
name = f"{data['last_name']}, {data['first_name']}"
|
||||
if data["title"] is not None:
|
||||
title = data["title"]
|
||||
self.personTitle = title
|
||||
self.personName = name
|
||||
self.title_length = len(self.title) + 3 + len(self.personName.split(",")[0])
|
||||
if self.title_length > 40:
|
||||
log.warning("Title is too long")
|
||||
name_len = len(self.personName.split(",")[0])
|
||||
self.title_max_length = 38 - name_len
|
||||
suggestions = run_shortener(self.title, self.title_max_length)
|
||||
for suggestion in suggestions:
|
||||
self.title_suggestions.append(suggestion["shortened_string"])
|
||||
else:
|
||||
self.title_suggestions = []
|
||||
pass
|
||||
|
||||
@property
|
||||
def renameSemester(self) -> None:
|
||||
if ", Dauer" in self.semester:
|
||||
self.semester = self.semester.split(",")[0]
|
||||
self.eternal = True
|
||||
self.semester = Semester().from_string(self.semester)
|
||||
else:
|
||||
log.warning("Semester {} is not valid", self.semester)
|
||||
self.semester = Semester().from_string(semester_converter(self.semester))
|
||||
|
||||
@property
|
||||
def signatures(self) -> list[str]:
|
||||
if self.books is not None:
|
||||
return [book.signature for book in self.books if book.has_signature]
|
||||
return []
|
||||
|
||||
|
||||
def word_docx_to_csv(path: str) -> list[pd.DataFrame]:
|
||||
doc = Document(path)
|
||||
tables = doc.tables
|
||||
@@ -272,7 +161,7 @@ def word_to_semap(word_path: str, ai: bool = True) -> SemapDocument:
|
||||
apparatdata = df[0]
|
||||
apparatdata = apparatdata.to_dict()
|
||||
keys = list(apparatdata.keys())
|
||||
print(apparatdata, keys)
|
||||
# print(apparatdata, keys)
|
||||
|
||||
appdata = {keys[i]: keys[i + 1] for i in range(0, len(keys) - 1, 2)}
|
||||
semap.phoneNumber = appdata["Telefon:"]
|
||||
@@ -309,6 +198,182 @@ def word_to_semap(word_path: str, ai: bool = True) -> SemapDocument:
|
||||
return semap
|
||||
|
||||
|
||||
def pdf_to_semap(pdf_path: str, ai: bool = True) -> SemapDocument:
|
||||
"""
|
||||
Parse a Semesterapparat PDF like the sample you provided and return a SemapDocument.
|
||||
- No external programs, only PyMuPDF.
|
||||
- Robust to multi-line field values (e.g., hyphenated emails) and multi-line table cells.
|
||||
- Works across multiple pages; headers only need to exist on the first page.
|
||||
"""
|
||||
doc = fitz.open(pdf_path)
|
||||
semap = SemapDocument()
|
||||
|
||||
# ---------- helpers ----------
|
||||
def _join_tokens(tokens: list[str]) -> str:
|
||||
"""Join tokens, preserving hyphen/URL joins across line wraps."""
|
||||
parts = []
|
||||
for tok in tokens:
|
||||
if parts and (
|
||||
parts[-1].endswith("-")
|
||||
or parts[-1].endswith("/")
|
||||
or parts[-1].endswith(":")
|
||||
):
|
||||
parts[-1] = parts[-1] + tok # no space after '-', '/' or ':'
|
||||
else:
|
||||
parts.append(tok)
|
||||
return " ".join(parts).strip()
|
||||
|
||||
def _extract_row_values_multiline(
|
||||
page, labels: list[str], y_window: float = 24
|
||||
) -> dict[str, str]:
|
||||
"""For a row of inline labels (e.g., Name/Fach/Telefon/Mail), grab text to the right of each label."""
|
||||
rects = []
|
||||
for lab in labels:
|
||||
hits = page.search_for(lab)
|
||||
if hits:
|
||||
rects.append((lab, hits[0]))
|
||||
if not rects:
|
||||
return {}
|
||||
|
||||
rects.sort(key=lambda t: t[1].x0)
|
||||
words = page.get_text("words")
|
||||
out = {}
|
||||
for i, (lab, r) in enumerate(rects):
|
||||
x0 = r.x1 + 1
|
||||
x1 = rects[i + 1][1].x0 - 1 if i + 1 < len(rects) else page.rect.width - 5
|
||||
y0 = r.y0 - 3
|
||||
y1 = r.y0 + y_window
|
||||
toks = [w for w in words if x0 <= w[0] <= x1 and y0 <= w[1] <= y1]
|
||||
toks.sort(key=lambda w: (w[1], w[0])) # line, then x
|
||||
out[lab] = _join_tokens([w[4] for w in toks])
|
||||
return out
|
||||
|
||||
def _compute_columns_from_headers(page0):
|
||||
"""Find column headers (once) and derive column centers + header baseline."""
|
||||
headers = [
|
||||
("Autorenname(n):", "Autorenname(n):Nachname, Vorname"),
|
||||
("Jahr/Auflage", "Jahr/Auflage"),
|
||||
("Titel", "Titel"),
|
||||
("Ort und Verlag", "Ort und Verlag"),
|
||||
("Standnummer", "Standnummer"),
|
||||
("Interne Vermerke", "Interne Vermerke"),
|
||||
]
|
||||
found = []
|
||||
for label, canon in headers:
|
||||
rects = [
|
||||
r for r in page0.search_for(label) if r.y0 > 200
|
||||
] # skip top-of-form duplicates
|
||||
if rects:
|
||||
found.append((canon, rects[0]))
|
||||
found.sort(key=lambda t: t[1].x0)
|
||||
cols = [(canon, r.x0, r.x1, (r.x0 + r.x1) / 2.0) for canon, r in found]
|
||||
header_y = min(r.y0 for _, r in found) if found else 0
|
||||
return cols, header_y
|
||||
|
||||
def _extract_table_rows_from_page(
|
||||
page, cols, header_y, y_top_margin=5, y_bottom_margin=40, y_tol=26.0
|
||||
):
|
||||
"""
|
||||
Group words into logical rows (tolerant to wrapped lines), then map each word
|
||||
to the nearest column by x-center and join tokens per column.
|
||||
"""
|
||||
words = [
|
||||
w
|
||||
for w in page.get_text("words")
|
||||
if w[1] > header_y + y_top_margin
|
||||
and w[3] < page.rect.height - y_bottom_margin
|
||||
]
|
||||
|
||||
# group into row bands by y (tolerance big enough to capture wrapped lines, but below next row gap)
|
||||
rows = []
|
||||
for w in sorted(words, key=lambda w: w[1]):
|
||||
y = w[1]
|
||||
for row in rows:
|
||||
if abs(row["y_mean"] - y) <= y_tol:
|
||||
row["ys"].append(y)
|
||||
row["y_mean"] = sum(row["ys"]) / len(row["ys"])
|
||||
row["words"].append(w)
|
||||
break
|
||||
else:
|
||||
rows.append({"y_mean": y, "ys": [y], "words": [w]})
|
||||
|
||||
# map to columns + join
|
||||
joined_rows = []
|
||||
for row in rows:
|
||||
rowdict = {canon: "" for canon, *_ in cols}
|
||||
words_by_col = {canon: [] for canon, *_ in cols}
|
||||
for w in sorted(row["words"], key=lambda w: (w[1], w[0])):
|
||||
xmid = (w[0] + w[2]) / 2.0
|
||||
canon = min(cols, key=lambda c: abs(xmid - c[3]))[0]
|
||||
words_by_col[canon].append(w[4])
|
||||
for canon, toks in words_by_col.items():
|
||||
rowdict[canon] = _join_tokens(toks)
|
||||
if any(v for v in rowdict.values()):
|
||||
joined_rows.append(rowdict)
|
||||
return joined_rows
|
||||
|
||||
# ---------- top-of-form fields ----------
|
||||
p0 = doc[0]
|
||||
row1 = _extract_row_values_multiline(
|
||||
p0,
|
||||
["Ihr Name und Titel:", "Ihr Fach:", "Telefon:", "Mailadresse:"],
|
||||
y_window=22,
|
||||
)
|
||||
row2 = _extract_row_values_multiline(
|
||||
p0, ["Veranstaltung:", "Semester:"], y_window=20
|
||||
)
|
||||
|
||||
name_title = row1.get("Ihr Name und Titel:", "") or ""
|
||||
semap.subject = row1.get("Ihr Fach:", None)
|
||||
semap.phoneNumber = row1.get("Telefon:", None) # keep as-is (string like "682-308")
|
||||
semap.mail = row1.get("Mailadresse:", None)
|
||||
semap.personName = ",".join(name_title.split(",")[:-1]) if name_title else None
|
||||
semap.personTitle = (
|
||||
",".join(name_title.split(",")[-1:]).strip() if name_title else None
|
||||
)
|
||||
|
||||
semap.title = row2.get("Veranstaltung:", None)
|
||||
semap.semester = row2.get("Semester:", None)
|
||||
|
||||
# ---------- table extraction (all pages) ----------
|
||||
cols, header_y = _compute_columns_from_headers(p0)
|
||||
all_rows: list[dict[str, Any]] = []
|
||||
for pn in range(len(doc)):
|
||||
all_rows.extend(_extract_table_rows_from_page(doc[pn], cols, header_y))
|
||||
|
||||
# drop the sub-header line "Nachname, Vorname" etc.
|
||||
filtered = []
|
||||
for r in all_rows:
|
||||
if r.get("Autorenname(n):Nachname, Vorname", "").strip() in (
|
||||
"",
|
||||
"Nachname, Vorname",
|
||||
):
|
||||
# skip if it's just the sub-header line
|
||||
if all(not r[c] for c in r if c != "Autorenname(n):Nachname, Vorname"):
|
||||
continue
|
||||
filtered.append(r)
|
||||
|
||||
# build Book objects (same filters as your word parser)
|
||||
booklist: list[Book] = []
|
||||
for row in filtered:
|
||||
b = Book()
|
||||
b.from_dict(row)
|
||||
if b.is_empty:
|
||||
continue
|
||||
if not b.has_signature:
|
||||
continue
|
||||
booklist.append(b)
|
||||
|
||||
semap.books = booklist
|
||||
|
||||
# keep parity with your post-processing
|
||||
if ai:
|
||||
_ = semap.renameSemester
|
||||
_ = semap.nameSetter
|
||||
|
||||
return semap
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
else_df = word_to_semap("C:/Users/aky547/Desktop/semap/db/temp/tmpzsz_hgdr.docx")
|
||||
print(else_df)
|
||||
else_df = pdf_to_semap("C:/Users/aky547/Dokumente/testsemap.pdf")
|
||||
# print(else_df)
|
||||
|
||||
Reference in New Issue
Block a user