import argparse import json import re from pathlib import Path from urllib.parse import urlparse import spacy NOISE_SNIPPETS = [ "القائمة الرئيسية", "جميع الحقوق محفوظة", "حقوق النشر", "تواصل معنا", "AR EN", "KO KO", ] def normalize_url(u: str) -> str: p = urlparse((u or "").strip()) p = p._replace(fragment="", query="") return p.geturl().rstrip("/") def fix_mojibake(text: str) -> str: if not text: return "" candidates = [text] for src_enc in ("latin1", "cp1252"): try: decoded = text.encode(src_enc, errors="ignore").decode("utf-8", errors="ignore") if decoded: candidates.append(decoded) except Exception: pass def score(s: str) -> int: arabic = len(re.findall(r"[\u0600-\u06FF]", s)) broken = sum(s.count(ch) for ch in ("Ø", "Ù", "ظ", "ط", "Ã", "Â")) return arabic - (2 * broken) return max(candidates, key=score) def strip_noise(text: str) -> str: t = text for snippet in NOISE_SNIPPETS: t = t.replace(snippet, " ") t = t.replace(snippet.lower(), " ") return re.sub(r"\s+", " ", t).strip() def remove_terms(text: str, terms): t = text for term in terms: if not term: continue t = t.replace(term, " ") return re.sub(r"\s+", " ", t).strip() def strip_signs(text: str) -> str: # Keep Arabic, English letters, and digits; remove punctuation/symbols. t = re.sub(r"[^\w\s\u0600-\u06FF]", " ", text) return re.sub(r"\s+", " ", t).strip() def dedupe_sentences(text: str) -> str: # Remove repeated sentence-like chunks while preserving order. chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text) seen = set() out = [] for chunk in chunks: c = re.sub(r"\s+", " ", chunk).strip() if not c: continue key = c.lower() if key in seen: continue seen.add(key) out.append(c) return " ".join(out).strip() def split_chunks(text: str): chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text) out = [] for c in chunks: c = re.sub(r"\s+", " ", c).strip() if c: out.append(c) return out def remove_global_repeated_chunks(text: str, repeated_chunks): chunks = split_chunks(text) filtered = [c for c in chunks if c not in repeated_chunks] return " ".join(filtered).strip() def detokenize(tokens): no_space_before = { ".", ",", ":", ";", "!", "?", ")", "]", "}", "،", "؛", "؟", } no_space_after = {"(", "[", "{"} out = [] for tok in tokens: if not out: out.append(tok) continue prev = out[-1] if tok in no_space_before or prev in no_space_after: out[-1] = prev + tok else: out.append(" " + tok) return "".join(out).strip() def clean_text_with_spacy(nlp, text: str) -> str: doc = nlp(text) tokens = [t.text for t in doc if not t.is_space] return detokenize(tokens) def clean_records( records, terms_to_remove, strip_signs_flag: bool, cross_record_repeat_min: int, ): nlp = spacy.blank("xx") cleaned = [] for row in records: if not isinstance(row, dict): continue site_url = normalize_url(str(row.get("site_url", ""))) url = normalize_url(str(row.get("url", ""))) title = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("title", ""))))) text = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("text", ""))))) if terms_to_remove: title = remove_terms(title, terms_to_remove) text = remove_terms(text, terms_to_remove) if strip_signs_flag: title = strip_signs(title) text = strip_signs(text) title = dedupe_sentences(title) text = dedupe_sentences(text) if not url: continue cleaned.append( { "site_url": site_url, "url": url, "title": title, "text": text, } ) # Remove chunks repeated across many records without deleting records. chunk_doc_count = {} for idx, rec in enumerate(cleaned): uniq_chunks = set(split_chunks(rec["title"]) + split_chunks(rec["text"])) for ch in uniq_chunks: if len(ch) < 8: continue chunk_doc_count[ch] = chunk_doc_count.get(ch, 0) + 1 repeated_chunks = { ch for ch, cnt in chunk_doc_count.items() if cnt >= cross_record_repeat_min } for rec in cleaned: rec["title"] = remove_global_repeated_chunks(rec["title"], repeated_chunks) rec["text"] = remove_global_repeated_chunks(rec["text"], repeated_chunks) rec["title"] = re.sub(r"\s+", " ", rec["title"]).strip() rec["text"] = re.sub(r"\s+", " ", rec["text"]).strip() return cleaned def main(): parser = argparse.ArgumentParser(description="Clean crawled JSON data using spaCy tokenization.") parser.add_argument("--input", default="site_data_raw.json") parser.add_argument("--output", default="site_data_clean.json") parser.add_argument("--compact", action="store_true", help="Write compact JSON without indentation") parser.add_argument( "--remove-terms", default="", help="Comma-separated terms to remove from title/text, e.g. \"حمزة المصطفى,محمد\"", ) parser.add_argument( "--strip-signs", action="store_true", help="Remove punctuation/symbol signs from title/text.", ) parser.add_argument( "--cross-record-repeat-min", type=int, default=3, help="Remove chunks repeated in this many records or more (without deleting records).", ) args = parser.parse_args() in_path = Path(args.input) if not in_path.exists(): raise SystemExit(f"Input file not found: {args.input}") data = json.loads(in_path.read_text(encoding="utf-8-sig")) if not isinstance(data, list): raise SystemExit("Input JSON must be a list of objects.") terms_to_remove = [x.strip() for x in args.remove_terms.split(",") if x.strip()] cleaned = clean_records( data, terms_to_remove=terms_to_remove, strip_signs_flag=args.strip_signs, cross_record_repeat_min=args.cross_record_repeat_min, ) out_path = Path(args.output) if args.compact: payload = json.dumps(cleaned, ensure_ascii=False, separators=(",", ":")) else: payload = json.dumps(cleaned, ensure_ascii=False, indent=2) out_path.write_text(payload, encoding="utf-8") print(f"Input records: {len(data)}") print(f"Cleaned records: {len(cleaned)}") print(f"Saved to: {args.output}") if __name__ == "__main__": main()