Source code for agentgit.agents.agent_service

"""Service for managing RollbackAgent instances.

Handles agent creation, rollback operations, and session management.
"""

from typing import Optional, Dict, Any, List, Callable
from datetime import datetime
import os
from langchain_openai import ChatOpenAI

from agentgit.agents.rollback_agent import RollbackAgent
from agentgit.sessions.external_session import ExternalSession
from agentgit.sessions.internal_session import InternalSession
from agentgit.database.repositories.external_session_repository import ExternalSessionRepository
from agentgit.database.repositories.internal_session_repository import InternalSessionRepository
from agentgit.database.repositories.checkpoint_repository import CheckpointRepository


[docs] class AgentService: """Service for managing RollbackAgent instances. Provides high-level operations for creating agents, handling rollbacks, and managing the interaction between external and internal sessions. """ def __init__(self, model_config: Optional[Dict[str, Any]] = None): """Initialize the agent service. Args: model_config: Optional configuration for the AI model. """ self.external_session_repo = ExternalSessionRepository() self.internal_session_repo = InternalSessionRepository() self.checkpoint_repo = CheckpointRepository() # Default model configuration base_url = os.getenv("BASE_URL") api_key = os.getenv("OPENAI_API_KEY") base_url = self._sanitize_base_url(base_url) self.model_config = model_config or { "id": "gpt-4o-mini", "temperature": 0.7, "api_key": api_key, "base_url": base_url } self.current_agent: Optional[RollbackAgent] = None def _sanitize_base_url(self, raw_url: Optional[str]) -> Optional[str]: if not raw_url: return None url = raw_url.strip().rstrip("/") if not url: return None if not (url.startswith("http://") or url.startswith("https://")): url = "https://" + url return url
[docs] def create_new_agent( self, external_session_id: int, session_name: Optional[str] = None, base_url: Optional[str] = None, api_key: Optional[str] = None, **agent_kwargs ) -> RollbackAgent: """Create a new RollbackAgent for an external session. Args: external_session_id: ID of the external session. session_name: Optional name for the internal session. base_url: Optional base URL for the model provider (overrides defaults/env if provided). api_key: Optional API key for the model provider (overrides defaults/env if provided). **agent_kwargs: Additional arguments for the agent. Returns: The created RollbackAgent instance. """ # Resolve effective model configuration (allow per-call overrides) effective_model_config = dict(self.model_config) if api_key: effective_model_config["api_key"] = api_key if base_url: effective_model_config["base_url"] = self._sanitize_base_url(base_url) # Create the model using LangChain's ChatOpenAI # Map config keys to LangChain format model = ChatOpenAI( model=effective_model_config.get("id", "gpt-4o-mini"), temperature=effective_model_config.get("temperature", 0.7), openai_api_key=effective_model_config.get("api_key"), openai_api_base=effective_model_config.get("base_url") ) # Create the agent with repositories agent = RollbackAgent( external_session_id=external_session_id, model=model, internal_session_repo=self.internal_session_repo, checkpoint_repo=self.checkpoint_repo, add_history_to_messages=True, num_history_runs=5, show_tool_calls=True, **agent_kwargs ) # Set session name if provided if session_name and agent.internal_session: # Update the internal session name via checkpoint name # (Internal sessions don't have names, but we can track it in metadata) pass self.current_agent = agent return agent
[docs] def resume_agent( self, external_session_id: int, internal_session_id: Optional[int] = None, base_url: Optional[str] = None, api_key: Optional[str] = None, tools: Optional[List] = None, reverse_tools: Optional[Dict[str, Callable]] = None ) -> Optional[RollbackAgent]: """Resume an existing agent session. Args: external_session_id: ID of the external session. internal_session_id: Optional specific internal session to resume. If None, uses the current internal session. base_url: Optional base URL for the model provider (overrides defaults/env if provided). api_key: Optional API key for the model provider (overrides defaults/env if provided). tools: Optional list of tools to register with the agent. reverse_tools: Optional mapping of tool names to reverse handlers. Returns: The resumed RollbackAgent instance, or None if not found. """ # Get the external session external_session = self.external_session_repo.get_by_id(external_session_id) if not external_session: return None # Get the internal session to resume if internal_session_id: internal_session = self.internal_session_repo.get_by_id(internal_session_id) else: internal_session = self.internal_session_repo.get_current_session(external_session_id) if not internal_session: # No existing internal session, create new agent return self.create_new_agent(external_session_id) # Create agent and restore state effective_model_config = dict(self.model_config) if api_key: effective_model_config["api_key"] = api_key if base_url: effective_model_config["base_url"] = self._sanitize_base_url(base_url) # Create the model using LangChain's ChatOpenAI model = ChatOpenAI( model=effective_model_config.get("id", "gpt-4o-mini"), temperature=effective_model_config.get("temperature", 0.7), openai_api_key=effective_model_config.get("api_key"), openai_api_base=effective_model_config.get("base_url") ) agent = RollbackAgent( external_session_id=external_session_id, model=model, internal_session_repo=self.internal_session_repo, checkpoint_repo=self.checkpoint_repo, session_state=internal_session.session_state, skip_session_creation=True, # Don't create a new session, we're resuming tools=tools, reverse_tools=reverse_tools, add_history_to_messages=True, num_history_runs=5, show_tool_calls=True ) # Set the existing internal session (since we skipped creation) agent.internal_session = internal_session agent.langgraph_session_id = internal_session.langgraph_session_id # CRITICAL FIX: Restore conversation history for resumed sessions # This ensures the agent remembers previous conversations if internal_session.conversation_history: agent._restored_from_checkpoint = True agent._restored_history = internal_session.conversation_history.copy() # Restore tool track from latest checkpoint latest_checkpoint = self.checkpoint_repo.get_latest_checkpoint(internal_session.id) if latest_checkpoint and latest_checkpoint.tool_invocations: from agentgit.core.rollback_protocol import ToolInvocationRecord for inv in latest_checkpoint.tool_invocations: record = ToolInvocationRecord( tool_name=inv.get("tool_name"), args=inv.get("args", {}), result=inv.get("result"), success=inv.get("success", True), error_message=inv.get("error_message") ) agent.tool_rollback_registry._track.append(record) self.current_agent = agent return agent
[docs] def rollback_to_checkpoint( self, external_session_id: int, checkpoint_id: int, base_url: Optional[str] = None, api_key: Optional[str] = None, rollback_tools: bool = True, tools: Optional[List] = None, reverse_tools: Optional[Dict[str, Callable]] = None ) -> Optional[RollbackAgent]: """Create a new agent from a checkpoint (rollback operation). Args: external_session_id: ID of the external session. checkpoint_id: ID of the checkpoint to rollback to. base_url: Optional base URL for the model provider (overrides defaults/env if provided). api_key: Optional API key for the model provider (overrides defaults/env if provided). rollback_tools: Whether to rollback tool operations after the checkpoint. tools: Optional list of tools to register with the agent. reverse_tools: Optional mapping of tool names to reverse handlers. Returns: A new RollbackAgent with the checkpoint's state, or None if failed. """ try: # If we have a current agent and rollback_tools is enabled, # rollback tool operations after the checkpoint first if rollback_tools and self.current_agent: checkpoint = self.checkpoint_repo.get_by_id(checkpoint_id) if checkpoint and "tool_track_position" in checkpoint.metadata: tool_track_position = checkpoint.metadata["tool_track_position"] print(f"Rolling back tools from position {tool_track_position}...") reverse_results = self.current_agent.rollback_tools_from_track_index(tool_track_position) for rr in reverse_results: if not rr.reversed_successfully: print(f"Warning: Failed to reverse {rr.tool_name}: {rr.error_message}") effective_model_config = dict(self.model_config) if api_key: effective_model_config["api_key"] = api_key if base_url: effective_model_config["base_url"] = self._sanitize_base_url(base_url) # Create the model using LangChain's ChatOpenAI model = ChatOpenAI( model=effective_model_config.get("id", "gpt-4o-mini"), temperature=effective_model_config.get("temperature", 0.7), openai_api_key=effective_model_config.get("api_key"), openai_api_base=effective_model_config.get("base_url") ) agent = RollbackAgent.from_checkpoint( checkpoint_id=checkpoint_id, external_session_id=external_session_id, model=model, checkpoint_repo=self.checkpoint_repo, internal_session_repo=self.internal_session_repo, tools=tools, reverse_tools=reverse_tools, add_history_to_messages=True, num_history_runs=5, show_tool_calls=True ) self.current_agent = agent return agent except ValueError as e: print(f"Rollback failed: {e}") return None
[docs] def handle_agent_response(self, agent: RollbackAgent, response: Any) -> bool: """Handle agent response and check for rollback requests. Args: agent: The RollbackAgent that generated the response. response: The response from the agent. Returns: True if a rollback was requested and should be handled, False otherwise. """ # Check if rollback was requested if agent.internal_session and agent.internal_session.session_state.get('rollback_requested'): checkpoint_id = agent.internal_session.session_state.get('rollback_checkpoint_id') if checkpoint_id: # Don't clear the checkpoint_id yet - the caller needs it! # Only clear the request flag agent.internal_session.session_state['rollback_requested'] = False # Keep rollback_checkpoint_id for the caller to use agent._save_internal_session() return True # Signal that rollback should be performed return False
[docs] def list_internal_sessions(self, external_session_id: int) -> list: """List all internal sessions for an external session. Args: external_session_id: ID of the external session. Returns: List of internal sessions. """ return self.internal_session_repo.get_by_external_session(external_session_id)
[docs] def list_checkpoints(self, internal_session_id: int) -> list: """List all checkpoints for an internal session. Args: internal_session_id: ID of the internal session. Returns: List of checkpoints. """ return self.checkpoint_repo.get_by_internal_session(internal_session_id)
[docs] def get_conversation_summary(self, agent: RollbackAgent) -> str: """Get a summary of the conversation history. Args: agent: The RollbackAgent instance. Returns: A formatted summary of the conversation. """ history = agent.get_conversation_history() if not history: return "No conversation history yet." summary = f"Conversation ({len(history)} messages):\n" for msg in history[-10:]: # Show last 10 messages role = msg.get('role', 'unknown') content = msg.get('content', '') timestamp = msg.get('timestamp', '') # Truncate long messages if len(content) > 100: content = content[:97] + "..." summary += f"\n[{role}] {content}\n" return summary
[docs] def get_active_agent(self, external_session_id: int) -> Optional[RollbackAgent]: """Get an active agent for a session if one exists. Args: external_session_id: ID of the external session. Returns: The active RollbackAgent if found, None otherwise. """ return self.active_agents.get(external_session_id)
[docs] def cleanup_agent(self, external_session_id: int): """Remove an agent from active tracking. Args: external_session_id: ID of the external session. """ if external_session_id in self.active_agents: del self.active_agents[external_session_id] if self.current_agent and self.current_agent.external_session_id == external_session_id: self.current_agent = None
[docs] def get_branch_tree(self, external_session_id: int) -> Dict[str, Any]: """Get the branch tree structure for an external session. Args: external_session_id: ID of the external session. Returns: Dictionary representing the branch tree structure. """ internal_sessions = self.internal_session_repo.get_by_external_session(external_session_id) # Build session info map first session_info_map = {} for session in internal_sessions: session_info_map[session.id] = { "id": session.id, "session_id": session.langgraph_session_id, "created_at": session.created_at.isoformat() if session.created_at else None, "is_current": session.is_current, "is_branch": session.is_branch(), "checkpoint_count": session.checkpoint_count, "tool_invocations": session.tool_invocation_count, "children": [] } # Build tree structure - first pass: identify roots tree = {} for session in internal_sessions: if not session.parent_session_id: # Root session tree[session.id] = session_info_map[session.id] # Second pass: add children to parents for session in internal_sessions: if session.parent_session_id and session.parent_session_id in session_info_map: parent_info = session_info_map[session.parent_session_id] parent_info["children"].append(session_info_map[session.id]) return tree