Saltar a contenido

Datapool

La funcionalidad de Datapool se puede utilizar como una forma de gestionar el procesamiento por lotes.

En esta sección, verás más detalles sobre cómo interactuar con esta característica de BotCity Maestro a través de tu código de automatización.

Tip

Consulta más detalles sobre cómo crear un Datapool y agregar nuevos elementos a través de la interfaz de Maestro accediendo a este enlace.

Consumir elementos de un Datapool

El primer paso es obtener la referencia del Datapool a través de su identificador único (etiqueta).

Después de obtener la referencia del Datapool, podemos usar un bucle repetitivo para verificar mientras haya elementos que se deban procesar.

# Consuming the next available item and reporting the finishing state at the end
datapool = maestro.get_datapool(label="Items-To-Process")

while datapool.has_next():
    # Fetch the next Datapool item
    item = datapool.next(task_id=execution.task_id)
    if item is None:
        # Item could be None if another process consumed it before
        break

    try:
        # Processing item...

        # Finishing as 'DONE' after processing
        item.report_done(finish_message="Processed successfully!")

    except Exception:
        # Finishing item processing as 'ERROR'
        item.report_error(finish_message="Processing failed.")
// Consuming the next available item and reporting the finishing state at the end
Datapool datapool = await maestro.GetDatapoolAsync("Items-To-Process");

while (await dataPool.HasNextAsync()) {
    // Fetch the next Datapool item
    DatapoolEntry item = await datapool.NextAsync(execution.TaskId);
    if (item == null) {
        // Item could be 'null' if another process consumed it before
        break;
    }

    try {
        // Processing item...

        // Finishing as 'DONE' after processing
        await item.ReportDoneAsync();

    } catch (Exception ex) {
        // Finishing item processing as 'ERROR'
        await item.ReportErrorAsync();
    }
}

Warning

Recuerda siempre incluir en el código el informe sobre el estado de finalización del elemento que se procesó.

Esto es extremadamente importante para que los estados de los elementos se actualicen dentro del Datapool en Maestro.

Manipulación de elementos de Datapool

Además de informar el estado de finalización de un elemento, podemos realizar otras operaciones en el código.

Accediendo a los datos

Es posible obtener información sobre el elemento, así como datos específicos basados en el Esquema que se creó.

# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)

# Using the get_entry() method, we can also obtain an item through its ID
item = datapool.get_entry(entry_id="<ENTRY_ID>")

# Getting the value of some specific field of the item
item_data = item["data-label"]

# Using the get_value() method will have the same effect
item_data = item.get_value("data-label")
// Fetch the next Datapool item
DatapoolEntry item = await datapool.NextAsync(execution.TaskId);

// Getting the value of some specific field of the item
string item_data = await item.GetValueAsync("data-label");

Reportando el estado de finalización

Reportar el estado de finalización de un elemento es esencial para que se contabilicen correctamente.

Cada elemento procesado puede finalizarse con un estado de DONE o ERROR.

En ambos casos, es posible pasar un mensaje de finalización. En el caso de errores, también es posible especificar el tipo de error.

# Fetch the next Datapool item
item = datapool.next(task_id=execution.task_id)

# Finishing as 'DONE' after processing
item.report_done(finish_message="Processed successfully!")

# Finishing item processing, indicating a system error
item.report_error(error_type=ErrorType.SYSTEM, finish_message="System unavailable.")

# Finishing item processing, indicating a business error
item.report_error(error_type=ErrorType.BUSINESS, finish_message="Invalid data.")

Info

Por defecto, todos los errores se considerarán de tipo SYSTEM si no se especifican en el informe.

Utiliza la clase ErrorType para especificar el tipo de error apropiado.

// Fetch the next Datapool item
DatapoolEntry item = await datapool.NextAsync(execution.TaskId);

// Finishing as 'DONE' after processing
await item.ReportDoneAsync();

// Finishing item processing as 'ERROR'
await item.ReportErrorAsync();

Operaciones de Datapool

A través del SDK de Maestro, podemos realizar otras operaciones con el Datapool además de verificar y consumir los elementos que se deben procesar.

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Checking if the Datapool is active
print(datapool.is_active())

# Checking if the Datapool is empty
print(datapool.is_empty())

# Marking the Datapool as active
datapool.activate()

# Marking the Datapool as inactive
datapool.deactivate()
// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Checking if the Datapool is active
Console.WriteLine(await datapool.IsActiveAsync());

// Checking if the Datapool is empty
Console.WriteLine(await datapool.IsEmptyAsync());

// Marking the Datapool as active
await datapool.ActivateAsync();

// Marking the Datapool as inactive
await datapool.DeactivateAsync();

Creación de un Datapool mediante código

El Maestro SDK nos permite crear la estructura de un nuevo Datapool desde cero.

Además de las configuraciones básicas, podemos definir las reglas que se utilizarán y también el Schema base de los elementos.

Creación del Schema

Los campos que compondrán el Schema deben crearse a través de la clase SchemaField.

Para cada campo, además de la label y el type, podemos definir si el campo debe tener un valor único (unique_id) y también si debe mostrarse en la lista de elementos en la cola (display_value).

También puedes crear el Datapool pasando un Schema vacío, en caso de que no sea necesario utilizar una estructura base para los elementos.

# Creating the fields that will compose the Schema: 'id', 'name', and 'price'
# Schema fields must be created using the 'SchemaField' class

product_id = SchemaField(
    label="id",           # Unique identifier for the field
    type=FieldType.TEXT,  # Field type
    unique_id=True,       # The 'id' field will be a unique value field
    display_value=True    # This field will also be displayed in the item list
)

product_name = SchemaField(
    label="name",
    type=FieldType.TEXT,
    unique_id=False,
    display_value=True
)

product_price = SchemaField(
    label="price",
    type=FieldType.DOUBLE,
    unique_id=False,
    display_value=False
)

# Fields that will be used in the Datapool Schema
schema = [product_id, product_name, product_price]

# Creating the Datapool object
datapool = DataPool(
    label="ProductsData",   # Unique identifier for the Datapool
    name="ProductsData",    # Display name
    max_auto_retry=2,       # Maximum number of retries in case of error
    max_errors_before_inactive=5,   # Maximum number of consecutive errors allowed
    item_max_processing_time=3,     # Maximum processing time for each item (in minutes)
    schema=schema   # Fields that will compose the structure of each item ('schema' created above)
)

# Creating the Datapool structure in the Orchestrator
maestro.create_datapool(datapool)
// Not yet implemented

Tip

Aunque el Maestro SDK permite la creación de un Datapool mediante código, recomendamos que utilice la interfaz web del Orquestador para configurar de forma más intuitiva. Más detalles aquí.

Una vez que la estructura del Datapool está creada en la plataforma, puedes usar el Maestro SDK para la creación y el consumo de nuevos elementos.

Agregar nuevos elementos

Podemos agregar nuevos elementos al Datapool utilizando el SDK de Maestro.

Tip

Este método puede ser útil si deseas utilizar un script de Python para poblar el Datapool con los elementos que se procesarán.

# Instantiating a new Datapool item based on the Schema that has been defined
new_item = DataPoolEntry(
    values={
        "id": "Electronic#001"
        "name": "Smartphone",
        "price": "2000"
    }
)

# Getting the Datapool reference
datapool = maestro.get_datapool("ProductsData")

# Adding a new item
datapool.create_entry(new_item)
// Instantiating a new Datapool item based on the Schema that has been defined
var values = new Dictionary<string, object>
{
    { "id", "Electronic#001" },
    { "name", "Smartphone" },
    { "price", 2000 }
};
DatapoolEntry new_item = new DatapoolEntry(0,values);

// Getting the Datapool reference
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Adding a new item
await datapool.CreateEntryAsync(new_item);

Cancelación de elementos

Si por alguna razón es necesario ignorar un elemento específico que ha sido creado, podemos usar la función de cancelar.

De este modo, un elemento cancelado no se tomará para procesamiento, pero seguirá apareciendo en el historial del Datapool.

Info

Solamente los elementos con estado PENDING pueden ser cancelados.

# Cancelling a pending item in the queue
datapool.cancel_entry(
    entry_id="<ENTRY_ID>",
    finish_message="Item with missing data"
)
// Not yet implemented

Eliminación de elementos

Si deseas eliminar un elemento del Datapool, puedes usar la función de eliminar.

En este caso, los elementos pendientes se eliminarán de la cola y los elementos ya procesados se eliminarán del historial.

Info

Los elementos que están en estado PROCESSING o TIMEOUT no se pueden eliminar.

# Removing item from the queue
datapool.delete_entry(entry_id="<ENTRY_ID>")
// Not yet implemented