import zipfile from typing import Any, Optional import fitz # PyMuPDF import pandas as pd from bs4 import BeautifulSoup from docx import Document from src.logic.dataclass import Book, SemapDocument from src.shared.logging import log def word_docx_to_csv(path: str) -> list[pd.DataFrame]: doc = Document(path) tables = doc.tables m_data = [] for table in tables: data = [] for row in table.rows: row_data: list[Any] = [] for cell in row.cells: text = cell.text text = text.replace("\n", "") row_data.append(text) # if text == "Ihr Fach:": # row_data.append(get_fach(path)) data.append(row_data) df = pd.DataFrame(data) df.columns = df.iloc[0] df = df.iloc[1:] m_data.append(df) return m_data def get_fach(path: str) -> Optional[str]: document = zipfile.ZipFile(path) xml_data = document.read("word/document.xml") document.close() soup = BeautifulSoup(xml_data, "xml") # text we need is in -> w:r -> w:t paragraphs = soup.find_all("w:p") for para in paragraphs: para_id = para.get("w14:paraId") if para_id == "12456A32": # get the data in the w:t for run in para.find_all("w:r"): data = run.find("w:t") if data and data.contents: return data.contents[0] return None def makeDict() -> dict[str, Optional[str]]: return { "work_author": None, "section_author": None, "year": None, "edition": None, "work_title": None, "chapter_title": None, "location": None, "publisher": None, "signature": None, "issue": None, "pages": None, "isbn": None, "type": None, } def tuple_to_dict(tlist: tuple, type: str) -> list[dict[str, Optional[str]]]: ret: list[dict[str, Optional[str]]] = [] for line in tlist: data = makeDict() if type == "Monografien": data["type"] = type data["work_author"] = line[0] data["year"] = line[1] data["edition"] = line[2] data["work_title"] = line[3] data["location"] = line[4] data["publisher"] = line[5] data["signature"] = line[6] data["pages"] = line[7] elif type == "Herausgeberwerke": data["type"] = type data["section_author"] = line[0] data["year"] = line[1] data["edition"] = line[2] data["chapter_title"] = line[3] data["work_author"] = line[4] data["work_title"] = line[5] data["location"] = line[6] data["publisher"] = line[7] data["signature"] = line[9] data["pages"] = line[8] elif type == "Zeitschriftenaufsätze": data["type"] = type data["section_author"] = line[0] data["year"] = line[1] data["issue"] = line[2] data["chapter_title"] = line[3] data["work_title"] = line[4] data["location"] = line[5] data["publisher"] = line[6] data["signature"] = line[8] data["pages"] = line[7] ret.append(data) return ret def elsa_word_to_csv(path: str) -> tuple[list[dict[str, Optional[str]]], str]: doc = Document(path) # # print all lines in doc doctype = [para.text for para in doc.paragraphs if para.text != ""][-1] tuples = { "Monografien": ("", "", "", "", "", "", "", "", ""), "Herausgeberwerke": ("", "", "", "", "", "", "", "", "", "", ""), "Zeitschriftenaufsätze": ("", "", "", "", "", "", "", "", "", ""), } tables = doc.tables m_data: list[pd.DataFrame] = [] for table in tables: data: list[list[str]] = [] for row in table.rows: row_data: list[str] = [] for cell in row.cells: text = cell.text text = text.replace("\n", "") text = text.replace("\u2002", "") row_data.append(text) data.append(row_data) df = pd.DataFrame(data) df.columns = df.iloc[0] df = df.iloc[1:] m_data.append(df) df = m_data[0] # split df to rows data = [ row for row in df.itertuples(index=False, name=None) if row != tuples[doctype] ] # log.debug(data) return tuple_to_dict(data, doctype), doctype def word_to_semap(word_path: str, ai: bool = True) -> SemapDocument: log.info("Parsing Word Document {}", word_path) semap = SemapDocument() df = word_docx_to_csv(word_path) apparatdata = df[0] apparatdata = apparatdata.to_dict() keys = list(apparatdata.keys()) # print(apparatdata, keys) appdata = {keys[i]: keys[i + 1] for i in range(0, len(keys) - 1, 2)} semap.phoneNumber = appdata["Telefon:"] semap.subject = appdata["Ihr Fach:"] semap.mail = appdata["Mailadresse:"] semap.personName = ",".join(appdata["Ihr Name und Titel:"].split(",")[:-1]) semap.personTitle = ",".join(appdata["Ihr Name und Titel:"].split(",")[-1:]).strip() apparatdata = df[1] apparatdata = apparatdata.to_dict() keys = list(apparatdata.keys()) appdata = {keys[i]: keys[i + 1] for i in range(0, len(keys), 2)} semap.title = appdata["Veranstaltung:"] semap.semester = appdata["Semester:"] if ai: semap.renameSemester semap.nameSetter books = df[2] booklist = [] for i in range(len(books)): if books.iloc[i].isnull().all(): continue data = books.iloc[i].to_dict() book = Book() book.from_dict(data) if book.is_empty: continue elif not book.has_signature: continue else: booklist.append(book) log.info("Found {} books", len(booklist)) semap.books = booklist return semap def pdf_to_semap(pdf_path: str, ai: bool = True) -> SemapDocument: """ Parse a Semesterapparat PDF like the sample you provided and return a SemapDocument. - No external programs, only PyMuPDF. - Robust to multi-line field values (e.g., hyphenated emails) and multi-line table cells. - Works across multiple pages; headers only need to exist on the first page. """ doc = fitz.open(pdf_path) semap = SemapDocument() # ---------- helpers ---------- def _join_tokens(tokens: list[str]) -> str: """Join tokens, preserving hyphen/URL joins across line wraps.""" parts = [] for tok in tokens: if parts and ( parts[-1].endswith("-") or parts[-1].endswith("/") or parts[-1].endswith(":") ): parts[-1] = parts[-1] + tok # no space after '-', '/' or ':' else: parts.append(tok) return " ".join(parts).strip() def _extract_row_values_multiline( page, labels: list[str], y_window: float = 24 ) -> dict[str, str]: """For a row of inline labels (e.g., Name/Fach/Telefon/Mail), grab text to the right of each label.""" rects = [] for lab in labels: hits = page.search_for(lab) if hits: rects.append((lab, hits[0])) if not rects: return {} rects.sort(key=lambda t: t[1].x0) words = page.get_text("words") out = {} for i, (lab, r) in enumerate(rects): x0 = r.x1 + 1 x1 = rects[i + 1][1].x0 - 1 if i + 1 < len(rects) else page.rect.width - 5 y0 = r.y0 - 3 y1 = r.y0 + y_window toks = [w for w in words if x0 <= w[0] <= x1 and y0 <= w[1] <= y1] toks.sort(key=lambda w: (w[1], w[0])) # line, then x out[lab] = _join_tokens([w[4] for w in toks]) return out def _compute_columns_from_headers(page0): """Find column headers (once) and derive column centers + header baseline.""" headers = [ ("Autorenname(n):", "Autorenname(n):Nachname, Vorname"), ("Jahr/Auflage", "Jahr/Auflage"), ("Titel", "Titel"), ("Ort und Verlag", "Ort und Verlag"), ("Standnummer", "Standnummer"), ("Interne Vermerke", "Interne Vermerke"), ] found = [] for label, canon in headers: rects = [ r for r in page0.search_for(label) if r.y0 > 200 ] # skip top-of-form duplicates if rects: found.append((canon, rects[0])) found.sort(key=lambda t: t[1].x0) cols = [(canon, r.x0, r.x1, (r.x0 + r.x1) / 2.0) for canon, r in found] header_y = min(r.y0 for _, r in found) if found else 0 return cols, header_y def _extract_table_rows_from_page( page, cols, header_y, y_top_margin=5, y_bottom_margin=40, y_tol=26.0 ): """ Group words into logical rows (tolerant to wrapped lines), then map each word to the nearest column by x-center and join tokens per column. """ words = [ w for w in page.get_text("words") if w[1] > header_y + y_top_margin and w[3] < page.rect.height - y_bottom_margin ] # group into row bands by y (tolerance big enough to capture wrapped lines, but below next row gap) rows = [] for w in sorted(words, key=lambda w: w[1]): y = w[1] for row in rows: if abs(row["y_mean"] - y) <= y_tol: row["ys"].append(y) row["y_mean"] = sum(row["ys"]) / len(row["ys"]) row["words"].append(w) break else: rows.append({"y_mean": y, "ys": [y], "words": [w]}) # map to columns + join joined_rows = [] for row in rows: rowdict = {canon: "" for canon, *_ in cols} words_by_col = {canon: [] for canon, *_ in cols} for w in sorted(row["words"], key=lambda w: (w[1], w[0])): xmid = (w[0] + w[2]) / 2.0 canon = min(cols, key=lambda c: abs(xmid - c[3]))[0] words_by_col[canon].append(w[4]) for canon, toks in words_by_col.items(): rowdict[canon] = _join_tokens(toks) if any(v for v in rowdict.values()): joined_rows.append(rowdict) return joined_rows # ---------- top-of-form fields ---------- p0 = doc[0] row1 = _extract_row_values_multiline( p0, ["Ihr Name und Titel:", "Ihr Fach:", "Telefon:", "Mailadresse:"], y_window=22, ) row2 = _extract_row_values_multiline( p0, ["Veranstaltung:", "Semester:"], y_window=20 ) name_title = row1.get("Ihr Name und Titel:", "") or "" semap.subject = row1.get("Ihr Fach:", None) semap.phoneNumber = row1.get("Telefon:", None) # keep as-is (string like "682-308") semap.mail = row1.get("Mailadresse:", None) semap.personName = ",".join(name_title.split(",")[:-1]) if name_title else None semap.personTitle = ( ",".join(name_title.split(",")[-1:]).strip() if name_title else None ) semap.title = row2.get("Veranstaltung:", None) semap.semester = row2.get("Semester:", None) # ---------- table extraction (all pages) ---------- cols, header_y = _compute_columns_from_headers(p0) all_rows: list[dict[str, Any]] = [] for pn in range(len(doc)): all_rows.extend(_extract_table_rows_from_page(doc[pn], cols, header_y)) # drop the sub-header line "Nachname, Vorname" etc. filtered = [] for r in all_rows: if r.get("Autorenname(n):Nachname, Vorname", "").strip() in ( "", "Nachname, Vorname", ): # skip if it's just the sub-header line if all(not r[c] for c in r if c != "Autorenname(n):Nachname, Vorname"): continue filtered.append(r) # build Book objects (same filters as your word parser) booklist: list[Book] = [] for row in filtered: b = Book() b.from_dict(row) if b.is_empty: continue if not b.has_signature: continue booklist.append(b) semap.books = booklist # keep parity with your post-processing if ai: _ = semap.renameSemester _ = semap.nameSetter return semap if __name__ == "__main__": else_df = pdf_to_semap("C:/Users/aky547/Dokumente/testsemap.pdf") # print(else_df)