Saltar a contenido

Datapool

La funcionalidad del Datapool puede utilizarse para gestionar una cola de ítems en gran volumen que necesitan ser procesados y monitoreados de forma granular.

En esta sección encontrarás más detalles sobre cómo interactuar con este recurso del Orquestador BotCity a través del SDK.

Orquestador BotCity

Puedes utilizar la funcionalidad del Datapool directamente en la plataforma del Orquestador BotCity.

Consulta más en:

Crear un Datapool

Puedes crear la estructura de un nuevo Datapool y realizar las configuraciones a través del SDK.

Necesitas la siguiente información:

  • Label: Identificador único del Datapool.
  • Nombre: Nombre descriptivo para el reconocimiento del Datapool.
  • Reprocesamiento: El número máximo de intentos de reprocesamiento en caso de error en el ítem.
  • Abortar en caso de error: El número máximo de errores consecutivos hasta inactivar la cola.
  • Tiempo de procesamiento: El tiempo en minutos esperado para el procesamiento del ítem.
  • Schema: Lista de campos que componen el Datapool, definidos con la clase SchemaField:
    • label: Identificador del campo.
    • type: Tipo de dato, definido por FieldType: TEXT, INTEGER o DOUBLE.
    • unique_id: Define como ID único: True o False.
    • display_value: Define como visible para los usuarios: True o False.

Crear a través de la interfaz del Orquestador

Aunque el Maestro SDK permite la creación de un Datapool por código, recomendamos que utilices la interfaz web del Orquestador para realizar las configuraciones de forma más intuitiva y detallada sobre cada parámetro necesario.

Consulta más en:

# Ejemplo de Schema con campos: 'id', 'name' y 'price'

product_id = SchemaField(
    label="id",
    type=FieldType.TEXT,
    unique_id=True,
    display_value=True
)

product_name = SchemaField(
    label="name",
    type=FieldType.TEXT,
    unique_id=False,
    display_value=True
)

product_price = SchemaField(
    label="price",
    type=FieldType.DOUBLE,
    unique_id=False,
    display_value=False
)

# Lista con los campos
schema = [product_id, product_name, product_price]

# Creando el objeto del Datapool
datapool = DataPool(
    label="ProductsData",
    name="ProductsData",
    max_auto_retry=2,
    max_errors_before_inactive=5,
    item_max_processing_time=3,
    schema=schema
)

# Creando la estructura del Datapool en el Orquestador
maestro.create_datapool(datapool)
// Aún no implementado

Operaciones con el Datapool

Puedes realizar algunas operaciones con el Datapool creado, entre ellas:

  • Obtener: Devuelve la referencia del Datapool.
    • Label: Parámetro de identificación del Datapool.
  • Está activo: Verifica si el Datapool obtenido está activo: True o False.
  • Está vacío: Verifica si no hay ítems pendientes en el Datapool obtenido: True o False.
  • Tiene siguiente: Verifica si hay un próximo ítem pendiente en la cola del Datapool obtenido: True o False.
  • Activar: Activa el Datapool obtenido.
  • Desactivar: Desactiva el Datapool obtenido.

Ejemplos de uso:

# Obteniendo la referencia del Datapool
datapool = maestro.get_datapool("ProductsData")

# Verificando si el Datapool está activo
print(datapool.is_active())

# Verificando si el Datapool está vacío de ítems pendientes
print(datapool.is_empty())

# Verificando si hay un próximo ítem pendiente
print(datapool.has_next())

# Marcando el Datapool como activo
datapool.activate()

# Marcando el Datapool como inactivo
datapool.deactivate()
// Obteniendo la referencia del Datapool
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Verificando si el Datapool está activo
Console.WriteLine(await datapool.IsActiveAsync());

// Verificando si el Datapool está vacío
Console.WriteLine(await datapool.IsEmptyAsync());

// Marcando el Datapool como activo
await datapool.ActivateAsync();

// Marcando el Datapool como inactivo
await datapool.DeactivateAsync();

Operaciones con los ítems

Con el Datapool creado, puedes realizar operaciones con los ítems. A continuación se presentan las acciones posibles.

Agregar nuevos ítems

Puedes agregar ítems directamente a través de una automatización o script utilizando el SDK.

Necesitas la siguiente información:

  • Label: Referencia del Datapool creado.
  • Ítem: Valores de entrada de un nuevo ítem, definidos con la clase DataPoolEntry:
    • values: Estructura de clave y valor conforme a los campos creados para el Datapool de referencia.

Consejo

Puedes crear un script que ingrese valores en el Datapool y otro que consuma y procese esos datos.

Ejemplo de entrada de un ítem:

# Instanciando un nuevo ítem del Datapool basado en el Schema que fue definido
new_item = DataPoolEntry(
    values={
        "id": "Electronic#001"
        "name": "Smartphone",
        "price": "2000"
    }
)

# Obteniendo la referencia del Datapool
datapool = maestro.get_datapool("ProductsData")

# Agregando un nuevo ítem
datapool.create_entry(new_item)
// Instanciando un nuevo ítem del Datapool basado en el Schema que fue definido
var values = new Dictionary<string, object>
{
    { "id", "Electronic#001" },
    { "name", "Smartphone" },
    { "price", 2000 }
};
DatapoolEntry new_item = new DatapoolEntry(0, values);

// Obteniendo la referencia del Datapool
Datapool datapool = await maestro.GetDatapoolAsync("ProductsData");

// Agregando un nuevo ítem
await datapool.CreateEntryAsync(new_item);

Cancelar ítems

Puedes cancelar ítems que están pendientes de procesamiento en la cola. Esto significa que el ítem no será procesado, pero se mantiene en el historial del Datapool.

Necesitas la siguiente información:

  • Label: Referencia del Datapool.
  • ID del Ítem: Identificador único del ítem.
  • Mensaje de finalización: (opcional) Mensaje de cancelación.

Estado del ítem

Solo los ítems que tienen el estado PENDIENTE pueden ser cancelados.

Ejemplo de cancelación de un ítem:

# Obteniendo la referencia del Datapool
datapool = maestro.get_datapool("ProductsData")

# Cancelando ítem que está pendiente en la cola
datapool.cancel_entry(
    entry_id="<ENTRY_ID>",
    finish_message="Ítem con datos faltantes"
)
// Aún no implementado

Eliminar ítems

Puedes eliminar ítems que están en la cola del Datapool.

Necesitas la siguiente información:

  • Label: Referencia del Datapool.
  • ID del Ítem: Identificador único del ítem.

Estado del ítem

Los ítems que están en estado PROCESANDO o TIEMPO ESGOTADO no pueden ser eliminados.

# Obteniendo la referencia del Datapool
datapool = maestro.get_datapool("ProductsData")

# Eliminando ítem de la cola
datapool.delete_entry(entry_id="<ENTRY_ID>")
// Aún no implementado

Obtener ítems

Puedes obtener la referencia de los ítems para su procesamiento o para verificar sus valores.

Necesitas la siguiente información:

  • Label: Referencia del Datapool.
  • ID del Ítem: Identificador único del ítem.

¡Atención!

El método next() cambia el estado de un ítem de PENDIENTE a PROCESANDO.

# Obteniendo la referencia del Datapool
datapool = maestro.get_datapool("ProductsData")

# Obtiene el próximo ítem del Datapool
item_1 = datapool.next(task_id=<TASK_ID>)

# A través del método get_entry() también podemos obtener un ítem por su ID
item_2 = datapool.get_entry(entry_id="<ENTRY_ID>")
// Obtiene el próximo ítem del Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Obteniendo el valor de algún campo específico del ítem
string item_data = await item.GetValueAsync("data-label");

Acceder a los valores de los ítems

Tras obtener la referencia del ítem, puedes acceder a sus valores basándote en el Schema que fue creado.

Necesitas la siguiente información:

  • Ítem: Referencia del ítem obtenido.
  • Label: Identificador del campo definido en el Schema.
# Obteniendo el valor de algún campo específico del ítem
item_data = item["data-label"]

# Utilizar el método get_value() tendrá el mismo efecto
item_data = item.get_value("data-label")
// Obtiene el próximo ítem del Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Obteniendo el valor de algún campo específico del ítem
string item_data = await item.GetValueAsync("data-label");

Reportar el estado de finalización

Reportar el estado de finalización de un ítem es esencial para que sean contabilizados de forma correcta.

Cada ítem procesado puede finalizarse con un estado de COMPLETADO o ERROR.

Necesitas la siguiente información:

  • Ítem: Referencia del ítem obtenido.
  • Éxito: Reporta el estado de completado.
  • Error: Reporta el estado de error, definiendo el tipo:
    • ErrorType.SYSTEM: indica un error de sistema.
    • ErrorType.BUSINESS: indica un error de negocio.
  • Mensaje: (opcional) valor personalizado para la finalización.

Tipo de error

Por defecto, todo error se considerará de tipo SYSTEM cuando no sea especificado en el reporte.

Los ítems reportados con errores de tipo BUSINESS no serán considerados para los escenarios de auto-retry y abortar en caso de errores. Para estos escenarios, solo se considerarán los ítems con error de tipo SYSTEM.

# Obtiene el próximo ítem disponible del Datapool
item = datapool.next(task_id=<TASK_ID>)

# Finalizando como 'COMPLETADO' tras el procesamiento
item.report_done(finish_message="¡Procesado con éxito!")

# Finalizando el procesamiento del ítem indicando un error de sistema
item.report_error(error_type=ErrorType.SYSTEM, finish_message="Sistema no disponible.")

# Finalizando el procesamiento del ítem indicando un error de negocio
item.report_error(error_type=ErrorType.BUSINESS, finish_message="Datos inválidos.")
// Obtiene el próximo ítem del Datapool
DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);

// Finalizando como 'COMPLETADO' tras el procesamiento
await item.ReportDoneAsync();

// Finalizando el procesamiento del ítem como 'ERROR'
await item.ReportErrorAsync();

Sugerencia de estructura de consumo del Datapool

El Datapool puede consumirse de diversas formas, utilizando los métodos de operaciones de ítems.

Aquí un ejemplo de implementación para el consumo de una secuencia de ítems en una única tarea:

# Consumiendo el próximo ítem disponible y reportando el estado de finalización al final
datapool = maestro.get_datapool(label="Items-To-Process")

while datapool.has_next():
    # Obtiene el próximo ítem del Datapool
    item = datapool.next(task_id=<TASK_ID>)
    if item is None:
        # El ítem podría ser 'None' si otro proceso lo consumió antes
        break

    try:
        # Procesando el ítem...

        # Finalizando como 'COMPLETADO' tras el procesamiento
        item.report_done(finish_message="¡Procesado con éxito!")

    except Exception:
        # Finalizando el procesamiento del ítem como 'ERROR'
        item.report_error(finish_message="Fallo en el procesamiento.")
// Consumiendo el próximo ítem disponible y reportando el estado de finalización al final
Datapool datapool = await maestro.GetDatapoolAsync("Items-To-Process");

while (await dataPool.HasNextAsync()) {
    // Obtiene el próximo ítem del Datapool
    DatapoolEntry item = await datapool.NextAsync(<TASK_ID>);
    if (item == null) {
        // El ítem podría ser 'null' si otro proceso lo consumió antes
        break;
    }

    try {
        // Procesando el ítem...

        // Finalizando como 'COMPLETADO' tras el procesamiento
        await item.ReportDoneAsync();

    } catch (Exception ex) {
        // Finalizando el procesamiento del ítem como 'ERROR'
        await item.ReportErrorAsync();
    }
}

Aviso

Recuerda siempre incluir en el código el reporte referente al estado de finalización de cada ítem que fue procesado.

Esto es extremadamente importante para que los estados de los ítems sean actualizados dentro del Datapool en el Orquestador BotCity.