Skip to content

comet_mpm.CometMPM

CometMPM(
    api_key: Optional[str] = None,
    workspace_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_version: Optional[str] = None,
    disabled: Optional[bool] = None,
    asyncio: bool = False,
    max_batch_size: Optional[int] = None,
    max_batch_time: Optional[int] = None,
    raise_on_error_during_init: bool = False,
)

The Comet MPM class is used to upload a model's input and output features to MPM

Creates the Comet MPM Event logger object. Args: api_key: The Comet API Key workspace_name: The Comet Workspace Name of the model model_name: The Comet Model Name of the model model_version: The Comet Model Version of the model disabled: If set to True, CometMPM will not send anything to the backend. asyncio: Set to True if you are using an Asyncio-based framework like FastAPI. max_batch_size: Maximum number of MPM events sent in a batch, can also be configured using the environment variable MPM_MAX_BATCH_SIZE. max_batch_time: Maximum time before a batch of events is submitted to MPM, can also be configured using the environment variable MPM_MAX_BATCH_SIZE. raise_on_error_during_init: If set to True, CometMPM will raise exceptions instead of just logging errors during initialization. Default is False for backwards compatibility.

Functions

connect

connect() -> Optional[Awaitable[None]]

When using CometMPM in asyncio mode, this coroutine needs to be awaited at the server start.

end

end(timeout: Optional[int] = None) -> Optional[Awaitable[None]]

Ensure that all data has been sent to Comet and close the MPM object. After that, no data can be logged anymore. Waits for up to 30 seconds if timeout is not set.

get_logging_errors

get_logging_errors(clear: bool = True) -> List[Dict[str, Any]]

Get any logging errors that occurred during background processing.

This method allows users to programmatically check for and handle errors that occurred in background threads, such as network failures when sending batch data to the backend.

Parameters:

  • clear (bool, default: True ) –

    If True, clear the error store after retrieving errors. Defaults to True to prevent memory leaks.

Returns:

  • List[Dict[str, Any]]

    List of error dictionaries, each containing:

  • List[Dict[str, Any]]
    • 'message': The error message
  • List[Dict[str, Any]]
    • 'logger_name': Name of the logger that produced the error
  • List[Dict[str, Any]]
    • 'timestamp': ISO timestamp when the error occurred
  • List[Dict[str, Any]]
    • 'data_affected': Description of what data was affected (optional)
  • List[Dict[str, Any]]
    • 'traceback': Exception traceback information (optional)
Example

mpm = CometMPM(...) mpm.log_event(...) mpm.end() errors = mpm.get_logging_errors() if errors: ... print(f"Found {len(errors)} errors:") ... for error in errors: ... print(f" {error['timestamp']}: {error['message']}")

has_logging_error

has_logging_error() -> bool

Check if there are any stored logging errors without retrieving them.

Returns:

  • bool

    True if there are errors in the store, False otherwise.

Example

mpm = CometMPM(...) mpm.log_event(...) mpm.end() if mpm.has_logging_error(): ... errors = mpm.get_logging_errors() ... # Handle errors

join

join(timeout: Optional[int] = None) -> Optional[Awaitable[None]]

MPM.join is deprecated, use MPM.end instead.

log_dataframe

log_dataframe(
    dataframe,
    prediction_id_column: str,
    feature_columns: Optional[List[str]] = None,
    output_value_column: Optional[str] = None,
    output_probability_column: Optional[str] = None,
    output_features_columns: Optional[List[str]] = None,
    labels_columns: Optional[List[str]] = None,
    timestamp_column: Optional[str] = None,
) -> LogEventsResult

This function logs each row of a Pandas DataFrame as an MPM event. The events are structured as described in the log_event method, so please refer to it for full context.

Parameters:

  • dataframe

    The Pandas DataFrame to be logged.

  • prediction_id_column (str) –

    This column should contain the prediction_id values for the events.

  • feature_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the input_features for the events.

  • output_features_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the output_features for the events.

  • output_value_column (Optional[str], default: None ) –

    Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_value for the events.

  • output_probability_column (Optional[str], default: None ) –

    Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_probability for the events.

  • labels_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the labels for the events.

  • timestamp_column (Optional[str], default: None ) –

    If provided, this column will be used as the timestamp (seconds since epoch start in UTC timezone) for the events.

log_event

log_event(
    prediction_id: str,
    input_features: Optional[Dict[str, Any]] = None,
    output_value: Optional[Any] = None,
    output_probability: Optional[Any] = None,
    output_features: Optional[Dict[str, Any]] = None,
    labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
    timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]

Asynchronously log a single event to MPM. Events are identified by the mandatory prediction_id parameter. If you send multiple events with the same prediction_id, the Comet platform with automatically reject the duplicate events.

Parameters:

  • prediction_id (str) –

    The unique prediction ID. It can be provided by the framework, you, or a random unique value such as str(uuid4()).

  • input_features (Optional[Dict[str, Any]], default: None ) –

    If provided, it must be a flat dictionary where the keys are the feature names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"age": 42, "income": 42894.89}.

  • output_value (Optional[Any], default: None ) –

    The prediction as a native Python scalar, such as an integer, float, boolean, or string.

  • output_probability (Optional[Any], default: None ) –

    If provided, it must be a float between 0 and 1, indicating the model's confidence in the prediction.

  • output_features (Optional[Dict[str, Any]], default: None ) –

    A dictionary of output features.

  • labels (Optional[Dict[str, Union[int, float, bool, str]]], default: None ) –

    If provided, it must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"person": 2, "bicycle": 1, "car": 3}.

  • timestamp (Optional[float], default: None ) –

    An optional timestamp to associate with the event (seconds since epoch in UTC timezone). If not provided, the current time will be used.

log_label

log_label(
    prediction_id: str,
    label: Optional[Any] = None,
    labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
    timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]

Send an MPM event containing the ground truth value for a prediction whose input and output features are already stored in Comet. If you send multiple labels with the same prediction_id, the Comet platform with automatically reject the duplicate labels. Args: prediction_id: The unique prediction ID label: Deprecated, please use the labels instead. If provided, this value will be used put as 'value' within the labels. labels: The ground truth values for the prediction. It must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"person": 2, "bicycle": 1, "car": 3}. timestamp: An optional timestamp to associate with the label (seconds since epoch in UTC timezone). If not provided, the current time will be used.

upload_dataset_csv

upload_dataset_csv(
    file_path: str,
    dataset_type: str,
    dataset_name: Optional[str] = None,
    na_values: Optional[str] = None,
    keep_default_na_values: Optional[str] = None,
) -> None

Uploads dataset from a local CSV file to the backend, streaming the data line by line.

This method facilitates the creation or updating of a dataset with data streamed from a specified CSV file. The uploaded data is used as a reference for detecting drift in a production model within the same workspace. Each line of the CSV file is sent as an event to the dataset, allowing for incremental updates.

Parameters:

  • file_path (`str`) –

    The path to the local CSV file whose data is to be streamed to the backend.

  • dataset_type (`Literal['EVENTS', 'LATE_LABELS', 'TRAINING_EVENTS']`) –

    Type of the dataset to be updated

  • dataset_name (`str`, default: None ) –

    The name of the dataset where the data will be stored. If a model with this name does not exist, a new model will be created. If the model already exists, new records will be added to it (duplicated predictionIds will be ignored). In case dataset_type is TRAINING_EVENTS this is mandatory as the MPM model_name is the production model, which the dataset_name parameter is referring to.

  • na_values (`str`, *optional*, default: None ) –

    Additional strings to recognize as NA/NaN. By default, the system recognizes standard missing values (like empty fields, 'NaN', 'NULL', etc.). Specifying this parameter allows for the inclusion of custom missing value identifiers, enhancing the flexibility in data handling. If specified, it should be as comma delimiter string. The default list is (Note that empty string is also in this list): None,,null,NULL,N/A,NA,NaN,n/a,nan

  • keep_default_na_values (`str`, *optional*, defaults to None, default: None ) –

    A boolean that determines whether to include the default set of NA identifiers in addition to the values specified in 'na_values'. If True, both default and specified missing value identifiers are used. If False, only the values specified in 'na_values' are considered.

Notes

CSV Format: - The first line of the CSV file must contain headers. - Columns: 1. timestamp (optional): If missing, the current timestamp will be used as the event time. If specified, it should be the millis since epoch. 2. predictionId (optional): Unique identifier for each event. If missing, a UUID will be generated. Duplicate predictionIds in new events will be ignored. 3. feature_* columns: These prefixed columns specify the input features for the model, e.g., 'feature_age' or 'feature_color'. 4. prediction_* columns: These prefixed columns are for the output features, e.g., 'prediction_animal' or 'prediction_probability'. 5. label_value_* columns: These columns are for the label values of the event, e.g., 'label_value_price' or 'label_value_animal'.

Sample CSV content: timestamp,predictionId,feature_oneMoreFeature,feature_anotherFeature,feature_someFeature,prediction_fingers_count,prediction_probability,prediction_value,label_value_fingers_count,label_value_animal 1713006000001,someAssetId_-1895825684,Dog,special,53.09863247819340,7,0.87,Bird,4,Fish 1713006600001,someAssetId_926457604,null,special,55.73110218323990,1,0.69,Fish,6,Fish 1713007200001,someAssetId_2145792990,Rabbit,special,49.40627545548700,4,0.59,Bird,1,Fish

Examples:

1
2
3
4
5
6
7
8
from comet_mpm import CometMPM

MPM = CometMPM()
MPM.upload_dataset_csv(
    file_path="path/to/your/data.csv",
    dataset_type="TRAINING_EVENTS",  # Or use 'EVENTS', 'LATE_LABELS' as needed
    dataset_name="your-dataset-name"
)
Nov. 5, 2025