From b14346f5afda3a9fe0c0124064ed6fc7c4eaf639 Mon Sep 17 00:00:00 2001 From: boutmoun123 Date: Mon, 9 Mar 2026 15:25:02 +0300 Subject: [PATCH] Add boutmoun12.py and clean_data_spacy.py --- boutmoun12.py | 109 +++++++++++++++++++ clean_data_spacy.py | 258 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 367 insertions(+) create mode 100644 boutmoun12.py create mode 100644 clean_data_spacy.py diff --git a/boutmoun12.py b/boutmoun12.py new file mode 100644 index 0000000..221f4b1 --- /dev/null +++ b/boutmoun12.py @@ -0,0 +1,109 @@ +import json +import re +import time +from collections import deque +from urllib.parse import urljoin, urlparse + +import requests +from bs4 import BeautifulSoup + + +START_URL = "https://sana.sy" +MAX_PAGES = 10000 +REQUEST_DELAY_SECONDS = 0.4 +TIMEOUT_SECONDS = 15 +OUTPUT_FILE = "site_data2.json" +VERIFY_SSL = False + + +def is_same_domain(url: str, base_netloc: str) -> bool: + return urlparse(url).netloc == base_netloc + + +def normalize_url(url: str) -> str: + parsed = urlparse(url) + clean = parsed._replace(fragment="", query="") + return clean.geturl().rstrip("/") + + +def extract_text(soup: BeautifulSoup) -> str: + for tag in soup(["script", "style", "noscript"]): + tag.decompose() + text = soup.get_text(separator=" ", strip=True) + return re.sub(r"\s+", " ", text).strip() + + +def crawl_site(start_url: str, max_pages: int = 100): + session = requests.Session() + session.headers.update( + { + "User-Agent": "Mozilla/5.0 (compatible; simple-crawler/1.0; +https://example.com/bot)" + } + ) + if not VERIFY_SSL: + requests.packages.urllib3.disable_warnings() + + base_netloc = urlparse(start_url).netloc + queue = deque([normalize_url(start_url)]) + visited = set() + results = [] + + while queue and len(visited) < max_pages: + current_url = queue.popleft() + if current_url in visited: + continue + + try: + response = session.get( + current_url, + timeout=TIMEOUT_SECONDS, + verify=VERIFY_SSL, + ) + response.raise_for_status() + except requests.RequestException as exc: + print(f"[SKIP] {current_url} -> {exc}") + visited.add(current_url) + continue + + content_type = response.headers.get("Content-Type", "") + if "text/html" not in content_type: + visited.add(current_url) + continue + + soup = BeautifulSoup(response.text, "html.parser") + page_title = soup.title.get_text(strip=True) if soup.title else "" + page_text = extract_text(soup) + + results.append( + { + "site_url":START_URL, + "url": current_url, + "title": page_title, + "text": page_text, + } + ) + + visited.add(current_url) + print(f"[OK] ({len(visited)}/{max_pages}) {current_url}") + + for a_tag in soup.find_all("a", href=True): + href = a_tag["href"].strip() + absolute_url = normalize_url(urljoin(current_url, href)) + parsed = urlparse(absolute_url) + if parsed.scheme not in {"http", "https"}: + continue + if not is_same_domain(absolute_url, base_netloc): + continue + if absolute_url not in visited: + queue.append(absolute_url) + + time.sleep(REQUEST_DELAY_SECONDS) + + return results + + +if __name__ == "__main__": + data = crawl_site(START_URL, MAX_PAGES) + with open(OUTPUT_FILE, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + print(f"\nSaved {len(data)} pages to {OUTPUT_FILE}") diff --git a/clean_data_spacy.py b/clean_data_spacy.py new file mode 100644 index 0000000..a8a3ffe --- /dev/null +++ b/clean_data_spacy.py @@ -0,0 +1,258 @@ +import argparse +import json +import re +from pathlib import Path +from urllib.parse import urlparse + +import spacy + + +NOISE_SNIPPETS = [ + "القائمة الرئيسية", + "جميع الحقوق محفوظة", + "حقوق النشر", + "تواصل معنا", + "AR EN", + "KO KO", +] + + +def normalize_url(u: str) -> str: + p = urlparse((u or "").strip()) + p = p._replace(fragment="", query="") + return p.geturl().rstrip("/") + + +def fix_mojibake(text: str) -> str: + if not text: + return "" + + candidates = [text] + for src_enc in ("latin1", "cp1252"): + try: + decoded = text.encode(src_enc, errors="ignore").decode("utf-8", errors="ignore") + if decoded: + candidates.append(decoded) + except Exception: + pass + + def score(s: str) -> int: + arabic = len(re.findall(r"[\u0600-\u06FF]", s)) + broken = sum(s.count(ch) for ch in ("Ø", "Ù", "ظ", "ط", "Ã", "Â")) + return arabic - (2 * broken) + + return max(candidates, key=score) + + +def strip_noise(text: str) -> str: + t = text + for snippet in NOISE_SNIPPETS: + t = t.replace(snippet, " ") + t = t.replace(snippet.lower(), " ") + return re.sub(r"\s+", " ", t).strip() + + +def remove_terms(text: str, terms): + t = text + for term in terms: + if not term: + continue + t = t.replace(term, " ") + return re.sub(r"\s+", " ", t).strip() + + +def strip_signs(text: str) -> str: + # Keep Arabic, English letters, and digits; remove punctuation/symbols. + t = re.sub(r"[^\w\s\u0600-\u06FF]", " ", text) + return re.sub(r"\s+", " ", t).strip() + + +def dedupe_sentences(text: str) -> str: + # Remove repeated sentence-like chunks while preserving order. + chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text) + seen = set() + out = [] + for chunk in chunks: + c = re.sub(r"\s+", " ", chunk).strip() + if not c: + continue + key = c.lower() + if key in seen: + continue + seen.add(key) + out.append(c) + return " ".join(out).strip() + + +def split_chunks(text: str): + chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text) + out = [] + for c in chunks: + c = re.sub(r"\s+", " ", c).strip() + if c: + out.append(c) + return out + + +def remove_global_repeated_chunks(text: str, repeated_chunks): + chunks = split_chunks(text) + filtered = [c for c in chunks if c not in repeated_chunks] + return " ".join(filtered).strip() + + +def detokenize(tokens): + no_space_before = { + ".", + ",", + ":", + ";", + "!", + "?", + ")", + "]", + "}", + "،", + "؛", + "؟", + } + no_space_after = {"(", "[", "{"} + + out = [] + for tok in tokens: + if not out: + out.append(tok) + continue + + prev = out[-1] + if tok in no_space_before or prev in no_space_after: + out[-1] = prev + tok + else: + out.append(" " + tok) + + return "".join(out).strip() + + +def clean_text_with_spacy(nlp, text: str) -> str: + doc = nlp(text) + tokens = [t.text for t in doc if not t.is_space] + return detokenize(tokens) + + +def clean_records( + records, + terms_to_remove, + strip_signs_flag: bool, + cross_record_repeat_min: int, +): + nlp = spacy.blank("xx") + cleaned = [] + + for row in records: + if not isinstance(row, dict): + continue + + site_url = normalize_url(str(row.get("site_url", ""))) + url = normalize_url(str(row.get("url", ""))) + title = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("title", ""))))) + text = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("text", ""))))) + + if terms_to_remove: + title = remove_terms(title, terms_to_remove) + text = remove_terms(text, terms_to_remove) + + if strip_signs_flag: + title = strip_signs(title) + text = strip_signs(text) + + title = dedupe_sentences(title) + text = dedupe_sentences(text) + + if not url: + continue + + cleaned.append( + { + "site_url": site_url, + "url": url, + "title": title, + "text": text, + } + ) + + # Remove chunks repeated across many records without deleting records. + chunk_doc_count = {} + for idx, rec in enumerate(cleaned): + uniq_chunks = set(split_chunks(rec["title"]) + split_chunks(rec["text"])) + for ch in uniq_chunks: + if len(ch) < 8: + continue + chunk_doc_count[ch] = chunk_doc_count.get(ch, 0) + 1 + + repeated_chunks = { + ch + for ch, cnt in chunk_doc_count.items() + if cnt >= cross_record_repeat_min + } + + for rec in cleaned: + rec["title"] = remove_global_repeated_chunks(rec["title"], repeated_chunks) + rec["text"] = remove_global_repeated_chunks(rec["text"], repeated_chunks) + rec["title"] = re.sub(r"\s+", " ", rec["title"]).strip() + rec["text"] = re.sub(r"\s+", " ", rec["text"]).strip() + + return cleaned + + +def main(): + parser = argparse.ArgumentParser(description="Clean crawled JSON data using spaCy tokenization.") + parser.add_argument("--input", default="site_data_raw.json") + parser.add_argument("--output", default="site_data_clean.json") + parser.add_argument("--compact", action="store_true", help="Write compact JSON without indentation") + parser.add_argument( + "--remove-terms", + default="", + help="Comma-separated terms to remove from title/text, e.g. \"حمزة المصطفى,محمد\"", + ) + parser.add_argument( + "--strip-signs", + action="store_true", + help="Remove punctuation/symbol signs from title/text.", + ) + parser.add_argument( + "--cross-record-repeat-min", + type=int, + default=3, + help="Remove chunks repeated in this many records or more (without deleting records).", + ) + args = parser.parse_args() + + in_path = Path(args.input) + if not in_path.exists(): + raise SystemExit(f"Input file not found: {args.input}") + + data = json.loads(in_path.read_text(encoding="utf-8-sig")) + if not isinstance(data, list): + raise SystemExit("Input JSON must be a list of objects.") + + terms_to_remove = [x.strip() for x in args.remove_terms.split(",") if x.strip()] + cleaned = clean_records( + data, + terms_to_remove=terms_to_remove, + strip_signs_flag=args.strip_signs, + cross_record_repeat_min=args.cross_record_repeat_min, + ) + + out_path = Path(args.output) + if args.compact: + payload = json.dumps(cleaned, ensure_ascii=False, separators=(",", ":")) + else: + payload = json.dumps(cleaned, ensure_ascii=False, indent=2) + + out_path.write_text(payload, encoding="utf-8") + print(f"Input records: {len(data)}") + print(f"Cleaned records: {len(cleaned)}") + print(f"Saved to: {args.output}") + + +if __name__ == "__main__": + main()