import requests from src.database import Database import threading from ratelimit import limits, sleep_and_retry import time import loguru log = loguru.logger log.remove() log.add("status_code.log", rotation="100 MB") THREADS = 10 threadlist = [] db = Database("lfer.db") links = db.get_links() LINKLEN = len(links) LINKPROGRESS = 0 RESPONSES = [] non_support = ["d-nb.info", ".jpg", ".png", ".jpeg"] @log.catch() def get_status_code(url): if any(x in url for x in non_support): log.error(f"URL: {url}, ERROR: Site not supported") return -2, "Site not supported" if "Error" in url: log.error(f"URL: {url}, ERROR: No data found") return -1, "No data found" try: userAgent = ( "Automated LFER Status Code Checker/1.0 (alexander.kirchner@ph-freiburg.de)" ) accept = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" headers = {"User-Agent": userAgent, "Accept": accept} response = requests.get(url, headers=headers, timeout=50) log.info(f"URL: {url}, Status Code: {response.status_code}") return response.status_code, response.url except Exception as e: log.error(f"URL: {url}, Status Code: 0") return 0, str(e) def worker(listpart): global LINKPROGRESS global RESPONSES for link in listpart: id, url = link response_code, destination_link = get_status_code(url) RESPONSES.append((id, response_code, destination_link)) LINKPROGRESS += 1 print("Progress: ", LINKPROGRESS, "/", LINKLEN, end="\r") def main_threaded(): global threadlist global links global THREADS global LINKLEN global LINKPROGRESS for i in range(THREADS): start = i * (LINKLEN // THREADS) end = (i + 1) * (LINKLEN // THREADS) if i == THREADS - 1: end = LINKLEN threadlist.append(threading.Thread(target=worker, args=(links[start:end],))) for thread in threadlist: thread.start() for thread in threadlist: thread.join() for response in RESPONSES: id, response_code, destination_link = response db.update_response_code(id, response_code, destination_link) print("Done") def main(): for i in range(len(links)): id, url = links[i] response_code, destination_link = get_status_code(url) db.update_response_code(id, response_code, destination_link) print("Progress: ", i + 1, "/", LINKLEN, end="\r") time.sleep(1) print("Done") def check_by_status_code(status_code): links = db.get_links_by_response_code(status_code) for i in range(len(links)): id, url = links[i] response_code, destination_link = get_status_code(url) if response_code == status_code: db.update_response_code(id, response_code, destination_link) print("Progress: ", i + 1, "/", LINKLEN, end="\r") time.sleep(1) print("Done") if __name__ == "__main__": main() # checks all with code 0 # check_by_status_code(429) # checks titles with timeout