lastin-ai-2/src/main.py

307 lines
12 KiB
Python

import asyncio
import signal
import logging
import sys
import os
import json
from datetime import datetime, timedelta
from typing import List, Optional
from zoneinfo import ZoneInfo
from src.utils.agent_controller import AgentController
from src.data_acquisition.arxiv_client import ArxivClient
from src.utils.debug import tracker
from src.config.logging_config import setup_logging
logger = logging.getLogger(__name__)
def handle_sigint(signum, frame):
"""Handle interrupt signal."""
logger.info("Received interrupt signal, exiting...")
sys.exit(0)
async def cleanup_resources():
"""Clean up any remaining resources."""
logger.debug("Starting resource cleanup...")
tracker.print_active_resources()
try:
# Cancel all tasks except current
pending = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if pending:
logger.debug(f"Cancelling {len(pending)} pending tasks...")
for task in pending:
if not task.done() and not task.cancelled():
task.cancel()
# Wait for tasks to complete with timeout
try:
logger.debug("Waiting for tasks to complete...")
await asyncio.wait(pending, timeout=5)
except asyncio.CancelledError:
logger.debug("Task wait cancelled")
logger.debug("Cleanup completed successfully")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def fetch_papers(days: int = 7, categories: Optional[List[str]] = None) -> None:
"""Fetch and analyze papers from arXiv."""
logger.info(f"Fetching papers from the last {days} days")
if categories is None:
categories = ["cs.AI"]
async with ArxivClient() as client, AgentController() as agent:
try:
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
logger.info(f"Date range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# Fetch and analyze papers for each category
for category in categories:
logger.info(f"Fetching papers for category: {category}")
papers = await client.fetch_papers(category=category,
start_date=start_date,
end_date=end_date)
if not papers:
print(f"\nNo recent papers found in {category}")
continue
print(f"\nProcessing papers in {category}:")
print("=" * 80)
print()
for i, paper in enumerate(papers, 1):
print(f"Processing paper {i}/{len(papers)}: {paper['title']}")
try:
# Analyze paper
analysis = await agent.analyze_paper(paper)
# Print analysis
print("\nAnalysis:")
print(f"Summary: {analysis.get('summary', 'No summary available')}")
print("\nTechnical Concepts:")
print(analysis.get('technical_concepts', 'No technical concepts available'))
# Print fluff analysis
fluff = analysis.get('fluff', {})
score = fluff.get('score')
if score is not None:
color = '\033[92m' if score < 30 else '\033[93m' if score < 70 else '\033[91m'
reset = '\033[0m'
print(f"\nFluff Score: {color}{score}/100{reset}")
print("Analysis:")
print(fluff.get('explanation', 'No explanation available'))
except Exception as e:
logger.error(f"Error processing paper: {e}")
print("Failed to analyze paper")
print("-" * 80)
print()
except Exception as e:
logger.error(f"Error fetching papers: {e}")
raise
async def fetch_all_papers(categories: List[str], max_results: int = 1000):
"""Fetch all papers from specified categories."""
async with ArxivClient() as client, AgentController() as agent:
for category in categories:
papers = await client.fetch_papers(category=category, max_results=max_results)
print(f"Found {len(papers)} papers in {category}")
for paper in papers:
await agent.analyze_paper(paper)
async def fetch_single_paper(paper_id: str) -> None:
"""Fetch and analyze a single paper by ID."""
print(f"\nFetching paper: {paper_id}")
async with ArxivClient() as client, AgentController() as agent:
try:
# Get paper from arXiv
paper = await client.get_paper_by_id(paper_id)
if not paper:
print(f"\nPaper {paper_id} not found on arXiv.")
return
print(f"\nFound paper: {paper['title']}")
print(f"Authors: {', '.join(paper['authors'])}")
# Analyze the paper
analysis = await agent.analyze_paper(paper)
if analysis:
print("\nAnalysis:")
print("=" * 80)
print("\nSummary:")
print(analysis.get('summary', 'No summary available'))
print("\nTechnical Concepts:")
print(analysis.get('technical_concepts', 'No technical concepts available'))
fluff = analysis.get('fluff', {})
score = fluff.get('score')
if score is not None:
color = '\033[92m' if score < 30 else '\033[93m' if score < 70 else '\033[91m'
reset = '\033[0m'
print(f"\nFluff Score: {color}{score}/100{reset}")
print("Analysis:")
print(fluff.get('explanation', 'No explanation available'))
else:
print("\nPaper was already analyzed or an error occurred during analysis.")
except Exception as e:
logger.error(f"Error processing paper: {e}")
raise
async def process_query(query: str) -> None:
"""Process a search query and display results."""
async with AgentController() as agent:
try:
results = await agent.process_query(query)
if not results:
print("\nNo matching papers found.")
return
print(f"\nFound {len(results)} matching papers:")
print("=" * 80)
for i, paper in enumerate(results, 1):
print(f"\n{i}. {paper['title']}")
print(f" Authors: {', '.join(paper['authors'])}")
analysis = paper.get('analysis', {})
print("\nSummary:")
print(analysis.get('summary', 'No summary available'))
print("\nTechnical Concepts:")
print(analysis.get('technical_concepts', 'No technical concepts available'))
fluff = analysis.get('fluff', {})
score = fluff.get('score')
if score is not None:
color = '\033[92m' if score < 30 else '\033[93m' if score < 70 else '\033[91m'
reset = '\033[0m'
print(f"\nFluff Score: {color}{score}/100{reset}")
print("Analysis:")
print(fluff.get('explanation', 'No explanation available'))
print("-" * 80)
except Exception as e:
logger.error(f"Error processing query: {e}")
raise
async def search_arxiv(query: str, category: Optional[str] = None, max_results: int = 10) -> None:
"""Search papers directly on arXiv."""
print(f"\nSearching arXiv for: {query}")
if category:
print(f"Category: {category}")
async with ArxivClient() as client:
try:
papers = await client.fetch_papers(query=query, category=category, max_results=max_results)
if not papers:
print("\nNo papers found matching your query.")
return
print(f"\nFound {len(papers)} papers:")
print("=" * 80)
for i, paper in enumerate(papers, 1):
print(f"\n{i}. {paper['title']}")
print(f" Authors: {', '.join(paper['authors'])}")
print(f" arXiv ID: {paper['entry_id']}")
print(f" PDF: {paper['pdf_url']}")
print("\nAbstract:")
print(paper.get('abstract', 'No abstract available'))
print("-" * 80)
except Exception as e:
logger.error(f"Error searching arXiv: {e}")
raise
async def main():
"""Main application entry point."""
parser = argparse.ArgumentParser(description='AI Paper Analysis System')
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Fetch papers command
fetch_parser = subparsers.add_parser('fetch', help='Fetch recent papers')
fetch_parser.add_argument('--days', type=int, default=7,
help='Number of days to look back')
fetch_parser.add_argument('--categories', nargs='+', default=['cs.AI'],
help='arXiv categories to fetch')
# Fetch all papers command
fetch_all_parser = subparsers.add_parser('fetch-all', help='Fetch all papers from categories')
fetch_all_parser.add_argument('--categories', nargs='+', default=['cs.AI'],
help='arXiv categories to fetch')
fetch_all_parser.add_argument('--max-results', type=int, default=1000,
help='Maximum number of papers to fetch per category')
# Fetch single paper command
fetch_one_parser = subparsers.add_parser('fetch-paper', help='Fetch and analyze a single paper')
fetch_one_parser.add_argument('paper_id', help='arXiv paper ID (e.g., 2502.06788v1)')
# Search local papers command
search_parser = subparsers.add_parser('search', help='Search papers in local database')
search_parser.add_argument('query', help='Search query')
# Search arXiv directly command
arxiv_parser = subparsers.add_parser('arxiv-search', help='Search papers directly on arXiv')
arxiv_parser.add_argument('query', help='Search query')
arxiv_parser.add_argument('--category', help='arXiv category (e.g., cs.AI)')
arxiv_parser.add_argument('--max-results', type=int, default=10,
help='Maximum number of results to return')
args = parser.parse_args()
if args.command == 'fetch':
await fetch_papers(days=args.days, categories=args.categories)
elif args.command == 'fetch-all':
await fetch_all_papers(args.categories, args.max_results)
elif args.command == 'fetch-paper':
await fetch_single_paper(args.paper_id)
elif args.command == 'search':
await process_query(args.query)
elif args.command == 'arxiv-search':
await search_arxiv(args.query, args.category, args.max_results)
else:
parser.print_help()
def run_main():
"""Run the main application."""
# Set up logging
setup_logging()
# Set up signal handlers for Windows
signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGTERM, handle_sigint)
try:
# Run main application
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application cancelled by user")
except Exception as e:
logger.error(f"Application error: {e}")
sys.exit(1)
finally:
# Clean up
try:
asyncio.run(cleanup_resources())
except Exception as e:
logger.error(f"Error during cleanup: {e}")
if __name__ == "__main__":
import argparse
run_main()