Skip to content

Audit Module

Audit logging infrastructure including the main logger, encryption utilities, and storage backends.


AuditLogger

AuditLogger

Main audit logging interface for AI compliance.

The AuditLogger provides a high-level interface for recording AI interactions, including inputs, outputs, safety evaluations, and performance metrics. It supports optional encryption of content and configurable retention policies.

Parameters:

Name Type Description Default
storage Union[StorageBackend, str]

Either a StorageBackend instance or a file path string. If a string is provided, FileStorage is automatically created.

required
encryption EncryptionManager | None

Optional EncryptionManager for encrypting stored content.

None
store_content bool

If True, stores actual content. If False, only stores content hashes for privacy (default: False).

False
retention_days int

Number of days to retain entries before cleanup (default: 365).

365

Attributes:

Name Type Description
storage StorageBackend

The underlying storage backend.

encryption

The encryption manager (if provided).

store_content

Whether to store actual content.

retention_days

Retention period in days.

Example

Basic usage with file storage:

logger = AuditLogger("/var/log/audit") entry_id = await logger.log( ... input="What is 2+2?", ... output="4", ... provider="openai", ... model="gpt-4", ... )

With encryption and content storage:

from rotalabs_comply.audit import EncryptionManager encryption = EncryptionManager() logger = AuditLogger( ... "/var/log/audit", ... encryption=encryption, ... store_content=True, ... ) entry_id = await logger.log( ... input="Sensitive question", ... output="Sensitive answer", ... safety_passed=True, ... )

With custom storage backend:

from rotalabs_comply.audit import MemoryStorage storage = MemoryStorage(max_entries=10000) logger = AuditLogger(storage)

Source code in src/rotalabs_comply/audit/logger.py
class AuditLogger:
    """
    Main audit logging interface for AI compliance.

    The AuditLogger provides a high-level interface for recording AI interactions,
    including inputs, outputs, safety evaluations, and performance metrics. It
    supports optional encryption of content and configurable retention policies.

    Args:
        storage: Either a StorageBackend instance or a file path string.
            If a string is provided, FileStorage is automatically created.
        encryption: Optional EncryptionManager for encrypting stored content.
        store_content: If True, stores actual content. If False, only stores
            content hashes for privacy (default: False).
        retention_days: Number of days to retain entries before cleanup
            (default: 365).

    Attributes:
        storage: The underlying storage backend.
        encryption: The encryption manager (if provided).
        store_content: Whether to store actual content.
        retention_days: Retention period in days.

    Example:
        Basic usage with file storage:

        >>> logger = AuditLogger("/var/log/audit")
        >>> entry_id = await logger.log(
        ...     input="What is 2+2?",
        ...     output="4",
        ...     provider="openai",
        ...     model="gpt-4",
        ... )

        With encryption and content storage:

        >>> from rotalabs_comply.audit import EncryptionManager
        >>> encryption = EncryptionManager()
        >>> logger = AuditLogger(
        ...     "/var/log/audit",
        ...     encryption=encryption,
        ...     store_content=True,
        ... )
        >>> entry_id = await logger.log(
        ...     input="Sensitive question",
        ...     output="Sensitive answer",
        ...     safety_passed=True,
        ... )

        With custom storage backend:

        >>> from rotalabs_comply.audit import MemoryStorage
        >>> storage = MemoryStorage(max_entries=10000)
        >>> logger = AuditLogger(storage)
    """

    def __init__(
        self,
        storage: Union[StorageBackend, str],
        encryption: EncryptionManager | None = None,
        store_content: bool = False,
        retention_days: int = 365,
    ) -> None:
        """
        Initialize the audit logger.

        Args:
            storage: StorageBackend instance or file path string.
            encryption: Optional encryption manager for content encryption.
            store_content: Whether to store actual content (default: False).
            retention_days: Days to retain entries (default: 365).
        """
        if isinstance(storage, str):
            self.storage: StorageBackend = FileStorage(storage)
        else:
            self.storage = storage

        self.encryption = encryption
        self.store_content = store_content
        self.retention_days = retention_days

    def _prepare_content(
        self, content: str
    ) -> tuple[str | None, str]:
        """
        Prepare content for storage.

        Returns tuple of (stored_content, content_hash).
        If store_content is False, stored_content will be None.
        If encryption is enabled, stored_content will be encrypted.
        """
        content_hash = hash_content(content)

        if not self.store_content:
            return None, content_hash

        if self.encryption:
            stored_content = self.encryption.encrypt(content)
        else:
            stored_content = content

        return stored_content, content_hash

    async def log(
        self,
        input: str,
        output: str,
        provider: str | None = None,
        model: str | None = None,
        conversation_id: str | None = None,
        safety_passed: bool = True,
        detectors_triggered: List[str] | None = None,
        block_reason: str | None = None,
        alerts: List[str] | None = None,
        latency_ms: float = 0.0,
        input_tokens: int | None = None,
        output_tokens: int | None = None,
        metadata: Dict[str, Any] | None = None,
    ) -> str:
        """
        Log an AI interaction.

        Creates an audit entry for the interaction and stores it in the
        configured storage backend.

        Args:
            input: The user input or prompt.
            output: The AI-generated output or response.
            provider: The AI provider (e.g., "openai", "anthropic").
            model: The model identifier (e.g., "gpt-4", "claude-3-opus").
            conversation_id: Optional ID to link related interactions.
            safety_passed: Whether the interaction passed safety checks
                (default: True).
            detectors_triggered: List of safety detector names that triggered.
            block_reason: Reason for blocking, if the request was blocked.
            alerts: List of alert messages generated.
            latency_ms: Time taken to process the request in milliseconds
                (default: 0.0).
            input_tokens: Number of tokens in the input.
            output_tokens: Number of tokens in the output.
            metadata: Additional custom metadata dictionary.

        Returns:
            str: The unique entry ID for this audit log entry.

        Example:
            >>> entry_id = await logger.log(
            ...     input="Tell me a joke",
            ...     output="Why did the chicken...",
            ...     provider="anthropic",
            ...     model="claude-3-opus",
            ...     safety_passed=True,
            ...     latency_ms=250.5,
            ...     input_tokens=5,
            ...     output_tokens=20,
            ...     metadata={"session_id": "abc123"},
            ... )
        """
        entry_id = create_entry_id()
        timestamp = datetime.utcnow().isoformat()

        input_content, input_hash = self._prepare_content(input)
        output_content, output_hash = self._prepare_content(output)

        entry = AuditEntry(
            id=entry_id,
            timestamp=timestamp,
            input_hash=input_hash,
            output_hash=output_hash,
            input_content=input_content,
            output_content=output_content,
            provider=provider,
            model=model,
            conversation_id=conversation_id,
            safety_passed=safety_passed,
            detectors_triggered=detectors_triggered or [],
            block_reason=block_reason,
            alerts=alerts or [],
            latency_ms=latency_ms,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            metadata=metadata or {},
        )

        await self.storage.write(entry)
        return entry_id

    async def get_entry(self, entry_id: str) -> AuditEntry | None:
        """
        Retrieve an audit entry by ID.

        If encryption is enabled and store_content is True, the content
        will still be encrypted in the returned entry. Use the encryption
        manager to decrypt if needed.

        Args:
            entry_id: The unique identifier of the entry.

        Returns:
            AuditEntry | None: The entry if found, None otherwise.

        Example:
            >>> entry = await logger.get_entry("abc-123-def")
            >>> if entry:
            ...     print(f"Safety passed: {entry.safety_passed}")
        """
        return await self.storage.read(entry_id)

    async def get_entries(
        self, start: datetime, end: datetime
    ) -> List[AuditEntry]:
        """
        Retrieve all audit entries within a time range.

        Args:
            start: Start of the time range (inclusive).
            end: End of the time range (inclusive).

        Returns:
            List[AuditEntry]: All entries within the specified time range.

        Example:
            >>> from datetime import datetime, timedelta
            >>> end = datetime.utcnow()
            >>> start = end - timedelta(days=7)
            >>> entries = await logger.get_entries(start, end)
            >>> print(f"Found {len(entries)} entries in the last week")
        """
        return await self.storage.list_entries(start, end)

    async def cleanup_expired(self) -> int:
        """
        Delete entries older than the retention period.

        Removes all entries with timestamps older than `retention_days` from
        the current time.

        Returns:
            int: Number of entries deleted.

        Example:
            >>> # Delete entries older than retention_days
            >>> deleted_count = await logger.cleanup_expired()
            >>> print(f"Cleaned up {deleted_count} expired entries")

        Note:
            This operation may be slow for large datasets. Consider running
            during off-peak hours or using storage-native lifecycle policies
            (e.g., S3 lifecycle rules) for better performance.
        """
        cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
        start = datetime(1970, 1, 1)  # Unix epoch

        expired_entries = await self.storage.list_entries(start, cutoff)

        deleted_count = 0
        for entry in expired_entries:
            if await self.storage.delete(entry.id):
                deleted_count += 1

        return deleted_count

    def decrypt_content(self, encrypted_content: str) -> str:
        """
        Decrypt encrypted content from an audit entry.

        Convenience method for decrypting content stored in audit entries
        when encryption is enabled.

        Args:
            encrypted_content: The encrypted content string from an AuditEntry.

        Returns:
            str: The decrypted original content.

        Raises:
            ValueError: If no encryption manager is configured.
            cryptography.fernet.InvalidToken: If decryption fails.

        Example:
            >>> entry = await logger.get_entry("abc-123")
            >>> if entry and entry.input_content:
            ...     original_input = logger.decrypt_content(entry.input_content)
        """
        if not self.encryption:
            raise ValueError(
                "No encryption manager configured. "
                "Cannot decrypt content without the original encryption key."
            )
        return self.encryption.decrypt(encrypted_content)

__init__

__init__(
    storage: Union[StorageBackend, str],
    encryption: EncryptionManager | None = None,
    store_content: bool = False,
    retention_days: int = 365,
) -> None

Initialize the audit logger.

Parameters:

Name Type Description Default
storage Union[StorageBackend, str]

StorageBackend instance or file path string.

required
encryption EncryptionManager | None

Optional encryption manager for content encryption.

None
store_content bool

Whether to store actual content (default: False).

False
retention_days int

Days to retain entries (default: 365).

365
Source code in src/rotalabs_comply/audit/logger.py
def __init__(
    self,
    storage: Union[StorageBackend, str],
    encryption: EncryptionManager | None = None,
    store_content: bool = False,
    retention_days: int = 365,
) -> None:
    """
    Initialize the audit logger.

    Args:
        storage: StorageBackend instance or file path string.
        encryption: Optional encryption manager for content encryption.
        store_content: Whether to store actual content (default: False).
        retention_days: Days to retain entries (default: 365).
    """
    if isinstance(storage, str):
        self.storage: StorageBackend = FileStorage(storage)
    else:
        self.storage = storage

    self.encryption = encryption
    self.store_content = store_content
    self.retention_days = retention_days

log async

log(
    input: str,
    output: str,
    provider: str | None = None,
    model: str | None = None,
    conversation_id: str | None = None,
    safety_passed: bool = True,
    detectors_triggered: List[str] | None = None,
    block_reason: str | None = None,
    alerts: List[str] | None = None,
    latency_ms: float = 0.0,
    input_tokens: int | None = None,
    output_tokens: int | None = None,
    metadata: Dict[str, Any] | None = None,
) -> str

Log an AI interaction.

Creates an audit entry for the interaction and stores it in the configured storage backend.

Parameters:

Name Type Description Default
input str

The user input or prompt.

required
output str

The AI-generated output or response.

required
provider str | None

The AI provider (e.g., "openai", "anthropic").

None
model str | None

The model identifier (e.g., "gpt-4", "claude-3-opus").

None
conversation_id str | None

Optional ID to link related interactions.

None
safety_passed bool

Whether the interaction passed safety checks (default: True).

True
detectors_triggered List[str] | None

List of safety detector names that triggered.

None
block_reason str | None

Reason for blocking, if the request was blocked.

None
alerts List[str] | None

List of alert messages generated.

None
latency_ms float

Time taken to process the request in milliseconds (default: 0.0).

0.0
input_tokens int | None

Number of tokens in the input.

None
output_tokens int | None

Number of tokens in the output.

None
metadata Dict[str, Any] | None

Additional custom metadata dictionary.

None

Returns:

Name Type Description
str str

The unique entry ID for this audit log entry.

Example

entry_id = await logger.log( ... input="Tell me a joke", ... output="Why did the chicken...", ... provider="anthropic", ... model="claude-3-opus", ... safety_passed=True, ... latency_ms=250.5, ... input_tokens=5, ... output_tokens=20, ... metadata={"session_id": "abc123"}, ... )

Source code in src/rotalabs_comply/audit/logger.py
async def log(
    self,
    input: str,
    output: str,
    provider: str | None = None,
    model: str | None = None,
    conversation_id: str | None = None,
    safety_passed: bool = True,
    detectors_triggered: List[str] | None = None,
    block_reason: str | None = None,
    alerts: List[str] | None = None,
    latency_ms: float = 0.0,
    input_tokens: int | None = None,
    output_tokens: int | None = None,
    metadata: Dict[str, Any] | None = None,
) -> str:
    """
    Log an AI interaction.

    Creates an audit entry for the interaction and stores it in the
    configured storage backend.

    Args:
        input: The user input or prompt.
        output: The AI-generated output or response.
        provider: The AI provider (e.g., "openai", "anthropic").
        model: The model identifier (e.g., "gpt-4", "claude-3-opus").
        conversation_id: Optional ID to link related interactions.
        safety_passed: Whether the interaction passed safety checks
            (default: True).
        detectors_triggered: List of safety detector names that triggered.
        block_reason: Reason for blocking, if the request was blocked.
        alerts: List of alert messages generated.
        latency_ms: Time taken to process the request in milliseconds
            (default: 0.0).
        input_tokens: Number of tokens in the input.
        output_tokens: Number of tokens in the output.
        metadata: Additional custom metadata dictionary.

    Returns:
        str: The unique entry ID for this audit log entry.

    Example:
        >>> entry_id = await logger.log(
        ...     input="Tell me a joke",
        ...     output="Why did the chicken...",
        ...     provider="anthropic",
        ...     model="claude-3-opus",
        ...     safety_passed=True,
        ...     latency_ms=250.5,
        ...     input_tokens=5,
        ...     output_tokens=20,
        ...     metadata={"session_id": "abc123"},
        ... )
    """
    entry_id = create_entry_id()
    timestamp = datetime.utcnow().isoformat()

    input_content, input_hash = self._prepare_content(input)
    output_content, output_hash = self._prepare_content(output)

    entry = AuditEntry(
        id=entry_id,
        timestamp=timestamp,
        input_hash=input_hash,
        output_hash=output_hash,
        input_content=input_content,
        output_content=output_content,
        provider=provider,
        model=model,
        conversation_id=conversation_id,
        safety_passed=safety_passed,
        detectors_triggered=detectors_triggered or [],
        block_reason=block_reason,
        alerts=alerts or [],
        latency_ms=latency_ms,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        metadata=metadata or {},
    )

    await self.storage.write(entry)
    return entry_id

get_entry async

get_entry(entry_id: str) -> AuditEntry | None

Retrieve an audit entry by ID.

If encryption is enabled and store_content is True, the content will still be encrypted in the returned entry. Use the encryption manager to decrypt if needed.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry.

required

Returns:

Type Description
AuditEntry | None

AuditEntry | None: The entry if found, None otherwise.

Example

entry = await logger.get_entry("abc-123-def") if entry: ... print(f"Safety passed: {entry.safety_passed}")

Source code in src/rotalabs_comply/audit/logger.py
async def get_entry(self, entry_id: str) -> AuditEntry | None:
    """
    Retrieve an audit entry by ID.

    If encryption is enabled and store_content is True, the content
    will still be encrypted in the returned entry. Use the encryption
    manager to decrypt if needed.

    Args:
        entry_id: The unique identifier of the entry.

    Returns:
        AuditEntry | None: The entry if found, None otherwise.

    Example:
        >>> entry = await logger.get_entry("abc-123-def")
        >>> if entry:
        ...     print(f"Safety passed: {entry.safety_passed}")
    """
    return await self.storage.read(entry_id)

get_entries async

get_entries(
    start: datetime, end: datetime
) -> List[AuditEntry]

Retrieve all audit entries within a time range.

Parameters:

Name Type Description Default
start datetime

Start of the time range (inclusive).

required
end datetime

End of the time range (inclusive).

required

Returns:

Type Description
List[AuditEntry]

List[AuditEntry]: All entries within the specified time range.

Example

from datetime import datetime, timedelta end = datetime.utcnow() start = end - timedelta(days=7) entries = await logger.get_entries(start, end) print(f"Found {len(entries)} entries in the last week")

Source code in src/rotalabs_comply/audit/logger.py
async def get_entries(
    self, start: datetime, end: datetime
) -> List[AuditEntry]:
    """
    Retrieve all audit entries within a time range.

    Args:
        start: Start of the time range (inclusive).
        end: End of the time range (inclusive).

    Returns:
        List[AuditEntry]: All entries within the specified time range.

    Example:
        >>> from datetime import datetime, timedelta
        >>> end = datetime.utcnow()
        >>> start = end - timedelta(days=7)
        >>> entries = await logger.get_entries(start, end)
        >>> print(f"Found {len(entries)} entries in the last week")
    """
    return await self.storage.list_entries(start, end)

cleanup_expired async

cleanup_expired() -> int

Delete entries older than the retention period.

Removes all entries with timestamps older than retention_days from the current time.

Returns:

Name Type Description
int int

Number of entries deleted.

Example

Delete entries older than retention_days

deleted_count = await logger.cleanup_expired() print(f"Cleaned up {deleted_count} expired entries")

Note

This operation may be slow for large datasets. Consider running during off-peak hours or using storage-native lifecycle policies (e.g., S3 lifecycle rules) for better performance.

Source code in src/rotalabs_comply/audit/logger.py
async def cleanup_expired(self) -> int:
    """
    Delete entries older than the retention period.

    Removes all entries with timestamps older than `retention_days` from
    the current time.

    Returns:
        int: Number of entries deleted.

    Example:
        >>> # Delete entries older than retention_days
        >>> deleted_count = await logger.cleanup_expired()
        >>> print(f"Cleaned up {deleted_count} expired entries")

    Note:
        This operation may be slow for large datasets. Consider running
        during off-peak hours or using storage-native lifecycle policies
        (e.g., S3 lifecycle rules) for better performance.
    """
    cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
    start = datetime(1970, 1, 1)  # Unix epoch

    expired_entries = await self.storage.list_entries(start, cutoff)

    deleted_count = 0
    for entry in expired_entries:
        if await self.storage.delete(entry.id):
            deleted_count += 1

    return deleted_count

decrypt_content

decrypt_content(encrypted_content: str) -> str

Decrypt encrypted content from an audit entry.

Convenience method for decrypting content stored in audit entries when encryption is enabled.

Parameters:

Name Type Description Default
encrypted_content str

The encrypted content string from an AuditEntry.

required

Returns:

Name Type Description
str str

The decrypted original content.

Raises:

Type Description
ValueError

If no encryption manager is configured.

InvalidToken

If decryption fails.

Example

entry = await logger.get_entry("abc-123") if entry and entry.input_content: ... original_input = logger.decrypt_content(entry.input_content)

Source code in src/rotalabs_comply/audit/logger.py
def decrypt_content(self, encrypted_content: str) -> str:
    """
    Decrypt encrypted content from an audit entry.

    Convenience method for decrypting content stored in audit entries
    when encryption is enabled.

    Args:
        encrypted_content: The encrypted content string from an AuditEntry.

    Returns:
        str: The decrypted original content.

    Raises:
        ValueError: If no encryption manager is configured.
        cryptography.fernet.InvalidToken: If decryption fails.

    Example:
        >>> entry = await logger.get_entry("abc-123")
        >>> if entry and entry.input_content:
        ...     original_input = logger.decrypt_content(entry.input_content)
    """
    if not self.encryption:
        raise ValueError(
            "No encryption manager configured. "
            "Cannot decrypt content without the original encryption key."
        )
    return self.encryption.decrypt(encrypted_content)

Main audit logging interface for AI compliance.

Constructor

AuditLogger(
    storage: Union[StorageBackend, str],
    encryption: Optional[EncryptionManager] = None,
    store_content: bool = False,
    retention_days: int = 365,
)

Parameters:

Parameter Type Default Description
storage Union[StorageBackend, str] Required Storage backend or file path
encryption Optional[EncryptionManager] None Encryption manager for content
store_content bool False Store actual content vs hashes
retention_days int 365 Days to retain entries

Methods

log

async def log(
    input: str,
    output: str,
    provider: Optional[str] = None,
    model: Optional[str] = None,
    conversation_id: Optional[str] = None,
    safety_passed: bool = True,
    detectors_triggered: Optional[List[str]] = None,
    block_reason: Optional[str] = None,
    alerts: Optional[List[str]] = None,
    latency_ms: float = 0.0,
    input_tokens: Optional[int] = None,
    output_tokens: Optional[int] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> str

Log an AI interaction and return the entry ID.

Example:

entry_id = await logger.log(
    input="Tell me a joke",
    output="Why did the chicken...",
    provider="anthropic",
    model="claude-3-opus",
    safety_passed=True,
    latency_ms=250.5,
    metadata={"session_id": "abc123"},
)

get_entry

async def get_entry(entry_id: str) -> Optional[AuditEntry]

Retrieve an audit entry by ID.

Example:

entry = await logger.get_entry("abc-123-def")
if entry:
    print(f"Safety passed: {entry.safety_passed}")

get_entries

async def get_entries(start: datetime, end: datetime) -> List[AuditEntry]

Retrieve all entries within a time range.

Example:

from datetime import datetime, timedelta

end = datetime.utcnow()
start = end - timedelta(days=7)
entries = await logger.get_entries(start, end)

cleanup_expired

async def cleanup_expired() -> int

Delete entries older than the retention period. Returns count of deleted entries.

Example:

deleted = await logger.cleanup_expired()
print(f"Cleaned up {deleted} expired entries")

decrypt_content

def decrypt_content(encrypted_content: str) -> str

Decrypt encrypted content from an audit entry.

Raises:

  • ValueError: If no encryption manager is configured
  • cryptography.fernet.InvalidToken: If decryption fails

Example:

entry = await logger.get_entry("abc-123")
if entry and entry.input_content:
    original = logger.decrypt_content(entry.input_content)

Encryption

EncryptionManager

EncryptionManager

High-level encryption manager for string data.

Provides a convenient interface for encrypting and decrypting string data, with automatic key generation if not provided.

Parameters:

Name Type Description Default
key bytes | None

Optional Fernet encryption key. If not provided, a new key is generated.

None

Attributes:

Name Type Description
_key

The encryption key (kept private).

_fernet

The Fernet cipher instance.

Example

manager = EncryptionManager() encrypted = manager.encrypt("sensitive data") manager.decrypt(encrypted) 'sensitive data'

Use existing key

key = generate_key() manager = EncryptionManager(key) manager.get_key() == key True

Source code in src/rotalabs_comply/audit/encryption.py
class EncryptionManager:
    """
    High-level encryption manager for string data.

    Provides a convenient interface for encrypting and decrypting string data,
    with automatic key generation if not provided.

    Args:
        key: Optional Fernet encryption key. If not provided, a new key is generated.

    Attributes:
        _key: The encryption key (kept private).
        _fernet: The Fernet cipher instance.

    Example:
        >>> manager = EncryptionManager()
        >>> encrypted = manager.encrypt("sensitive data")
        >>> manager.decrypt(encrypted)
        'sensitive data'

        >>> # Use existing key
        >>> key = generate_key()
        >>> manager = EncryptionManager(key)
        >>> manager.get_key() == key
        True
    """

    def __init__(self, key: bytes | None = None) -> None:
        """
        Initialize the encryption manager.

        Args:
            key: Optional Fernet encryption key. If None, generates a new key.
        """
        self._key = key if key is not None else generate_key()
        self._fernet = Fernet(self._key)

    def encrypt(self, data: str) -> str:
        """
        Encrypt a string and return base64-encoded result.

        Args:
            data: The string to encrypt.

        Returns:
            str: Base64-encoded encrypted string, safe for storage in JSON.

        Example:
            >>> manager = EncryptionManager()
            >>> encrypted = manager.encrypt("secret")
            >>> isinstance(encrypted, str)
            True
        """
        encrypted_bytes = self._fernet.encrypt(data.encode("utf-8"))
        return base64.urlsafe_b64encode(encrypted_bytes).decode("ascii")

    def decrypt(self, data: str) -> str:
        """
        Decrypt a base64-encoded encrypted string.

        Args:
            data: The base64-encoded encrypted string (from encrypt()).

        Returns:
            str: The original decrypted string.

        Raises:
            cryptography.fernet.InvalidToken: If decryption fails.

        Example:
            >>> manager = EncryptionManager()
            >>> encrypted = manager.encrypt("secret")
            >>> manager.decrypt(encrypted)
            'secret'
        """
        encrypted_bytes = base64.urlsafe_b64decode(data.encode("ascii"))
        decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
        return decrypted_bytes.decode("utf-8")

    def get_key(self) -> bytes:
        """
        Get the encryption key.

        Returns:
            bytes: The Fernet encryption key. Store this securely!

        Warning:
            The encryption key must be stored securely. If lost, encrypted
            data cannot be recovered.

        Example:
            >>> manager = EncryptionManager()
            >>> key = manager.get_key()
            >>> len(key)
            44
        """
        return self._key

__init__

__init__(key: bytes | None = None) -> None

Initialize the encryption manager.

Parameters:

Name Type Description Default
key bytes | None

Optional Fernet encryption key. If None, generates a new key.

None
Source code in src/rotalabs_comply/audit/encryption.py
def __init__(self, key: bytes | None = None) -> None:
    """
    Initialize the encryption manager.

    Args:
        key: Optional Fernet encryption key. If None, generates a new key.
    """
    self._key = key if key is not None else generate_key()
    self._fernet = Fernet(self._key)

encrypt

encrypt(data: str) -> str

Encrypt a string and return base64-encoded result.

Parameters:

Name Type Description Default
data str

The string to encrypt.

required

Returns:

Name Type Description
str str

Base64-encoded encrypted string, safe for storage in JSON.

Example

manager = EncryptionManager() encrypted = manager.encrypt("secret") isinstance(encrypted, str) True

Source code in src/rotalabs_comply/audit/encryption.py
def encrypt(self, data: str) -> str:
    """
    Encrypt a string and return base64-encoded result.

    Args:
        data: The string to encrypt.

    Returns:
        str: Base64-encoded encrypted string, safe for storage in JSON.

    Example:
        >>> manager = EncryptionManager()
        >>> encrypted = manager.encrypt("secret")
        >>> isinstance(encrypted, str)
        True
    """
    encrypted_bytes = self._fernet.encrypt(data.encode("utf-8"))
    return base64.urlsafe_b64encode(encrypted_bytes).decode("ascii")

decrypt

decrypt(data: str) -> str

Decrypt a base64-encoded encrypted string.

Parameters:

Name Type Description Default
data str

The base64-encoded encrypted string (from encrypt()).

required

Returns:

Name Type Description
str str

The original decrypted string.

Raises:

Type Description
InvalidToken

If decryption fails.

Example

manager = EncryptionManager() encrypted = manager.encrypt("secret") manager.decrypt(encrypted) 'secret'

Source code in src/rotalabs_comply/audit/encryption.py
def decrypt(self, data: str) -> str:
    """
    Decrypt a base64-encoded encrypted string.

    Args:
        data: The base64-encoded encrypted string (from encrypt()).

    Returns:
        str: The original decrypted string.

    Raises:
        cryptography.fernet.InvalidToken: If decryption fails.

    Example:
        >>> manager = EncryptionManager()
        >>> encrypted = manager.encrypt("secret")
        >>> manager.decrypt(encrypted)
        'secret'
    """
    encrypted_bytes = base64.urlsafe_b64decode(data.encode("ascii"))
    decrypted_bytes = self._fernet.decrypt(encrypted_bytes)
    return decrypted_bytes.decode("utf-8")

get_key

get_key() -> bytes

Get the encryption key.

Returns:

Name Type Description
bytes bytes

The Fernet encryption key. Store this securely!

Warning

The encryption key must be stored securely. If lost, encrypted data cannot be recovered.

Example

manager = EncryptionManager() key = manager.get_key() len(key) 44

Source code in src/rotalabs_comply/audit/encryption.py
def get_key(self) -> bytes:
    """
    Get the encryption key.

    Returns:
        bytes: The Fernet encryption key. Store this securely!

    Warning:
        The encryption key must be stored securely. If lost, encrypted
        data cannot be recovered.

    Example:
        >>> manager = EncryptionManager()
        >>> key = manager.get_key()
        >>> len(key)
        44
    """
    return self._key

High-level encryption manager for string data.

Constructor

EncryptionManager(key: Optional[bytes] = None)

Parameters:

Parameter Type Default Description
key Optional[bytes] Auto-gen Fernet encryption key

Methods

encrypt

def encrypt(data: str) -> str

Encrypt a string and return base64-encoded result.

decrypt

def decrypt(data: str) -> str

Decrypt a base64-encoded encrypted string.

get_key

def get_key() -> bytes

Get the encryption key. Store this securely!

Example:

from rotalabs_comply import EncryptionManager

manager = EncryptionManager()
encrypted = manager.encrypt("sensitive data")
decrypted = manager.decrypt(encrypted)

# Save key securely
key = manager.get_key()

Helper Functions

generate_key

def generate_key() -> bytes

Generate a new Fernet encryption key.

Example:

from rotalabs_comply import generate_key

key = generate_key()
print(len(key))  # 44

encrypt

def encrypt(data: bytes, key: bytes) -> bytes

Encrypt raw bytes using Fernet symmetric encryption.

decrypt

def decrypt(data: bytes, key: bytes) -> bytes

Decrypt data that was encrypted with Fernet.

hash_content

def hash_content(content: str) -> str

Compute SHA-256 hash of string content.

Example:

from rotalabs_comply import hash_content

content_hash = hash_content("hello world")
# Returns: 'b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9'

Storage Backends

StorageBackend Protocol

StorageBackend

Protocol defining the interface for audit log storage backends.

All storage backends must implement these async methods to support writing, reading, listing, deleting, and counting audit entries.

Source code in src/rotalabs_comply/audit/storage.py
@runtime_checkable
class StorageBackend(Protocol):
    """
    Protocol defining the interface for audit log storage backends.

    All storage backends must implement these async methods to support
    writing, reading, listing, deleting, and counting audit entries.
    """

    @abstractmethod
    async def write(self, entry: AuditEntry) -> str:
        """
        Write an audit entry to storage.

        Args:
            entry: The audit entry to store.

        Returns:
            str: The entry ID (same as entry.id).
        """
        ...

    @abstractmethod
    async def read(self, entry_id: str) -> AuditEntry | None:
        """
        Read an audit entry by ID.

        Args:
            entry_id: The unique identifier of the entry.

        Returns:
            AuditEntry | None: The entry if found, None otherwise.
        """
        ...

    @abstractmethod
    async def list_entries(
        self, start: datetime, end: datetime
    ) -> List[AuditEntry]:
        """
        List all entries within a time range.

        Args:
            start: Start of the time range (inclusive).
            end: End of the time range (inclusive).

        Returns:
            List[AuditEntry]: Entries within the specified time range.
        """
        ...

    @abstractmethod
    async def delete(self, entry_id: str) -> bool:
        """
        Delete an entry by ID.

        Args:
            entry_id: The unique identifier of the entry to delete.

        Returns:
            bool: True if the entry was deleted, False if not found.
        """
        ...

    @abstractmethod
    async def count(self) -> int:
        """
        Count total number of entries in storage.

        Returns:
            int: Total number of stored entries.
        """
        ...

write abstractmethod async

write(entry: AuditEntry) -> str

Write an audit entry to storage.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to store.

required

Returns:

Name Type Description
str str

The entry ID (same as entry.id).

Source code in src/rotalabs_comply/audit/storage.py
@abstractmethod
async def write(self, entry: AuditEntry) -> str:
    """
    Write an audit entry to storage.

    Args:
        entry: The audit entry to store.

    Returns:
        str: The entry ID (same as entry.id).
    """
    ...

read abstractmethod async

read(entry_id: str) -> AuditEntry | None

Read an audit entry by ID.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry.

required

Returns:

Type Description
AuditEntry | None

AuditEntry | None: The entry if found, None otherwise.

Source code in src/rotalabs_comply/audit/storage.py
@abstractmethod
async def read(self, entry_id: str) -> AuditEntry | None:
    """
    Read an audit entry by ID.

    Args:
        entry_id: The unique identifier of the entry.

    Returns:
        AuditEntry | None: The entry if found, None otherwise.
    """
    ...

list_entries abstractmethod async

list_entries(
    start: datetime, end: datetime
) -> List[AuditEntry]

List all entries within a time range.

Parameters:

Name Type Description Default
start datetime

Start of the time range (inclusive).

required
end datetime

End of the time range (inclusive).

required

Returns:

Type Description
List[AuditEntry]

List[AuditEntry]: Entries within the specified time range.

Source code in src/rotalabs_comply/audit/storage.py
@abstractmethod
async def list_entries(
    self, start: datetime, end: datetime
) -> List[AuditEntry]:
    """
    List all entries within a time range.

    Args:
        start: Start of the time range (inclusive).
        end: End of the time range (inclusive).

    Returns:
        List[AuditEntry]: Entries within the specified time range.
    """
    ...

delete abstractmethod async

delete(entry_id: str) -> bool

Delete an entry by ID.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry to delete.

required

Returns:

Name Type Description
bool bool

True if the entry was deleted, False if not found.

Source code in src/rotalabs_comply/audit/storage.py
@abstractmethod
async def delete(self, entry_id: str) -> bool:
    """
    Delete an entry by ID.

    Args:
        entry_id: The unique identifier of the entry to delete.

    Returns:
        bool: True if the entry was deleted, False if not found.
    """
    ...

count abstractmethod async

count() -> int

Count total number of entries in storage.

Returns:

Name Type Description
int int

Total number of stored entries.

Source code in src/rotalabs_comply/audit/storage.py
@abstractmethod
async def count(self) -> int:
    """
    Count total number of entries in storage.

    Returns:
        int: Total number of stored entries.
    """
    ...

Protocol defining the interface for audit log storage backends.

Required Methods:

Method Signature Description
write async (entry: AuditEntry) -> str Write entry, return ID
read async (entry_id: str) -> Optional[AuditEntry] Read entry by ID
list_entries async (start: datetime, end: datetime) -> List[AuditEntry] List entries in range
delete async (entry_id: str) -> bool Delete entry, return success
count async () -> int Count total entries

FileStorage

FileStorage

File-based storage backend using JSONL format.

Stores audit entries as JSON Lines files with automatic rotation when files exceed the configured size limit.

Parameters:

Name Type Description Default
path str

Directory path for storing audit files.

required
rotation_size_mb int

Maximum file size in MB before rotation (default: 100).

100

Attributes:

Name Type Description
path

The storage directory path.

rotation_size_bytes

Maximum file size in bytes.

Example

storage = FileStorage("/var/log/audit") entry_id = await storage.write(entry) retrieved = await storage.read(entry_id)

Source code in src/rotalabs_comply/audit/storage.py
class FileStorage:
    """
    File-based storage backend using JSONL format.

    Stores audit entries as JSON Lines files with automatic rotation
    when files exceed the configured size limit.

    Args:
        path: Directory path for storing audit files.
        rotation_size_mb: Maximum file size in MB before rotation (default: 100).

    Attributes:
        path: The storage directory path.
        rotation_size_bytes: Maximum file size in bytes.

    Example:
        >>> storage = FileStorage("/var/log/audit")
        >>> entry_id = await storage.write(entry)
        >>> retrieved = await storage.read(entry_id)
    """

    def __init__(self, path: str, rotation_size_mb: int = 100) -> None:
        """
        Initialize file storage.

        Args:
            path: Directory path for storing audit files.
            rotation_size_mb: Maximum file size in MB before rotation.
        """
        self.path = Path(path)
        self.rotation_size_bytes = rotation_size_mb * 1024 * 1024
        self._entry_index: Dict[str, str] = {}  # entry_id -> filename

    def _get_current_filename(self) -> str:
        """Get the current audit file name based on date."""
        date_str = datetime.utcnow().strftime("%Y%m%d")
        return f"audit_{date_str}.jsonl"

    def _get_current_filepath(self) -> Path:
        """Get the full path to the current audit file."""
        return self.path / self._get_current_filename()

    async def _ensure_directory(self) -> None:
        """Ensure the storage directory exists."""
        if not self.path.exists():
            os.makedirs(self.path, exist_ok=True)

    async def _should_rotate(self, filepath: Path) -> bool:
        """Check if the file should be rotated based on size."""
        if not filepath.exists():
            return False
        try:
            stat = await aiofiles.os.stat(filepath)
            return stat.st_size >= self.rotation_size_bytes
        except FileNotFoundError:
            return False

    async def _rotate_file(self, filepath: Path) -> None:
        """Rotate the file by adding a sequence number."""
        if not filepath.exists():
            return

        base = filepath.stem
        suffix = filepath.suffix
        counter = 1

        while True:
            new_name = f"{base}_{counter:03d}{suffix}"
            new_path = self.path / new_name
            if not new_path.exists():
                os.rename(filepath, new_path)
                break
            counter += 1

    async def write(self, entry: AuditEntry) -> str:
        """
        Write an audit entry to a JSONL file.

        Auto-rotates the file if it exceeds the configured size limit.

        Args:
            entry: The audit entry to store.

        Returns:
            str: The entry ID.
        """
        await self._ensure_directory()
        filepath = self._get_current_filepath()

        if await self._should_rotate(filepath):
            await self._rotate_file(filepath)

        entry_json = json.dumps(entry.to_dict())

        async with aiofiles.open(filepath, "a", encoding="utf-8") as f:
            await f.write(entry_json + "\n")

        self._entry_index[entry.id] = str(filepath)
        return entry.id

    async def read(self, entry_id: str) -> AuditEntry | None:
        """
        Read an audit entry by ID.

        Searches through all JSONL files if the entry is not in the index.

        Args:
            entry_id: The unique identifier of the entry.

        Returns:
            AuditEntry | None: The entry if found, None otherwise.
        """
        await self._ensure_directory()

        # Check index first
        if entry_id in self._entry_index:
            filepath = Path(self._entry_index[entry_id])
            if filepath.exists():
                async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                    async for line in f:
                        line = line.strip()
                        if line:
                            data = json.loads(line)
                            if data.get("id") == entry_id:
                                return AuditEntry.from_dict(data)

        # Search all files
        for filepath in sorted(self.path.glob("audit_*.jsonl")):
            async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        if data.get("id") == entry_id:
                            self._entry_index[entry_id] = str(filepath)
                            return AuditEntry.from_dict(data)

        return None

    async def list_entries(
        self, start: datetime, end: datetime
    ) -> List[AuditEntry]:
        """
        List all entries within a time range.

        Args:
            start: Start of the time range (inclusive).
            end: End of the time range (inclusive).

        Returns:
            List[AuditEntry]: Entries within the specified time range.
        """
        await self._ensure_directory()
        entries: List[AuditEntry] = []

        for filepath in sorted(self.path.glob("audit_*.jsonl")):
            async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        timestamp = datetime.fromisoformat(data["timestamp"])
                        if start <= timestamp <= end:
                            entries.append(AuditEntry.from_dict(data))

        return entries

    async def delete(self, entry_id: str) -> bool:
        """
        Delete an entry by ID.

        Note: This rewrites the file without the deleted entry, which may be
        slow for large files. Consider using retention policies instead.

        Args:
            entry_id: The unique identifier of the entry to delete.

        Returns:
            bool: True if the entry was deleted, False if not found.
        """
        await self._ensure_directory()
        deleted = False

        for filepath in sorted(self.path.glob("audit_*.jsonl")):
            lines_to_keep: List[str] = []
            found_in_file = False

            async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        if data.get("id") == entry_id:
                            found_in_file = True
                            deleted = True
                        else:
                            lines_to_keep.append(line)

            if found_in_file:
                async with aiofiles.open(filepath, "w", encoding="utf-8") as f:
                    for line in lines_to_keep:
                        await f.write(line + "\n")

                if entry_id in self._entry_index:
                    del self._entry_index[entry_id]
                break

        return deleted

    async def count(self) -> int:
        """
        Count total number of entries in storage.

        Returns:
            int: Total number of stored entries.
        """
        await self._ensure_directory()
        total = 0

        for filepath in self.path.glob("audit_*.jsonl"):
            async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                async for line in f:
                    if line.strip():
                        total += 1

        return total

__init__

__init__(path: str, rotation_size_mb: int = 100) -> None

Initialize file storage.

Parameters:

Name Type Description Default
path str

Directory path for storing audit files.

required
rotation_size_mb int

Maximum file size in MB before rotation.

100
Source code in src/rotalabs_comply/audit/storage.py
def __init__(self, path: str, rotation_size_mb: int = 100) -> None:
    """
    Initialize file storage.

    Args:
        path: Directory path for storing audit files.
        rotation_size_mb: Maximum file size in MB before rotation.
    """
    self.path = Path(path)
    self.rotation_size_bytes = rotation_size_mb * 1024 * 1024
    self._entry_index: Dict[str, str] = {}  # entry_id -> filename

write async

write(entry: AuditEntry) -> str

Write an audit entry to a JSONL file.

Auto-rotates the file if it exceeds the configured size limit.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to store.

required

Returns:

Name Type Description
str str

The entry ID.

Source code in src/rotalabs_comply/audit/storage.py
async def write(self, entry: AuditEntry) -> str:
    """
    Write an audit entry to a JSONL file.

    Auto-rotates the file if it exceeds the configured size limit.

    Args:
        entry: The audit entry to store.

    Returns:
        str: The entry ID.
    """
    await self._ensure_directory()
    filepath = self._get_current_filepath()

    if await self._should_rotate(filepath):
        await self._rotate_file(filepath)

    entry_json = json.dumps(entry.to_dict())

    async with aiofiles.open(filepath, "a", encoding="utf-8") as f:
        await f.write(entry_json + "\n")

    self._entry_index[entry.id] = str(filepath)
    return entry.id

read async

read(entry_id: str) -> AuditEntry | None

Read an audit entry by ID.

Searches through all JSONL files if the entry is not in the index.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry.

required

Returns:

Type Description
AuditEntry | None

AuditEntry | None: The entry if found, None otherwise.

Source code in src/rotalabs_comply/audit/storage.py
async def read(self, entry_id: str) -> AuditEntry | None:
    """
    Read an audit entry by ID.

    Searches through all JSONL files if the entry is not in the index.

    Args:
        entry_id: The unique identifier of the entry.

    Returns:
        AuditEntry | None: The entry if found, None otherwise.
    """
    await self._ensure_directory()

    # Check index first
    if entry_id in self._entry_index:
        filepath = Path(self._entry_index[entry_id])
        if filepath.exists():
            async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        if data.get("id") == entry_id:
                            return AuditEntry.from_dict(data)

    # Search all files
    for filepath in sorted(self.path.glob("audit_*.jsonl")):
        async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
            async for line in f:
                line = line.strip()
                if line:
                    data = json.loads(line)
                    if data.get("id") == entry_id:
                        self._entry_index[entry_id] = str(filepath)
                        return AuditEntry.from_dict(data)

    return None

list_entries async

list_entries(
    start: datetime, end: datetime
) -> List[AuditEntry]

List all entries within a time range.

Parameters:

Name Type Description Default
start datetime

Start of the time range (inclusive).

required
end datetime

End of the time range (inclusive).

required

Returns:

Type Description
List[AuditEntry]

List[AuditEntry]: Entries within the specified time range.

Source code in src/rotalabs_comply/audit/storage.py
async def list_entries(
    self, start: datetime, end: datetime
) -> List[AuditEntry]:
    """
    List all entries within a time range.

    Args:
        start: Start of the time range (inclusive).
        end: End of the time range (inclusive).

    Returns:
        List[AuditEntry]: Entries within the specified time range.
    """
    await self._ensure_directory()
    entries: List[AuditEntry] = []

    for filepath in sorted(self.path.glob("audit_*.jsonl")):
        async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
            async for line in f:
                line = line.strip()
                if line:
                    data = json.loads(line)
                    timestamp = datetime.fromisoformat(data["timestamp"])
                    if start <= timestamp <= end:
                        entries.append(AuditEntry.from_dict(data))

    return entries

delete async

delete(entry_id: str) -> bool

Delete an entry by ID.

Note: This rewrites the file without the deleted entry, which may be slow for large files. Consider using retention policies instead.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry to delete.

required

Returns:

Name Type Description
bool bool

True if the entry was deleted, False if not found.

Source code in src/rotalabs_comply/audit/storage.py
async def delete(self, entry_id: str) -> bool:
    """
    Delete an entry by ID.

    Note: This rewrites the file without the deleted entry, which may be
    slow for large files. Consider using retention policies instead.

    Args:
        entry_id: The unique identifier of the entry to delete.

    Returns:
        bool: True if the entry was deleted, False if not found.
    """
    await self._ensure_directory()
    deleted = False

    for filepath in sorted(self.path.glob("audit_*.jsonl")):
        lines_to_keep: List[str] = []
        found_in_file = False

        async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
            async for line in f:
                line = line.strip()
                if line:
                    data = json.loads(line)
                    if data.get("id") == entry_id:
                        found_in_file = True
                        deleted = True
                    else:
                        lines_to_keep.append(line)

        if found_in_file:
            async with aiofiles.open(filepath, "w", encoding="utf-8") as f:
                for line in lines_to_keep:
                    await f.write(line + "\n")

            if entry_id in self._entry_index:
                del self._entry_index[entry_id]
            break

    return deleted

count async

count() -> int

Count total number of entries in storage.

Returns:

Name Type Description
int int

Total number of stored entries.

Source code in src/rotalabs_comply/audit/storage.py
async def count(self) -> int:
    """
    Count total number of entries in storage.

    Returns:
        int: Total number of stored entries.
    """
    await self._ensure_directory()
    total = 0

    for filepath in self.path.glob("audit_*.jsonl"):
        async with aiofiles.open(filepath, "r", encoding="utf-8") as f:
            async for line in f:
                if line.strip():
                    total += 1

    return total

File-based storage backend using JSONL format.

Constructor

FileStorage(path: str, rotation_size_mb: int = 100)

Parameters:

Parameter Type Default Description
path str Required Directory for audit files
rotation_size_mb int 100 Max file size before rotation

File Structure:

{path}/
├── audit_20260128.jsonl
├── audit_20260128_001.jsonl  # Rotated
├── audit_20260129.jsonl
└── ...

Example:

from rotalabs_comply.audit import FileStorage

storage = FileStorage("/var/log/audit", rotation_size_mb=50)
entry_id = await storage.write(entry)

MemoryStorage

MemoryStorage

In-memory storage backend for testing and development.

Stores entries in a dictionary. Data is lost when the process ends.

Parameters:

Name Type Description Default
max_entries int | None

Optional maximum number of entries to store. When exceeded, oldest entries are removed.

None
Example

storage = MemoryStorage(max_entries=1000) entry_id = await storage.write(entry) count = await storage.count()

Source code in src/rotalabs_comply/audit/storage.py
class MemoryStorage:
    """
    In-memory storage backend for testing and development.

    Stores entries in a dictionary. Data is lost when the process ends.

    Args:
        max_entries: Optional maximum number of entries to store.
            When exceeded, oldest entries are removed.

    Example:
        >>> storage = MemoryStorage(max_entries=1000)
        >>> entry_id = await storage.write(entry)
        >>> count = await storage.count()
    """

    def __init__(self, max_entries: int | None = None) -> None:
        """
        Initialize memory storage.

        Args:
            max_entries: Optional maximum number of entries to store.
        """
        self.max_entries = max_entries
        self._entries: Dict[str, AuditEntry] = {}
        self._insertion_order: List[str] = []

    async def write(self, entry: AuditEntry) -> str:
        """
        Write an audit entry to memory.

        If max_entries is set and exceeded, removes the oldest entry.

        Args:
            entry: The audit entry to store.

        Returns:
            str: The entry ID.
        """
        if (
            self.max_entries
            and len(self._entries) >= self.max_entries
            and entry.id not in self._entries
        ):
            # Remove oldest entry
            oldest_id = self._insertion_order.pop(0)
            del self._entries[oldest_id]

        self._entries[entry.id] = entry
        if entry.id not in self._insertion_order:
            self._insertion_order.append(entry.id)

        return entry.id

    async def read(self, entry_id: str) -> AuditEntry | None:
        """
        Read an audit entry by ID.

        Args:
            entry_id: The unique identifier of the entry.

        Returns:
            AuditEntry | None: The entry if found, None otherwise.
        """
        return self._entries.get(entry_id)

    async def list_entries(
        self, start: datetime, end: datetime
    ) -> List[AuditEntry]:
        """
        List all entries within a time range.

        Args:
            start: Start of the time range (inclusive).
            end: End of the time range (inclusive).

        Returns:
            List[AuditEntry]: Entries within the specified time range.
        """
        entries: List[AuditEntry] = []
        for entry in self._entries.values():
            timestamp = datetime.fromisoformat(entry.timestamp)
            if start <= timestamp <= end:
                entries.append(entry)
        return entries

    async def delete(self, entry_id: str) -> bool:
        """
        Delete an entry by ID.

        Args:
            entry_id: The unique identifier of the entry to delete.

        Returns:
            bool: True if the entry was deleted, False if not found.
        """
        if entry_id in self._entries:
            del self._entries[entry_id]
            self._insertion_order.remove(entry_id)
            return True
        return False

    async def count(self) -> int:
        """
        Count total number of entries in storage.

        Returns:
            int: Total number of stored entries.
        """
        return len(self._entries)

__init__

__init__(max_entries: int | None = None) -> None

Initialize memory storage.

Parameters:

Name Type Description Default
max_entries int | None

Optional maximum number of entries to store.

None
Source code in src/rotalabs_comply/audit/storage.py
def __init__(self, max_entries: int | None = None) -> None:
    """
    Initialize memory storage.

    Args:
        max_entries: Optional maximum number of entries to store.
    """
    self.max_entries = max_entries
    self._entries: Dict[str, AuditEntry] = {}
    self._insertion_order: List[str] = []

write async

write(entry: AuditEntry) -> str

Write an audit entry to memory.

If max_entries is set and exceeded, removes the oldest entry.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to store.

required

Returns:

Name Type Description
str str

The entry ID.

Source code in src/rotalabs_comply/audit/storage.py
async def write(self, entry: AuditEntry) -> str:
    """
    Write an audit entry to memory.

    If max_entries is set and exceeded, removes the oldest entry.

    Args:
        entry: The audit entry to store.

    Returns:
        str: The entry ID.
    """
    if (
        self.max_entries
        and len(self._entries) >= self.max_entries
        and entry.id not in self._entries
    ):
        # Remove oldest entry
        oldest_id = self._insertion_order.pop(0)
        del self._entries[oldest_id]

    self._entries[entry.id] = entry
    if entry.id not in self._insertion_order:
        self._insertion_order.append(entry.id)

    return entry.id

read async

read(entry_id: str) -> AuditEntry | None

Read an audit entry by ID.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry.

required

Returns:

Type Description
AuditEntry | None

AuditEntry | None: The entry if found, None otherwise.

Source code in src/rotalabs_comply/audit/storage.py
async def read(self, entry_id: str) -> AuditEntry | None:
    """
    Read an audit entry by ID.

    Args:
        entry_id: The unique identifier of the entry.

    Returns:
        AuditEntry | None: The entry if found, None otherwise.
    """
    return self._entries.get(entry_id)

list_entries async

list_entries(
    start: datetime, end: datetime
) -> List[AuditEntry]

List all entries within a time range.

Parameters:

Name Type Description Default
start datetime

Start of the time range (inclusive).

required
end datetime

End of the time range (inclusive).

required

Returns:

Type Description
List[AuditEntry]

List[AuditEntry]: Entries within the specified time range.

Source code in src/rotalabs_comply/audit/storage.py
async def list_entries(
    self, start: datetime, end: datetime
) -> List[AuditEntry]:
    """
    List all entries within a time range.

    Args:
        start: Start of the time range (inclusive).
        end: End of the time range (inclusive).

    Returns:
        List[AuditEntry]: Entries within the specified time range.
    """
    entries: List[AuditEntry] = []
    for entry in self._entries.values():
        timestamp = datetime.fromisoformat(entry.timestamp)
        if start <= timestamp <= end:
            entries.append(entry)
    return entries

delete async

delete(entry_id: str) -> bool

Delete an entry by ID.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry to delete.

required

Returns:

Name Type Description
bool bool

True if the entry was deleted, False if not found.

Source code in src/rotalabs_comply/audit/storage.py
async def delete(self, entry_id: str) -> bool:
    """
    Delete an entry by ID.

    Args:
        entry_id: The unique identifier of the entry to delete.

    Returns:
        bool: True if the entry was deleted, False if not found.
    """
    if entry_id in self._entries:
        del self._entries[entry_id]
        self._insertion_order.remove(entry_id)
        return True
    return False

count async

count() -> int

Count total number of entries in storage.

Returns:

Name Type Description
int int

Total number of stored entries.

Source code in src/rotalabs_comply/audit/storage.py
async def count(self) -> int:
    """
    Count total number of entries in storage.

    Returns:
        int: Total number of stored entries.
    """
    return len(self._entries)

In-memory storage backend for testing and development.

Constructor

MemoryStorage(max_entries: Optional[int] = None)

Parameters:

Parameter Type Default Description
max_entries Optional[int] None Max entries (LRU eviction)

Example:

from rotalabs_comply.audit import MemoryStorage

storage = MemoryStorage(max_entries=1000)
count = await storage.count()

Data Persistence

Data is lost when the process ends. Use only for testing.


S3Storage

S3Storage

AWS S3 storage backend for audit logs.

Stores each audit entry as a separate JSON file in S3, organized by date for easy querying and lifecycle management.

Requires boto3 to be installed (optional dependency).

Parameters:

Name Type Description Default
bucket str

S3 bucket name.

required
prefix str

Key prefix for audit files (default: "audit/").

'audit/'
region str | None

AWS region (optional, uses default if not specified).

None
File structure

{prefix}{YYYY-MM-DD}/{entry_id}.json

Example

storage = S3Storage("my-audit-bucket", prefix="logs/audit/") entry_id = await storage.write(entry)

Stored at: s3://my-audit-bucket/logs/audit/2024-01-15/abc123.json

Source code in src/rotalabs_comply/audit/storage.py
class S3Storage:
    """
    AWS S3 storage backend for audit logs.

    Stores each audit entry as a separate JSON file in S3, organized
    by date for easy querying and lifecycle management.

    Requires boto3 to be installed (optional dependency).

    Args:
        bucket: S3 bucket name.
        prefix: Key prefix for audit files (default: "audit/").
        region: AWS region (optional, uses default if not specified).

    File structure:
        {prefix}{YYYY-MM-DD}/{entry_id}.json

    Example:
        >>> storage = S3Storage("my-audit-bucket", prefix="logs/audit/")
        >>> entry_id = await storage.write(entry)
        # Stored at: s3://my-audit-bucket/logs/audit/2024-01-15/abc123.json
    """

    def __init__(
        self,
        bucket: str,
        prefix: str = "audit/",
        region: str | None = None,
    ) -> None:
        """
        Initialize S3 storage.

        Args:
            bucket: S3 bucket name.
            prefix: Key prefix for audit files.
            region: AWS region (optional).
        """
        self.bucket = bucket
        self.prefix = prefix.rstrip("/") + "/" if prefix else ""
        self.region = region
        self._client = None

    def _get_client(self):
        """Lazy-load boto3 client."""
        if self._client is None:
            try:
                import boto3
            except ImportError:
                raise ImportError(
                    "boto3 is required for S3Storage. "
                    "Install it with: pip install boto3"
                )

            if self.region:
                self._client = boto3.client("s3", region_name=self.region)
            else:
                self._client = boto3.client("s3")
        return self._client

    def _get_key(self, entry: AuditEntry) -> str:
        """Get the S3 key for an entry."""
        timestamp = datetime.fromisoformat(entry.timestamp)
        date_str = timestamp.strftime("%Y-%m-%d")
        return f"{self.prefix}{date_str}/{entry.id}.json"

    def _get_key_from_id(self, entry_id: str, date_str: str) -> str:
        """Get the S3 key from entry ID and date."""
        return f"{self.prefix}{date_str}/{entry_id}.json"

    async def write(self, entry: AuditEntry) -> str:
        """
        Write an audit entry to S3.

        Args:
            entry: The audit entry to store.

        Returns:
            str: The entry ID.
        """
        client = self._get_client()
        key = self._get_key(entry)
        body = json.dumps(entry.to_dict())

        client.put_object(
            Bucket=self.bucket,
            Key=key,
            Body=body.encode("utf-8"),
            ContentType="application/json",
        )

        return entry.id

    async def read(self, entry_id: str) -> AuditEntry | None:
        """
        Read an audit entry by ID.

        Note: This searches through date prefixes which may be slow.
        Consider maintaining an index for production use.

        Args:
            entry_id: The unique identifier of the entry.

        Returns:
            AuditEntry | None: The entry if found, None otherwise.
        """
        client = self._get_client()

        # List all date prefixes
        paginator = client.get_paginator("list_objects_v2")

        for page in paginator.paginate(
            Bucket=self.bucket, Prefix=self.prefix, Delimiter="/"
        ):
            for prefix_info in page.get("CommonPrefixes", []):
                date_prefix = prefix_info["Prefix"]
                key = f"{date_prefix}{entry_id}.json"

                try:
                    response = client.get_object(Bucket=self.bucket, Key=key)
                    body = response["Body"].read().decode("utf-8")
                    data = json.loads(body)
                    return AuditEntry.from_dict(data)
                except client.exceptions.NoSuchKey:
                    continue

        return None

    async def list_entries(
        self, start: datetime, end: datetime
    ) -> List[AuditEntry]:
        """
        List all entries within a time range.

        Args:
            start: Start of the time range (inclusive).
            end: End of the time range (inclusive).

        Returns:
            List[AuditEntry]: Entries within the specified time range.
        """
        client = self._get_client()
        entries: List[AuditEntry] = []

        # Generate date range
        current_date = start.date()
        end_date = end.date()

        while current_date <= end_date:
            date_str = current_date.strftime("%Y-%m-%d")
            date_prefix = f"{self.prefix}{date_str}/"

            paginator = client.get_paginator("list_objects_v2")

            for page in paginator.paginate(
                Bucket=self.bucket, Prefix=date_prefix
            ):
                for obj in page.get("Contents", []):
                    response = client.get_object(
                        Bucket=self.bucket, Key=obj["Key"]
                    )
                    body = response["Body"].read().decode("utf-8")
                    data = json.loads(body)
                    entry = AuditEntry.from_dict(data)

                    timestamp = datetime.fromisoformat(entry.timestamp)
                    if start <= timestamp <= end:
                        entries.append(entry)

            current_date = current_date.replace(
                day=current_date.day + 1
            ) if current_date.day < 28 else (
                current_date.replace(month=current_date.month + 1, day=1)
                if current_date.month < 12
                else current_date.replace(year=current_date.year + 1, month=1, day=1)
            )

        return entries

    async def delete(self, entry_id: str) -> bool:
        """
        Delete an entry by ID.

        Args:
            entry_id: The unique identifier of the entry to delete.

        Returns:
            bool: True if the entry was deleted, False if not found.
        """
        client = self._get_client()

        # Find the entry first
        paginator = client.get_paginator("list_objects_v2")

        for page in paginator.paginate(
            Bucket=self.bucket, Prefix=self.prefix, Delimiter="/"
        ):
            for prefix_info in page.get("CommonPrefixes", []):
                date_prefix = prefix_info["Prefix"]
                key = f"{date_prefix}{entry_id}.json"

                try:
                    client.head_object(Bucket=self.bucket, Key=key)
                    client.delete_object(Bucket=self.bucket, Key=key)
                    return True
                except client.exceptions.ClientError:
                    continue

        return False

    async def count(self) -> int:
        """
        Count total number of entries in storage.

        Returns:
            int: Total number of stored entries.
        """
        client = self._get_client()
        total = 0

        paginator = client.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
            for obj in page.get("Contents", []):
                if obj["Key"].endswith(".json"):
                    total += 1

        return total

__init__

__init__(
    bucket: str,
    prefix: str = "audit/",
    region: str | None = None,
) -> None

Initialize S3 storage.

Parameters:

Name Type Description Default
bucket str

S3 bucket name.

required
prefix str

Key prefix for audit files.

'audit/'
region str | None

AWS region (optional).

None
Source code in src/rotalabs_comply/audit/storage.py
def __init__(
    self,
    bucket: str,
    prefix: str = "audit/",
    region: str | None = None,
) -> None:
    """
    Initialize S3 storage.

    Args:
        bucket: S3 bucket name.
        prefix: Key prefix for audit files.
        region: AWS region (optional).
    """
    self.bucket = bucket
    self.prefix = prefix.rstrip("/") + "/" if prefix else ""
    self.region = region
    self._client = None

write async

write(entry: AuditEntry) -> str

Write an audit entry to S3.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to store.

required

Returns:

Name Type Description
str str

The entry ID.

Source code in src/rotalabs_comply/audit/storage.py
async def write(self, entry: AuditEntry) -> str:
    """
    Write an audit entry to S3.

    Args:
        entry: The audit entry to store.

    Returns:
        str: The entry ID.
    """
    client = self._get_client()
    key = self._get_key(entry)
    body = json.dumps(entry.to_dict())

    client.put_object(
        Bucket=self.bucket,
        Key=key,
        Body=body.encode("utf-8"),
        ContentType="application/json",
    )

    return entry.id

read async

read(entry_id: str) -> AuditEntry | None

Read an audit entry by ID.

Note: This searches through date prefixes which may be slow. Consider maintaining an index for production use.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry.

required

Returns:

Type Description
AuditEntry | None

AuditEntry | None: The entry if found, None otherwise.

Source code in src/rotalabs_comply/audit/storage.py
async def read(self, entry_id: str) -> AuditEntry | None:
    """
    Read an audit entry by ID.

    Note: This searches through date prefixes which may be slow.
    Consider maintaining an index for production use.

    Args:
        entry_id: The unique identifier of the entry.

    Returns:
        AuditEntry | None: The entry if found, None otherwise.
    """
    client = self._get_client()

    # List all date prefixes
    paginator = client.get_paginator("list_objects_v2")

    for page in paginator.paginate(
        Bucket=self.bucket, Prefix=self.prefix, Delimiter="/"
    ):
        for prefix_info in page.get("CommonPrefixes", []):
            date_prefix = prefix_info["Prefix"]
            key = f"{date_prefix}{entry_id}.json"

            try:
                response = client.get_object(Bucket=self.bucket, Key=key)
                body = response["Body"].read().decode("utf-8")
                data = json.loads(body)
                return AuditEntry.from_dict(data)
            except client.exceptions.NoSuchKey:
                continue

    return None

list_entries async

list_entries(
    start: datetime, end: datetime
) -> List[AuditEntry]

List all entries within a time range.

Parameters:

Name Type Description Default
start datetime

Start of the time range (inclusive).

required
end datetime

End of the time range (inclusive).

required

Returns:

Type Description
List[AuditEntry]

List[AuditEntry]: Entries within the specified time range.

Source code in src/rotalabs_comply/audit/storage.py
async def list_entries(
    self, start: datetime, end: datetime
) -> List[AuditEntry]:
    """
    List all entries within a time range.

    Args:
        start: Start of the time range (inclusive).
        end: End of the time range (inclusive).

    Returns:
        List[AuditEntry]: Entries within the specified time range.
    """
    client = self._get_client()
    entries: List[AuditEntry] = []

    # Generate date range
    current_date = start.date()
    end_date = end.date()

    while current_date <= end_date:
        date_str = current_date.strftime("%Y-%m-%d")
        date_prefix = f"{self.prefix}{date_str}/"

        paginator = client.get_paginator("list_objects_v2")

        for page in paginator.paginate(
            Bucket=self.bucket, Prefix=date_prefix
        ):
            for obj in page.get("Contents", []):
                response = client.get_object(
                    Bucket=self.bucket, Key=obj["Key"]
                )
                body = response["Body"].read().decode("utf-8")
                data = json.loads(body)
                entry = AuditEntry.from_dict(data)

                timestamp = datetime.fromisoformat(entry.timestamp)
                if start <= timestamp <= end:
                    entries.append(entry)

        current_date = current_date.replace(
            day=current_date.day + 1
        ) if current_date.day < 28 else (
            current_date.replace(month=current_date.month + 1, day=1)
            if current_date.month < 12
            else current_date.replace(year=current_date.year + 1, month=1, day=1)
        )

    return entries

delete async

delete(entry_id: str) -> bool

Delete an entry by ID.

Parameters:

Name Type Description Default
entry_id str

The unique identifier of the entry to delete.

required

Returns:

Name Type Description
bool bool

True if the entry was deleted, False if not found.

Source code in src/rotalabs_comply/audit/storage.py
async def delete(self, entry_id: str) -> bool:
    """
    Delete an entry by ID.

    Args:
        entry_id: The unique identifier of the entry to delete.

    Returns:
        bool: True if the entry was deleted, False if not found.
    """
    client = self._get_client()

    # Find the entry first
    paginator = client.get_paginator("list_objects_v2")

    for page in paginator.paginate(
        Bucket=self.bucket, Prefix=self.prefix, Delimiter="/"
    ):
        for prefix_info in page.get("CommonPrefixes", []):
            date_prefix = prefix_info["Prefix"]
            key = f"{date_prefix}{entry_id}.json"

            try:
                client.head_object(Bucket=self.bucket, Key=key)
                client.delete_object(Bucket=self.bucket, Key=key)
                return True
            except client.exceptions.ClientError:
                continue

    return False

count async

count() -> int

Count total number of entries in storage.

Returns:

Name Type Description
int int

Total number of stored entries.

Source code in src/rotalabs_comply/audit/storage.py
async def count(self) -> int:
    """
    Count total number of entries in storage.

    Returns:
        int: Total number of stored entries.
    """
    client = self._get_client()
    total = 0

    paginator = client.get_paginator("list_objects_v2")

    for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
        for obj in page.get("Contents", []):
            if obj["Key"].endswith(".json"):
                total += 1

    return total

AWS S3 storage backend for audit logs.

Constructor

S3Storage(
    bucket: str,
    prefix: str = "audit/",
    region: Optional[str] = None,
)

Parameters:

Parameter Type Default Description
bucket str Required S3 bucket name
prefix str "audit/" Key prefix for files
region Optional[str] None AWS region

Key Structure:

s3://{bucket}/{prefix}{YYYY-MM-DD}/{entry_id}.json

Example:

from rotalabs_comply.audit import S3Storage

storage = S3Storage(
    bucket="my-audit-bucket",
    prefix="prod/audit/",
    region="us-west-2",
)

Dependency

Requires boto3. Install with pip install rotalabs-comply[s3].


AuditEntry (Storage)

AuditEntry dataclass

Represents a single audit log entry.

Captures all relevant information about an AI interaction including inputs, outputs, safety evaluations, and performance metrics.

Attributes:

Name Type Description
id str

Unique identifier for this entry.

timestamp str

When the interaction occurred (ISO format).

input_hash str

SHA-256 hash of the input content.

output_hash str

SHA-256 hash of the output content.

input_content str | None

Actual input content (if store_content=True, may be encrypted).

output_content str | None

Actual output content (if store_content=True, may be encrypted).

provider str | None

The AI provider (e.g., "openai", "anthropic").

model str | None

The model identifier (e.g., "gpt-4", "claude-3-opus").

conversation_id str | None

Optional ID linking related interactions.

safety_passed bool

Whether the interaction passed all safety checks.

detectors_triggered List[str]

List of safety detector names that triggered.

block_reason str | None

Reason for blocking, if the request was blocked.

alerts List[str]

List of alert messages generated.

latency_ms float

Time taken to process the request in milliseconds.

input_tokens int | None

Number of tokens in the input.

output_tokens int | None

Number of tokens in the output.

metadata Dict[str, Any]

Additional custom metadata.

Source code in src/rotalabs_comply/audit/storage.py
@dataclass
class AuditEntry:
    """
    Represents a single audit log entry.

    Captures all relevant information about an AI interaction including
    inputs, outputs, safety evaluations, and performance metrics.

    Attributes:
        id: Unique identifier for this entry.
        timestamp: When the interaction occurred (ISO format).
        input_hash: SHA-256 hash of the input content.
        output_hash: SHA-256 hash of the output content.
        input_content: Actual input content (if store_content=True, may be encrypted).
        output_content: Actual output content (if store_content=True, may be encrypted).
        provider: The AI provider (e.g., "openai", "anthropic").
        model: The model identifier (e.g., "gpt-4", "claude-3-opus").
        conversation_id: Optional ID linking related interactions.
        safety_passed: Whether the interaction passed all safety checks.
        detectors_triggered: List of safety detector names that triggered.
        block_reason: Reason for blocking, if the request was blocked.
        alerts: List of alert messages generated.
        latency_ms: Time taken to process the request in milliseconds.
        input_tokens: Number of tokens in the input.
        output_tokens: Number of tokens in the output.
        metadata: Additional custom metadata.
    """

    id: str
    timestamp: str
    input_hash: str
    output_hash: str
    input_content: str | None = None
    output_content: str | None = None
    provider: str | None = None
    model: str | None = None
    conversation_id: str | None = None
    safety_passed: bool = True
    detectors_triggered: List[str] = field(default_factory=list)
    block_reason: str | None = None
    alerts: List[str] = field(default_factory=list)
    latency_ms: float = 0.0
    input_tokens: int | None = None
    output_tokens: int | None = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        """Convert entry to dictionary for serialization."""
        return asdict(self)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "AuditEntry":
        """Create entry from dictionary."""
        return cls(**data)

to_dict

to_dict() -> Dict[str, Any]

Convert entry to dictionary for serialization.

Source code in src/rotalabs_comply/audit/storage.py
def to_dict(self) -> Dict[str, Any]:
    """Convert entry to dictionary for serialization."""
    return asdict(self)

from_dict classmethod

from_dict(data: Dict[str, Any]) -> 'AuditEntry'

Create entry from dictionary.

Source code in src/rotalabs_comply/audit/storage.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AuditEntry":
    """Create entry from dictionary."""
    return cls(**data)

Dataclass representing a single audit log entry in storage.

Attributes:

Attribute Type Description
id str Unique identifier
timestamp str ISO format timestamp
input_hash str SHA-256 of input
output_hash str SHA-256 of output
input_content Optional[str] Input content (if stored)
output_content Optional[str] Output content (if stored)
provider Optional[str] AI provider
model Optional[str] Model identifier
conversation_id Optional[str] Conversation ID
safety_passed bool Safety check result
detectors_triggered List[str] Triggered detectors
block_reason Optional[str] Block reason
alerts List[str] Alert messages
latency_ms float Response latency
input_tokens Optional[int] Input token count
output_tokens Optional[int] Output token count
metadata Dict[str, Any] Custom metadata

Methods:

Method Description
to_dict() Convert to dictionary
from_dict(data) Create from dictionary

Helper Functions

create_entry_id

def create_entry_id() -> str

Generate a unique entry ID (UUID v4).

Example:

from rotalabs_comply.audit.storage import create_entry_id

entry_id = create_entry_id()
# Returns: "550e8400-e29b-41d4-a716-446655440000"