Skip to main content

UiPath Automation Framework

Overview

The Communications Mining Dispatcher Framework is a UiPath Studio template that can be used when integrating UiPath Communications Mining with RPA implementations. Its objective is to take the comments from a Communications Mining Stream and add them one by one, into one or many UiPath Orchestrator Unique References Queues. You should define one queue for each downstream process needing the data from a stream as input. By default, in the configuration file, we've set up two process queues (for Process A and B), and one generic one, but you can configure as many as you need.

You can download the framework from here.

Unique References Queues

When creating your queues, select the “Enforce unique references” checkbox.

The template is based on the Robotic Enterprise Framework (REFramework), so it covers all the essential Best Practices in RPA Projects (flexible configuration through the Config.xlsx file, exception handling, retry mechanisms, meaningful logging).

Robotic Enterprise Framework

Please consult the official REFramework documentation if you are not familiar with it, prior to working with the Communications Mining Dispatcher.

Starting from a REFramework project, we have encapsulated the two key operations that need to be performed when consuming data from a Communications Mining stream: fetching and advancing. Fetching is the process of getting data out of a Communications Mining stream and advancing is essentially marking the comments as read so that we do not return the same ones next time we fetch. We'll discuss them in more detail in the next section.

By default, the framework will stop the fetching and advancing cycle when it passes the end of the stream (when the stream has no more “unread” comments). However, if you would prefer it, you can configure it to run continuously, even when it reaches the end of the stream: simply mark the ExitOnEmptyStream setting as FALSE in the Config.xlsx file. In this case it will cycle infinitely, and whenever new data becomes available in the stream, it will process it instantly, with no need to wait for the when the framework's process is scheduled to run.

The end goal is to have the comments available in a usable Orchestrator Queue, one queue item per comment, containing their corresponding data and predicted Labels and Entities. This way, the downstream automations will have access to the predicted information.

The communication is not added to its corresponding queue without passing the Validation Rules. There are some basic Validation rules already defined in the framework (mainly for understanding to which process every item pertains) but you can add your own validation algorithms in the code. Also, simply as an example, in the Config.xlsx file, we have separate validation settings sheets for each downstream automation process (ProcessAValidations, ProcessBValidations). Since they were configured just as examples for theoretical processes, feel free to add your own sheets and settings.

caution

Make sure you don't have multiple settings with the same name in the Config file, even if they're on different sheets; they will all be added to the same Config dictionary and override each other.

In the file, we offered some examples of naming conventions for the validation settings that might be useful. The logic in the workflow that checks the validation rules follows the same conventions, so be careful when you're implementing your own, in case you want to change them!

If the validation fails, the information is added to a Human in the Loop (HitL) queue (which also requires unique references), to be validated by a human in Action Center. You can add the name of your HitL queue in the Config file.

info

We recommend that you define a trigger on the Human in the Loop queue which starts a new Orchestration Process for each new item; creating a task in Action Center. The task would contain the data retrieved from Communication Mining for the current item, and the human should validate it before sending it into its corresponding downstream automation process queue.

Streams

After successfully training a model in Communications Mining, we can create a new stream and configure the thresholds for each of the concepts that we have trained. A stream defines a collection of comments in a dataset. It enables persistent, stateful iteration through the comments, with predicted labels and entities computed using a given model version.

info

We recommend you follow the official Communications Mining Documentation and Academy for the model training steps and details on all the concepts involved.

The integration of UiPath Studio with Communications Mining consists of the consumption of each of the comments from the Communications Mining stream. Each of the communication's predictions can then be used in one or multiple downstream processes. In the diagram bellow, the generic UiPath and Communications Mining integration is described:

Our approach recommends that for n automations, n + 1 processes are configured in UiPath: n RPA Processes and one Queue Feeder. A single feeder process is introduced which is responsible for reading the structured communications out of a Communications Mining's stream and distributing them to the relevant RPA processes via Orchestrator Queues. Any exceptions that may occur due to Communications Mining's extraction can then be marked for manual Human Validation. The processes that get items out of the queues will be standard UiPath automations which read their input data from the queue item's data. The provided dispatcher framework fufills the role of the queue feeder.

Fetch-Advance Loop

To consume comments out of a stream, our framework needs to implement the Fetch and Advance Loop as described below:

  1. Every stream has a current comment:

  2. We can fetch comments starting at this current comment. Below we're fetching 2:

  3. Every comment returned from a stream will have a sequence_id:

    {
    "comment":{
    "messages":[ ... ],
    "user_properties":{ ... },
    "id":"Comment 1"
    },
    "entities":[ ... ],
    "labels":[ ... ],
    "sequence_id":"ABC123"
    }

  4. We can use this sequence_id to advance the current comment to the next in the queue. Now, when we fetch 2, we will return comments 2 and 3:

The Communications Mining Framework implements this fetch and advance loop for you.

Dispatcher Framework Overview

By simply modifying the necessary settings you can configure the dispatcher framework to consume comments out of your own stream, saving implementation time and ensuring best practices are followed.

Just like the REFramework, it can be used exactly as it is, as a Plug And Play solution, since without any modification to the code (just by adding your settings in the Configuration Excel file), it gets each of the comments from the defined stream (as objects of type CommunicationsMining.Result, defined in the CommunicationsMining package (see Dependencies section), and adds their data into a corresponding Queue. Alternatively, it can be completely customized, and you can add your own logic as required (the rules for validating the Communications Mining predictions, for example).

The Communications Mining Dispatcher Framework uses the Fetch and Advanced Loop approach described above, and we can advance one comment at a time, or one batch of comments at a time (we can set up the size of the batch in the Config file). Remember that the Dispatcher is used as feeder to one or multiple downstream processes, so in the Config file we also define the corresponding queue for each of these processes and the rules to add the item to the queue.

The overall steps are as follows:

  • We fetch an initial batch of comments. This will return a batch of comments from the Communications Mining stream, the sequence_id of the last item in the batch and the number of items filtered out of the current batch.

  • If no exception occurred (we successfully connected to the stream) and we're not at the end of the stream, we check if there are items left to process in the current batch (we might even fetch batches with no comments, if there are filters applied in Communications Mining for the current stream, and none of the comments in the current batch applies).

  • If there are items to process, we process them one by one: depending on the Consumer RPA Process to which the current item belongs, we check the validation rules of the item's data.

  • If the item passes the checks, we add a new queue item to the relevant queue (as set in the config excel file). The uid of the comment will be set as the reference of the queue item. If it doesn't pass the validation rules we add a queue item to the HitL queue. Every queue item which is created will contain all of the comment's entity and label predictions for use in downstream processes.

  • Once each item in the current batch has been processed, we first advance the stream (using the sequence_id of the last item of the batch) and then we fetch a new batch from the stream.

  • If we're at the end of the stream (as we retrieved no comments in the batch and there are no filtered out comments) we know we advanced past the end of the stream and the processing ends.

Configuration

All the settings that we need for configuring the Dispatcher are found in the Data/Config.xlsx file.

info

Make sure you add all corresponding assets to Orchestrator.

The settings & assets sheets

NameDescription
OrchestratorProcessAQueueNameThis is the Queue where our Dispatcher will push VALID Comments to be processed by the RPA Consumer Process A.
OrchestratorProcessBQueueNameThis is the Queue where our Dispatcher will push VALID Comments to be processed by the RPA Consumer Process B.
OrchestratorHITLQueueNameAThis is the Queue where our Dispatcher will push Comments that did not pass the Validation Rules defined for their corresponding Process. The HITLQueue will be processed by the Human In The Loop, Orchestration Process that creates Validation Actions for each of the queue items added.
OrchestratorGeneralQueueNameThis is the Queue where our Dispatcher will push Comments that were not categorised for a specific RPA Consumer Process.
Communications MiningApiTokenCredentialThe Communications Mining API Token needed for fetching from the stream and advancing within the stream, stored in a Credential Asset.
ExitOnEmptyStreamIf this setting is False, the framework will run continuously, even when we reach the end of the stream.

Each of the Process Validation Sheets

NameDescription
{ProcessName}_LabelThe naming convention of the setting is the label that marks a comment as being designated to be handled by current process+ "_"+"Label" keyword. The value of it is the name of the downstream process. Example: Name Policy_Label, Value ProcessA.

Since the Dispatcher can populate input Queues for one or more Downstream Processes, we propose that you create a new Sheet in the Config file for each of the Processes, in which you will define the validation rules for that Process. The naming convention of the Sheet should be: “{ProcessName}Validations”. By default, the Config file contains 2 Validations Sheets for Process A and Process B.

Exception Handling

The Framework collects every Exception that occurs during the processing of each of the Transaction Items (Communications Mining Comments) into two DataTables: one for System Exceptions and another for Business Rule Exceptions.

In the End Process State you can use the tables for Handling the Exceptions as per your Business Logic (Create Excel Files with them, send them attached to a reporting email, etc).

Architecture

Dependencies

We have created Custom Activities that handle the main operations that could be performed from UiPath to integrate with Communications Mining: Fetch Stream, Advance Stream. Also, the Framework's Transactions are of type CommunicationsMining.Result, a data type defined in the package that will hold all the pieces of information defined for each comment and its corresponding predicted Labels and Entities.

You need to have the CommunicationsMining package in one of your feeds, in order for the Dispatcher Framework to load correctly, downloaded from the Marketplace: here.

States

Since the Framework is basically a REFramework, it's a State Machine with the same States. Please consult the REFramework documentation for more details on each State.

The only modification is the addition of the Advance Stream Transition between the Get Transaction Data State and the Process Transaction State. In case there are no items to process in the current batch fetched, the execution returns to the GetTransactionData State for further advancement in the stream.

Shared Variables

The variables bellow are declared in the Main.xaml file and shared as arguments to the workflows invoked in the Framework or they simply decide the execution flow through the framework:

Variable NameDescription
ShouldStop(Boolean)True when the job is forcefully stopped (from Orchestrator).
TransactionItem(CommunicationsMining.Result)The Transaction Item is represented by one comment from the Communication Mining stream. We're processing one item at a time and adding its data to the corresponding queue.
SystemException(Exception)Used during transitions between states to represent exceptions other than business exceptions.
BusinessException(BusinessRuleException)Used during transitions between states and represents a situation that does not conform to the rules of the process being automated.
TransactionNumber(Int32)Sequential counter of transaction items.
Config(Dictionary(String,Object))Dictionary structure to store configuration data of the process (settings, constants, assets and Validation properties).
RetryNumber(Int32)Used to control the number of attempts of retrying the transaction processing in case of system exceptions.
TransactionData(IList(CommunicationsMining.Result))The batch of comments currently retrieved from the stream, by the latest Fetch.
ConsecutiveSystemExceptions(Int32)Used to control the number of consecutive system exceptions.
BusinessExceptionsDT(DataTable)Table with details on the BusinessRulesExceptions occurred during the processing of the Transactions. One row contains info about one faulty transaction.
ApplicationExceptionsDT(DataTable)Table with details on the System Exceptions occurred during the processing of the Transactions. One row contains info about one faulty transaction
GlobalRetryInterval(TimeSpan)The global retry interval set by default for every Retry Scope in the Framework.
GlobalMaxAttempts(Int32)The global number of Max Attempts set by default for every Retry Scope in the Framework.
CurrentSequenceId(String)The Sequence ID retrieved by the latest Fetch of a Stream Batch. It's the Sequence Id of the last item in the Current Stream Batch.
CurrentBatchFilteredResults(Int32)The number of items that don't fit the filter that was defined for the stream in Communication Mining and were filtered out by the latest Fetch (filtered out of the Current Fetched Batch).
CommunicationsMiningApiToken(SecureString)The API token defined in Communication Mining. Its value should be stored in a Credential Asset in Orchestrator.
CurrentBatchNumber(Int32)It's a good practice to split your stream into multiple batches (to help with the performance time of retrieving the data). This will tell us the current batch that's being processed.
ShouldAdvanceTheStream(Boolean)In case there are no items to process in the current batch fetched, the execution returns to the GetTransactionData state for further advancement in the stream.

Communications Mining specific Workflows

Workflow NameDescription
GetNextStreamBatchWe're trying to get the next Stream Batch of Communications Mining comments. The Fetch Stream activity will connect to Communications Mining and will populate the Fetch output object with:
- the collection of results (of the size we requested)
- the Sequence ID of the current batch (the Sequence ID of the last comment in the retrieved batch)
- the number of filtered out comments (in case we applied filters on our Communications Mining stream, the comments that don't match the filter will be skipped)

The Fetch activity performs an HTTPs request to Communications Mining.
AdvanceStreamBatchWe're trying to advance the stream of CommunicationsMining comments. The Advance Stream activity will connect to CommunicationsMining and using as an input the Sequence ID of one of the comments in the stream, it will mark the comments (before and including the one with the given Sequence ID) as read in the stream so that we do not return the same ones next time we fetch from a stream. If you fetch multiple times in a row without advancing a stream, you will get the same emails every time.

The Advance activity performs an HTTPs request to CommunicationsMining.
GetTransactionDataGet a transaction item from an array comments. Since there are multiple transactions, we use the argument in_TransactionNumber as an index to retrieve the correct transaction to be processed. If there are no more transactions left in the current batch, we need to advance the stream and fetch the next batch. If we fetch multiple times in a row without advancing a stream, you will get the same results every time. If there are items in the batch and there are still some left to process we take the instance of the next Transaction Item in the batch. Otherwise, we flag the fact that there are no more items left to process in the current batch and that we need to advance the stream. We don't set io_TransactionItem to nothing here as that would stop the processing of the whole framework, and maybe there are still items in next batches. The STOP condition is set in the Get Transaction Data STATE
CheckValidationRulesThis is a basic validation algorithm example that decides whether the predictions are valid solely on the number of labels predicted for the current item. If we have one label, we have a successful validation and we just need to get the name of the downstream process from the config file. If we have multiple labels, the automatic validation is set as unsuccessful.

Add your own logic for deciding the name of the Consumer process and whether the predictions on the items are Valid OR need human validation. If we have only one label predicted for the current item, we have to get the name of its corresponding process. We take the name of the Downstream (Consumer) Process from the Config file, based on that ONE label predicted for the current item. In the Config file, the naming convention of the process name setting is : The label of the comment + "_"+"Label" keyword. If the current item was predicted to have multiple labels, we need the human to decide how to proceed in the downstream automation. So the automatic validation success should be marked as false so that the current item will be added to the Human in the Loop queue for later manual Validation.
CreateDictionaryFromCommunicationsMiningItemWe'll need to add the information taken from CommunicationsMining for the current Item to a queue. So we're creating a dictionary based on it. We'll use the dictionary for adding the defining properties of the new queue item.
AddTransactionItemToQueueAdding a new item to the queue. All of its properties should be already set up in the in_QueueItemProperties dictionary. Make sure that your queue has the Enforce unique references checkbox selected.
ProcessThe purpose of the Dispatcher is to populate corresponding queues with the the information obtained in Communications Mining for each of the items so that they can be processed by Consumer Processes in the Downstream Automation. In this workflow, we have to add the current item to its corresponding queue.

Steps:
1. We're creating a dictionary based on the TransactionItem. We'll use the dictionary for adding the defining properties of the new queue item.
2. Based on the information obtained in Communications Mining for the current item, we're deciding its corresponding Consumer Process and checking the validation rules against the predicted data.
3. If the validation is successful, we're adding the item to the queue of the Consumer Process. If not, we're adding it into the Human In The Loop queue, to be validated and potentially processed by a human.

For the current Transaction:
- If a BusinessRuleException is thrown, the transaction is skipped.
- If another kind of exception occurs, the current transaction can be retried.
ExceptionsHandlerThis workflow should be used as a final Exception Handler in the Framework. If the input DataTables are populated, they contain details on all the Application and/or Busines Rule Exceptions that occured during the current run of the process.

Using the Framework

Before using the Framework:

  • Make sure you configure all the required assets in Orchestrator (see the Settings and assets section) and make the necessary modifications in the Data/Config.xlsx file.
  • Make sure that the queues in which you will add the items exist in Orchestrator and they have the “Enforce unique references” checkbox selected to avoid adding duplicates to the queue and processing the same item multiple times in the downstream automations.
  • Add your own validation rules in Communications Mining/CheckValidationRules.xaml . At the moment we only check if the current item has multiple labels predicted. If yes, the validation fails. If not, we take the Process Name corresponding to the current item, based on its label.