Source code for agentgit.database.repositories.external_session_repository

"""Repository for external session database operations.

Handles CRUD operations for external sessions in the LangGraph rollback agent system.
"""

import sqlite3
import json
from typing import Optional, List, Dict, Any
from datetime import datetime

from agentgit.sessions.external_session import ExternalSession
from agentgit.database.db_config import get_database_path
from agentgit.database.db_config import get_database_path



[docs] class ExternalSessionRepository: """Repository for ExternalSession CRUD operations with SQLite. Manages external sessions which are the user-visible conversation containers. Each external session can contain multiple internal sessions with branching support. Attributes: db_path: Path to the SQLite database file. Example: >>> repo = ExternalSessionRepository() >>> session = ExternalSession(user_id=1, session_name="My Chat") >>> saved_session = repo.create(session) >>> sessions = repo.get_user_sessions(user_id=1) """ def __init__(self, db_path: Optional[str] = None): """Initialize the external session repository. Args: db_path: Path to SQLite database. If None, uses configured default. """ self.db_path = db_path or get_database_path() self._init_db() def _init_db(self): """Initialize the external sessions table if it doesn't exist.""" conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" CREATE TABLE IF NOT EXISTS external_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, session_name TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT, is_active INTEGER DEFAULT 1, data TEXT, metadata TEXT, branch_count INTEGER DEFAULT 0, total_checkpoints INTEGER DEFAULT 0, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) """) # Check for migration needs cursor.execute("PRAGMA table_info(external_sessions)") columns = [column[1] for column in cursor.fetchall()] # Add new columns if they don't exist if 'metadata' not in columns: cursor.execute("ALTER TABLE external_sessions ADD COLUMN metadata TEXT") if 'branch_count' not in columns: cursor.execute("ALTER TABLE external_sessions ADD COLUMN branch_count INTEGER DEFAULT 0") if 'total_checkpoints' not in columns: cursor.execute("ALTER TABLE external_sessions ADD COLUMN total_checkpoints INTEGER DEFAULT 0") cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_external_sessions_user ON external_sessions(user_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_external_sessions_active ON external_sessions(user_id, is_active) """) conn.commit() finally: conn.close()
[docs] def create(self, session: ExternalSession) -> ExternalSession: """Create a new external session. Args: session: ExternalSession object to create. Returns: The created session with id populated. Raises: sqlite3.IntegrityError: If user_id doesn't exist. """ if not session.created_at: session.created_at = datetime.now() session_dict = session.to_dict() json_data = json.dumps(session_dict) conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" INSERT INTO external_sessions (user_id, session_name, created_at, updated_at, is_active, data, metadata, branch_count, total_checkpoints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( session.user_id, session.session_name, session.created_at.isoformat(), session.updated_at.isoformat() if session.updated_at else None, 1 if session.is_active else 0, json_data, json.dumps(session.metadata) if session.metadata else None, session.branch_count, session.total_checkpoints )) session.id = cursor.lastrowid conn.commit() finally: conn.close() return session
[docs] def update(self, session: ExternalSession) -> bool: """Update an existing external session. Updates all session data including internal session IDs and current session. Args: session: ExternalSession object with updated data. Returns: True if update successful, False if session not found. """ if not session.id: return False session.updated_at = datetime.now() session_dict = session.to_dict() json_data = json.dumps(session_dict) conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" UPDATE external_sessions SET session_name = ?, updated_at = ?, is_active = ?, data = ?, metadata = ?, branch_count = ?, total_checkpoints = ? WHERE id = ? """, ( session.session_name, session.updated_at.isoformat(), 1 if session.is_active else 0, json_data, json.dumps(session.metadata) if session.metadata else None, session.branch_count, session.total_checkpoints, session.id )) conn.commit() return cursor.rowcount > 0 finally: conn.close()
[docs] def get_by_id(self, session_id: int) -> Optional[ExternalSession]: """Get an external session by ID. Args: session_id: The ID of the session to retrieve. Returns: ExternalSession if found, None otherwise. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" SELECT id, user_id, session_name, created_at, updated_at, is_active, data, metadata, branch_count, total_checkpoints FROM external_sessions WHERE id = ? """, (session_id,)) row = cursor.fetchone() if row: return self._row_to_session(row) finally: conn.close() return None
[docs] def get_user_sessions(self, user_id: int, active_only: bool = False) -> List[ExternalSession]: """Get all sessions for a user. Args: user_id: The ID of the user. active_only: If True, only return active sessions. Returns: List of ExternalSession objects, ordered by created_at descending. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") if active_only: cursor.execute(""" SELECT id, user_id, session_name, created_at, updated_at, is_active, data, metadata, branch_count, total_checkpoints FROM external_sessions WHERE user_id = ? AND is_active = 1 ORDER BY created_at DESC """, (user_id,)) else: cursor.execute(""" SELECT id, user_id, session_name, created_at, updated_at, is_active, data, metadata, branch_count, total_checkpoints FROM external_sessions WHERE user_id = ? ORDER BY created_at DESC """, (user_id,)) rows = cursor.fetchall() return [self._row_to_session(row) for row in rows] finally: conn.close()
[docs] def get_by_internal_session(self, langgraph_session_id: str) -> Optional[ExternalSession]: """Find the external session containing a specific internal langgraph session. Args: langgraph_session_id: The langgraph session ID to search for. Returns: ExternalSession containing the internal session, None if not found. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" SELECT id, user_id, session_name, created_at, updated_at, is_active, data, metadata, branch_count, total_checkpoints FROM external_sessions WHERE data LIKE ? """, (f'%"{langgraph_session_id}"%',)) rows = cursor.fetchall() for row in rows: session = self._row_to_session(row) if langgraph_session_id in session.internal_session_ids: return session finally: conn.close() return None
[docs] def add_internal_session(self, external_session_id: int, langgraph_session_id: str) -> bool: """Add an internal langgraph session to an external session. Args: external_session_id: The ID of the external session. langgraph_session_id: The langgraph session ID to add. Returns: True if successful, False if external session not found. """ session = self.get_by_id(external_session_id) if not session: return False session.add_internal_session(langgraph_session_id) return self.update(session)
[docs] def set_current_internal_session(self, external_session_id: int, langgraph_session_id: str) -> bool: """Set the current internal session for an external session. Args: external_session_id: The ID of the external session. langgraph_session_id: The langgraph session ID to set as current. Returns: True if successful, False if session not found or langgraph_session_id not in list. """ session = self.get_by_id(external_session_id) if not session: return False if session.set_current_internal_session(langgraph_session_id): return self.update(session) return False
[docs] def deactivate(self, session_id: int) -> bool: """Deactivate an external session (soft delete). Args: session_id: The ID of the session to deactivate. Returns: True if deactivation successful, False otherwise. """ # Get the session first to update its data session = self.get_by_id(session_id) if not session: return False # Update the session object session.is_active = False session.updated_at = datetime.now() # Update both the column and the JSON data return self.update(session)
[docs] def delete(self, session_id: int) -> bool: """Permanently delete an external session. Args: session_id: The ID of the session to delete. Returns: True if deletion successful, False otherwise. Note: This will cascade delete all internal sessions and checkpoints associated with this external session. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" DELETE FROM external_sessions WHERE id = ? """, (session_id,)) conn.commit() return cursor.rowcount > 0 finally: conn.close()
[docs] def check_ownership(self, session_id: int, user_id: int) -> bool: """Check if a user owns a specific session. Args: session_id: The ID of the session. user_id: The ID of the user. Returns: True if the user owns the session, False otherwise. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") cursor.execute(""" SELECT COUNT(*) FROM external_sessions WHERE id = ? AND user_id = ? """, (session_id, user_id)) count = cursor.fetchone()[0] return count > 0 finally: conn.close()
[docs] def count_user_sessions(self, user_id: int, active_only: bool = False) -> int: """Count the number of sessions a user has. Args: user_id: The ID of the user. active_only: If True, only count active sessions. Returns: The number of sessions. """ conn = sqlite3.connect(self.db_path) try: cursor = conn.cursor() # Enable foreign key constraints cursor.execute("PRAGMA foreign_keys = ON") if active_only: cursor.execute(""" SELECT COUNT(*) FROM external_sessions WHERE user_id = ? AND is_active = 1 """, (user_id,)) else: cursor.execute(""" SELECT COUNT(*) FROM external_sessions WHERE user_id = ? """, (user_id,)) return cursor.fetchone()[0] finally: conn.close()
def _row_to_session(self, row) -> ExternalSession: """Convert a database row to an ExternalSession object. Args: row: Tuple containing database fields. Returns: ExternalSession object with all fields including internal session tracking. """ # Handle both old and new row formats if len(row) == 7: # Old format without new columns session_id, user_id, session_name, created_at, updated_at, is_active, json_data = row metadata = None branch_count = 0 total_checkpoints = 0 else: # New format with all columns session_id, user_id, session_name, created_at, updated_at, is_active, json_data, metadata, branch_count, total_checkpoints = row if json_data: session_dict = json.loads(json_data) else: # Fallback for older records without JSON data session_dict = { "id": session_id, "user_id": user_id, "session_name": session_name, "created_at": created_at, "updated_at": updated_at, "is_active": bool(is_active), "internal_session_ids": [], "current_internal_session_id": None, "metadata": {}, "branch_count": 0, "total_checkpoints": 0 } # Override with actual database values for new fields if metadata: session_dict["metadata"] = json.loads(metadata) session_dict["branch_count"] = branch_count or 0 session_dict["total_checkpoints"] = total_checkpoints or 0 session = ExternalSession.from_dict(session_dict) session.id = session_id # Ensure ID is set return session