Main Content

Scale Up with Parallel Jobs and Tasks

Since R2024a

This example shows how to use parallel jobs and tasks to scale up your computations to hundreds of workers on a large clusters.

You can scale up an existing parfor workflow beyond parallel pool limits by converting the parfor-loop into multiple tasks for an independent job. This example converts the parfor workflow in the Run parfor-Loops Without a Parallel Pool example to a job and tasks workflow.

This example recreates an update of the ARGESIM benchmark CP2 Monte Carlo study [1] by Jammer et al [2]. In the Monte Carlo study, you simulate a spring-mass system with different, randomly sampled damping factors using jobs and tasks.

Create Cluster Object and Job

Create the cluster object and display the number of workers available in the cluster. HPCProfile is a profile for a MATLAB® Job Scheduler cluster. Replace the HPCProfile profile with your own cluster profile.

cluster = parcluster("HPCProfile");
maxNumWorkers = cluster.NumWorkers;
fprintf("Number of workers available: %d",maxNumWorkers)
Number of workers available: 496

Create an independent job using the cluster object.

job = createJob(cluster);

Define Simulation Parameters

period = [0 2];
h = 0.001; % time step
t_interval = period(1):h:period(2);

Set the number of iterations.

nReps = 10000000;

Initialize the random number generator and create an array of damping coefficients sampled from a uniform distribution between 800 and 1200.

rng(0);
a = 800;
b = 1200;
d = (b-a).*rand(nReps,1) + a;

Modify to Job and Task Workflow

To change the parfor workflow into a jobs and tasks workflow, convert the main body of the parfor-loop into a function that takes in a vector of damping coefficients and returns the sum of the mass-spring motion.

function y_sum = taskFcn(d)

Define the simulation parameters on the worker. To reduce data transfer overheads, specify the time interval and any other constant parameters directly on the workers instead of transferring them to the workers as input arguments.

period = [0 2];
h = 0.001; 
y0 = [0 0.1]; 
t_interval = period(1):h:period(2);

Initialize the results variable for the reduction operation.

y_sum = zeros(numel(t_interval),1);

To reduce scheduling overheads, partition the iterations into groups for each task instead of scheduling a task for each iteration. Use a for-loop to iterate through this task's set of damping coefficients. Use a reduction variable to compute the sum of the motion at each time point.

for n = 1:length(d)
    f = @(t,y) massSpringODE(t,y,d(n));
    [~,yOut] = ode45(f,t_interval,y0);
    y_sum = y_sum + yOut(:,1);
end

You can use a job's ValueStore when the combined size of all the results is large, or if the client must process interim results while the job is running. Otherwise, if your results data is small, you can send the results back to client using the task's OutputArgument property.

end

Prepare Input Data for Tasks

To help reduce overheads when you schedule multiple tasks for a job, partition the iterations into groups for each task. Try to partition the iterations into groups that are:

  • Large enough that the computation time is large compared to the overhead of scheduling the partition.

  • Small enough that there are enough tasks to keep all workers busy.

  • Decreasing in size in the last sets of tasks to keep as many workers busy as possible.

The partitionIterations helper function, attached to this example, uses the number of iterations and desired maximum number of workers to divide the iterations into appropriately sized groups and returns a cell array where each cell corresponds to one group of iteration indices. The partitionIterations function allocates larger groups to the initial tasks and progressively smaller groups to later tasks for a balanced workload distribution.

taskGroups = partitionIterations(nReps,maxNumWorkers);

After you specify the iteration indices for each task, use the cellfun function to extract the damping coefficients corresponding to each task group into a cell array.

dampingCoeffs = cellfun(@(ind) {d(ind)},taskGroups,UniformOutput=false);

Create Tasks and Submit Job

Use a single call to create multiple tasks for the job. Each task executes the taskFcn function with the corresponding set of input arguments sourced from the dampingCoeffs cell array. Instruct the workers to return one output argument for each task.

tasks = createTask(job,@taskFcn,1,dampingCoeffs);

Submit the job to run on the cluster.

submit(job);

If you want to block the MATLAB client until the job completes, use the wait function on the job object. The wait function is useful when subsequent code depends on the completion of the job.

wait(job);

Access Results

After the job completes, you can retrieve the results from all the tasks using the fetchOutputs function.

results = fetchOutputs(job);

The fetchOutputs function returns a cell array, where each element is the output of a task. Convert the cell into a numerical array and compute the sum and the mean of each row.

y_sum = sum(cell2mat(results'),2);
meanY = y_sum./nReps;

Plot the mean response of the system against time.

plot(t_interval,meanY)
title("ODE Solution of Mass-Spring System")
xlabel("Time")
ylabel("Motion")
grid on

Display the job duration.

jobDuration = job.RunningDuration
jobDuration = duration
   00:07:32

Compare Computational Speedup

Compare the computational speedup of converting the parfor workflow into a jobs and tasks workflow to that of running the parfor-loop on a parallel pool and directly on a cluster.

Use the timeExecution helper function attached to this example to measure the execution time of the parfor workflow on the client, on a parallel pool with 496 workers, and directly on a cluster with 496 workers available. Convert the job duration into seconds.

[serialTime,hpcPoolTime,hpcClusterTime] = timeExecution("HPCProfile",maxNumWorkers);
jobsAndTaskTime = double(seconds(jobDuration));
elapsedTimes = [serialTime hpcPoolTime hpcClusterTime jobsAndTaskTime];

Calculate the computational speedup and create a bar chart comparing the speedup of each workflow. The chart shows that using a jobs and tasks workflow has a similar speedup to that of running the parfor-loop on a parallel pool and larger speedup than running the parfor-loop directly on the cluster.

speedUp = elapsedTimes(1)./elapsedTimes;
x = ["parfor Client","parfor Pool","parfor Cluster","Jobs and Tasks"];
bar(x,speedUp);
xlabel("Workflow")
ylabel("Computational Speedup")

Helper Functions

This helper function represents the mass-spring system's ODEs the solver uses. You can rewrite the differential equation that describes the spring-mass system (eq1) as a system of first-order ODEs (eq2) that you can solve using the ode45 solver.

dx˙(t)+kx(t)+mx¨(t)=0The differncial equations of the mass-spring-damper ODE system. (eq1)

y˙1=y2y˙2=-dy2+ky1m (eq2)

function dy = massSpringODE(t,y0,d)
k = 9000; % spring stiffness (N/m)
m = 450; % mass (kg)

dy = zeros(2,1);
dy(1) = y0(2);
dy(2) = -(d*y0(2)+k*y0(1))/m;
end

References

[1] Breitenecker, Felix, Gerhard Höfinger, Thorsten Pawletta, Sven Pawletta, and Rene Fink. "ARGESIM Benchmark on Parallel and Distributed Simulation." Simulation News Europe SNE 17, no. 1 (2007): 53-56.

[2] Jammer, David, Peter Junglas, and Sven Pawletta. “Solving ARGESIM Benchmark CP2 ’Parallel and Distributed Simulation’ with Open MPI/GSL and Matlab PCT - Monte Carlo and PDE Case Studies.” SNE Simulation Notes Europe 32, no. 4 (December 2022): 211–20. https://doi.org/10.11128/sne.32.bncp2.10625.

See Also

Related Topics