Upload files to "/"
هذا الالتزام موجود في:
2
.gitignore
مباع
Normal file
2
.gitignore
مباع
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
.env
|
||||||
306
main.py
Normal file
306
main.py
Normal file
@@ -0,0 +1,306 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
Interactive R&D Assistant: generates one research idea at a time (user decides when to stop),
|
||||||
|
then suggests 3 search terms per idea, searches mithal.space,
|
||||||
|
and saves everything in a markdown file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import requests
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from urllib.parse import quote
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv()
|
||||||
|
import os
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# 1. LLM configuration
|
||||||
|
# =============================================================================
|
||||||
|
AUTH_TOKEN = os.getenv("AUTH_TOKEN")
|
||||||
|
BASE_URL = "https://chat.cumin.dev/api"
|
||||||
|
HEADERS = {
|
||||||
|
"Authorization": f"Bearer {AUTH_TOKEN}",
|
||||||
|
"Content-Type": "application/json"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Prompt for a SINGLE research idea (JSON format)
|
||||||
|
SINGLE_IDEA_PROMPT = """You are an expert R&D assistant specialized in AI, Machine Learning, and LLMs.
|
||||||
|
Generate exactly ONE creative, underexplored research direction.
|
||||||
|
Respond ONLY with a valid JSON object with exactly these fields:
|
||||||
|
- "title": short title in Arabic
|
||||||
|
- "explanation": brief explanation (2-3 sentences) in Arabic
|
||||||
|
- "novelty": why it's promising and novel evidence (1 sentence) in Arabic
|
||||||
|
- "reference": at least one paper or source (as a short text) in Arabic
|
||||||
|
|
||||||
|
Example:
|
||||||
|
{
|
||||||
|
"title": "عنوان الفكرة",
|
||||||
|
"explanation": "شرح موجز للفكرة...",
|
||||||
|
"novelty": "لماذا هذه الفكرة جديدة وواعدة...",
|
||||||
|
"reference": "مثال: ورقة علمية من arXiv..."
|
||||||
|
}
|
||||||
|
Output ONLY the JSON object, no extra text."""
|
||||||
|
|
||||||
|
def create_conversation(title="R&D Session"):
|
||||||
|
url = f"{BASE_URL}/conversations"
|
||||||
|
resp = requests.post(url, headers=HEADERS, json={"title": title})
|
||||||
|
if resp.status_code in (200, 201):
|
||||||
|
return resp.json().get("id")
|
||||||
|
else:
|
||||||
|
print(f"Failed to create conversation: {resp.status_code} - {resp.text}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def send_message(conv_id, text):
|
||||||
|
url = f"{BASE_URL}/conversations/{conv_id}/chat"
|
||||||
|
resp = requests.post(url, headers=HEADERS, json={"message": text, "files": []})
|
||||||
|
return resp.status_code == 200
|
||||||
|
|
||||||
|
def get_conversation(conv_id):
|
||||||
|
url = f"{BASE_URL}/conversations/{conv_id}"
|
||||||
|
resp = requests.get(url, headers=HEADERS)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
return resp.json()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def wait_for_assistant_reply(conv_id, last_msg_count, timeout=120, poll_interval=2):
|
||||||
|
"""Wait for assistant reply, with interruptible polls."""
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < timeout:
|
||||||
|
conv = get_conversation(conv_id)
|
||||||
|
if conv and "messages" in conv and len(conv["messages"]) > last_msg_count:
|
||||||
|
last_msg = conv["messages"][-1]
|
||||||
|
if last_msg["role"] == "assistant":
|
||||||
|
return last_msg["content"]
|
||||||
|
time.sleep(poll_interval)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_llm_response(prompt, timeout=120):
|
||||||
|
"""Create a fresh conversation, send prompt, wait for reply."""
|
||||||
|
conv_id = create_conversation()
|
||||||
|
if not conv_id:
|
||||||
|
return None
|
||||||
|
if not send_message(conv_id, prompt):
|
||||||
|
return None
|
||||||
|
conv_data = get_conversation(conv_id)
|
||||||
|
if not conv_data:
|
||||||
|
return None
|
||||||
|
msg_count = len(conv_data.get("messages", []))
|
||||||
|
print(" ⏳ Waiting for LLM response (up to {}s)...".format(timeout))
|
||||||
|
reply = wait_for_assistant_reply(conv_id, msg_count, timeout=timeout)
|
||||||
|
return reply
|
||||||
|
|
||||||
|
def parse_single_idea(raw_json_str):
|
||||||
|
"""Parse LLM response into a dict; return None on failure."""
|
||||||
|
try:
|
||||||
|
clean = raw_json_str.strip()
|
||||||
|
if clean.startswith("```json"):
|
||||||
|
clean = clean[7:]
|
||||||
|
if clean.startswith("```"):
|
||||||
|
clean = clean[3:]
|
||||||
|
if clean.endswith("```"):
|
||||||
|
clean = clean[:-3]
|
||||||
|
idea = json.loads(clean)
|
||||||
|
if isinstance(idea, dict) and all(k in idea for k in ("title", "explanation", "novelty", "reference")):
|
||||||
|
return idea
|
||||||
|
else:
|
||||||
|
print(" ❌ LLM response missing required fields.")
|
||||||
|
return None
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(" ❌ Failed to parse JSON from LLM.")
|
||||||
|
print(" Raw response (first 500 chars):", raw_json_str[:500])
|
||||||
|
return None
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# 2. Search engine integration (no screenshots)
|
||||||
|
# =============================================================================
|
||||||
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
||||||
|
|
||||||
|
RESULT_LINK_SELECTOR = "a.result-link"
|
||||||
|
TITLE_SELECTOR = "h3"
|
||||||
|
URL_SELECTOR = "small.arabic-url"
|
||||||
|
|
||||||
|
def search_mithal(query, retry_user_choice=True):
|
||||||
|
"""
|
||||||
|
Perform a single search on mithal.space and return top 3 results as list of (title, url).
|
||||||
|
If retry_user_choice is True, on failure ask user to retry or skip.
|
||||||
|
"""
|
||||||
|
search_url = f"https://mithal.space/search?q={quote(query, safe='')}"
|
||||||
|
print(f" 🔍 Searching: {query}")
|
||||||
|
|
||||||
|
with sync_playwright() as p:
|
||||||
|
browser = p.chromium.launch(headless=True)
|
||||||
|
context = browser.new_context(locale="ar-SA", viewport={"width": 1280, "height": 900})
|
||||||
|
page = context.new_page()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
page.goto(search_url, wait_until="domcontentloaded", timeout=15000)
|
||||||
|
page.wait_for_selector(RESULT_LINK_SELECTOR, timeout=10000)
|
||||||
|
links = page.query_selector_all(RESULT_LINK_SELECTOR)
|
||||||
|
results = []
|
||||||
|
for i, link in enumerate(links[:3]):
|
||||||
|
title_elem = link.query_selector(TITLE_SELECTOR)
|
||||||
|
title = title_elem.inner_text() if title_elem else "(بدون عنوان)"
|
||||||
|
url_elem = link.query_selector(URL_SELECTOR)
|
||||||
|
url = url_elem.get_attribute("data-url") if url_elem else link.get_attribute("href")
|
||||||
|
results.append((title.strip(), url.strip() if url else "#"))
|
||||||
|
browser.close()
|
||||||
|
return results
|
||||||
|
except PlaywrightTimeoutError:
|
||||||
|
print(f" ⚠️ Search timed out for: {query}")
|
||||||
|
if retry_user_choice:
|
||||||
|
choice = input(" Retry (r), Skip this term (s), or Abort whole script (a)? ").strip().lower()
|
||||||
|
if choice == 'r':
|
||||||
|
continue
|
||||||
|
elif choice == 's':
|
||||||
|
browser.close()
|
||||||
|
return []
|
||||||
|
elif choice == 'a':
|
||||||
|
browser.close()
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
browser.close()
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ❌ Error searching: {e}")
|
||||||
|
if retry_user_choice:
|
||||||
|
choice = input(" Retry (r), Skip (s), or Abort (a)? ").strip().lower()
|
||||||
|
if choice == 'r':
|
||||||
|
continue
|
||||||
|
elif choice == 's':
|
||||||
|
browser.close()
|
||||||
|
return []
|
||||||
|
elif choice == 'a':
|
||||||
|
browser.close()
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# 3. Search term generation for a given idea
|
||||||
|
# =============================================================================
|
||||||
|
def get_search_terms_for_idea(idea_title, idea_explanation, timeout=90):
|
||||||
|
"""Ask LLM to generate 3 Arabic search queries for a given idea."""
|
||||||
|
prompt = f"""Based on the following R&D idea:
|
||||||
|
Title: {idea_title}
|
||||||
|
Explanation: {idea_explanation}
|
||||||
|
|
||||||
|
Generate exactly 3 Arabic search queries (short phrases) that would help find related research, papers, or projects online.
|
||||||
|
Return ONLY a JSON array of strings, e.g. ["استعلام 1", "استعلام 2", "استعلام 3"].
|
||||||
|
No extra text."""
|
||||||
|
reply = get_llm_response(prompt, timeout=timeout)
|
||||||
|
if not reply:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
clean = reply.strip()
|
||||||
|
if clean.startswith("```json"): clean = clean[7:]
|
||||||
|
if clean.startswith("```"): clean = clean[3:]
|
||||||
|
if clean.endswith("```"): clean = clean[:-3]
|
||||||
|
terms = json.loads(clean)
|
||||||
|
if isinstance(terms, list) and all(isinstance(t, str) for t in terms):
|
||||||
|
return terms[:3]
|
||||||
|
else:
|
||||||
|
print(f" ⚠️ Invalid search terms format for idea: {idea_title}")
|
||||||
|
return None
|
||||||
|
except:
|
||||||
|
print(f" ⚠️ Could not parse search terms for idea: {idea_title}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# 4. Main interactive loop
|
||||||
|
# =============================================================================
|
||||||
|
def main():
|
||||||
|
print("="*70)
|
||||||
|
print("INTERACTIVE R&D ASSISTANT with integrated search")
|
||||||
|
print("Type 'stop' when asked for another idea, or press Ctrl+C to abort.")
|
||||||
|
print("="*70)
|
||||||
|
|
||||||
|
# Create output file with timestamp
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
out_file = Path(f"rd_ideas_{timestamp}.md")
|
||||||
|
|
||||||
|
idea_counter = 0
|
||||||
|
out_file.write_text(f"# R&D Ideas Report\n*Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n", encoding="utf-8")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
idea_counter += 1
|
||||||
|
print(f"\n📡 Generating idea #{idea_counter}...")
|
||||||
|
raw_idea = get_llm_response(SINGLE_IDEA_PROMPT, timeout=120)
|
||||||
|
if not raw_idea:
|
||||||
|
print("❌ Failed to get idea from LLM. Do you want to try again?")
|
||||||
|
retry = input("Try again? (y/n): ").strip().lower()
|
||||||
|
if retry == 'y':
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
idea = parse_single_idea(raw_idea)
|
||||||
|
if not idea:
|
||||||
|
print("❌ Could not parse idea. Skipping this one.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
title = idea.get("title", "بدون عنوان")
|
||||||
|
explanation = idea.get("explanation", "")
|
||||||
|
novelty = idea.get("novelty", "")
|
||||||
|
reference = idea.get("reference", "")
|
||||||
|
|
||||||
|
# Write idea to markdown file
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write(f"## فكرة {idea_counter}: {title}\n\n")
|
||||||
|
f.write(f"**الشرح:** {explanation}\n\n")
|
||||||
|
f.write(f"**الجدة:** {novelty}\n\n")
|
||||||
|
f.write(f"**مرجع:** {reference}\n\n")
|
||||||
|
|
||||||
|
print(f"\n✅ Idea {idea_counter}: {title}")
|
||||||
|
print(" Generating 3 search terms (LLM)...")
|
||||||
|
|
||||||
|
terms = get_search_terms_for_idea(title, explanation, timeout=90)
|
||||||
|
if not terms:
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write("**❌ فشل في توليد مصطلحات البحث**\n\n---\n\n")
|
||||||
|
print(" ⚠️ No search terms generated. Moving to next idea decision.\n")
|
||||||
|
else:
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write("### مصطلحات البحث والنتائج\n\n")
|
||||||
|
|
||||||
|
for term_idx, term in enumerate(terms, 1):
|
||||||
|
print(f" Term {term_idx}: {term}")
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write(f"#### مصطلح {term_idx}: `{term}`\n\n")
|
||||||
|
|
||||||
|
results = search_mithal(term, retry_user_choice=True)
|
||||||
|
if not results:
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write("*لا توجد نتائج* \n\n")
|
||||||
|
else:
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
for rank, (res_title, res_url) in enumerate(results, 1):
|
||||||
|
f.write(f"{rank}. [{res_title}]({res_url}) \n")
|
||||||
|
f.write("\n")
|
||||||
|
|
||||||
|
with out_file.open("a", encoding="utf-8") as f:
|
||||||
|
f.write("---\n\n")
|
||||||
|
|
||||||
|
# Ask if user wants another idea
|
||||||
|
while True:
|
||||||
|
again = input("\n🔁 Do you want another research idea? (y/n): ").strip().lower()
|
||||||
|
if again in ('y', 'n'):
|
||||||
|
break
|
||||||
|
print("Please answer 'y' or 'n'.")
|
||||||
|
if again == 'n':
|
||||||
|
break
|
||||||
|
|
||||||
|
print(f"\n🎉 Finished! Results saved to: {out_file.resolve()}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n\n⚠️ User interrupted. Exiting gracefully.")
|
||||||
|
sys.exit(0)
|
||||||
ثنائية
requirements.txt
Normal file
ثنائية
requirements.txt
Normal file
ملف ثنائي غير معروض.
المرجع في مشكلة جديدة
حظر مستخدم