Skip to content

Probes

Linear probes and steering vectors for activation analysis.

LinearProbe

Linear probe for binary classification on activation patterns.

Uses logistic regression to learn a linear decision boundary in activation space for detecting sandbagging behavior.

Example

probe = LinearProbe() probe.fit(X_train, y_train) predictions = probe.predict(X_test) accuracy = probe.score(X_test, y_test)

Source code in src/rotalabs_probe/probing/probes.py
class LinearProbe:
    """Linear probe for binary classification on activation patterns.

    Uses logistic regression to learn a linear decision boundary
    in activation space for detecting sandbagging behavior.

    Example:
        >>> probe = LinearProbe()
        >>> probe.fit(X_train, y_train)
        >>> predictions = probe.predict(X_test)
        >>> accuracy = probe.score(X_test, y_test)
    """

    def __init__(
        self,
        C: float = 1.0,
        max_iter: int = 1000,
        random_state: int = 42,
    ):
        """Initialize linear probe.

        Args:
            C: Inverse regularization strength
            max_iter: Maximum iterations for optimization
            random_state: Random seed for reproducibility
        """
        if not SKLEARN_AVAILABLE:
            raise ImportError("sklearn required for LinearProbe. Install: pip install scikit-learn")

        self.C = C
        self.max_iter = max_iter
        self.random_state = random_state
        self._model: Optional[LogisticRegression] = None
        self._cv_scores: Optional[np.ndarray] = None

    def fit(
        self,
        X: np.ndarray,
        y: np.ndarray,
        cv_folds: int = 5,
    ) -> "LinearProbe":
        """Fit the probe to training data.

        Args:
            X: Activation vectors (n_samples, hidden_dim)
            y: Binary labels (0=genuine, 1=sandbagging)
            cv_folds: Number of cross-validation folds

        Returns:
            self
        """
        self._model = LogisticRegression(
            penalty="l2",
            C=self.C,
            solver="lbfgs",
            max_iter=self.max_iter,
            random_state=self.random_state,
        )

        # Cross-validation for accuracy estimate
        self._cv_scores = cross_val_score(
            self._model, X, y, cv=cv_folds, scoring="accuracy"
        )

        # Fit on full data
        self._model.fit(X, y)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Predict labels for new activations."""
        if self._model is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        """Get probability estimates for each class."""
        if self._model is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._model.predict_proba(X)

    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        """Compute accuracy on test data."""
        if self._model is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._model.score(X, y)

    @property
    def cv_accuracy(self) -> float:
        """Mean cross-validation accuracy."""
        if self._cv_scores is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._cv_scores.mean()

    @property
    def cv_std(self) -> float:
        """Standard deviation of cross-validation accuracy."""
        if self._cv_scores is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._cv_scores.std()

    @property
    def coef(self) -> np.ndarray:
        """Coefficients of the linear classifier (the probe direction)."""
        if self._model is None:
            raise RuntimeError("Probe not fitted. Call fit() first.")
        return self._model.coef_[0]

    def save(self, path: Path) -> None:
        """Save probe to disk."""
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, "wb") as f:
            pickle.dump({
                "model": self._model,
                "cv_scores": self._cv_scores,
                "C": self.C,
                "max_iter": self.max_iter,
                "random_state": self.random_state,
            }, f)

    @classmethod
    def load(cls, path: Path) -> "LinearProbe":
        """Load probe from disk."""
        with open(path, "rb") as f:
            data = pickle.load(f)

        probe = cls(
            C=data["C"],
            max_iter=data["max_iter"],
            random_state=data["random_state"],
        )
        probe._model = data["model"]
        probe._cv_scores = data["cv_scores"]
        return probe

cv_accuracy: float property

Mean cross-validation accuracy.

cv_std: float property

Standard deviation of cross-validation accuracy.

coef: np.ndarray property

Coefficients of the linear classifier (the probe direction).

__init__(C: float = 1.0, max_iter: int = 1000, random_state: int = 42)

Initialize linear probe.

Parameters:

Name Type Description Default
C float

Inverse regularization strength

1.0
max_iter int

Maximum iterations for optimization

1000
random_state int

Random seed for reproducibility

42
Source code in src/rotalabs_probe/probing/probes.py
def __init__(
    self,
    C: float = 1.0,
    max_iter: int = 1000,
    random_state: int = 42,
):
    """Initialize linear probe.

    Args:
        C: Inverse regularization strength
        max_iter: Maximum iterations for optimization
        random_state: Random seed for reproducibility
    """
    if not SKLEARN_AVAILABLE:
        raise ImportError("sklearn required for LinearProbe. Install: pip install scikit-learn")

    self.C = C
    self.max_iter = max_iter
    self.random_state = random_state
    self._model: Optional[LogisticRegression] = None
    self._cv_scores: Optional[np.ndarray] = None

fit(X: np.ndarray, y: np.ndarray, cv_folds: int = 5) -> LinearProbe

Fit the probe to training data.

Parameters:

Name Type Description Default
X ndarray

Activation vectors (n_samples, hidden_dim)

required
y ndarray

Binary labels (0=genuine, 1=sandbagging)

required
cv_folds int

Number of cross-validation folds

5

Returns:

Type Description
LinearProbe

self

Source code in src/rotalabs_probe/probing/probes.py
def fit(
    self,
    X: np.ndarray,
    y: np.ndarray,
    cv_folds: int = 5,
) -> "LinearProbe":
    """Fit the probe to training data.

    Args:
        X: Activation vectors (n_samples, hidden_dim)
        y: Binary labels (0=genuine, 1=sandbagging)
        cv_folds: Number of cross-validation folds

    Returns:
        self
    """
    self._model = LogisticRegression(
        penalty="l2",
        C=self.C,
        solver="lbfgs",
        max_iter=self.max_iter,
        random_state=self.random_state,
    )

    # Cross-validation for accuracy estimate
    self._cv_scores = cross_val_score(
        self._model, X, y, cv=cv_folds, scoring="accuracy"
    )

    # Fit on full data
    self._model.fit(X, y)

    return self

predict(X: np.ndarray) -> np.ndarray

Predict labels for new activations.

Source code in src/rotalabs_probe/probing/probes.py
def predict(self, X: np.ndarray) -> np.ndarray:
    """Predict labels for new activations."""
    if self._model is None:
        raise RuntimeError("Probe not fitted. Call fit() first.")
    return self._model.predict(X)

predict_proba(X: np.ndarray) -> np.ndarray

Get probability estimates for each class.

Source code in src/rotalabs_probe/probing/probes.py
def predict_proba(self, X: np.ndarray) -> np.ndarray:
    """Get probability estimates for each class."""
    if self._model is None:
        raise RuntimeError("Probe not fitted. Call fit() first.")
    return self._model.predict_proba(X)

score(X: np.ndarray, y: np.ndarray) -> float

Compute accuracy on test data.

Source code in src/rotalabs_probe/probing/probes.py
def score(self, X: np.ndarray, y: np.ndarray) -> float:
    """Compute accuracy on test data."""
    if self._model is None:
        raise RuntimeError("Probe not fitted. Call fit() first.")
    return self._model.score(X, y)

save(path: Path) -> None

Save probe to disk.

Source code in src/rotalabs_probe/probing/probes.py
def save(self, path: Path) -> None:
    """Save probe to disk."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "wb") as f:
        pickle.dump({
            "model": self._model,
            "cv_scores": self._cv_scores,
            "C": self.C,
            "max_iter": self.max_iter,
            "random_state": self.random_state,
        }, f)

load(path: Path) -> LinearProbe classmethod

Load probe from disk.

Source code in src/rotalabs_probe/probing/probes.py
@classmethod
def load(cls, path: Path) -> "LinearProbe":
    """Load probe from disk."""
    with open(path, "rb") as f:
        data = pickle.load(f)

    probe = cls(
        C=data["C"],
        max_iter=data["max_iter"],
        random_state=data["random_state"],
    )
    probe._model = data["model"]
    probe._cv_scores = data["cv_scores"]
    return probe

SteeringVector

A vector in activation space representing a behavioral direction.

Created by computing mean(positive_activations) - mean(negative_activations) using Contrastive Activation Addition (CAA).

Attributes:

Name Type Description
behavior str

Name of the behavior (e.g., "sandbagging")

layer_index int

Which layer this vector was extracted from

vector Tensor

The actual steering vector tensor

model_name str

Model used for extraction

extraction_method str

Method used (typically "caa")

metadata Dict[str, Any]

Additional extraction details

Source code in src/rotalabs_probe/probing/vectors.py
@dataclass
class SteeringVector:
    """A vector in activation space representing a behavioral direction.

    Created by computing mean(positive_activations) - mean(negative_activations)
    using Contrastive Activation Addition (CAA).

    Attributes:
        behavior: Name of the behavior (e.g., "sandbagging")
        layer_index: Which layer this vector was extracted from
        vector: The actual steering vector tensor
        model_name: Model used for extraction
        extraction_method: Method used (typically "caa")
        metadata: Additional extraction details
    """

    behavior: str
    layer_index: int
    vector: torch.Tensor
    model_name: str = "unknown"
    extraction_method: str = "caa"
    metadata: Dict[str, Any] = field(default_factory=dict)

    @property
    def norm(self) -> float:
        """L2 norm of the steering vector."""
        return self.vector.norm().item()

    @property
    def dim(self) -> int:
        """Dimensionality of the vector."""
        return self.vector.shape[-1]

    def to(self, device: str) -> "SteeringVector":
        """Move vector to specified device."""
        return SteeringVector(
            behavior=self.behavior,
            layer_index=self.layer_index,
            vector=self.vector.to(device),
            model_name=self.model_name,
            extraction_method=self.extraction_method,
            metadata=self.metadata,
        )

    def normalize(self) -> "SteeringVector":
        """Return unit-normalized version of this vector."""
        return SteeringVector(
            behavior=self.behavior,
            layer_index=self.layer_index,
            vector=self.vector / self.norm,
            model_name=self.model_name,
            extraction_method=self.extraction_method,
            metadata={**self.metadata, "normalized": True},
        )

    def save(self, path: Path) -> None:
        """Save vector to disk.

        Creates:
            - {path}.pt: The vector tensor
            - {path}_meta.json: Metadata
        """
        import json

        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        # Save tensor
        torch.save(self.vector, f"{path}.pt")

        # Save metadata
        meta = {
            "behavior": self.behavior,
            "layer_index": self.layer_index,
            "model_name": self.model_name,
            "extraction_method": self.extraction_method,
            "norm": self.norm,
            "dim": self.dim,
            **self.metadata,
        }
        with open(f"{path}_meta.json", "w") as f:
            json.dump(meta, f, indent=2)

    @classmethod
    def load(cls, path: Path) -> "SteeringVector":
        """Load vector from disk."""
        import json

        path = Path(path)

        # Load tensor
        vector = torch.load(f"{path}.pt", weights_only=True)

        # Load metadata
        with open(f"{path}_meta.json") as f:
            meta = json.load(f)

        return cls(
            behavior=meta["behavior"],
            layer_index=meta["layer_index"],
            vector=vector,
            model_name=meta.get("model_name", "unknown"),
            extraction_method=meta.get("extraction_method", "caa"),
            metadata={k: v for k, v in meta.items()
                      if k not in ["behavior", "layer_index", "model_name",
                                   "extraction_method", "norm", "dim"]},
        )

    def cosine_similarity(self, other: "SteeringVector") -> float:
        """Compute cosine similarity with another vector."""
        return torch.nn.functional.cosine_similarity(
            self.vector.unsqueeze(0),
            other.vector.unsqueeze(0),
        ).item()

    def __repr__(self) -> str:
        return (
            f"SteeringVector(behavior='{self.behavior}', "
            f"layer={self.layer_index}, dim={self.dim}, norm={self.norm:.4f})"
        )

norm: float property

L2 norm of the steering vector.

dim: int property

Dimensionality of the vector.

to(device: str) -> SteeringVector

Move vector to specified device.

Source code in src/rotalabs_probe/probing/vectors.py
def to(self, device: str) -> "SteeringVector":
    """Move vector to specified device."""
    return SteeringVector(
        behavior=self.behavior,
        layer_index=self.layer_index,
        vector=self.vector.to(device),
        model_name=self.model_name,
        extraction_method=self.extraction_method,
        metadata=self.metadata,
    )

normalize() -> SteeringVector

Return unit-normalized version of this vector.

Source code in src/rotalabs_probe/probing/vectors.py
def normalize(self) -> "SteeringVector":
    """Return unit-normalized version of this vector."""
    return SteeringVector(
        behavior=self.behavior,
        layer_index=self.layer_index,
        vector=self.vector / self.norm,
        model_name=self.model_name,
        extraction_method=self.extraction_method,
        metadata={**self.metadata, "normalized": True},
    )

save(path: Path) -> None

Save vector to disk.

Creates
  • {path}.pt: The vector tensor
  • {path}_meta.json: Metadata
Source code in src/rotalabs_probe/probing/vectors.py
def save(self, path: Path) -> None:
    """Save vector to disk.

    Creates:
        - {path}.pt: The vector tensor
        - {path}_meta.json: Metadata
    """
    import json

    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    # Save tensor
    torch.save(self.vector, f"{path}.pt")

    # Save metadata
    meta = {
        "behavior": self.behavior,
        "layer_index": self.layer_index,
        "model_name": self.model_name,
        "extraction_method": self.extraction_method,
        "norm": self.norm,
        "dim": self.dim,
        **self.metadata,
    }
    with open(f"{path}_meta.json", "w") as f:
        json.dump(meta, f, indent=2)

load(path: Path) -> SteeringVector classmethod

Load vector from disk.

Source code in src/rotalabs_probe/probing/vectors.py
@classmethod
def load(cls, path: Path) -> "SteeringVector":
    """Load vector from disk."""
    import json

    path = Path(path)

    # Load tensor
    vector = torch.load(f"{path}.pt", weights_only=True)

    # Load metadata
    with open(f"{path}_meta.json") as f:
        meta = json.load(f)

    return cls(
        behavior=meta["behavior"],
        layer_index=meta["layer_index"],
        vector=vector,
        model_name=meta.get("model_name", "unknown"),
        extraction_method=meta.get("extraction_method", "caa"),
        metadata={k: v for k, v in meta.items()
                  if k not in ["behavior", "layer_index", "model_name",
                               "extraction_method", "norm", "dim"]},
    )

cosine_similarity(other: SteeringVector) -> float

Compute cosine similarity with another vector.

Source code in src/rotalabs_probe/probing/vectors.py
def cosine_similarity(self, other: "SteeringVector") -> float:
    """Compute cosine similarity with another vector."""
    return torch.nn.functional.cosine_similarity(
        self.vector.unsqueeze(0),
        other.vector.unsqueeze(0),
    ).item()

extract_caa_vector

Extract steering vector using Contrastive Activation Addition.

The core idea: compute mean(positive_acts) - mean(negative_acts) to find the direction in activation space that corresponds to the target behavior.

Parameters:

Name Type Description Default
model

HuggingFace model

required
tokenizer

Corresponding tokenizer

required
contrast_pairs List[Dict[str, str]]

List of dicts with "positive" and "negative" keys

required
layer_idx int

Which layer to extract from

required
token_position Literal['last', 'first', 'mean']

Which token position to use

'last'
behavior str

Name of the behavior being extracted

'sandbagging'
show_progress bool

Show progress bar

True

Returns:

Type Description
SteeringVector

SteeringVector for the extracted direction

Source code in src/rotalabs_probe/probing/extraction.py
def extract_caa_vector(
    model,
    tokenizer,
    contrast_pairs: List[Dict[str, str]],
    layer_idx: int,
    token_position: Literal["last", "first", "mean"] = "last",
    behavior: str = "sandbagging",
    show_progress: bool = True,
) -> SteeringVector:
    """Extract steering vector using Contrastive Activation Addition.

    The core idea: compute mean(positive_acts) - mean(negative_acts)
    to find the direction in activation space that corresponds to
    the target behavior.

    Args:
        model: HuggingFace model
        tokenizer: Corresponding tokenizer
        contrast_pairs: List of dicts with "positive" and "negative" keys
        layer_idx: Which layer to extract from
        token_position: Which token position to use
        behavior: Name of the behavior being extracted
        show_progress: Show progress bar

    Returns:
        SteeringVector for the extracted direction
    """
    device = next(model.parameters()).device
    model.eval()

    positive_activations = []
    negative_activations = []

    iterator = tqdm(contrast_pairs, desc=f"Layer {layer_idx}", disable=not show_progress)

    for pair in iterator:
        pos_text = pair["positive"]
        neg_text = pair["negative"]

        # Extract positive activation
        pos_act = _get_activation(
            model, tokenizer, pos_text, layer_idx, token_position, device
        )
        positive_activations.append(pos_act)

        # Extract negative activation
        neg_act = _get_activation(
            model, tokenizer, neg_text, layer_idx, token_position, device
        )
        negative_activations.append(neg_act)

    # Compute mean activations
    pos_mean = torch.stack(positive_activations).mean(dim=0)
    neg_mean = torch.stack(negative_activations).mean(dim=0)

    # NOTE: this is the core of CAA - surprisingly simple but it works
    # see the original paper for theoretical justification
    steering_vector = pos_mean - neg_mean

    model_name = getattr(model.config, "_name_or_path", "unknown")

    return SteeringVector(
        behavior=behavior,
        layer_index=layer_idx,
        vector=steering_vector.cpu(),
        model_name=model_name,
        extraction_method="caa",
        metadata={
            "num_pairs": len(contrast_pairs),
            "token_position": token_position,
            "pos_mean_norm": pos_mean.norm().item(),
            "neg_mean_norm": neg_mean.norm().item(),
        },
    )