comet_mpm.CometMPM ¶
CometMPM(
api_key: Optional[str] = None,
workspace_name: Optional[str] = None,
model_name: Optional[str] = None,
model_version: Optional[str] = None,
disabled: Optional[bool] = None,
asyncio: bool = False,
max_batch_size: Optional[int] = None,
max_batch_time: Optional[int] = None,
raise_on_error_during_init: bool = False,
)
The Comet MPM class is used to upload a model's input and output features to MPM
Creates the Comet MPM Event logger object. Args: api_key: The Comet API Key workspace_name: The Comet Workspace Name of the model model_name: The Comet Model Name of the model model_version: The Comet Model Version of the model disabled: If set to True, CometMPM will not send anything to the backend. asyncio: Set to True if you are using an Asyncio-based framework like FastAPI. max_batch_size: Maximum number of MPM events sent in a batch, can also be configured using the environment variable MPM_MAX_BATCH_SIZE. max_batch_time: Maximum time before a batch of events is submitted to MPM, can also be configured using the environment variable MPM_MAX_BATCH_SIZE. raise_on_error_during_init: If set to True, CometMPM will raise exceptions instead of just logging errors during initialization. Default is False for backwards compatibility.
Functions¶
connect ¶
connect() -> Optional[Awaitable[None]]
When using CometMPM in asyncio mode, this coroutine needs to be awaited at the server start.
end ¶
end(timeout: Optional[int] = None) -> Optional[Awaitable[None]]
Ensure that all data has been sent to Comet and close the MPM object. After that, no data can be logged anymore. Waits for up to 30 seconds if timeout is not set.
get_logging_errors ¶
get_logging_errors(clear: bool = True) -> List[Dict[str, Any]]
Get any logging errors that occurred during background processing.
This method allows users to programmatically check for and handle errors that occurred in background threads, such as network failures when sending batch data to the backend.
Parameters:
clear(bool, default:True) –If True, clear the error store after retrieving errors. Defaults to True to prevent memory leaks.
Returns:
List[Dict[str, Any]]–List of error dictionaries, each containing:
List[Dict[str, Any]]–- 'message': The error message
List[Dict[str, Any]]–- 'logger_name': Name of the logger that produced the error
List[Dict[str, Any]]–- 'timestamp': ISO timestamp when the error occurred
List[Dict[str, Any]]–- 'data_affected': Description of what data was affected (optional)
List[Dict[str, Any]]–- 'traceback': Exception traceback information (optional)
Example
mpm = CometMPM(...) mpm.log_event(...) mpm.end() errors = mpm.get_logging_errors() if errors: ... print(f"Found {len(errors)} errors:") ... for error in errors: ... print(f" {error['timestamp']}: {error['message']}")
has_logging_error ¶
has_logging_error() -> bool
Check if there are any stored logging errors without retrieving them.
Returns:
bool–True if there are errors in the store, False otherwise.
Example
mpm = CometMPM(...) mpm.log_event(...) mpm.end() if mpm.has_logging_error(): ... errors = mpm.get_logging_errors() ... # Handle errors
join ¶
join(timeout: Optional[int] = None) -> Optional[Awaitable[None]]
MPM.join is deprecated, use MPM.end instead.
log_dataframe ¶
log_dataframe(
dataframe,
prediction_id_column: str,
feature_columns: Optional[List[str]] = None,
output_value_column: Optional[str] = None,
output_probability_column: Optional[str] = None,
output_features_columns: Optional[List[str]] = None,
labels_columns: Optional[List[str]] = None,
timestamp_column: Optional[str] = None,
) -> LogEventsResult
This function logs each row of a Pandas DataFrame as an MPM event. The events are structured as described in the log_event method, so please refer to it for full context.
Parameters:
dataframe–The Pandas DataFrame to be logged.
prediction_id_column(str) –This column should contain the prediction_id values for the events.
feature_columns(Optional[List[str]], default:None) –If provided, these columns will be used as the input_features for the events.
output_features_columns(Optional[List[str]], default:None) –If provided, these columns will be used as the output_features for the events.
output_value_column(Optional[str], default:None) –Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_value for the events.
output_probability_column(Optional[str], default:None) –Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_probability for the events.
labels_columns(Optional[List[str]], default:None) –If provided, these columns will be used as the labels for the events.
timestamp_column(Optional[str], default:None) –If provided, this column will be used as the timestamp (seconds since epoch start in UTC timezone) for the events.
log_event ¶
log_event(
prediction_id: str,
input_features: Optional[Dict[str, Any]] = None,
output_value: Optional[Any] = None,
output_probability: Optional[Any] = None,
output_features: Optional[Dict[str, Any]] = None,
labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]
Asynchronously log a single event to MPM. Events are identified by the mandatory prediction_id parameter. If you send multiple events with the same prediction_id, the Comet platform with automatically reject the duplicate events.
Parameters:
prediction_id(str) –The unique prediction ID. It can be provided by the framework, you, or a random unique value such as str(uuid4()).
input_features(Optional[Dict[str, Any]], default:None) –If provided, it must be a flat dictionary where the keys are the feature names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example:
{"age": 42, "income": 42894.89}.output_value(Optional[Any], default:None) –The prediction as a native Python scalar, such as an integer, float, boolean, or string.
output_probability(Optional[Any], default:None) –If provided, it must be a float between 0 and 1, indicating the model's confidence in the prediction.
output_features(Optional[Dict[str, Any]], default:None) –A dictionary of output features.
labels(Optional[Dict[str, Union[int, float, bool, str]]], default:None) –If provided, it must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example:
{"person": 2, "bicycle": 1, "car": 3}.timestamp(Optional[float], default:None) –An optional timestamp to associate with the event (seconds since epoch in UTC timezone). If not provided, the current time will be used.
log_label ¶
log_label(
prediction_id: str,
label: Optional[Any] = None,
labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]
Send an MPM event containing the ground truth value for a prediction whose input and output features are already stored in Comet. If you send multiple labels with the same prediction_id, the Comet platform with automatically reject the duplicate labels. Args: prediction_id: The unique prediction ID label: Deprecated, please use the labels instead. If provided, this value will be used put as 'value' within the labels. labels: The ground truth values for the prediction. It must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"person": 2, "bicycle": 1, "car": 3}. timestamp: An optional timestamp to associate with the label (seconds since epoch in UTC timezone). If not provided, the current time will be used.
upload_dataset_csv ¶
upload_dataset_csv(
file_path: str,
dataset_type: str,
dataset_name: Optional[str] = None,
na_values: Optional[str] = None,
keep_default_na_values: Optional[str] = None,
) -> None
Uploads dataset from a local CSV file to the backend, streaming the data line by line.
This method facilitates the creation or updating of a dataset with data streamed from a specified CSV file. The uploaded data is used as a reference for detecting drift in a production model within the same workspace. Each line of the CSV file is sent as an event to the dataset, allowing for incremental updates.
Parameters:
file_path(`str`) –The path to the local CSV file whose data is to be streamed to the backend.
dataset_type(`Literal['EVENTS', 'LATE_LABELS', 'TRAINING_EVENTS']`) –Type of the dataset to be updated
dataset_name(`str`, default:None) –The name of the dataset where the data will be stored. If a model with this name does not exist, a new model will be created. If the model already exists, new records will be added to it (duplicated predictionIds will be ignored). In case dataset_type is TRAINING_EVENTS this is mandatory as the MPM model_name is the production model, which the dataset_name parameter is referring to.
na_values(`str`, *optional*, default:None) –Additional strings to recognize as NA/NaN. By default, the system recognizes standard missing values (like empty fields, 'NaN', 'NULL', etc.). Specifying this parameter allows for the inclusion of custom missing value identifiers, enhancing the flexibility in data handling. If specified, it should be as comma delimiter string. The default list is (Note that empty string is also in this list): None,,null,NULL,N/A,NA,NaN,n/a,nan
keep_default_na_values(`str`, *optional*, defaults to None, default:None) –A boolean that determines whether to include the default set of NA identifiers in addition to the values specified in 'na_values'. If
True, both default and specified missing value identifiers are used. IfFalse, only the values specified in 'na_values' are considered.
Notes
CSV Format: - The first line of the CSV file must contain headers. - Columns: 1. timestamp (optional): If missing, the current timestamp will be used as the event time. If specified, it should be the millis since epoch. 2. predictionId (optional): Unique identifier for each event. If missing, a UUID will be generated. Duplicate predictionIds in new events will be ignored. 3. feature_* columns: These prefixed columns specify the input features for the model, e.g., 'feature_age' or 'feature_color'. 4. prediction_* columns: These prefixed columns are for the output features, e.g., 'prediction_animal' or 'prediction_probability'. 5. label_value_* columns: These columns are for the label values of the event, e.g., 'label_value_price' or 'label_value_animal'.
Sample CSV content: timestamp,predictionId,feature_oneMoreFeature,feature_anotherFeature,feature_someFeature,prediction_fingers_count,prediction_probability,prediction_value,label_value_fingers_count,label_value_animal 1713006000001,someAssetId_-1895825684,Dog,special,53.09863247819340,7,0.87,Bird,4,Fish 1713006600001,someAssetId_926457604,null,special,55.73110218323990,1,0.69,Fish,6,Fish 1713007200001,someAssetId_2145792990,Rabbit,special,49.40627545548700,4,0.59,Bird,1,Fish
Examples:
1 2 3 4 5 6 7 8 | |