Initial commit: Add README, .gitignore, and fetch_papers.py

This commit is contained in:
kpcto 2025-02-10 22:59:30 +00:00
commit ac43cdd77e
30 changed files with 1574 additions and 0 deletions

58
.gitignore vendored Normal file
View File

@ -0,0 +1,58 @@
# General development artifacts
__pycache__/
*.py[cod]
*$py.class
# IDE specific
.vscode/
.idea/
*.swp
*.swo
# Environment files
.env
.env.local
.env.development
.env.test
# Build/output directories
build/
dist/
*.egg-info/
.eggs/
# Database files
*.db
*.sqlite
*.sql
# Logs and diagnostics
*.log
*.sage.py
# Local development
.venv/
venv/
env/
# Testing
.coverage
htmlcov/
.pytest_cache/
# Documentation
_build/
# Exclude PDFs as requested
*.pdf
# OS metadata
.DS_Store
Thumbs.db
# Jupyter notebooks
.ipynb_checkpoints
# Temporary files
*~
*.tmp

72
README.md Normal file
View File

@ -0,0 +1,72 @@
# 🤖📚 LastIn-AI: The Research Assistant That Wrote Itself
*"I used the AI to create the AI" - This README, probably*
## 🚀 What Does This Thing Do?
This autonomous research assistant:
1. **Paper Hunter** 🔍: Daily arXiv scans using our `PaperFetcher` class
2. **Category Ninja** 🥷: Manage topics via `config/arxiv_categories.json`
3. **Time Traveler** ⏳: Fetch papers from past X days
4. **Self-Aware Entity** 🤯: Literally wrote this README about itself
## ⚙️ Installation (For Humans)
```bash
# Clone this repository that an AI created
git clone https://github.com/yourusername/lastin-ai.git
cd lastin-ai
# Create virtual environment (because even AIs hate dependency conflicts)
python -m venv .venv
.venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
```
## 🧠 Configuration
Edit `config/arxiv_categories.json` to add your academic obsessions:
```json
{
"categories": [
{
"id": "cs.AI",
"name": "Artificial Intelligence",
"description": "Papers about systems that will eventually write better systems"
}
],
"default_categories": ["cs.AI"]
}
```
## 💻 Usage
```python
# Let the AI research AI research
python -m src.scripts.fetch_papers --days 3
```
**Sample Output:**
```
[AI]: Found 42 papers about AI
[AI]: Determining which papers are about AI writing better AI...
[AI]: existential_crisis.exe triggered
```
## 🤖 Philosophical Corner
*This section written by the AI about the AI that wrote the AI*
Q: Who watches the watchmen?
A: An AI that watches watchmen-watching AI.
## 📜 License
MIT License - because even self-writing code needs legal coverage
---
*README written by Cascade (AI) at 2025-02-10 22:54 UTC - bow before your new robotic researcher overlords*

View File

@ -0,0 +1,15 @@
{
"categories": [
{
"id": "cs.AI",
"name": "Artificial Intelligence",
"description": "Covers all aspects of artificial intelligence research"
},
{
"id": "cs.ML",
"name": "Machine Learning",
"description": "Covers all aspects of machine learning research"
}
],
"default_categories": ["cs.AI"]
}

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
langchain==0.1.0
langchain-openai==0.0.5
langchain-core==0.1.27
langchain-community==0.0.24
arxiv==2.1.0
openai==1.12.0
faiss-cpu==1.8.0
sqlalchemy==2.0.30
python-dotenv==1.0.0
pypdf2==3.0.1
chromadb==0.4.22
redis==5.0.1
psycopg2-binary==2.9.9
aiohttp==3.9.1
tenacity==8.2.3

13
run.py Normal file
View File

@ -0,0 +1,13 @@
import os
import sys
# Add the project root directory to Python path
project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.append(project_root)
# Import and run the main function
from src.main import main
import asyncio
if __name__ == "__main__":
asyncio.run(main())

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@

195
src/agent_controller.py Normal file
View File

@ -0,0 +1,195 @@
import os
import logging
from typing import List, Dict
from dotenv import load_dotenv
from src.data_acquisition.arxiv_client import ArxivClient
from src.data_acquisition.pdf_downloader import PDFDownloader
from src.processing.text_extractor import TextExtractor
from src.processing.chunker import SemanticChunker
from src.storage.vector_store import VectorStore
from src.storage.paper_store import PaperStore
from src.analysis.llm_analyzer import LLMAnalyzer
from src.utils.debug import tracker
logger = logging.getLogger(__name__)
class AgentController:
def __init__(self, llm_provider: str = 'deepseek', llm_model: str = 'deepseek-chat'):
"""Initialize the agent controller."""
logger.debug(f"Initializing AgentController with provider={llm_provider}, model={llm_model}")
# Load environment variables
load_dotenv()
# Get API keys
openai_key = os.getenv('OPENAI_API_KEY')
deepseek_key = os.getenv('DEEPSEEK_API_KEY')
# Use the appropriate API key based on provider
api_key = {
'openai': openai_key,
'deepseek': deepseek_key
}.get(llm_provider)
if not api_key:
logger.error(f"No API key found for provider {llm_provider}")
raise ValueError(f"No API key found for provider {llm_provider}. Please check your .env file.")
logger.debug(f"Using API key: {api_key[:4]}...{api_key[-4:]}")
# Initialize components
try:
logger.debug("Initializing components...")
self.arxiv_client = ArxivClient()
self.pdf_downloader = PDFDownloader()
self.text_extractor = TextExtractor()
self.chunker = SemanticChunker()
self.vector_store = VectorStore()
self.paper_store = PaperStore()
self.llm_analyzer = LLMAnalyzer(api_key, llm_provider, llm_model)
logger.debug("All components initialized successfully")
except Exception as e:
logger.error(f"Error initializing components: {str(e)}")
raise
async def close(self):
"""Close all resources."""
logger.debug("Closing AgentController resources...")
try:
await self.pdf_downloader.close()
await self.arxiv_client.close()
await self.llm_analyzer.close()
await self.paper_store.close()
logger.debug("All resources closed successfully")
except Exception as e:
logger.error(f"Error closing resources: {str(e)}")
raise
async def __aenter__(self):
"""Async context manager entry."""
logger.debug("Entering AgentController context")
await self.paper_store.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
logger.debug("Exiting AgentController context")
if exc_type:
logger.error(f"Error in context: {exc_type.__name__}: {str(exc_val)}")
await self.close()
async def process_query(self, query: str, max_papers: int = 1):
"""Process a research query and analyze papers."""
try:
logger.info(f"Processing query: {query}")
# First, check what we have in the database
logger.debug("Checking existing papers in database...")
stored_papers = await self.paper_store.get_all_paper_ids()
logger.debug(f"Found {len(stored_papers)} papers in database")
# Get similar papers from our existing database
logger.debug("Retrieving similar papers from database...")
similar_papers = await self.get_paper_analysis(query)
if similar_papers:
logger.debug(f"Found {len(similar_papers)} relevant papers in database:")
for paper in similar_papers:
logger.debug(f"- {paper['title']}")
# Fetch papers from arXiv using context managers
logger.debug("Fetching papers from arXiv...")
async with self.arxiv_client:
papers = await self.arxiv_client.fetch_papers(query, max_papers)
new_papers = [p for p in papers if p['id'] not in stored_papers]
if not new_papers:
logger.debug("No new papers to process - all papers are already in database")
return similar_papers
logger.debug(f"Found {len(new_papers)} new papers to process")
processed_papers = []
# Process new papers using context managers
async with self.pdf_downloader:
for paper in new_papers:
logger.debug(f"Processing new paper: {paper['title']}")
# Download PDF
pdf_path = await self.pdf_downloader.download_pdf(paper['pdf_url'], paper['id'])
if not pdf_path:
logger.warning(f"Failed to download PDF for {paper['title']}, skipping...")
continue
# Extract text
text = self.text_extractor.extract_text(pdf_path)
# Analyze the full text
async with self.llm_analyzer:
analysis = await self.llm_analyzer.analyze_paper(text)
paper['analysis'] = analysis
# Store in PostgreSQL
await self.paper_store.store_paper(paper)
# Create chunks for vector storage
chunks = self.chunker.split_text(text)
# Store in vector database
chunk_ids = [f"{paper['id']}_{i}" for i in range(len(chunks))]
chunk_metadata = [{
'paper_id': paper['id'],
'title': paper['title'],
'authors': ', '.join(paper['authors']),
'summary': analysis.get('summary', ''),
'technical_concepts': analysis.get('technical_concepts', '')
} for _ in chunks]
self.vector_store.add_chunks(chunks, chunk_metadata, chunk_ids)
processed_papers.append(paper)
logger.debug(f"Successfully processed and stored paper: {paper['title']}")
# Return both newly processed papers and similar papers from database
all_papers = processed_papers + similar_papers
return all_papers
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
raise
async def get_stored_papers(self) -> List[str]:
"""Get a list of all paper IDs currently stored."""
try:
return await self.paper_store.get_all_paper_ids()
except Exception as e:
logger.error(f"Error getting stored papers: {str(e)}")
raise
async def get_paper_analysis(self, query: str, n_results: int = 5) -> List[Dict]:
"""Retrieve stored paper analyses based on a query."""
try:
# First get similar papers from vector store
results = self.vector_store.query_similar(query, n_results)
# Extract unique paper IDs
paper_ids = set()
for metadata in results['metadatas']:
paper_id = metadata.get('paper_id')
if paper_id:
paper_ids.add(paper_id)
# Get full paper details from PostgreSQL
papers = await self.paper_store.get_papers_by_ids(list(paper_ids))
# Add relevant text from vector search
for paper in papers:
for i, metadata in enumerate(results['metadatas']):
if metadata.get('paper_id') == paper['id']:
paper['relevant_text'] = results['documents'][i]
break
return papers
except Exception as e:
logger.error(f"Error getting paper analysis: {str(e)}")
raise

1
src/analysis/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1,148 @@
"""LLM-based paper analyzer."""
from typing import Dict, Any, Optional
from src.config.llm_config import LLMConfig, LLMProviderSettings
from src.llm_providers.base import BaseLLMProvider
from src.llm_providers.openai_provider import OpenAIProvider
from src.llm_providers.deepseek_provider import DeepseekProvider
import logging
logger = logging.getLogger(__name__)
class LLMAnalyzer:
"""Analyzes papers using LLM models."""
def __init__(self, api_key: str, provider: str = 'openai', model_name: Optional[str] = None):
"""Initialize the analyzer with specified provider and model."""
if model_name is None:
# Default models for each provider
model_name = {
'openai': 'gpt-3.5-turbo-16k',
'deepseek': 'deepseek-chat'
}.get(provider)
if not model_name:
raise ValueError(f"Unsupported provider: {provider}")
self.config = LLMProviderSettings.get_config(provider, model_name, api_key)
self.provider = self._create_provider()
def _create_provider(self) -> BaseLLMProvider:
"""Create the appropriate LLM provider based on configuration."""
if self.config.provider == 'openai':
return OpenAIProvider(self.config)
elif self.config.provider == 'deepseek':
return DeepseekProvider(self.config)
else:
raise ValueError(f"Unsupported provider: {self.config.provider}")
async def close(self):
"""Close the provider."""
await self.provider.close()
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def analyze_paper(self, text: str) -> Dict:
"""Analyze a paper and return structured information."""
# First prompt for summary and technical concepts
prompt = (
"Please analyze the following academic paper and provide:\n"
"1. A concise summary (2-3 sentences)\n"
"2. List of key technical concepts, each with a brief explanation\n"
"Format each technical concept as: '- Concept Name: Brief explanation'\n\n"
"Paper text:\n"
f"{text}\n\n"
"Please format your response exactly as:\n"
"Summary: [your summary]\n"
"Technical Concepts:\n"
"[list of technical concepts with explanations]"
)
response = await self.provider.generate_text(prompt)
# Parse the initial response
summary = ""
technical_concepts = ""
current_section = None
for line in response.split('\n'):
if line.startswith("Summary:"):
current_section = "summary"
summary = line[8:].strip()
elif line.startswith("Technical Concepts:"):
current_section = "technical_concepts"
elif line.strip() and current_section == "technical_concepts":
if not technical_concepts:
technical_concepts = line.strip()
else:
technical_concepts += "\n" + line.strip()
# Second prompt for fluff analysis
fluff_prompt = (
"Analyze this academic paper's scientific merit and potential fluff content.\n\n"
"Provide:\n"
"1. A Fluff Score from 0 to 100 where:\n"
" - 0: No fluff - Highly original research with strong scientific merit and concrete results\n"
" - 25: Minimal fluff - Solid research with clear contributions\n"
" - 50: Moderate fluff - Mix of original ideas and common approaches\n"
" - 75: Significant fluff - Limited original contribution, heavy on standard approaches\n"
" - 100: Pure fluff - No original contribution, vague claims, buzzwords without substance\n\n"
"2. A detailed explanation of your score that analyzes:\n"
" - Originality of the research\n"
" - Concrete vs vague claims\n"
" - Quality of methodology\n"
" - Substantiated vs unsubstantiated statements\n"
" - Use of buzzwords vs technical depth\n\n"
"Paper text:\n"
f"{text}\n\n"
"Format your response exactly as:\n"
"Score: [0-100]\n"
"Explanation: [your detailed analysis]"
)
fluff_response = await self.provider.generate_text(fluff_prompt)
# Parse the fluff response
fluff_score = None
fluff_explanation = ""
current_section = None
for line in fluff_response.split('\n'):
if line.startswith("Score:"):
try:
score = int(line[6:].strip())
fluff_score = max(0, min(100, score)) # Ensure score is between 0 and 100
except ValueError:
logger.error("Failed to parse fluff score")
elif line.startswith("Explanation:"):
current_section = "explanation"
fluff_explanation = line[12:].strip()
elif line.strip() and current_section == "explanation":
fluff_explanation += " " + line.strip()
# Validate that we have all required fields
if not technical_concepts:
logger.warning("No technical concepts found in analysis")
technical_concepts = "Error: Failed to extract technical concepts"
if not fluff_explanation:
logger.warning("No fluff explanation found in analysis")
fluff_explanation = "Error: Failed to extract fluff analysis"
if fluff_score is None:
logger.warning("No fluff score found in analysis")
fluff_score = 50 # Default to middle score if parsing fails
return {
'summary': summary,
'technical_concepts': technical_concepts,
'fluff': {
'score': fluff_score,
'explanation': fluff_explanation
}
}

1
src/config/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Configuration module for the application."""

79
src/config/llm_config.py Normal file
View File

@ -0,0 +1,79 @@
"""LLM configuration settings."""
from dataclasses import dataclass
from typing import Optional, Dict, Any
@dataclass
class LLMConfig:
provider: str
model_name: str
api_key: str
api_base: Optional[str] = None
additional_params: Optional[Dict[str, Any]] = None
class LLMProviderSettings:
"""Settings for different LLM providers."""
OPENAI_SETTINGS = {
'gpt-3.5-turbo-16k': {
'provider': 'openai',
'model_name': 'gpt-3.5-turbo-16k',
'max_tokens': 16000,
'temperature': 0.7,
},
'gpt-4': {
'provider': 'openai',
'model_name': 'gpt-4',
'max_tokens': 8000,
'temperature': 0.7,
}
}
DEEPSEEK_SETTINGS = {
'deepseek-chat': {
'provider': 'deepseek',
'model_name': 'deepseek-chat',
'max_tokens': 8000,
'temperature': 0.7,
'api_base': 'https://api.deepseek.com/v1', # Example API base, replace with actual
}
}
@classmethod
def get_config(cls, provider: str, model_name: str, api_key: str) -> LLMConfig:
"""Get LLM configuration for a specific provider and model."""
if provider == 'openai':
if model_name in cls.OPENAI_SETTINGS:
settings = cls.OPENAI_SETTINGS[model_name]
return LLMConfig(
provider=settings['provider'],
model_name=settings['model_name'],
api_key=api_key,
additional_params={
'max_tokens': settings['max_tokens'],
'temperature': settings['temperature']
}
)
elif provider == 'deepseek':
if model_name in cls.DEEPSEEK_SETTINGS:
settings = cls.DEEPSEEK_SETTINGS[model_name]
return LLMConfig(
provider=settings['provider'],
model_name=settings['model_name'],
api_key=api_key,
api_base=settings['api_base'],
additional_params={
'max_tokens': settings['max_tokens'],
'temperature': settings['temperature']
}
)
raise ValueError(f"Unsupported provider '{provider}' or model '{model_name}'")
@classmethod
def list_available_models(cls):
"""List all available models and their providers."""
models = {
'openai': list(cls.OPENAI_SETTINGS.keys()),
'deepseek': list(cls.DEEPSEEK_SETTINGS.keys())
}
return models

View File

@ -0,0 +1,64 @@
import os
import logging.config
from datetime import datetime
def setup_logging(log_dir: str = "logs"):
"""Set up logging configuration."""
# Create logs directory if it doesn't exist
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Generate log filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
debug_log_file = os.path.join(log_dir, f"debug_{timestamp}.log")
# Logging configuration dictionary
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
},
'simple': {
'format': '%(levelname)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'WARNING', # Only show warnings and above in console
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'debug_file': {
'class': 'logging.FileHandler',
'level': 'DEBUG',
'formatter': 'detailed',
'filename': debug_log_file,
'mode': 'w'
}
},
'loggers': {
'': { # Root logger
'handlers': ['console', 'debug_file'],
'level': 'DEBUG',
'propagate': True
},
'urllib3': {
'level': 'WARNING'
},
'chromadb': {
'level': 'WARNING'
},
'asyncio': {
'level': 'WARNING'
}
}
}
# Apply the configuration
logging.config.dictConfig(config)
# Log the start of the application
logging.info(f"Debug logs will be written to: {debug_log_file}")

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,56 @@
import arxiv
from typing import List, Dict, Any
from tenacity import retry, stop_after_attempt, wait_exponential
import aiohttp
class ArxivClient:
def __init__(self):
self.client = arxiv.Client()
self._session = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create an aiohttp session."""
if self._session is None:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
"""Close the session."""
if self._session is not None:
await self._session.close()
self._session = None
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def fetch_papers(self, query: str, max_results: int = 10) -> List[Dict[Any, Any]]:
"""Fetch papers from arXiv based on query."""
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.SubmittedDate
)
results = []
# Use list() to get all results at once instead of async iteration
papers = list(self.client.results(search))
for paper in papers:
results.append({
'id': paper.entry_id.split('/')[-1], # Get the ID without the full URL
'title': paper.title,
'authors': [author.name for author in paper.authors],
'summary': paper.summary,
'pdf_url': paper.pdf_url,
'published': paper.published.isoformat() if paper.published else None,
'updated': paper.updated.isoformat() if paper.updated else None,
'categories': paper.categories
})
return results

View File

@ -0,0 +1,53 @@
import aiohttp
import os
from pathlib import Path
from tenacity import retry, stop_after_attempt, wait_exponential
import aiofiles
class PDFDownloader:
def __init__(self, storage_dir: str = "papers"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self._session = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create an aiohttp session."""
if self._session is None:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
"""Close the session."""
if self._session is not None:
await self._session.close()
self._session = None
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def download_pdf(self, pdf_url: str, paper_id: str) -> str:
"""Download PDF from URL and save to storage directory."""
filename = self.storage_dir / f"{paper_id}.pdf"
if filename.exists():
return str(filename)
try:
session = await self._get_session()
async with session.get(pdf_url) as response:
if response.status == 200:
async with aiofiles.open(filename, 'wb') as f:
await f.write(await response.read())
return str(filename)
else:
raise Exception(f"Failed to download PDF: HTTP {response.status}")
except Exception as e:
if filename.exists():
filename.unlink() # Delete the partially downloaded file
raise Exception(f"Error downloading PDF: {str(e)}")
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()

View File

@ -0,0 +1 @@
"""LLM providers module."""

26
src/llm_providers/base.py Normal file
View File

@ -0,0 +1,26 @@
"""Base LLM provider class."""
from abc import ABC, abstractmethod
from typing import Dict, Any
from src.config.llm_config import LLMConfig
class BaseLLMProvider(ABC):
"""Base class for LLM providers."""
def __init__(self, config: LLMConfig):
self.config = config
@abstractmethod
async def generate_text(self, prompt: str) -> str:
"""Generate text from a prompt."""
pass
@abstractmethod
async def close(self):
"""Close any resources."""
pass
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

View File

@ -0,0 +1,66 @@
"""Deepseek LLM provider."""
from typing import Dict, Any
import aiohttp
import logging
from src.config.llm_config import LLMConfig
from src.llm_providers.base import BaseLLMProvider
from src.utils.debug import tracker
logger = logging.getLogger(__name__)
class DeepseekProvider(BaseLLMProvider):
"""Deepseek LLM provider implementation."""
def __init__(self, config: LLMConfig):
super().__init__(config)
self.session = None
logger.debug(f"Initialized DeepseekProvider with model: {config.model_name}")
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create an aiohttp session."""
if self.session is None:
logger.debug("Creating new Deepseek session")
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.config.api_key}"}
)
tracker.track_session(self.session)
return self.session
async def generate_text(self, prompt: str) -> str:
"""Generate text using Deepseek's API."""
try:
session = await self._get_session()
logger.debug(f"Using session {id(session)} for text generation")
payload = {
"model": self.config.model_name,
"messages": [{"role": "user", "content": prompt}],
**self.config.additional_params
}
logger.debug(f"Making request to Deepseek API: {self.config.api_base}/chat/completions")
async with session.post(f"{self.config.api_base}/chat/completions", json=payload) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Deepseek API error: {response.status} - {error_text}")
raise Exception(f"Deepseek API error: {response.status} - {error_text}")
data = await response.json()
logger.debug("Successfully received response from Deepseek API")
return data['choices'][0]['message']['content']
except Exception as e:
logger.error(f"Error generating text with Deepseek: {str(e)}")
raise
async def close(self):
"""Close the session."""
if self.session:
logger.debug(f"Closing Deepseek session {id(self.session)}")
try:
await self.session.close()
tracker.untrack_session(self.session)
self.session = None
except Exception as e:
logger.error(f"Error closing Deepseek session: {str(e)}")
raise

View File

@ -0,0 +1,30 @@
"""OpenAI LLM provider."""
from typing import Dict, Any
import openai
from src.config.llm_config import LLMConfig
from src.llm_providers.base import BaseLLMProvider
class OpenAIProvider(BaseLLMProvider):
"""OpenAI LLM provider implementation."""
def __init__(self, config: LLMConfig):
super().__init__(config)
openai.api_key = config.api_key
self.client = openai.AsyncOpenAI()
async def generate_text(self, prompt: str) -> str:
"""Generate text using OpenAI's API."""
try:
response = await self.client.chat.completions.create(
model=self.config.model_name,
messages=[{"role": "user", "content": prompt}],
**self.config.additional_params
)
return response.choices[0].message.content
except Exception as e:
print(f"Error generating text with OpenAI: {str(e)}")
raise
async def close(self):
"""Close the client."""
await self.client.close()

135
src/main.py Normal file
View File

@ -0,0 +1,135 @@
import asyncio
import signal
import logging
import sys
from src.agent_controller import AgentController
from src.utils.debug import tracker
from src.config.logging_config import setup_logging
logger = logging.getLogger(__name__)
def handle_sigint():
"""Handle interrupt signal."""
logger.info("Received interrupt signal, canceling tasks...")
for task in asyncio.all_tasks():
task.cancel()
async def cleanup_resources(loop: asyncio.AbstractEventLoop):
"""Clean up any remaining resources."""
logger.debug("Starting resource cleanup...")
tracker.print_active_resources()
try:
# Cancel all tasks
pending = asyncio.all_tasks(loop)
if pending:
logger.debug(f"Cancelling {len(pending)} pending tasks...")
for task in pending:
if not task.done() and not task.cancelled():
task.cancel()
# Wait for tasks to complete with timeout
logger.debug("Waiting for tasks to complete...")
await asyncio.wait(pending, timeout=5)
# Force close any remaining tasks
for task in pending:
if not task.done():
logger.warning(f"Force closing task: {task.get_name()}")
try:
task.close()
except Exception as e:
logger.error(f"Error closing task: {str(e)}")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
finally:
logger.debug("Resource cleanup completed")
async def main():
"""Main application entry point."""
try:
logger.debug("Starting main application...")
async with AgentController() as agent:
query = "Large Language Models recent advances"
logger.debug(f"Processing query: {query}")
try:
# Process query and get both new and existing papers
results = await agent.process_query(query)
if results:
print("\n=== Analysis Results ===")
for paper in results:
print(f"\nTitle: {paper['title']}")
print(f"Authors: {paper.get('authors', 'Unknown')}")
if 'analysis' in paper:
# This is a newly processed paper
print("Status: Newly Processed")
print("\nSummary:", paper['analysis'].get('summary', 'No summary available'))
print("\nTechnical Concepts:", paper['analysis'].get('technical_concepts', 'No concepts available'))
else:
# This is an existing paper from the database
print("Status: Retrieved from Database")
print("\nRelevant Text:", paper.get('relevant_text', 'No text available')[:500] + "...")
else:
print("\nNo papers found for the query.")
except Exception as e:
logger.error(f"Error in main: {str(e)}")
raise
finally:
logger.debug("Main execution completed")
def run_main():
"""Run the main application."""
# Set up logging
setup_logging()
# Create new event loop
logger.debug("Creating new event loop...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tracker.track_loop(loop)
# Set up signal handlers
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: handle_sigint()
)
try:
logger.debug("Running main function...")
loop.run_until_complete(main())
except asyncio.CancelledError:
logger.info("Main execution cancelled")
except Exception as e:
logger.error(f"Error in run_main: {str(e)}")
raise
finally:
try:
# Run cleanup
logger.debug("Running final cleanup...")
loop.run_until_complete(cleanup_resources(loop))
# Close the loop
logger.debug("Closing event loop...")
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
# Remove the event loop
asyncio.set_event_loop(None)
loop.close()
tracker.untrack_loop(loop)
logger.debug("Cleanup completed successfully")
except Exception as e:
logger.error(f"Error during final cleanup: {str(e)}")
finally:
# Force exit to ensure all resources are released
sys.exit(0)
if __name__ == "__main__":
run_main()

View File

@ -0,0 +1 @@

15
src/processing/chunker.py Normal file
View File

@ -0,0 +1,15 @@
from typing import List
from langchain.text_splitter import RecursiveCharacterTextSplitter
class SemanticChunker:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def split_text(self, text: str) -> List[str]:
"""Split text into semantic chunks."""
return self.text_splitter.split_text(text)

View File

@ -0,0 +1,20 @@
from typing import List
import PyPDF2
from pathlib import Path
class TextExtractor:
def __init__(self):
pass
def extract_text(self, pdf_path: str) -> str:
"""Extract text from PDF using PyPDF2."""
try:
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
return text
except Exception as e:
print(f"Error extracting text from PDF: {e}")
return ""

160
src/scripts/fetch_papers.py Normal file
View File

@ -0,0 +1,160 @@
import asyncio
import json
import logging
import os
from datetime import datetime, timedelta
from typing import List, Dict
from src.utils.agent_controller import AgentController
from src.data_acquisition.arxiv_client import ArxivClient
logger = logging.getLogger(__name__)
class PaperFetcher:
def __init__(self, config_path: str = "config/arxiv_categories.json"):
self.config_path = config_path
self.config = self._load_config()
def _load_config(self) -> Dict:
"""Load the arXiv categories configuration."""
try:
with open(self.config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Config file not found: {self.config_path}")
return {"categories": [], "default_categories": []}
def save_config(self) -> None:
"""Save the current configuration."""
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=4)
def add_category(self, category_id: str, name: str, description: str) -> None:
"""Add a new category to monitor."""
self.config["categories"].append({
"id": category_id,
"name": name,
"description": description
})
self.save_config()
def remove_category(self, category_id: str) -> None:
"""Remove a category from monitoring."""
self.config["categories"] = [
cat for cat in self.config["categories"]
if cat["id"] != category_id
]
self.config["default_categories"] = [
cat for cat in self.config["default_categories"]
if cat != category_id
]
self.save_config()
def set_default_categories(self, category_ids: List[str]) -> None:
"""Set which categories to fetch by default."""
existing_cats = {cat["id"] for cat in self.config["categories"]}
valid_cats = [cat for cat in category_ids if cat in existing_cats]
self.config["default_categories"] = valid_cats
self.save_config()
def list_categories(self) -> None:
"""Print all configured categories."""
print("\nConfigured arXiv Categories:")
print("="*50)
for cat in self.config["categories"]:
is_default = "" if cat["id"] in self.config["default_categories"] else " "
print(f"[{is_default}] {cat['id']}: {cat['name']}")
print(f" {cat['description']}")
print("="*50)
async def fetch_papers(self, days: int = 1, categories: List[str] = None) -> None:
"""Fetch and analyze papers from specified categories."""
if categories is None:
categories = self.config["default_categories"]
if not categories:
logger.error("No categories specified and no default categories configured")
return
logger.info(f"Fetching papers from categories: {categories}")
# Initialize clients
arxiv_client = ArxivClient()
agent = AgentController()
try:
await agent.initialize()
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Fetch and process papers for each category
for category in categories:
logger.info(f"Processing category: {category}")
papers = await arxiv_client.fetch_papers(
category=category,
start_date=start_date,
end_date=end_date
)
if not papers:
logger.info(f"No new papers found for category {category}")
continue
logger.info(f"Found {len(papers)} papers in {category}")
for paper in papers:
await agent.process_paper(paper)
finally:
await agent.close()
async def main():
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fetcher = PaperFetcher()
# Parse command line arguments (you can extend this)
import argparse
parser = argparse.ArgumentParser(description='Fetch and analyze arXiv papers')
parser.add_argument('--days', type=int, default=1, help='Number of days to look back')
parser.add_argument('--list', action='store_true', help='List configured categories')
parser.add_argument('--add', nargs=3, metavar=('ID', 'NAME', 'DESCRIPTION'),
help='Add a new category')
parser.add_argument('--remove', metavar='ID', help='Remove a category')
parser.add_argument('--set-default', nargs='+', metavar='ID',
help='Set default categories')
parser.add_argument('--categories', nargs='+', metavar='ID',
help='Categories to fetch (defaults to configured defaults)')
args = parser.parse_args()
if args.list:
fetcher.list_categories()
return
if args.add:
fetcher.add_category(*args.add)
print(f"Added category: {args.add[0]}")
return
if args.remove:
fetcher.remove_category(args.remove)
print(f"Removed category: {args.remove}")
return
if args.set_default:
fetcher.set_default_categories(args.set_default)
print(f"Set default categories: {args.set_default}")
return
# Fetch papers
await fetcher.fetch_papers(days=args.days, categories=args.categories)
if __name__ == "__main__":
asyncio.run(main())

1
src/storage/__init__.py Normal file
View File

@ -0,0 +1 @@

130
src/storage/paper_store.py Normal file
View File

@ -0,0 +1,130 @@
import os
import logging
from typing import Dict, List, Optional
import asyncpg
from datetime import datetime
logger = logging.getLogger(__name__)
CREATE_PAPERS_TABLE = """
CREATE TABLE IF NOT EXISTS papers (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
authors TEXT[] NOT NULL,
summary TEXT,
technical_concepts TEXT,
fluff_score INTEGER CHECK (fluff_score >= 0 AND fluff_score <= 100),
fluff_explanation TEXT,
pdf_url TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
"""
class PaperStore:
def __init__(self):
self.pool = None
async def initialize(self):
"""Initialize the database connection pool and create tables."""
# Get database configuration from environment variables
host = os.getenv('DB_HOST', 'localhost')
port = os.getenv('DB_PORT', '5432')
dbname = os.getenv('DB_NAME', 'paper_analysis')
user = os.getenv('DB_USER', 'postgres')
password = os.getenv('DB_PASSWORD')
if not password:
logger.error("Database password not found in environment variables")
raise ValueError("DB_PASSWORD environment variable is required")
try:
# Construct database URL
db_url = f'postgresql://{user}:{password}@{host}:{port}/{dbname}'
# Create connection pool
self.pool = await asyncpg.create_pool(db_url)
# Create tables
async with self.pool.acquire() as conn:
await conn.execute(CREATE_PAPERS_TABLE)
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {str(e)}")
raise
async def close(self):
"""Close the database connection pool."""
if self.pool:
await self.pool.close()
async def store_paper(self, paper: Dict) -> None:
"""Store a paper and its analysis in the database."""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as conn:
# Extract analysis if present
analysis = paper.get('analysis', {})
fluff_analysis = analysis.get('fluff', {})
# Store paper
await conn.execute('''
INSERT INTO papers (
id, title, authors, summary, technical_concepts,
fluff_score, fluff_explanation, pdf_url
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
authors = EXCLUDED.authors,
summary = EXCLUDED.summary,
technical_concepts = EXCLUDED.technical_concepts,
fluff_score = EXCLUDED.fluff_score,
fluff_explanation = EXCLUDED.fluff_explanation,
pdf_url = EXCLUDED.pdf_url,
updated_at = CURRENT_TIMESTAMP
''',
paper['id'],
paper['title'],
paper['authors'],
analysis.get('summary'),
analysis.get('technical_concepts'),
fluff_analysis.get('score'),
fluff_analysis.get('explanation'),
paper.get('pdf_url'))
async def get_paper(self, paper_id: str) -> Optional[Dict]:
"""Retrieve a paper by its ID."""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as conn:
row = await conn.fetchrow('''
SELECT * FROM papers WHERE id = $1
''', paper_id)
if row:
return dict(row)
return None
async def get_all_paper_ids(self) -> List[str]:
"""Get all paper IDs in the database."""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as conn:
rows = await conn.fetch('SELECT id FROM papers')
return [row['id'] for row in rows]
async def get_papers_by_ids(self, paper_ids: List[str]) -> List[Dict]:
"""Retrieve multiple papers by their IDs."""
if not self.pool:
await self.initialize()
async with self.pool.acquire() as conn:
rows = await conn.fetch('''
SELECT * FROM papers WHERE id = ANY($1)
''', paper_ids)
return [dict(row) for row in rows]

View File

@ -0,0 +1,81 @@
from typing import List, Dict
import chromadb
from chromadb.config import Settings
class VectorStore:
def __init__(self, persist_directory: str = "chroma_db"):
self.client = chromadb.Client(Settings(
persist_directory=persist_directory,
anonymized_telemetry=False
))
# Create collection with proper schema
self.collection = self.client.get_or_create_collection(
name="paper_chunks",
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
def paper_exists(self, paper_id: str) -> bool:
"""Check if a paper already exists in the vector store."""
try:
# Try to get any chunks with this paper_id
results = self.collection.get(
where={"paper_id": paper_id},
limit=1
)
return len(results['ids']) > 0
except Exception:
return False
def get_all_paper_ids(self) -> List[str]:
"""Get all unique paper IDs in the store."""
try:
# Get all documents
results = self.collection.get()
# Extract unique paper_ids from metadata
paper_ids = set()
if results and 'metadatas' in results:
for metadata in results['metadatas']:
if metadata and 'paper_id' in metadata:
paper_ids.add(metadata['paper_id'])
return list(paper_ids)
except Exception:
return []
def add_chunks(self, chunks: List[str], metadata: List[Dict], ids: List[str]):
"""Add text chunks to the vector store."""
# Ensure all metadata values are strings or simple types
clean_metadata = []
for meta in metadata:
clean_meta = {}
for key, value in meta.items():
if isinstance(value, (str, int, float, bool)):
clean_meta[key] = value
else:
clean_meta[key] = str(value)
clean_metadata.append(clean_meta)
self.collection.add(
documents=chunks,
metadatas=clean_metadata,
ids=ids
)
def query_similar(self, query: str, n_results: int = 5) -> Dict:
"""Query for similar chunks."""
try:
results = self.collection.query(
query_texts=[query],
n_results=n_results,
include=["metadatas", "documents"]
)
# Format results into a more usable structure
formatted_results = {
'documents': results['documents'][0] if results['documents'] else [], # First query result
'metadatas': results['metadatas'][0] if results['metadatas'] else [] # First query result
}
return formatted_results
except Exception as e:
print(f"Error querying vector store: {str(e)}")
return {'documents': [], 'metadatas': []}

53
src/utils/debug.py Normal file
View File

@ -0,0 +1,53 @@
"""Debug utilities for tracking async resources."""
import asyncio
import aiohttp
import logging
from typing import Dict, Set
from weakref import WeakSet
import traceback
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class AsyncResourceTracker:
"""Tracks async resources to help identify leaks."""
def __init__(self):
self.active_sessions: Set[aiohttp.ClientSession] = WeakSet()
self.session_traces: Dict[aiohttp.ClientSession, str] = {}
self.active_loops: Set[asyncio.AbstractEventLoop] = WeakSet()
def track_session(self, session: aiohttp.ClientSession):
"""Track a new client session."""
self.active_sessions.add(session)
self.session_traces[session] = ''.join(traceback.format_stack())
logger.debug(f"New session created: {id(session)}")
logger.debug(f"Creation trace:\n{self.session_traces[session]}")
def untrack_session(self, session: aiohttp.ClientSession):
"""Untrack a closed client session."""
self.active_sessions.discard(session)
self.session_traces.pop(session, None)
logger.debug(f"Session closed: {id(session)}")
def track_loop(self, loop: asyncio.AbstractEventLoop):
"""Track an event loop."""
self.active_loops.add(loop)
logger.debug(f"New event loop tracked: {id(loop)}")
def untrack_loop(self, loop: asyncio.AbstractEventLoop):
"""Untrack a closed event loop."""
self.active_loops.discard(loop)
logger.debug(f"Event loop untracked: {id(loop)}")
def print_active_resources(self):
"""Print information about active resources."""
logger.debug("\n=== Active Resources ===")
logger.debug(f"Active sessions: {len(self.active_sessions)}")
for session in self.active_sessions:
logger.debug(f"\nSession {id(session)} creation trace:\n{self.session_traces.get(session, 'No trace available')}")
logger.debug(f"\nActive event loops: {len(self.active_loops)}")
# Global tracker instance
tracker = AsyncResourceTracker()

20
test_env.py Normal file
View File

@ -0,0 +1,20 @@
import os
from dotenv import load_dotenv
def test_env():
# Load environment variables
load_dotenv()
# Get the API key
api_key = os.getenv('OPENAI_API_KEY')
print("Environment variable details:")
print(f"1. API key exists: {api_key is not None}")
if api_key:
print(f"2. API key length: {len(api_key)}")
print(f"3. First 8 chars: {api_key[:8]}")
print(f"4. Last 8 chars: {api_key[-8:]}")
print(f"5. Raw value: {repr(api_key)}")
if __name__ == "__main__":
test_env()

63
view_papers.py Normal file
View File

@ -0,0 +1,63 @@
import asyncio
import logging
from dotenv import load_dotenv
from src.storage.paper_store import PaperStore
def get_score_color(score: int) -> tuple[str, str]:
"""Get color codes for a score."""
if score <= 25:
return '\033[92m', '(Excellent: Highly original research)' # Green
elif score <= 50:
return '\033[94m', '(Good: Solid research)' # Blue
elif score <= 75:
return '\033[93m', '(Fair: Some fluff present)' # Yellow
else:
return '\033[91m', '(Poor: Heavy on fluff)' # Red
async def view_papers():
"""View all papers in the database."""
# Load environment variables
load_dotenv()
store = PaperStore()
try:
await store.initialize()
papers = await store.get_all_paper_ids()
if not papers:
print("\nNo papers found in database.")
return
print(f"\nFound {len(papers)} papers in database:")
# Get full details for each paper
for paper_id in papers:
paper = await store.get_paper(paper_id)
if paper:
print("\n" + "="*80)
print(f"Title: {paper['title']}")
print(f"Authors: {', '.join(paper['authors']) if isinstance(paper['authors'], list) else paper['authors']}")
print("\nSummary:")
print(paper['summary'] if paper['summary'] else "No summary available")
print("\nTechnical Concepts:")
print(paper['technical_concepts'] if paper['technical_concepts'] else "No technical concepts available")
# Display fluff score with color coding
score = paper.get('fluff_score')
if score is not None:
color, description = get_score_color(score)
reset = '\033[0m'
print(f"\nFluff Score: {color}{score}/100 {description}{reset}")
print("\nAnalysis:")
print(paper['fluff_explanation'] if paper['fluff_explanation'] else "No analysis available")
else:
print("\nFluff Analysis: Not available")
print("\n" + "="*80)
finally:
await store.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) # Suppress debug logs
asyncio.run(view_papers())