How to create multithreading in D365 batch jobs and process records in parallel
Approach for Multithreading is of 3 types:
I will be taking an example of a restaurant kitchen to explain the three types of multithreading in D365 batch jobs.
Individual Task Modeling: 🟢
Think of each dish ordered by customers as a work item. In this approach, you assign one chef to cook one dish at a time. If there are only a few dishes to make, this works well because each chef focuses on one dish without getting overwhelmed. However, if the restaurant is very busy, having one chef for each dish can lead to chaos in the kitchen due to too many chefs working at the same time.
Batch Bundling: 🟢🟢
In this scenario, you group several dishes into a batch and assign it to a chef to cook. For example, a chef might be responsible for making all the appetizers, another chef for all the main courses, and so on. This can make things more organized, but the problem arises when one batch takes much longer to prepare than others. In this case, some chefs might be waiting around with nothing to do while others are still busy with their batches.
Top Picking: 🟢🟢🟢
Here, chefs don't have pre-assigned dishes or batches. Instead, they take the next available dish to cook from the order list as soon as they are free. This keeps all chefs active and reduces waiting times because as soon as they finish cooking one dish, they immediately start with the next one that needs to be prepared. This method ensures a steady flow of dishes being cooked and served.
Now for we will do code for Top Picking, first of all we need to create 5 classes for doing Top Picking multithreading in D365 batch jobs, First 3 will be the classes for bundling the task to be done
Contract 1 for multithreading in D365 batch jobs:
[DataContract]
public class LSMultiThreadingCreateTasksContract
{
int numberOfThreads;
[DataMember,SysOperationLabel(literalStr("Number of threads"))]
public int parmNumberOfThreads(int _numberOfThreads = numberOfThreads)
{
numberOfThreads = _numberOfThreads;
return numberOfThreads;
}
}
End of Contract 1 for multithreading in D365 batch jobs:
Controller 1 for multithreading in D365 batch jobs:
public class LSMultiThreadingCreateTasksController extends SysOperationServiceController
{
protected void new()
{
super(classStr(LSMultiThreadingCreateTasksService),
methodStr(LSMultiThreadingCreateTasksService, process), SysOperationExecutionMode::Synchronous);
}
public ClassDescription defaultCaption()
{
return "Create multithreading tasks by DynamicsCommunity101";
}
public static LSMultiThreadingCreateTasksController construct(SysOperationExecutionMode _executionMode = SysOperationExecutionMode::Synchronous)
{
LSMultiThreadingCreateTasksController controller = new LSMultiThreadingCreateTasksController();
controller.parmExecutionMode(_executionMode);
return controller;
}
public static void main(Args _args)
{
LSMultiThreadingCreateTasksController LSMultiThreadingCreateTasksController = LSMultiThreadingCreateTasksController::construct();
LSMultiThreadingCreateTasksController.parmArgs(_args);
LSMultiThreadingCreateTasksController.startOperation();
}
}
End of Controller 1 for multithreading in D365 batch jobs:
Service 1 for multithreading in D365 batch jobs:
public class LSMultiThreadingCreateTasksService extends SysOperationServiceBase
{
public void process(LSMultiThreadingCreateTasksContract _contract)
{
BatchHeader batchHeader;
SysOperationServiceController controller;
LSStagingTable stagingTable;
int totalNumberOfTasksNeeded = _contract.parmNumberOfThreads();
int threadCount;
Args args = new Args();
select count(RecId) from stagingTable where
stagingTable.ProcessingStatus == LSProcessingStatus::ToBeProcessed;
if (stagingTable.RecId > 0)
{
update_recordset stagingTable
setting ProcessingStatus = LSProcessingStatus::InProcessing
where stagingTable.ProcessingStatus == LSProcessingStatus::ToBeProcessed;
batchHeader = BatchHeader::construct(); //if you want to create a new batch header
//batchHeader = this.getCurrentBatchHeader(); //if you want to use same batch job
//Creating tasks
for(threadCount = 1; threadCount <= totalNumberOfTasksNeeded; threadCount++)
{
controller = LSMultiThreadingGetWorkItemController::construct();
batchHeader.addTask(controller);
}
batchHeader.save();
}
}
}
End of Service 1 for multithreading in D365 batch jobs:
Controller 2 for multithreading in D365 batch jobs:
public class LSMultiThreadingGetWorkItemController extends SysOperationServiceController
{
protected void new()
{
super(classStr(LSMultiThreadingGetWorkItemService),
methodStr(LSMultiThreadingGetWorkItemService, process), SysOperationExecutionMode::Synchronous);
}
public ClassDescription defaultCaption()
{
return "Get work item";
}
public static LSMultiThreadingGetWorkItemController construct(SysOperationExecutionMode _executionMode = SysOperationExecutionMode::Synchronous)
{
LSMultiThreadingGetWorkItemController controller = new LSMultiThreadingGetWorkItemController();
controller.parmExecutionMode(_executionMode);
return controller;
}
public static void main(Args _args)
{
LSMultiThreadingGetWorkItemController LSmultiThreadingGetWorkItemController = LSMultiThreadingGetWorkItemController::construct();
LSMultiThreadingGetWorkItemController.parmArgs(_args);
LSMultiThreadingGetWorkItemController.startOperation();
}
}
End of Controller 2 for multithreading in D365 batch jobs:
Service 2 for multithreading in D365 batch jobs:
public class LSMultiThreadingGetWorkItemService extends SysOperationServiceBase
{
public void process()
{
LSStagingTable stagingTable;
stagingTable.readPast(true);
do
{
try
{
ttsBegin;
select pessimisticlock firstOnly stagingTable
where stagingTable.ProcessingStatus == LSProcessingStatus::InProcessing;
if (stagingTable)
{
//ProcessStagingTable::processStagingTableRecord(stagingTable);
sleep(5000);
stagingTable.ProcessingStatus = LSProcessingStatus::Processed;
stagingTable.update();
}
ttscommit;
}
catch
{
//revert data
ttsabort;
//log errors
}
}
while (stagingTable);
}
}
End of Service 2 for multithreading in D365 batch jobs:
In-depth references
YouTube reference