MATLAB Answers

How to use persistent variables inside parfor method of a custom object class?

26 views (last 30 days)
There is a similar entry on using persistent variables with spmd/parfor (https://www.mathworks.com/matlabcentral/answers/233648-spmd-and-persistent-variables), but the solution doesn't appear to work when applied to object classes.
Here's a very simplified example where I'd like the persistent variable inside my object class to persist across parfor iterations called by another method in the class:
classdef example
methods(Static)
function output = parforFun(n)
output = cell(n,1);
parfor x = 1:n
output{x} = example.persistentFun;
% Do something with output (not shown here)
end
output = cell2mat(output); % Ideally, every index of "output" is identical
end
function output = persistentFun
% Return the persistent variable myConstant
persistent myConstant
if isempty(myConstant)
myConstant = rand; % Define the term. Real code involves calling web service.
end
output = myConstant;
end
end
end
Why do this, you ask? In my real application, "example" is a web service class I created. I'm getting an authentication token to a web service in persistentFun so I don't want each worker to ask for its own authentication token. ***Importantly, I can't move the persistentFun call before the parfor and then distribute the token, because the token expires periodically and may expire while my parfor is running. Therefore, I want to call PersistentFun inside the parfor and not before, then let persistentFun determine when to request a new token and distribute to all workers. Also, I'd like to avoid the clunky hack of writing the token to a file and then fetching it, if possible.
Back to the example....
Now call output = example.parforFun(5) and see if the value of output is different for every index, which would indicate each parfor is using a different value for persistent myConstant.
output = example.parforFun(5)
output =
0.350625385914416
1.91633571411975
1.85656023059334
1.17686837315114
4.30311363531808
You can see that the persistent "myConstant" changes between parfor calls when I would like for it to be constant. Of course, if you set a large enough input value (approaching or greater than number of workers) then you start to see repeating values, which indicates that each worker is starting to get called more than once and is retrieving its persistent variable.
By the way, the idea of putting into a separate file (i.e. creating a @example folder, adding to path, and putting everything in there) and using "addAttachedFiles(gcp,{'persistentFun.m'})" doesn't seem to resolve this issue.
I'm not seeing an obvious way to do this without writing the token to a file and then reading, but I'd like to avoid this. Any ideas?
Thanks,
Paul S

  2 Comments

Walter Roberson
Walter Roberson on 3 Sep 2019
I suggest using parfeval() to dispatch workers and parfEvalOnAll to update the tokens.
Paul Shoemaker
Paul Shoemaker on 3 Sep 2019
Walter,
Could you please elaborate just a bit on this recommendation? I don't want you to write my code for me, but I've never used either of these and I'm struggling to get something workable despite researching them and fiddling with my code.
It sounds like you're proposing replacing parfor with a FOR-loop containing a parfeval() and then calling parfEvalOnAll either within the same FOR-loop or putting the parfEvalOnAll within the persistentFun method. Am I completely off-base?
Thanks

Sign in to comment.

Accepted Answer

Walter Roberson
Walter Roberson on 3 Sep 2019
Outline:
Initialize token
set up a timer to refresh the token
loop, parfeval() as many jobs as there are workers. They should get allocated one per worker, but that is not technically guaranteed
set up a foreach() that stores the result received and if there are more iterations to be done, parfeval() another iteration with the current token (which is being refreshed from time to time by the timer)
wait for all the iterations to be done, and shut down
The timer interval has to be such that if a job were started immediately before the timer fired, that the previous token will still be valid for the duration of the job -- so it has to update no more than (hard expiration duration minus maximum job duration.)

  2 Comments

Walter Roberson
Walter Roberson on 3 Sep 2019
There is a way that a client can update on an active worker: if you create a data queue or pollable queue;
The trick is that you can create a queue before a parallel construct, and it will be copied over to the workers. The first thing you do on the worker is then to create a queue on the worker, and use the first queue to send the second queue back to the client. You now have an per-worker endpoint on the client that you can use to send an updated token to the worker. I think a pollable queue would make the most sense to construct on the worker, as the worker could then periodically check to see if there is anything in the queue and if so use it to update the local token.
Paul Shoemaker
Paul Shoemaker on 4 Sep 2019
Walter, your "Outline" entry appears to basically be what I came up with as well. Thanks for clarifying.
Your second idea about creating a pollable queue is pretty neat. I need to study this more and test out how it works in my use case (including speed).

Sign in to comment.

More Answers (3)

Matt J
Matt J on 3 Sep 2019
Edited: Matt J on 3 Sep 2019
Here's one way to do it, sort of along the lines that Walter proposed, but with externally scoped variables instead of persistent variables,
function test
n=10;
Token=rand;
updateExecuted=false(1,n);
tic;
for i = 1:n
future(i) = parfeval(@LongTask,1);
if rand>0.7%Simulate token updates at random times
updateToken();
updateExecuted(i)=true;
end
end
toc; %Elapsed time is 0.026488 seconds.
tic;
for i = 1:n
[completedIdx,value] = fetchNext(future);
output(completedIdx) = value;
end
toc; %Elapsed time is 5.022906 seconds.
output,
updateExecuted,
function t=LongTask
t=Token;
pause(5); %simulate some fake time-consuming step
end
function updateToken
Token=rand;
end
end
Executing this on (12 workers), we see that the output of the workers remains constant between the points where token updates occurred.
>> test
output =
0.9303 0.9303 0.9303 0.9303 0.9303 0.9750 0.9750 0.7657 0.7657 0.7657
updateExecuted =
1×10 logical array
0 0 0 0 1 0 1 0 0 0

  6 Comments

Show 3 older comments
Matt J
Matt J on 4 Sep 2019
I guess it's also possible the value is cloned to the worker when parFeval is issued... I really don't know.
But I think in the scenario described by the OP, a decision is made at some point by the worker to accept a token as valid. So, some desynchronization with the host copy of the token must inevitably be tolerated.
Paul Shoemaker
Paul Shoemaker on 4 Sep 2019
Walter, you are correct on describing the shortcoming of Matt's approach; it will not solve the token expiring during parfeval and causing several of the worker jobs to return invalid/null responses.
I got something to work that might scratch the itch: use a for loop and call a fresh token every iteration and setup an Fevalfuture on each worker. When the worker queue fills up, execute/fetch the results of those Fevalfutures and then fill up the queue with the next batch and repeat.
This keeps the token retrieval within the FOR loop, and while there's still chance of the token timing out, I accounted for this by adding a buffer to the token refresh so that it refreshes slightly more quickly than is needed to ensure it doesn't expire during runs.
Here's my code:
classdef example
methods(Static)
function output = parforFun(n)
output = nan(n,1); % Instantiate output
pool = gcp;
numWorkers = pool.NumWorkers; % Num workers needed to determine when to fetch results
F = cell(1,n);
for x = 1:n
token = example.persistentFun; % Get fresh authentication token
F{x} = parfeval(@example.doSomething,1,token);
if rem(x,numWorkers)==0
% Fetch results everytime the worker queue fills up
startIdx = x - numWorkers + 1;
endIdx = x;
output(startIdx:endIdx) = fetchOutputs([F{startIdx:endIdx}]);
elseif x==n
% Execute the last bit of iterations if n is not an
% even multiple of numWorkers
count = rem(x,numWorkers);
startIdx = x - count + 1;
endIdx = x;
output(startIdx:endIdx) = fetchOutputs([F{startIdx:endIdx}]);
end
end
end
function output = persistentFun
% Return the persistent variable myConstant
persistent myConstant
if isempty(myConstant)
myConstant = rand; % Define the term. Real code involves calling web service.
end
output = myConstant;
end
function output = doSomething(token)
% Workhorse function that does the number crunching
pause(0.25); % Simulate time to do some math
output = token; % No work here.
end
end
end

Sign in to comment.


Paul Shoemaker
Paul Shoemaker on 4 Sep 2019
Thanks for the great info all around here.
To clarify on the use case, here's a bit more about what's going on:
An authentication token is required to access a web service. This authentication token is good for 30min, after which I must fetch a new one. In my actual example, I have several methods that need access to this authentication token to access various features from the web service, so I made a method that manages the token for all of these interactions (in above example, I just called it persistentFun).
When it's first called, the persistent token is empty so I fetch one from the web service. After that, it gets stored to the persistent variable along with a timestamp. In subsequent calls, the timestamp is checked and if it's still less than 30min (actually, I set to 25min for buffer) then it servces up the persistent token. If older, it goes and fetches a new one from the web server. Obviously, when running in parallel, I don't want to fetch 6, 8, or 12 keys at once (one for each worker) since that takes time to ping the server and is wasteful. It might even get me a slap on the wrist from the web service provider.
In a real use-case, this particular parfor function would run maybe 10,000 times over the course of 1-3hrs, which is well beyond the life of a valid token.
Thanks for all the great input here. It's been very helpful! I will continue to test out ideas and let you know where I end up.
Paul S

  0 Comments

Sign in to comment.


Paul Shoemaker
Paul Shoemaker on 15 Sep 2019
Hey folks,
I ended up going with Walter's first approach using FOR loop with parfeval within, very close to what I posted above, but with a new multiplier term.
classdef example
methods(Static)
function output = parforFun(n)
output = nan(n,1); % Instantiate output
pool = gcp;
multiplier = 10;
numWorkers = multiplier * pool.NumWorkers; % Num workers needed to determine when to fetch results
F = cell(1,n);
for x = 1:n
token = example.persistentFun; % Get fresh authentication token
F{x} = parfeval(@example.doSomething,1,token);
if rem(x,numWorkers)==0
% Fetch results everytime the worker queue fills up
startIdx = x - numWorkers + 1;
endIdx = x;
output(startIdx:endIdx) = fetchOutputs([F{startIdx:endIdx}]);
elseif x==n
% Execute the last bit of iterations if n is not an
% even multiple of numWorkers
count = rem(x,numWorkers);
startIdx = x - count + 1;
endIdx = x;
output(startIdx:endIdx) = fetchOutputs([F{startIdx:endIdx}]);
end
end
end
function output = persistentFun
% Return the persistent variable myConstant
persistent myConstant
if isempty(myConstant)
myConstant = rand; % Define the term. Real code involves calling web service.
end
output = myConstant;
end
function output = doSomething(token)
% Workhorse function that does the number crunching
pause(0.25); % Simulate time to do some math
output = token; % No work here.
end
end
end
I found that parfeval was noticeably slower than parfor, so I minimized this by having parfeval fetch results when each worker has 10 jobs (multiplier) instead of when they have a single job. I set the token refresh rate to be 5min faster than what is needed, so as long as 10 jobs / worker can complete in that timeframe (they easily can in my case) then this satisfies my needs here. In the end, the slower parfeval ended up being comprable in speed as implemented above.
I liked the idea of using a pollable queue, but the parfeval approach was more straightforward and still met my needs.
Thanks all,
Paul S

  0 Comments

Sign in to comment.

Sign in to answer this question.

Products


Release

R2019a