DocsIntegrationsDSPy
This is a Jupyter notebook

DSPy - Observability & Tracing

This cookbook demonstrates how to use DSPy with Langfuse. DSPy is a framework that systematically optimizes language model prompts and weights, making it easier to build and refine complex systems with LMs by automating the tuning process and improving reliability. For further information on DSPy, please visit the documentation.

Prerequisites

Install the required packages:

%pip install dspy langfuse litellm

Step 1: Setup Langfuse Environment Variables

First, configure your Langfuse environment variables. You can get your Langfuse API keys by signing up for Langfuse Cloud or self-hosting Langfuse.

import os
 
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."

Step 2: Create a DSPy Callback for Langfuse

Create a custom callback class that extends DSPy’s BaseCallback to integrate with Langfuse. This callback will handle the tracing and observability of your DSPy modules.

from dspy.utils.callback import BaseCallback
from langfuse.decorators import langfuse_context
from langfuse import Langfuse
from litellm import completion_cost
from typing import Optional
import dspy
import contextvars
import logging
 
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
class LangFuseDSPYCallback(BaseCallback):
    def __init__(self, signature: dspy.Signature):
        super().__init__()
        # Use contextvars for per-call state
        self.current_system_prompt = contextvars.ContextVar("current_system_prompt")
        self.current_prompt = contextvars.ContextVar("current_prompt")
        self.current_completion = contextvars.ContextVar("current_completion")
        self.current_span = contextvars.ContextVar("current_span")
        self.model_name_at_span_creation = contextvars.ContextVar("model_name_at_span_creation")
        self.input_field_values = contextvars.ContextVar("input_field_values")
        # Initialize Langfuse client
        self.langfuse = Langfuse()
        self.input_field_names = signature.input_fields.keys()
 
    def on_module_start(self, call_id, *args, **kwargs):
        inputs = kwargs.get("inputs")
        extracted_args = inputs["kwargs"]
        input_field_values = {}
        for input_field_name in self.input_field_names:
            if input_field_name in extracted_args:
                input_field_values[input_field_name] = extracted_args[input_field_name]
        self.input_field_values.set(input_field_values)
 
    def on_module_end(self, call_id, outputs, exception):
        metadata = {
            "existing_trace_id": langfuse_context.get_current_trace_id(),
            "parent_observation_id": langfuse_context.get_current_observation_id(),
        }
        outputs_extracted = {}
        if outputs is not None:
            try:
                outputs_extracted = {k: v for k, v in outputs.items()}
            except AttributeError:
                outputs_extracted = {"value": outputs}
            except Exception as e:
                outputs_extracted = {"error_extracting_module_output": str(e)}
        langfuse_context.update_current_observation(
            input=self.input_field_values.get({}),
            output=outputs_extracted,
            metadata=metadata
        )
 
    def on_lm_start(self, call_id, *args, **kwargs):
        if self.current_span.get(None):
            return
        lm_instance = kwargs.get("instance")
        lm_dict = lm_instance.__dict__
        model_name = lm_dict.get("model")
        temperature = lm_dict.get("kwargs", {}).get("temperature")
        max_tokens = lm_dict.get("kwargs", {}).get("max_tokens")
        inputs = kwargs.get("inputs")
        messages = inputs.get("messages")
        system_prompt = messages[0].get("content")
        user_input = messages[1].get("content")
        self.current_system_prompt.set(system_prompt)
        self.current_prompt.set(user_input)
        self.model_name_at_span_creation.set(model_name)
        trace_id = langfuse_context.get_current_trace_id()
        parent_observation_id = langfuse_context.get_current_observation_id()
        span_obj = None
        if trace_id:
            span_obj = self.langfuse.generation(
                input=user_input,
                name=model_name,
                trace_id=trace_id,
                parent_observation_id=parent_observation_id,
                metadata={
                    "model": model_name,
                    "temperature": temperature,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                },
            )
        self.current_span.set(span_obj)
 
    def on_lm_end(self, call_id, outputs, exception, **kwargs):
        completion_content = None
        model_name_for_span = None
        usage_for_span = None
        level = "DEFAULT"
        status_message = None
        span = self.current_span.get(None)
        system_prompt = self.current_system_prompt.get(None)
        prompt = self.current_prompt.get(None)
        model_name_for_span = self.model_name_at_span_creation.get(None)
        # Use the model name stored at span creation as the primary source
        if exception:
            level = "ERROR"
            status_message = str(exception)
        elif outputs is None:
            level = "ERROR"
            status_message = "LM call returned None outputs without an explicit exception."
        elif isinstance(outputs, list):
            if outputs:
                completion_content = outputs[0]
            else:
                level = "WARNING"
                status_message = "LM call returned an empty list as outputs."
        else:
            try:
                if hasattr(outputs, "model") and outputs.model is not None:
                    model_name_for_span = outputs.model
                if (
                    outputs.choices
                    and outputs.choices[0]
                    and outputs.choices[0].message
                ):
                    completion_content = outputs.choices[0].message.content
                else:
                    level = "WARNING"
                    status_message = "LM output structure did not contain expected choices or message."
            except AttributeError as e:
                level = "ERROR"
                status_message = f"Error processing LM output structure: {e}. Output: {str(outputs)[:200]}"
            except Exception as e:
                level = "ERROR"
                status_message = f"Unexpected error processing LM output: {e}. Output: {str(outputs)[:200]}"
        # Calculate usage if we have the necessary information
        if (
            completion_content
            and system_prompt is not None
            and prompt is not None
            and model_name_for_span
        ):
            try:
                if hasattr(outputs, "usage"):
                    prompt_tokens = outputs.usage.prompt_tokens
                    completion_tokens = outputs.usage.completion_tokens
                    total_tokens = outputs.usage.total_tokens
                else:
                    prompt_tokens = len(system_prompt + prompt)
                    completion_tokens = len(completion_content)
                    total_tokens = prompt_tokens + completion_tokens
                total_cost = completion_cost(
                    model=model_name_for_span,
                    prompt=system_prompt + prompt,
                    completion=completion_content,
                )
                if span:
                    span.update(
                        usage_details={
                            "input": prompt_tokens,
                            "output": completion_tokens,
                            "cache_read_input_tokens": 0,
                            "total": total_tokens,
                        },
                        cost_details={
                            "input": total_cost * (prompt_tokens / total_tokens) if total_tokens else 0,
                            "output": total_cost * (completion_tokens / total_tokens) if total_tokens else 0,
                            "cache_read_input_tokens": 0.0,
                            "total": total_cost,
                        },
                    )
            except Exception as e:
                logger.warning(f"Failed to calculate usage/cost: {str(e)}")
                level = "WARNING"
                status_message = f"Usage/cost calculation failed: {str(e)}"
        else:
            missing_info = []
            if not completion_content:
                missing_info.append("completion content")
            if not system_prompt:
                missing_info.append("system prompt")
            if not prompt:
                missing_info.append("user prompt")
            if not model_name_for_span:
                missing_info.append("model name")
            logger.warning(
                f"Missing required information for usage/cost calculation: {', '.join(missing_info)}"
            )
        if span:
            end_args = {
                "output": completion_content,
                "model": model_name_for_span,
                "level": level,
                "status_message": status_message,
            }
            final_end_args = {
                k: v
                for k, v in end_args.items()
                if v is not None or k in ["output", "model", "level", "status_message"]
            }
            span.end(**final_end_args)
            self.current_span.set(None)
        if level == "OK" and completion_content is not None:
            self.current_completion.set(completion_content)

Step 3: Using the Callback with DSPy

Here’s how to use the Langfuse callback with your DSPy modules:

import dspy
from utils.llm import LangFuseDSPYCallback
 
# Define your signature
class ExtractInfo(dspy.Signature):
    """Extract structured information from text."""
    text: str = dspy.InputField()
    title: str = dspy.OutputField()
    headings: list[str] = dspy.OutputField()
    entities: list[dict[str, str]] = dspy.OutputField(desc="a list of entities and their metadata")
 
# Initialize the language model
lm = dspy.LM('openai/your-model-name', api_key='PROVIDER_API_KEY')
 
# Create and configure the callback
callback = LangFuseDSPYCallback(ExtractInfo)
dspy.configure(lm=lm, callbacks=[callback])
 
# Create and use your module
module = dspy.Predict(ExtractInfo)
 
# Example usage
text = "Apple Inc. announced its latest iPhone 14 today. The CEO, Tim Cook, highlighted its new features in a press release."
response = module(text=text)
 
print(response.title)
print(response.headings)
print(response.entities)

Step 4: Viewing Traces in Langfuse

After running your DSPy application, you can inspect the traced events in Langfuse:

Example trace in Langfuse

The traces will include:

  • Input and output values for each module
  • Model usage and costs
  • System prompts and completions
  • Error handling and status messages
  • Metadata about the model configuration

This integration provides a comprehensive view of your DSPy application’s behavior and performance, making it easier to debug and optimize your language model pipelines.

Was this page useful?

Questions? We're here to help

Subscribe to updates