Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
"""

import json
import numpy as np
try:
import numpy as np
_NP_AVAILABLE = True
except ImportError:
np = None # type: ignore[assignment]
_NP_AVAILABLE = False
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import logging
Expand Down Expand Up @@ -57,10 +62,12 @@ def __init__(self, agent_id: str):

def get_action(self, state: str, available_actions: List[str]) -> str:
"""Get action using epsilon-greedy policy"""
import random
with self.lock:
if np.random.random() < self.epsilon:
if (_NP_AVAILABLE and np.random.random() < self.epsilon) or \
(not _NP_AVAILABLE and random.random() < self.epsilon):
# Exploration: random action
return np.random.choice(available_actions)
return np.random.choice(available_actions) if _NP_AVAILABLE else random.choice(available_actions)
else:
# Exploitation: best action
return max(available_actions, key=lambda a: self.q_table[state][a])
Expand Down Expand Up @@ -313,8 +320,11 @@ def optimize_learning_strategy(self, current_performance: Dict[str, Any]) -> Dic
# Analyze performance trends
if len(self.performance_history) >= 5:
recent_performance = self.performance_history[-5:]
avg_performance = np.mean([p['performance'].get('success_rate', 0.5)
for p in recent_performance])
avg_performance = (np.mean([p['performance'].get('success_rate', 0.5)
for p in recent_performance])
if _NP_AVAILABLE else
sum(p['performance'].get('success_rate', 0.5)
for p in recent_performance) / len(recent_performance))

# Adjust learning parameters based on performance
strategy_adjustments = {}
Expand Down Expand Up @@ -357,7 +367,10 @@ def get_learning_insights(self) -> Dict[str, Any]:

insights = {
'current_performance': recent_performance[-1] if recent_performance else 0.5,
'average_performance': np.mean(recent_performance),
'average_performance': (np.mean(recent_performance)
if _NP_AVAILABLE and recent_performance
else (sum(recent_performance) / len(recent_performance)
if recent_performance else 0.0)),
'performance_trend': 'improving' if len(recent_performance) >= 2 and
recent_performance[-1] > recent_performance[0] else 'stable',
'learning_efficiency': len(self.performance_history),
Expand All @@ -374,7 +387,8 @@ def _generate_recommendations(self, performance_history: List[float]) -> List[st
recommendations.append("Need more data for meaningful analysis")
return recommendations

avg_performance = np.mean(performance_history)
avg_performance = (np.mean(performance_history) if _NP_AVAILABLE
else sum(performance_history) / len(performance_history))

if avg_performance < 0.6:
recommendations.extend([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import asyncio
import logging
import json
import numpy as np
try:
import numpy as np
except ImportError:
np = None # type: ignore[assignment]
from typing import Dict, List, Optional, Any, Tuple, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
Expand Down Expand Up @@ -77,7 +80,30 @@ async def record_learning_event(self, event: LearningEvent):
logger.info(f"Recorded learning event {event.event_id}")
except Exception as e:
logger.error(f"Error recording learning event: {e}")


async def record_event(self, event_data: Union[LearningEvent, Dict[str, Any]]) -> Union[LearningEvent, str]:
"""Record an event; accepts a LearningEvent or a plain dict for convenience.

Returns the LearningEvent that was recorded.
"""
if isinstance(event_data, LearningEvent):
learning_event = event_data
else:
learning_event = LearningEvent(
event_id=event_data.get('event_id', f"evt_{datetime.now().timestamp()}"),
agent_id=event_data.get('agent_id', 'unknown'),
action_type=event_data.get('action_type', 'generic'),
input_features=event_data.get('input_features', {}),
output_result=event_data.get('output_result'),
feedback_score=float(event_data.get('feedback_score', 0.0)),
context=event_data.get('context', {}),
timestamp=event_data.get('timestamp', datetime.now().isoformat()),
success=bool(event_data.get('success', True)),
execution_time=float(event_data.get('execution_time', 0.0)),
)
await self.record_learning_event(learning_event)
return learning_event

async def get_action_recommendation(self, agent_id: str, context: Dict[str, Any], action_type: str) -> Dict[str, Any]:
"""Get ML-based action recommendations"""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, TYPE_CHECKING
import numpy as np
try:
import numpy as np
_NP_AVAILABLE = True
except ImportError:
np = None # type: ignore[assignment]
_NP_AVAILABLE = False
from dataclasses import dataclass, asdict
import pickle
import threading
Expand Down Expand Up @@ -40,7 +45,7 @@ class VectorEmbedding:
"""Represents a vector embedding for semantic search"""
id: str
content_hash: str
embedding: np.ndarray
embedding: Any
metadata: Dict[str, Any]
created_at: datetime

Expand Down Expand Up @@ -246,7 +251,7 @@ def retrieve_memory(self, agent_id: str, memory_type: str = None,
conn.commit()
return entries

def store_vector_embedding(self, content_hash: str, embedding: np.ndarray,
def store_vector_embedding(self, content_hash: str, embedding: Any,
metadata: Dict[str, Any] = None) -> str:
"""
Store a vector embedding for semantic search
Expand Down Expand Up @@ -287,7 +292,7 @@ def store_vector_embedding(self, content_hash: str, embedding: np.ndarray,
logger.info(f"Stored vector embedding {embedding_id}")
return embedding_id

def find_similar_vectors(self, query_embedding: np.ndarray, limit: int = 5) -> List[Tuple[str, float]]:
def find_similar_vectors(self, query_embedding: Any, limit: int = 5) -> List[Tuple[str, float]]:
"""
Find similar vectors using cosine similarity

Expand All @@ -306,9 +311,16 @@ def find_similar_vectors(self, query_embedding: np.ndarray, limit: int = 5) -> L
similarities = []
for row in rows:
stored_embedding = pickle.loads(row['embedding'])
similarity = np.dot(query_embedding, stored_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(stored_embedding)
)
if _NP_AVAILABLE:
similarity = np.dot(query_embedding, stored_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(stored_embedding)
)
else:
import math
dot = sum(a * b for a, b in zip(query_embedding, stored_embedding))
norm_q = math.sqrt(sum(a * a for a in query_embedding))
norm_s = math.sqrt(sum(b * b for b in stored_embedding))
similarity = dot / (norm_q * norm_s) if norm_q and norm_s else 0.0
similarities.append((row['id'], float(similarity)))

# Sort by similarity and return top results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

import json
import os
import numpy as np
try:
import numpy as np
_NP_AVAILABLE = True
except ImportError:
np = None # type: ignore[assignment]
_NP_AVAILABLE = False
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import logging
Expand Down Expand Up @@ -392,7 +397,7 @@ def _load_or_initialize_model(self):
logger.info("Initializing new quality assessment model")
self._initialize_dummy_model()

def extract_features(self, manuscript: Dict[str, Any]) -> np.ndarray:
def extract_features(self, manuscript: Dict[str, Any]) -> Any:
"""Extract features from manuscript"""
features = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def optimize_formatting(self, document: Document, target_format: Optional[
if target_format is None:
# Use document's own format_type or call _predict_optimal_format
prediction = self._predict_optimal_format(document)
target_format = prediction.get('recommended_format', document.format_type or FormatType.PDF)
target_format = prediction.get('recommended_format', getattr(document, 'format_type', None) or FormatType.PDF)
confidence = float(prediction.get('confidence', 0.8))
optimization_rules: List[str] = list(prediction.get('optimization_rules', []))
else:
Expand Down Expand Up @@ -210,7 +210,7 @@ async def optimize_formatting(self, document: Document, target_format: Optional[
logger.info(f"Applied {len(applied_rules)} formatting rules to document {document.document_id}")

return OptimizationResult(
original_format=document.format_type,
original_format=getattr(document, 'format_type', None),
optimized_format=target_format,
confidence_score=confidence,
applied_optimizations=applied_rules,
Expand All @@ -219,7 +219,7 @@ async def optimize_formatting(self, document: Document, target_format: Optional[
except Exception as e:
logger.error(f"Error optimizing document formatting: {e}")
return OptimizationResult(
original_format=document.format_type,
original_format=getattr(document, 'format_type', None),
optimized_format=target_format or FormatType.PDF,
confidence_score=0.0,
applied_optimizations=[],
Expand All @@ -228,7 +228,7 @@ async def optimize_formatting(self, document: Document, target_format: Optional[
def _predict_optimal_format(self, document: Document) -> Dict[str, Any]:
"""Predict the optimal output format for a document (can be patched in tests)"""
# Simple heuristic: prefer PDF unless metadata says otherwise
preferred = document.format_type or FormatType.PDF
preferred = getattr(document, 'format_type', None) or FormatType.PDF
return {
'recommended_format': preferred,
'confidence': 0.8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
np = None # type: ignore[assignment]
NUMPY_AVAILABLE = False

try:
Expand Down Expand Up @@ -874,7 +875,7 @@ def _predict_growth_transformer(self, documents: List[Dict[str, Any]]) -> Dict[s
logger.error(f"Error in transformer growth prediction: {e}")
return {'predicted_growth_rate': 0.0}

def _extract_trend_features(self, documents: List[Dict[str, Any]]) -> np.ndarray:
def _extract_trend_features(self, documents: List[Dict[str, Any]]) -> Any:
"""Extract features for trend analysis"""
features = []

Expand Down Expand Up @@ -984,7 +985,7 @@ def _calculate_author_expertise(self, authors: List[str]) -> float:

return total_expertise / len(authors)

def _identify_trending_topics(self, features: np.ndarray) -> List[str]:
def _identify_trending_topics(self, features: Any) -> List[str]:
"""Identify trending topics using clustering"""
try:
if len(features) < 3:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import asyncio
import logging
import json
import numpy as np
try:
import numpy as np
_NP_AVAILABLE = True
except ImportError:
np = None # type: ignore[assignment]
_NP_AVAILABLE = False
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
Expand Down Expand Up @@ -480,8 +485,10 @@ def _calculate_growth_indicators(self, citation_trends: Dict[str, List[int]]) ->

if len(sorted_months) >= 2:
# Calculate month-over-month growth
recent_avg = np.mean(citation_trends[sorted_months[-1]]) if citation_trends[sorted_months[-1]] else 0
previous_avg = np.mean(citation_trends[sorted_months[-2]]) if citation_trends[sorted_months[-2]] else 0
def _mean(lst):
return (np.mean(lst) if _NP_AVAILABLE else sum(lst) / len(lst)) if lst else 0
recent_avg = _mean(citation_trends[sorted_months[-1]])
previous_avg = _mean(citation_trends[sorted_months[-2]])

if previous_avg > 0:
mom_growth = (recent_avg - previous_avg) / previous_avg
Expand All @@ -490,13 +497,21 @@ def _calculate_growth_indicators(self, citation_trends: Dict[str, List[int]]) ->
# Calculate overall trend slope
monthly_avgs = []
for month in sorted_months:
avg_citations = np.mean(citation_trends[month]) if citation_trends[month] else 0
monthly_avgs.append(avg_citations)
monthly_avgs.append(_mean(citation_trends[month]))

if len(monthly_avgs) > 1:
# Simple linear trend
x = np.arange(len(monthly_avgs))
trend_slope = np.polyfit(x, monthly_avgs, 1)[0]
if _NP_AVAILABLE:
x = np.arange(len(monthly_avgs))
trend_slope = np.polyfit(x, monthly_avgs, 1)[0]
else:
n = len(monthly_avgs)
x_vals = list(range(n))
x_mean = sum(x_vals) / n
y_mean = sum(monthly_avgs) / n
num = sum((x_vals[i] - x_mean) * (monthly_avgs[i] - y_mean) for i in range(n))
den = sum((x_vals[i] - x_mean) ** 2 for i in range(n))
trend_slope = num / den if den else 0.0
indicators['trend_slope'] = trend_slope

return indicators
Expand Down
Loading
Loading