Async Method Calls

posted Feb 4, 2013, 5:55 AM by Eric Patrick   [ updated Nov 3, 2014, 9:56 AM ]

Background

There are many uses cases where QBO3 needs to invoke an operation that may take minutes, hours or days to complete. Examples include:
  • ImportFile.Import: batch imports may process rows of data one-by-one, completing the import process many minutes after Import is called
  • Contact.SCRA: a military search may take several minutes before QDS posts search results back to the requesting system
  • Valuation.LPSAppraisal: an appraisal order via a third party will typically take days before being completed
In such situations, it would be useful for a workflow step (or any other code) to register a callback to know when the long-running operation is complete.

Wiring

The AbstractObject class includes three methods relevent to async callbacks:
  • BeginInvoke(operation, parameters, callback)
  • EndInvoke(operation, parameters)
  • EndInvoke(operation, parameters, callback)
BeginInvoke works just like Invoke (in fact, it calls Invoke), but it will 'push' a callback to a queue. More precisely, it will insert a row matching the callback into the ObjectQueue table, without any QueuedDate. At the end of BeginInvoke, there is a database record waiting to be processed explicitly by some method.

EndInvoke is used to 'pop' a callback from a queue. More precisely, it will update the corresponding ObjectQueue record's QueuedDate to DateTime.Now, allowing the Queue Processor to process the row.

A key considering is how AbstractObject identifies the 'corresponding' ObjectQueue record. When pushing a callback, three key fields are determined:
  • ObjectQueue: {ClassName}/{Operation}?ID={ID}
  • Object: {ClassName}
  • ObjectID: {ID}
The EndInvoke method will search the ObjectQueue table for matching records, and process any found. 

Examples:
  • ImportFile/Import?ID=106
  • Contact/SCRA?ID=7842
  • Valuation/LPSAppraisal?ID=5524
Normally, your async code should simply call EndInvoke(operation, parameters). Advanced use cases may have classes proxy completion of async calls (e.g. QDS hooks), which require using the EndInvoke(operation, parameters, callback) override.

Invoking an Async Method

To invoke an async method, you should call BeginInvoke(operation, parameter, callback), where the callback is a MethodSignature. For example, the DecisionStep class will invoke any async operations with a callback to DecisionStep.Complete, as follows:

if (item.Configuration.IsAsync(DecisionStepTemplate.Operation))
{
item.BeginInvoke(DecisionStepTemplate.Operation, parameters, new MethodSignature(string.Format("DecisionStep/Complete?ID={0}", ID)));
...
Status = "Pending";
}
else
{
item.Invoke(DecisionStepTemplate.Operation, parameters);
...
Complete();
}

Designing an Async Method

Any QBO3 generic method you implement that may be a long-running transaction can be wired to handle async callbacks as follows:
  • Add Async=true to the method's DbMethodAttribute
  • Create a corresponding 'end' method
  • Invoke the corresponding 'end' method by calling EndInvoke(endMethod, properties)

Example

The ImportFile class' Import method uses an ImportFileQueueEngine plugin to queue ImportFileQueue records for processing. When all ImportFileQueue records have been processed, the Import method can be considered to be complete.

[DbVoidMethod(Async=true)]
public void Import(IDictionary<string, object> parameters)
{
...
ImportStart = DateTime.Now;
...
bool complete = ImportEngine.Import(parameters);
if (complete)
EndInvoke("EndImport", string.Format("ID={0}", ID).ToProperties());
else
{
Status = "Pending";
Save();
}
}

// Called by ImportFileQueue whenever a row is done processing
public bool CompletionCheck()
{
using (IDataReader reader = InvokeDataReader("QueueCompletionCheck", new Dictionary<string, object>() { { "ID", ID } }))
{
while (reader.Read())
{
SetProperties(reader);
if (ImportComplete.HasValue)
{
EndInvoke("EndImport", string.Format("ID={0}", ID).ToProperties());
return true;
}
}
}
return false;
}

// Called by AbstractObject's EndInvoke
[DbVoidMethod]
public void EndImport(IDictionary<string, object> parameters)
{
foreach (Int64 id in IdList(parameters))
{
ImportFileObject import = new ImportFileObject(User);
import.Select(id);
import.EndImport();
}
}

// Called by the late-bound EndImport above
public AbstractObject EndImport()
{
Status = "Complete";
if (!ImportComplete.HasValue)
ImportComplete = DateTime.Now;
Save();
return this;
}

Notes:
  • The Import method's DbMethodAttribute (DbVoidMethod) is marked as Async
  • Instead of calling the EndImport method directly, EndInvoke("EndImport", parameters) is called instead
  • EndImport implements a DbMethodAttribute (DbVoidMethod), but does not need to be marked as async (when we call EndImport, we're done)
  • AbstractObject.BeginInvoke and EndInvoke are called

Example: Extending Decision to Support Async Processing

Decision processing is a perfect example of the need for long-running operations. To process a batch of Decisions correctly with ImportFileQueue, an async method must be called. Decision.Process is a poor candidate for this use case, because it is already designed for a multi-threaded operation against multiple Decision IDs.  Thus, we introduce a new ProcessAsync method, with a matching EndProcessAsync.

Following the execution trail, we have:
  • ImportFileQueue.Process
  • calls Decision.BeginInvoke("ProcessAsync", {ID=X}, "ImportFileQueue/EndProcess?ID=Y")
  • Decision.BeginInvoke will
    • create an ObjectQueue record where:
      • ClassName = ImportFileQueue
      • Operation = EndProcess
      • Attributes = {ID=Y)
      • ObjectQueue = Decision/ProcessAsync?ID=X
      • Object=Decision
      • ObjectID=X
    • call Decision/ProcessAsync
Later, on another thread (perhaps months later!) Decision.Process will evaluate DecisionSteps, and eventually call:
  • Decision.Complete()
  • calls Decision.EndInvoke("EndProcessAsync", {ID=X})
    • invoke Decision/EndProcessAsync
    • pops any ObjectQueue records matching:
      • Object=Decision
      • ObjectID=X
      • ObjectQueue = Decision/ProcessAsync?ID=X
    • finds ImportFileQueue/EndProcess?ID=Y (based on matching Object/ObjectID/ObjectQueue to Decision/X/Decision.ProcessAsync?ID=X)
      • processes the ObjectQueue record (releases it into the regular queues)
  • ImportFileQueue/EndProcess?ID=Y is now eligible to be processed by regular queues





Comments