238 lines
9.4 KiB
Python
238 lines
9.4 KiB
Python
"""
|
|
Agent Controller for managing AI interactions and paper analysis.
|
|
"""
|
|
import os
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
from src.analysis.llm_analyzer import LLMAnalyzer
|
|
from src.storage.paper_store import PaperStore
|
|
from src.storage.vector_store import VectorStore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AgentController:
|
|
"""Controller class for managing AI agent operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the agent controller."""
|
|
self.initialized = False
|
|
self.llm_analyzer = None
|
|
self.paper_store = None
|
|
self.vector_store = None
|
|
self.papers_dir = Path("papers")
|
|
|
|
async def __aenter__(self):
|
|
"""Async context manager entry."""
|
|
await self.initialize()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Async context manager exit."""
|
|
await self.close()
|
|
return None # Don't suppress exceptions
|
|
|
|
async def initialize(self):
|
|
"""Initialize the agent and prepare it for paper analysis."""
|
|
if self.initialized:
|
|
return
|
|
|
|
try:
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Get API keys from environment
|
|
deepseek_key = os.getenv('DEEPSEEK_API_KEY')
|
|
if not deepseek_key:
|
|
raise ValueError("DEEPSEEK_API_KEY environment variable is required")
|
|
|
|
# Create papers directory if it doesn't exist
|
|
self.papers_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize components
|
|
self.llm_analyzer = LLMAnalyzer(api_key=deepseek_key, provider='deepseek')
|
|
self.paper_store = PaperStore()
|
|
self.vector_store = VectorStore()
|
|
|
|
# Initialize database
|
|
await self.paper_store.initialize()
|
|
|
|
self.initialized = True
|
|
logger.info("Agent controller initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize agent controller: {e}")
|
|
raise
|
|
|
|
async def analyze_paper(self, paper_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze a research paper using AI capabilities.
|
|
|
|
Args:
|
|
paper_data (dict): Paper metadata and content to analyze
|
|
|
|
Returns:
|
|
dict: Analysis results including summary, technical concepts, and fluff analysis
|
|
"""
|
|
if not self.initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
# Get paper text (combine title and abstract)
|
|
paper_text = f"Title: {paper_data.get('title', '')}\n\n"
|
|
if paper_data.get('abstract'):
|
|
paper_text += f"Abstract: {paper_data['abstract']}\n\n"
|
|
|
|
# Analyze paper using LLM
|
|
analysis = await self.llm_analyzer.analyze_paper(paper_text)
|
|
|
|
# Store paper in databases
|
|
paper_id = paper_data.get('entry_id') # Use entry_id from arXiv
|
|
if paper_id:
|
|
# Extract just the ID part from the arXiv URL
|
|
paper_id = paper_id.split('/')[-1] # This will get "2502.06788v1" from "http://arxiv.org/abs/2502.06788v1"
|
|
|
|
logger.debug(f"Checking PostgreSQL for paper {paper_id}")
|
|
existing = await self.paper_store.get_paper(paper_id)
|
|
if existing:
|
|
logger.info(f"Paper {paper_id} already in database - skipping")
|
|
return existing
|
|
|
|
logger.debug(f"Checking vector store for paper {paper_id}")
|
|
if self.vector_store.paper_exists(paper_id):
|
|
logger.warning(f"Found orphaned vector entry for {paper_id} - repairing")
|
|
await self._repair_orphaned_paper(paper_id, paper_data)
|
|
return {}
|
|
|
|
# Clean paper_id to use as filename
|
|
safe_id = paper_id.replace('.', '_')
|
|
|
|
# Save paper content to file
|
|
paper_path = self.papers_dir / f"{safe_id}.json"
|
|
with open(paper_path, 'w', encoding='utf-8') as f:
|
|
json.dump(paper_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Store metadata in PostgreSQL
|
|
metadata = {
|
|
'id': paper_id,
|
|
'title': paper_data['title'],
|
|
'authors': paper_data['authors'],
|
|
'summary': analysis.get('summary'),
|
|
'technical_concepts': analysis.get('technical_concepts'),
|
|
'fluff_score': analysis.get('fluff', {}).get('score'),
|
|
'fluff_explanation': analysis.get('fluff', {}).get('explanation'),
|
|
'pdf_url': paper_data.get('pdf_url')
|
|
}
|
|
await self.paper_store.store_paper(metadata)
|
|
|
|
# Store in vector database for similarity search
|
|
chunks = [
|
|
paper_text, # Store full text
|
|
analysis.get('summary', ''), # Store summary
|
|
analysis.get('technical_concepts', '') # Store technical concepts
|
|
]
|
|
chunk_metadata = [
|
|
{'paper_id': paper_id, 'type': 'full_text'},
|
|
{'paper_id': paper_id, 'type': 'summary'},
|
|
{'paper_id': paper_id, 'type': 'technical_concepts'}
|
|
]
|
|
chunk_ids = [
|
|
f"{safe_id}_text",
|
|
f"{safe_id}_summary",
|
|
f"{safe_id}_concepts"
|
|
]
|
|
self.vector_store.add_chunks(chunks, chunk_metadata, chunk_ids)
|
|
|
|
return analysis
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to analyze paper: {e}")
|
|
raise
|
|
|
|
async def process_query(self, query: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process a search query and return relevant papers.
|
|
|
|
Args:
|
|
query (str): Search query string
|
|
|
|
Returns:
|
|
list: List of relevant papers with their analysis
|
|
"""
|
|
if not self.initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
# Get similar papers from vector store
|
|
results = self.vector_store.query_similar(query)
|
|
|
|
# Get unique paper IDs from results
|
|
paper_ids = set()
|
|
for metadata in results['metadatas']:
|
|
if metadata and 'paper_id' in metadata:
|
|
paper_ids.add(metadata['paper_id'])
|
|
|
|
# Get paper metadata from PostgreSQL
|
|
papers = []
|
|
for paper_id in paper_ids:
|
|
paper = await self.paper_store.get_paper(paper_id)
|
|
if paper:
|
|
# Load full paper data if needed
|
|
safe_id = paper_id.replace('.', '_')
|
|
paper_path = self.papers_dir / f"{safe_id}.json"
|
|
if paper_path.exists():
|
|
with open(paper_path, 'r', encoding='utf-8') as f:
|
|
full_paper = json.load(f)
|
|
paper['full_data'] = full_paper
|
|
papers.append(paper)
|
|
|
|
return papers
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing query: {e}")
|
|
raise
|
|
|
|
async def close(self):
|
|
"""Clean up resources."""
|
|
if not self.initialized:
|
|
return
|
|
|
|
try:
|
|
if self.llm_analyzer:
|
|
await self.llm_analyzer.close()
|
|
if self.paper_store:
|
|
await self.paper_store.close()
|
|
if self.vector_store:
|
|
await self.vector_store.close()
|
|
|
|
self.initialized = False
|
|
logger.info("Agent controller closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to close agent controller: {e}")
|
|
raise
|
|
|
|
async def _repair_orphaned_paper(self, paper_id: str, paper_data: Dict):
|
|
"""Repair a paper that exists in vector store but not PostgreSQL."""
|
|
try:
|
|
# Try to get fresh metadata from arXiv
|
|
fresh_data = await self.arxiv_client.get_paper_by_id(paper_id)
|
|
|
|
if fresh_data:
|
|
# Store in PostgreSQL with original analysis data
|
|
await self.paper_store.store_paper({
|
|
**fresh_data,
|
|
"technical_concepts": paper_data.get('technical_concepts'),
|
|
"fluff_score": paper_data.get('fluff_score'),
|
|
"fluff_explanation": paper_data.get('fluff_explanation')
|
|
})
|
|
logger.info(f"Repaired orphaned paper {paper_id}")
|
|
else:
|
|
# Paper no longer exists on arXiv - clean up vector store
|
|
self.vector_store.delete_paper(paper_id)
|
|
logger.warning(f"Deleted orphaned paper {paper_id} (not found on arXiv)")
|
|
except Exception as e:
|
|
logger.error(f"Failed to repair paper {paper_id}: {e}")
|