Datapool¶
Datapool functionality can be used as a way to manage batch processing.
In this section, you will see more details on how to interact with this BotCity Maestro feature through your automation code.
Tip
See more details on how to create a Datapool and add new items through the Maestro interface by accessing this link.
Consuming items from a Datapool¶
The first step is to obtain the Datapool reference through its unique identifier (label).
After obtaining the Datapool reference, we can use a repeat loop to check as long as there are items to be processed.
# Consuming the next available item and reporting the finishing state at the end
datapool = maestro.get_datapool(label="Items-To-Process")
while datapool.has_next():
# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)
if item is None:
# Item could be None if another process consumed it before
break
try:
# Processing item...
# Finishing as 'DONE' after processing
item.report_done(finish_message="Processed successfully!")
except Exception:
# Finishing item processing as 'ERROR'
item.report_error(finish_message="Processing failed.")
// Consuming the next available item and reporting the finishing state at the end
Datapool datapool = await maestro.GetDatapoolAsync("Items-To-Process");
while (await dataPool.HasNextAsync()) {
// Fetch the next Datapool item
DatapoolEntry item = await datapool.NextAsync(execution.TaskId);
if (item == null) {
// Item could be 'null' if another process consumed it before
break;
}
try {
// Processing item...
// Finishing as 'DONE' after processing
await item.ReportDoneAsync();
} catch (Exception ex) {
// Finishing item processing as 'ERROR'
await item.ReportErrorAsync();
}
}
Warning
Remember to always include in the code the report regarding the finishing state of the item that was processed.
This is extremely important so that item states are updated within the Datapool in Maestro.
Manipulating Datapool items¶
In addition to reporting the finishing state of an item, we can perform other operations in the code.
Accessing data¶
It is possible to obtain some information about the item, as well as specific data based on the Schema that was created.
# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)
# Using the get_entry() method, we can also obtain an item through its ID
item = datapool.get_entry(entry_id="<ENTRY_ID>")
# Getting the value of some specific field of the item
item_data = item["data-label"]
# Using the get_value() method will have the same effect
item_data = item.get_value("data-label")
Reporting the finishing state¶
Reporting an item's finish state is essential for accurate accounting.
Each processed item can be finished with a status of DONE
or ERROR
.
In both cases, a finish message can be provided. For errors, it's also possible to specify the error type.
# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)
# Finishing as 'DONE' after processing
item.report_done(finish_message="Processed successfully!")
# Finishing item processing, indicating a system error
item.report_error(error_type=ErrorType.SYSTEM, finish_message="System unavailable.")
# Finishing item processing, indicating a business error
item.report_error(error_type=ErrorType.BUSINESS, finish_message="Invalid data.")
Info
By default, all errors will be considered of type SYSTEM
if not specified in the report.
Use the ErrorType
class to specify the appropriate error type.
Datapool operations¶
Through the Maestro SDK, we can perform other operations with the Datapool in addition to checking and consuming the items to be processed.
# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")
# Checking if the Datapool is active
print(datapool.is_active())
# Checking if the Datapool is empty
print(datapool.is_empty())
# Marking the Datapool as active
datapool.activate()
# Marking the Datapool as inactive
datapool.deactivate()
// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");
// Checking if the Datapool is active
Console.WriteLine(await datapool.IsActiveAsync());
// Checking if the Datapool is empty
Console.WriteLine(await datapool.IsEmptyAsync());
// Marking the Datapool as active
await datapool.ActivateAsync();
// Marking the Datapool as inactive
await datapool.DeactivateAsync();
Creating a Datapool via code¶
The Maestro SDK allows us to create the structure of a new Datapool from scratch.
Besides the basic configurations, we can define the rules to be used and also the base Schema for the items.
Schema Creation
The fields that will compose the Schema must be created using the SchemaField
class.
For each field, in addition to the label
and type
, we can define whether the field should have a unique value (unique_id
) and also whether it should be displayed in the item list in the queue (display_value
).
You can also create the Datapool by passing an empty Schema if a base structure for the items is not required.
# Creating the fields that will compose the Schema: 'id', 'name', and 'price'
# Schema fields must be created using the 'SchemaField' class
product_id = SchemaField(
label="id", # Unique identifier for the field
type=FieldType.TEXT, # Field type
unique_id=True, # The 'id' field will be a unique value field
display_value=True # This field will also be displayed in the item list
)
product_name = SchemaField(
label="name",
type=FieldType.TEXT,
unique_id=False,
display_value=True
)
product_price = SchemaField(
label="price",
type=FieldType.DOUBLE,
unique_id=False,
display_value=False
)
# Fields that will be used in the Datapool Schema
schema = [product_id, product_name, product_price]
# Creating the Datapool object
datapool = DataPool(
label="ProductsData", # Unique identifier for the Datapool
name="ProductsData", # Display name
max_auto_retry=2, # Maximum number of retries in case of error
max_errors_before_inactive=5, # Maximum number of consecutive errors allowed
item_max_processing_time=3, # Maximum processing time for each item (in minutes)
schema=schema # Fields that will compose the structure of each item ('schema' created above)
)
# Creating the Datapool structure in the Orchestrator
maestro.create_datapool(datapool)
Tip
Although the Maestro SDK allows Datapool creation via code, we recommend using the Orchestrator's web interface for more intuitive configuration. More details here.
Once the Datapool structure is created on the platform, you can use the Maestro SDK to create and consume new items.
Adding new items¶
We can add new items to the Datapool using the Maestro SDK.
Tip
This method can be useful if you want to use a Python script to populate the Datapool with the items that will be processed.
# Instantiating a new Datapool item based on the Schema that has been defined
new_item = DataPoolEntry(
values={
"id": "Electronic#001"
"name": "Smartphone",
"price": "2000"
}
)
# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")
# Adding a new item
datapool.create_entry(new_item)
// Instantiating a new Datapool item based on the Schema that has been defined
var values = new Dictionary<string, object>
{
{ "id", "Electronic#001" },
{ "name", "Smartphone" },
{ "price", 2000 }
};
DatapoolEntry new_item = new DatapoolEntry(0,values);
// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");
// Adding a new item
await datapool.CreateEntryAsync(new_item);
Cancelling Items¶
If, for any reason, you need to ignore a specific item that has been created, you can use the cancel functionality.
This way, a cancelled item won't be pulled for processing, but it will still appear in the Datapool's history.
Info
Only items with a PENDING
state can be cancelled.
Deleting Items¶
If you want to remove an item from the Datapool, you can use the delete functionality.
In this case, pending items will be excluded from the queue, and processed items will be removed from the history.
Info
Items with a PROCESSING
or TIMEOUT
state cannot be deleted.