Add boutmoun12.py and clean_data_spacy.py

هذا الالتزام موجود في:
2026-03-09 15:25:02 +03:00
الأصل f0a58c084e
التزام b14346f5af
2 ملفات معدلة مع 367 إضافات و0 حذوفات

109
boutmoun12.py Normal file
عرض الملف

@@ -0,0 +1,109 @@
import json
import re
import time
from collections import deque
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
START_URL = "https://sana.sy"
MAX_PAGES = 10000
REQUEST_DELAY_SECONDS = 0.4
TIMEOUT_SECONDS = 15
OUTPUT_FILE = "site_data2.json"
VERIFY_SSL = False
def is_same_domain(url: str, base_netloc: str) -> bool:
return urlparse(url).netloc == base_netloc
def normalize_url(url: str) -> str:
parsed = urlparse(url)
clean = parsed._replace(fragment="", query="")
return clean.geturl().rstrip("/")
def extract_text(soup: BeautifulSoup) -> str:
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
return re.sub(r"\s+", " ", text).strip()
def crawl_site(start_url: str, max_pages: int = 100):
session = requests.Session()
session.headers.update(
{
"User-Agent": "Mozilla/5.0 (compatible; simple-crawler/1.0; +https://example.com/bot)"
}
)
if not VERIFY_SSL:
requests.packages.urllib3.disable_warnings()
base_netloc = urlparse(start_url).netloc
queue = deque([normalize_url(start_url)])
visited = set()
results = []
while queue and len(visited) < max_pages:
current_url = queue.popleft()
if current_url in visited:
continue
try:
response = session.get(
current_url,
timeout=TIMEOUT_SECONDS,
verify=VERIFY_SSL,
)
response.raise_for_status()
except requests.RequestException as exc:
print(f"[SKIP] {current_url} -> {exc}")
visited.add(current_url)
continue
content_type = response.headers.get("Content-Type", "")
if "text/html" not in content_type:
visited.add(current_url)
continue
soup = BeautifulSoup(response.text, "html.parser")
page_title = soup.title.get_text(strip=True) if soup.title else ""
page_text = extract_text(soup)
results.append(
{
"site_url":START_URL,
"url": current_url,
"title": page_title,
"text": page_text,
}
)
visited.add(current_url)
print(f"[OK] ({len(visited)}/{max_pages}) {current_url}")
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"].strip()
absolute_url = normalize_url(urljoin(current_url, href))
parsed = urlparse(absolute_url)
if parsed.scheme not in {"http", "https"}:
continue
if not is_same_domain(absolute_url, base_netloc):
continue
if absolute_url not in visited:
queue.append(absolute_url)
time.sleep(REQUEST_DELAY_SECONDS)
return results
if __name__ == "__main__":
data = crawl_site(START_URL, MAX_PAGES)
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"\nSaved {len(data)} pages to {OUTPUT_FILE}")

258
clean_data_spacy.py Normal file
عرض الملف

@@ -0,0 +1,258 @@
import argparse
import json
import re
from pathlib import Path
from urllib.parse import urlparse
import spacy
NOISE_SNIPPETS = [
"القائمة الرئيسية",
"جميع الحقوق محفوظة",
"حقوق النشر",
"تواصل معنا",
"AR EN",
"KO KO",
]
def normalize_url(u: str) -> str:
p = urlparse((u or "").strip())
p = p._replace(fragment="", query="")
return p.geturl().rstrip("/")
def fix_mojibake(text: str) -> str:
if not text:
return ""
candidates = [text]
for src_enc in ("latin1", "cp1252"):
try:
decoded = text.encode(src_enc, errors="ignore").decode("utf-8", errors="ignore")
if decoded:
candidates.append(decoded)
except Exception:
pass
def score(s: str) -> int:
arabic = len(re.findall(r"[\u0600-\u06FF]", s))
broken = sum(s.count(ch) for ch in ("Ø", "Ù", "ظ", "ط", "Ã", "Â"))
return arabic - (2 * broken)
return max(candidates, key=score)
def strip_noise(text: str) -> str:
t = text
for snippet in NOISE_SNIPPETS:
t = t.replace(snippet, " ")
t = t.replace(snippet.lower(), " ")
return re.sub(r"\s+", " ", t).strip()
def remove_terms(text: str, terms):
t = text
for term in terms:
if not term:
continue
t = t.replace(term, " ")
return re.sub(r"\s+", " ", t).strip()
def strip_signs(text: str) -> str:
# Keep Arabic, English letters, and digits; remove punctuation/symbols.
t = re.sub(r"[^\w\s\u0600-\u06FF]", " ", text)
return re.sub(r"\s+", " ", t).strip()
def dedupe_sentences(text: str) -> str:
# Remove repeated sentence-like chunks while preserving order.
chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text)
seen = set()
out = []
for chunk in chunks:
c = re.sub(r"\s+", " ", chunk).strip()
if not c:
continue
key = c.lower()
if key in seen:
continue
seen.add(key)
out.append(c)
return " ".join(out).strip()
def split_chunks(text: str):
chunks = re.split(r"(?<=[\.\!\?؟؛])\s+|\n+", text)
out = []
for c in chunks:
c = re.sub(r"\s+", " ", c).strip()
if c:
out.append(c)
return out
def remove_global_repeated_chunks(text: str, repeated_chunks):
chunks = split_chunks(text)
filtered = [c for c in chunks if c not in repeated_chunks]
return " ".join(filtered).strip()
def detokenize(tokens):
no_space_before = {
".",
",",
":",
";",
"!",
"?",
")",
"]",
"}",
"،",
"؛",
"؟",
}
no_space_after = {"(", "[", "{"}
out = []
for tok in tokens:
if not out:
out.append(tok)
continue
prev = out[-1]
if tok in no_space_before or prev in no_space_after:
out[-1] = prev + tok
else:
out.append(" " + tok)
return "".join(out).strip()
def clean_text_with_spacy(nlp, text: str) -> str:
doc = nlp(text)
tokens = [t.text for t in doc if not t.is_space]
return detokenize(tokens)
def clean_records(
records,
terms_to_remove,
strip_signs_flag: bool,
cross_record_repeat_min: int,
):
nlp = spacy.blank("xx")
cleaned = []
for row in records:
if not isinstance(row, dict):
continue
site_url = normalize_url(str(row.get("site_url", "")))
url = normalize_url(str(row.get("url", "")))
title = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("title", "")))))
text = clean_text_with_spacy(nlp, strip_noise(fix_mojibake(str(row.get("text", "")))))
if terms_to_remove:
title = remove_terms(title, terms_to_remove)
text = remove_terms(text, terms_to_remove)
if strip_signs_flag:
title = strip_signs(title)
text = strip_signs(text)
title = dedupe_sentences(title)
text = dedupe_sentences(text)
if not url:
continue
cleaned.append(
{
"site_url": site_url,
"url": url,
"title": title,
"text": text,
}
)
# Remove chunks repeated across many records without deleting records.
chunk_doc_count = {}
for idx, rec in enumerate(cleaned):
uniq_chunks = set(split_chunks(rec["title"]) + split_chunks(rec["text"]))
for ch in uniq_chunks:
if len(ch) < 8:
continue
chunk_doc_count[ch] = chunk_doc_count.get(ch, 0) + 1
repeated_chunks = {
ch
for ch, cnt in chunk_doc_count.items()
if cnt >= cross_record_repeat_min
}
for rec in cleaned:
rec["title"] = remove_global_repeated_chunks(rec["title"], repeated_chunks)
rec["text"] = remove_global_repeated_chunks(rec["text"], repeated_chunks)
rec["title"] = re.sub(r"\s+", " ", rec["title"]).strip()
rec["text"] = re.sub(r"\s+", " ", rec["text"]).strip()
return cleaned
def main():
parser = argparse.ArgumentParser(description="Clean crawled JSON data using spaCy tokenization.")
parser.add_argument("--input", default="site_data_raw.json")
parser.add_argument("--output", default="site_data_clean.json")
parser.add_argument("--compact", action="store_true", help="Write compact JSON without indentation")
parser.add_argument(
"--remove-terms",
default="",
help="Comma-separated terms to remove from title/text, e.g. \"حمزة المصطفى,محمد\"",
)
parser.add_argument(
"--strip-signs",
action="store_true",
help="Remove punctuation/symbol signs from title/text.",
)
parser.add_argument(
"--cross-record-repeat-min",
type=int,
default=3,
help="Remove chunks repeated in this many records or more (without deleting records).",
)
args = parser.parse_args()
in_path = Path(args.input)
if not in_path.exists():
raise SystemExit(f"Input file not found: {args.input}")
data = json.loads(in_path.read_text(encoding="utf-8-sig"))
if not isinstance(data, list):
raise SystemExit("Input JSON must be a list of objects.")
terms_to_remove = [x.strip() for x in args.remove_terms.split(",") if x.strip()]
cleaned = clean_records(
data,
terms_to_remove=terms_to_remove,
strip_signs_flag=args.strip_signs,
cross_record_repeat_min=args.cross_record_repeat_min,
)
out_path = Path(args.output)
if args.compact:
payload = json.dumps(cleaned, ensure_ascii=False, separators=(",", ":"))
else:
payload = json.dumps(cleaned, ensure_ascii=False, indent=2)
out_path.write_text(payload, encoding="utf-8")
print(f"Input records: {len(data)}")
print(f"Cleaned records: {len(cleaned)}")
print(f"Saved to: {args.output}")
if __name__ == "__main__":
main()