Skip to content

Datapool

Datapool functionality can be used as a way to manage batch processing.

In this section, you will see more details on how to interact with this BotCity Maestro feature through your automation code.

Tip

See more details on how to create a Datapool and add new items through the Maestro interface by accessing this link.

Consuming items from a Datapool

The first step is to obtain the Datapool reference through its unique identifier (label).

After obtaining the Datapool reference, we can use a repeat loop to check as long as there are items to be processed.

# Consuming the next available item and reporting the finishing state at the end
datapool = maestro.get_datapool(label="Items-To-Process")

while datapool.has_next():
    # Fetch the next Datapool item
    item = datapool.next(task_id=execution.task_id)
    if item is None:
        # Item could be None if another process consumed it before
        break

    try:
        # Processing item...

        # Finishing as 'DONE' after processing
        item.report_done(finish_message="Processed successfully!")

    except Exception:
        # Finishing item processing as 'ERROR'
        item.report_error(finish_message="Processing failed.")
// Consuming the next available item and reporting the finishing state at the end
Datapool datapool = await maestro.GetDatapoolAsync("Items-To-Process");

while (await dataPool.HasNextAsync()) {
    // Fetch the next Datapool item
    DatapoolEntry item = await datapool.NextAsync(execution.TaskId);
    if (item == null) {
        // Item could be 'null' if another process consumed it before
        break;
    }

    try {
        // Processing item...

        // Finishing as 'DONE' after processing
        await item.ReportDoneAsync();

    } catch (Exception ex) {
        // Finishing item processing as 'ERROR'
        await item.ReportErrorAsync();
    }
}

Warning

Remember to always include in the code the report regarding the finishing state of the item that was processed.

This is extremely important so that item states are updated within the Datapool in Maestro.

Manipulating Datapool items

In addition to reporting the finishing state of an item, we can perform other operations in the code.

Accessing data

It is possible to obtain some information about the item, as well as specific data based on the Schema that was created.

# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)

# Using the get_entry() method, we can also obtain an item through its ID
item = datapool.get_entry(entry_id="<ENTRY_ID>")

# Getting the value of some specific field of the item
item_data = item["data-label"]

# Using the get_value() method will have the same effect
item_data = item.get_value("data-label")
// Fetch the next Datapool item
DatapoolEntry item = await datapool.NextAsync(execution.TaskId);

// Getting the value of some specific field of the item
string item_data = await item.GetValueAsync("data-label");

Reporting the finishing state

Reporting an item's finish state is essential for accurate accounting.

Each processed item can be finished with a status of DONE or ERROR.

In both cases, a finish message can be provided. For errors, it's also possible to specify the error type.

# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)

# Finishing as 'DONE' after processing
item.report_done(finish_message="Processed successfully!")

# Finishing item processing, indicating a system error
item.report_error(error_type=ErrorType.SYSTEM, finish_message="System unavailable.")

# Finishing item processing, indicating a business error
item.report_error(error_type=ErrorType.BUSINESS, finish_message="Invalid data.")

Info

By default, all errors will be considered of type SYSTEM if not specified in the report.

Use the ErrorType class to specify the appropriate error type.

// Fetch the next Datapool item
DatapoolEntry item = await datapool.NextAsync(execution.TaskId);

// Finishing as 'DONE' after processing
await item.ReportDoneAsync();

// Finishing item processing as 'ERROR'
await item.ReportErrorAsync();

Datapool operations

Through the Maestro SDK, we can perform other operations with the Datapool in addition to checking and consuming the items to be processed.

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Checking if the Datapool is active
print(datapool.is_active())

# Checking if the Datapool is empty
print(datapool.is_empty())

# Marking the Datapool as active
datapool.activate()

# Marking the Datapool as inactive
datapool.deactivate()
// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Checking if the Datapool is active
Console.WriteLine(await datapool.IsActiveAsync());

// Checking if the Datapool is empty
Console.WriteLine(await datapool.IsEmptyAsync());

// Marking the Datapool as active
await datapool.ActivateAsync();

// Marking the Datapool as inactive
await datapool.DeactivateAsync();

Creating a Datapool via code

The Maestro SDK allows us to create the structure of a new Datapool from scratch.

Besides the basic configurations, we can define the rules to be used and also the base Schema for the items.

Schema Creation

The fields that will compose the Schema must be created using the SchemaField class.

For each field, in addition to the label and type, we can define whether the field should have a unique value (unique_id) and also whether it should be displayed in the item list in the queue (display_value).

You can also create the Datapool by passing an empty Schema if a base structure for the items is not required.

# Creating the fields that will compose the Schema: 'id', 'name', and 'price'
# Schema fields must be created using the 'SchemaField' class

product_id = SchemaField(
    label="id",           # Unique identifier for the field
    type=FieldType.TEXT,  # Field type
    unique_id=True,       # The 'id' field will be a unique value field
    display_value=True    # This field will also be displayed in the item list
)

product_name = SchemaField(
    label="name",
    type=FieldType.TEXT,
    unique_id=False,
    display_value=True
)

product_price = SchemaField(
    label="price",
    type=FieldType.DOUBLE,
    unique_id=False,
    display_value=False
)

# Fields that will be used in the Datapool Schema
schema = [product_id, product_name, product_price]

# Creating the Datapool object
datapool = DataPool(
    label="ProductsData",   # Unique identifier for the Datapool
    name="ProductsData",    # Display name
    max_auto_retry=2,       # Maximum number of retries in case of error
    max_errors_before_inactive=5,   # Maximum number of consecutive errors allowed
    item_max_processing_time=3,     # Maximum processing time for each item (in minutes)
    schema=schema   # Fields that will compose the structure of each item ('schema' created above)
)

# Creating the Datapool structure in the Orchestrator
maestro.create_datapool(datapool)
// Not yet implemented

Tip

Although the Maestro SDK allows Datapool creation via code, we recommend using the Orchestrator's web interface for more intuitive configuration. More details here.

Once the Datapool structure is created on the platform, you can use the Maestro SDK to create and consume new items.

Adding new items

We can add new items to the Datapool using the Maestro SDK.

Tip

This method can be useful if you want to use a Python script to populate the Datapool with the items that will be processed.

# Instantiating a new Datapool item based on the Schema that has been defined
new_item = DataPoolEntry(
    values={
        "id": "Electronic#001"
        "name": "Smartphone",
        "price": "2000"
    }
)

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Adding a new item
datapool.create_entry(new_item)
// Instantiating a new Datapool item based on the Schema that has been defined
var values = new Dictionary<string, object>
{
    { "id", "Electronic#001" },
    { "name", "Smartphone" },
    { "price", 2000 }
};
DatapoolEntry new_item = new DatapoolEntry(0,values);

// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Adding a new item
await datapool.CreateEntryAsync(new_item);

Cancelling Items

If, for any reason, you need to ignore a specific item that has been created, you can use the cancel functionality.

This way, a cancelled item won't be pulled for processing, but it will still appear in the Datapool's history.

Info

Only items with a PENDING state can be cancelled.

# Cancelling a pending item in the queue
datapool.cancel_entry(
    entry_id="<ENTRY_ID>",
    finish_message="Item with missing data"
)
// Not yet implemented

Deleting Items

If you want to remove an item from the Datapool, you can use the delete functionality.

In this case, pending items will be excluded from the queue, and processed items will be removed from the history.

Info

Items with a PROCESSING or TIMEOUT state cannot be deleted.

# Removing item from the queue
datapool.delete_entry(entry_id="<ENTRY_ID>")
// Not yet implemented