Initial commit: Academic paper processing system

This commit is contained in:
kpcto 2025-01-31 02:00:36 +00:00
commit ec6a6b0cd0
9 changed files with 470 additions and 0 deletions

20
.env.example Normal file
View File

@ -0,0 +1,20 @@
# LLM API Keys
DEEPSEEK_API_KEY=your-deepseek-key-here
OPENAI_API_KEY=your-openai-key-here # Optional backup provider
# Database Credentials
POSTGRES_USER=your-postgres-user
POSTGRES_PASSWORD=your-postgres-password
POSTGRES_DB=paper_review
# Redis Configuration
REDIS_URL=redis://localhost:6379/0
# Storage Paths
CACHE_DIR=cache/papers
VECTOR_STORE_PATH=data/chroma
LOG_PATH=logs/agent.log
# Security
ENCRYPTION_KEY=your-encryption-key-here
MAX_REQUESTS_PER_MINUTE=100

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
venv/
.venv/
.env
__pycache__/
*.pyc
.vscode/
.idea/
*.sqlite3
*.comments

62
architecture.md Normal file
View File

@ -0,0 +1,62 @@
# AI Paper Review Agent Architecture
## Overview
Modular system for fetching, processing, and summarizing academic papers using LangChain.
### Core Components
1. **Data Acquisition Layer**
- arXiv API Client (REST interface)
- PDF Downloader Service
- Metadata Extraction Module
2. **Processing Pipeline**
- PDF Text Extractor (PyPDF2/Unstructured)
- Semantic Chunker
- Metadata Enricher (author institutions, citations)
3. **Analysis Engine**
- LangChain Document Loaders
- Multi-stage Summary Chain (Deepseek r1)
- Technical Concept Extractor
- Cross-Paper Insight Aggregator
4. **Storage Layer**
- Relational Storage (PostgreSQL)
- Vector Store (Chroma)
- Cache System (Redis)
5. **Orchestration**
- Agent Controller Class
- Retry Mechanism with Exponential Backoff
- Quality Assurance Checks
## Architectural Diagram
```mermaid
graph LR
A[User Query] --> B(arXiv API)
B --> C[PDF Storage]
C --> D{Processing Queue}
D --> E[Text Extraction]
E --> F[Chunking]
F --> G[Embedding]
G --> H[Vector Store]
H --> I[LLM Analysis]
I --> J[Report Generation]
```
## Key Decisions
1. **Modular Design**: Components communicate via clean interfaces for easy replacement
2. **Batch Processing**: Asynchronous pipeline for parallel paper processing
3. **Caching Layer**: Reduces API calls and improves performance
4. **Fallback Strategies**: Multiple PDF parsers with automatic fallback
5. **Security**: Environment variables for credentials, encrypted storage
## Dependencies
```python
langchain==0.2.1
arxiv==2.1.0
unstructured==0.12.2
openai==1.30.1
faiss-cpu==1.8.0
sqlalchemy==2.0.30

63
config/settings.yaml Normal file
View File

@ -0,0 +1,63 @@
# AI Paper Review Agent Configuration
# LLM Settings
llm:
temperature: 0.5
max_tokens: 4096
model: deepseek-r1
# ArXiv Client Settings
arxiv:
max_results_per_query: 10
cache_dir: cache/papers
retry_attempts: 3
retry_delay: 2 # seconds
# PDF Processing
pdf:
chunk_size: 1000
chunk_overlap: 100
fallback_enabled: true
supported_formats:
- pdf
- PDF
# Analysis Settings
analysis:
summary_chain_type: map_reduce
min_concepts: 3
max_summary_length: 2000
# Storage Configuration
storage:
# PostgreSQL settings
postgres:
host: localhost
port: 5432
database: paper_review
user: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
# Vector store settings
chroma:
persist_directory: data/chroma
collection_name: papers
# Redis cache settings
redis:
host: localhost
port: 6379
db: 0
ttl: 86400 # 24 hours
# Security
security:
encrypt_storage: true
api_rate_limit: 100 # requests per minute
max_file_size: 50000000 # 50MB
# Logging
logging:
level: INFO
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
file: logs/agent.log

12
requirements.txt Normal file
View File

@ -0,0 +1,12 @@
langchain==0.2.1
arxiv==2.1.0
unstructured==0.12.2
deepseek-ai==1.0.0
faiss-cpu==1.8.0
sqlalchemy==2.0.30
python-dotenv==1.0.0
pyyaml==6.0.1
pypdf==4.2.0
redis==5.0.3
chromadb==0.5.0
requests>=2.31.0

View File

@ -0,0 +1,53 @@
from langchain.schema import Document
from langchain.chains.summarize import load_summarize_chain
from langchain_community.llms import OpenAI
from typing import List, Dict
import yaml
import os
class TechnicalConceptExtractor:
def __init__(self, llm):
self.llm = llm
def extract_concepts(self, text: str) -> List[str]:
prompt = f"""Identify key technical concepts from this text:
{text[:5000]}
List them as comma-separated values:"""
result = self.llm(prompt)
return [c.strip() for c in result.split(",")]
class InsightAggregator:
def __init__(self, llm):
self.llm = llm
def find_connections(self, papers: List[Dict]) -> str:
summaries = "\n\n".join([p["summary"] for p in papers])
prompt = f"""Analyze these paper summaries and identify cross-cutting themes:
{summaries[:10000]}
Provide a structured analysis:"""
return self.llm(prompt)
class PaperAnalyzer:
def __init__(self, config_path: str = "config/settings.yaml"):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.llm = OpenAI(
temperature=0.5, # More conservative temperature for academic analysis
model_name="deepseek-r1",
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com/v1",
max_tokens=4096 # Increased token limit for complex papers
)
self.summary_chain = load_summarize_chain(self.llm, chain_type="map_reduce")
self.concept_extractor = TechnicalConceptExtractor(self.llm)
self.insight_aggregator = InsightAggregator(self.llm)
def analyze_document(self, document: Document) -> Dict:
summary = self.summary_chain.run([document])
concepts = self.concept_extractor.extract_concepts(document.page_content)
return {
"summary": summary,
"concepts": concepts,
"metadata": document.metadata
}

View File

@ -0,0 +1,61 @@
import arxiv
import os
from typing import List, Dict
import requests
from pathlib import Path
class ArxivClient:
def __init__(self, cache_dir: str = "cache/papers"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def search_papers(self, query: str, max_results: int = 10) -> List[Dict]:
"""Search arXiv for papers matching the query"""
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.SubmittedDate
)
results = []
for paper in search.results():
results.append({
"id": paper.entry_id.split("/")[-1],
"title": paper.title,
"authors": [author.name for author in paper.authors],
"summary": paper.summary,
"pdf_url": paper.pdf_url,
"published": paper.published.isoformat(),
"categories": paper.categories
})
return results
def download_paper(self, paper_id: str, pdf_url: str) -> str:
"""Download paper PDF and return local path"""
cache_path = self.cache_dir / f"{paper_id}.pdf"
if cache_path.exists():
return str(cache_path)
response = requests.get(pdf_url, stream=True)
response.raise_for_status()
with open(cache_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(cache_path)
def extract_metadata(self, paper_id: str) -> Dict:
"""Extract additional metadata from arXiv API"""
url = f"http://export.arxiv.org/api/query?id_list={paper_id}"
response = requests.get(url)
response.raise_for_status()
# Basic metadata extraction from API response
# Could be enhanced with XML parsing for more detailed info
return {
"id": paper_id,
"retrieved_at": response.headers.get("Last-Modified"),
"size": len(response.content)
}

View File

@ -0,0 +1,107 @@
import time
from typing import Dict, List, Optional
import logging
from pathlib import Path
import redis
from langchain.schema import Document
from ..data_acquisition.arxiv_client import ArxivClient
from ..processing.pdf_processor import PDFProcessor
from ..analysis.analysis_engine import PaperAnalyzer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgentController:
def __init__(self, config: Dict):
self.config = config
self.arxiv_client = ArxivClient()
self.pdf_processor = PDFProcessor()
self.paper_analyzer = PaperAnalyzer()
# Initialize Redis for caching
redis_config = config.get("redis", {})
self.cache = redis.Redis(
host=redis_config.get("host", "localhost"),
port=redis_config.get("port", 6379),
db=redis_config.get("db", 0)
)
def process_paper(self, paper_id: str, pdf_url: str, max_retries: int = 3) -> Optional[Dict]:
"""Process a single paper with retry mechanism"""
for attempt in range(max_retries):
try:
# Download PDF
pdf_path = self.arxiv_client.download_paper(paper_id, pdf_url)
# Extract text with fallback options
text = self.pdf_processor.extract_text(pdf_path)
if not text.strip():
raise ValueError("Extracted text is empty")
# Create document with metadata
metadata = {
**self.arxiv_client.extract_metadata(paper_id),
**self.pdf_processor.extract_metadata(pdf_path)
}
document = Document(page_content=text, metadata=metadata)
# Analyze content
return self.paper_analyzer.analyze_document(document)
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
time.sleep(wait_time)
else:
logger.error(f"All attempts failed for paper {paper_id}")
return None
def process_query(self, query: str, max_papers: int = 5) -> List[Dict]:
"""Process multiple papers for a given query"""
# Search for papers
papers = self.arxiv_client.search_papers(query, max_results=max_papers)
results = []
for paper in papers:
# Check cache first
cache_key = f"paper:{paper['id']}"
cached_result = self.cache.get(cache_key)
if cached_result:
results.append(eval(cached_result))
continue
# Process paper if not in cache
result = self.process_paper(paper["id"], paper["pdf_url"])
if result:
# Cache successful results
self.cache.setex(
cache_key,
self.config.get("cache_ttl", 86400), # Default 24h TTL
str(result)
)
results.append(result)
return results
def run_quality_checks(self, results: List[Dict]) -> bool:
"""Verify quality of processed results"""
if not results:
return False
for result in results:
# Check for required fields
if not all(k in result for k in ["summary", "concepts", "metadata"]):
return False
# Validate summary length
if len(result["summary"]) < 100: # Arbitrary minimum length
return False
# Validate concepts extraction
if not result["concepts"] or len(result["concepts"]) < 3:
return False
return True

View File

@ -0,0 +1,83 @@
from typing import Dict, List
from pathlib import Path
import pypdf
from unstructured.partition.pdf import partition_pdf
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PDFProcessor:
def __init__(self, fallback_enabled: bool = True):
self.fallback_enabled = fallback_enabled
def extract_text(self, pdf_path: str) -> str:
"""Extract text from PDF with fallback options"""
try:
# First attempt: Use unstructured
elements = partition_pdf(pdf_path)
text = "\n".join([str(element) for element in elements])
if text.strip():
return text
if not self.fallback_enabled:
raise ValueError("Primary extraction failed and fallback disabled")
# Fallback: Use PyPDF
logger.info("Falling back to PyPDF for extraction")
return self._extract_with_pypdf(pdf_path)
except Exception as e:
logger.error(f"PDF extraction failed: {str(e)}")
if self.fallback_enabled:
logger.info("Attempting PyPDF fallback")
return self._extract_with_pypdf(pdf_path)
raise
def _extract_with_pypdf(self, pdf_path: str) -> str:
"""Fallback extraction using PyPDF"""
text = []
with open(pdf_path, "rb") as file:
reader = pypdf.PdfReader(file)
for page in reader.pages:
text.append(page.extract_text())
return "\n".join(text)
def chunk_text(self, text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
"""Split text into overlapping chunks for processing"""
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
chunk = text[start:end]
# Adjust chunk to end at sentence boundary if possible
if end < text_len:
last_period = chunk.rfind(".")
if last_period != -1:
end = start + last_period + 1
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
return chunks
def extract_metadata(self, pdf_path: str) -> Dict:
"""Extract PDF metadata"""
with open(pdf_path, "rb") as file:
reader = pypdf.PdfReader(file)
info = reader.metadata
if info:
return {
"title": info.get("/Title", ""),
"author": info.get("/Author", ""),
"subject": info.get("/Subject", ""),
"keywords": info.get("/Keywords", ""),
"creator": info.get("/Creator", ""),
"producer": info.get("/Producer", ""),
"page_count": len(reader.pages)
}
return {"page_count": len(reader.pages)}