Post date: Feb 04, 2013 1:55:4 PM
Background
There are many uses cases where QBO3 needs to invoke an operation that may take minutes, hours or days to complete. Examples include:
ImportFile.Import: batch imports may process rows of data one-by-one, completing the import process many minutes after Import is called
Contact.SCRA: a military search may take several minutes before QDS posts search results back to the requesting system
Valuation.LPSAppraisal: an appraisal order via a third party will typically take days before being completed
In such situations, it would be useful for a workflow step (or any other code) to register a callback to know when the long-running operation is complete.
Wiring
The AbstractObject class includes three methods relevent to async callbacks:
BeginInvoke(operation, parameters, callback)
EndInvoke(operation, parameters)
EndInvoke(operation, parameters, callback)
BeginInvoke works just like Invoke (in fact, it calls Invoke), but it will 'push' a callback to a queue. More precisely, it will insert a row matching the callback into the ObjectQueue table, without any QueuedDate. At the end of BeginInvoke, there is a database record waiting to be processed explicitly by some method.
EndInvoke is used to 'pop' a callback from a queue. More precisely, it will update the corresponding ObjectQueue record's QueuedDate to DateTime.Now, allowing the Queue Processor to process the row.
A key considering is how AbstractObject identifies the 'corresponding' ObjectQueue record. When pushing a callback, three key fields are determined:
ObjectQueue: {ClassName}/{Operation}?ID={ID}
Object: {ClassName}
ObjectID: {ID}
The EndInvoke method will search the ObjectQueue table for matching records, and process any found.
Examples:
ImportFile/Import?ID=106
Contact/SCRA?ID=7842
Valuation/LPSAppraisal?ID=5524
Normally, your async code should simply call EndInvoke(operation, parameters). Advanced use cases may have classes proxy completion of async calls (e.g. QDS hooks), which require using the EndInvoke(operation, parameters, callback) override.
Invoking an Async Method
To invoke an async method, you should call BeginInvoke(operation, parameter, callback), where the callback is a MethodSignature. For example, the DecisionStep class will invoke any async operations with a callback to DecisionStep.Complete, as follows:
if (item.Configuration.IsAsync(DecisionStepTemplate.Operation))
{
item.BeginInvoke(DecisionStepTemplate.Operation, parameters, new MethodSignature(string.Format("DecisionStep/Complete?ID={0}", ID)));
...
Status = "Pending";
}
else
{
item.Invoke(DecisionStepTemplate.Operation, parameters);
...
Complete();
}
Designing an Async Method
Any QBO3 generic method you implement that may be a long-running transaction can be wired to handle async callbacks as follows:
Add Async=true to the method's DbMethodAttribute
Create a corresponding 'end' method
Invoke the corresponding 'end' method by calling EndInvoke(endMethod, properties)
Example
The ImportFile class' Import method uses an ImportFileQueueEngine plugin to queue ImportFileQueue records for processing. When all ImportFileQueue records have been processed, the Import method can be considered to be complete.
[DbVoidMethod(Async=true)]
public void Import(IDictionary<string, object> parameters)
{
...
ImportStart = DateTime.Now;
...
bool complete = ImportEngine.Import(parameters);
if (complete)
EndInvoke("EndImport", string.Format("ID={0}", ID).ToProperties());
else
{
Status = "Pending";
Save();
}
}
// Called by ImportFileQueue whenever a row is done processing
public bool CompletionCheck()
{
using (IDataReader reader = InvokeDataReader("QueueCompletionCheck", new Dictionary<string, object>() { { "ID", ID } }))
{
while (reader.Read())
{
SetProperties(reader);
if (ImportComplete.HasValue)
{
EndInvoke("EndImport", string.Format("ID={0}", ID).ToProperties());
return true;
}
}
}
return false;
}
// Called by AbstractObject's EndInvoke
[DbVoidMethod]
public void EndImport(IDictionary<string, object> parameters)
{
foreach (Int64 id in IdList(parameters))
{
ImportFileObject import = new ImportFileObject(User);
import.Select(id);
import.EndImport();
}
}
// Called by the late-bound EndImport above
public AbstractObject EndImport()
{
Status = "Complete";
if (!ImportComplete.HasValue)
ImportComplete = DateTime.Now;
Save();
return this;
}
Notes:
The Import method's DbMethodAttribute (DbVoidMethod) is marked as Async
Instead of calling the EndImport method directly, EndInvoke("EndImport", parameters) is called instead
EndImport implements a DbMethodAttribute (DbVoidMethod), but does not need to be marked as async (when we call EndImport, we're done)
AbstractObject.BeginInvoke and EndInvoke are called
Example: Extending Decision to Support Async Processing
Decision processing is a perfect example of the need for long-running operations. To process a batch of Decisions correctly with ImportFileQueue, an async method must be called. Decision.Process is a poor candidate for this use case, because it is already designed for a multi-threaded operation against multiple Decision IDs. Thus, we introduce a new ProcessAsync method, with a matching EndProcessAsync.
Following the execution trail, we have:
ImportFileQueue.Process
calls Decision.BeginInvoke("ProcessAsync", {ID=X}, "ImportFileQueue/EndProcess?ID=Y")
Decision.BeginInvoke will
create an ObjectQueue record where:
ClassName = ImportFileQueue
Operation = EndProcess
Attributes = {ID=Y)
ObjectQueue = Decision/ProcessAsync?ID=X
Object=Decision
ObjectID=X
call Decision/ProcessAsync
Later, on another thread (perhaps months later!) Decision.Process will evaluate DecisionSteps, and eventually call:
Decision.Complete()
calls Decision.EndInvoke("EndProcessAsync", {ID=X})
invoke Decision/EndProcessAsync
pops any ObjectQueue records matching:
Object=Decision
ObjectID=X
ObjectQueue = Decision/ProcessAsync?ID=X
finds ImportFileQueue/EndProcess?ID=Y (based on matching Object/ObjectID/ObjectQueue to Decision/X/Decision.ProcessAsync?ID=X)
processes the ObjectQueue record (releases it into the regular queues)
ImportFileQueue/EndProcess?ID=Y is now eligible to be processed by regular queues