parallel.pool.DataQueue
Send and listen for data between client and workers
Description
ADataQueue
enables asynchronous sending data or messages from workers back to the client in a parallel pool while a computation is carried out. For example, you can get intermediate values and an indication of the progress of the computation.
To send data from a parallel pool worker back to the client, first construct aDataQueue
in the client. Pass thisDataQueue
into aparfor
-loop or other parallel language construct, such asspmd
. From the workers, callsend
to send data back to the client. At the client, register a function to be called each time data is received by usingafterEach
.
You can call
send
from the worker or client that created theDataQueue
, if required.You can construct the queue on the workers and send it back to the client to enable communication in the reverse direction. However, you cannot send a queue from one worker to another. To transfer data between workers, use
spmd
,spmdSend
, orspmdReceive
instead.Unlike all other handle objects,
DataQueue
andPollableDataQueue
instances do remain connected when they are sent to workers.
Creation
Syntax
Description
creates an object that can be used to send or listen for messages (or data) from different workers. Create theq
= parallel.pool.DataQueueDataQueue
on the worker or client where you want to receive the data.
Properties
QueueLength
—Number of items currently held on the queue
zero or positive integer
This property is read-only.
The number of items of data waiting to be removed from the queue, specified as a zero or positive integer. The value is0
or a positive integer on the worker or client that created thePollableDataQueue
instance. If the client creates thePollableDataQueue
instance, the value is0
on all workers. If a worker creates thePollableDataQueue
, the value is0
on the client and all other workers.
Object Functions
Examples
Send a Message in aparfor
-Loop, and Dispatch the Message on the Queue
Construct aDataQueue
, and callafterEach
.
q = parallel.pool.DataQueue; afterEach(q, @disp);
parfor
-loop, and send a message. The pending message is passed to theafterEach
function, in this example@disp
.
parfori = 1:3 send(q, i);end;
1 2 3
For more details on listening for data using aDataQueue
, seeafterEach
.
Find Length ofDataQueue
When you send a message to aDataQueue
对象,消息队列中等待,直到processed by a listener. Each message adds1
to the queue length. In this example, you use theQueueLength
property to find the length of aDataQueue
object.
When a client or worker creates aDataQueue
object, any messages that are sent to the queue are held in the memory of that client or worker. If the client creates aDataQueue
object, theQueueLength
property on all workers is0
. In this example, you create aDataQueue
object on the client, and send data from a worker.
First, create a parallel pool with one worker.
parpool(1);
Starting parallel pool (parpool) using the 'local' profile ... Connected to the parallel pool (number of workers: 1).
Then, create aDataQueue
.
q = parallel.pool.DataQueue
q = DataQueue with properties: QueueLength: 0
A newly createdDataQueue
has an empty queue. You can useparfor
to findq.QueueLength
on the worker. Find the queue length on the client, and the queue length on the worker.
fprintf('On the client: %i\n', q.QueueLength)
On the client: 0
parfori = 1 fprintf('On the worker: %i\n', q.QueueLength)end
On the worker: 0
As the queue is empty, theQueueLength
is0
for both the client and the worker. Next, send a message to the queue from the worker. Then, use theQueueLength
property to find the length of the queue.
% Send a message firstparfori = 1 send(q,'A message');end% Find the lengthfprintf('On the client: %i\n', q.QueueLength)
On the client: 1
parfori = 1 fprintf('On the worker: %i\n', q.QueueLength)end
On the worker: 0
TheQueueLength
property is1
on the client, and0
on the worker. Create a listener to process the queue by immediately displaying the data.
el = afterEach(q, @disp);
Wait until the queue is empty, then delete the listener.
whileq.QueueLength > 0 pause(0.1);enddelete(el);
Use theQueueLength
property to find the length of the queue.
fprintf('On the client: %i\n', q.QueueLength)
On the client: 0
QueueLength
is0
because the queue processing is complete.
Use aDataQueue
Object andparfor
to Update a Wait Bar
In this example, you use aDataQueue
to update a wait bar with the progress of aparfor
-loop.
When you create aparfor
-loop, you offload each iteration to workers in a parallel pool. Information is only returned from the workers when theparfor
-loop completes. You can use aDataQueue
to update a wait bar at the end of each iteration.
When you update a wait bar with the progress of yourparfor
循环,客户机亩t record information about how many iterations remain.
Tip
If you are creating new parallel code and want to monitor the progress of your code, consider using aparfeval
workflow. For more information, seeUpdate User Interface Asynchronously Using afterEach and afterAll.
The helper functionparforWaitbar
, defined at the end of this example, updates a wait bar. The function usespersistent
to store information about the number of remaining iterations.
Usewaitbar
to create a wait bar,w
.
w = waitbar(0,'Please wait ...');
Create aDataQueue
,D
. Then useafterEach
to runparforWaitbar
after messages are sent to theDataQueue
.
% Create DataQueue and listenerD = parallel.pool.DataQueue; afterEach(D,@parforWaitbar);
Set the number of iterations for yourparfor
-loop,N
. Use the wait barw
和迭代次数N
to initialize the functionparforWaitbar
.
At the end of each iteration of theparfor
-loop, the client runsparforWaitbar
and incrementally updates the wait bar.
N = 100; parforWaitbar(w,N)
The functionparforWaitbar
uses persistent variables to store the number of completed iterations on the client. No information is required from the workers.
Run aparfor
-loop withN
iterations. For this example, usepause
andrand
to simulate some work. After each iteration, usesend
to send a message to theDataQueue
. When a message is sent to theDataQueue
, the wait bar updates. Because no information is required from the workers, send an empty message to avoid unnecessary data transfer.
After theparfor
-loop completes, usedelete
to close the wait bar.
parfori = 1:N pause(rand) send(D,[]);enddelete(w);
Define the helper functionparforWaitbar
. When you runparforWaitbar
with two input arguments, the function initializes three persistent variables (count
,h
, andN
). When you runparforWaitbar
with one input argument, the wait bar updates.
functionparforWaitbar(waitbarHandle,iterations)persistentcount h Nifnargin == 2% Initializecount = 0; h = waitbarHandle; N = iterations;else% Update the waitbar% Check whether the handle is a reference to a deleted objectifisvalid(h) count = count + 1; waitbar(count / N,h);endendend
Plot During Parameter Sweep withparfeval
This example shows how to perform a parallel parameter sweep withparfeval
and send results back during computations with aDataQueue
object.
parfeval
does not block MATLAB, so you can continue working while computations take place.
The example performs a parameter sweep on the Lorenz system of ordinary differential equations, on the parameters and , and shows the chaotic nature of this system.
Create Parameter Grid
Define the range of parameters that you want to explore in the parameter sweep.
gridSize = 40; sigma = linspace(5, 45, gridSize); rho = linspace(50, 100, gridSize); beta = 8/3;
Create a 2-D grid of parameters by using themeshgrid
function.
[rho,sigma] = meshgrid(rho,sigma);
Create a figure object, and set'Visible'
totrue
so that it opens in a new window, outside of the live script. To visualize the results of the parameter sweep, create a surface plot. Note that initializing theZ
component of the surface withNaN
creates an empty plot.
figure('Visible',true); surface = surf(rho,sigma,NaN(size(sigma))); xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
Set Up Parallel Environment
Create a pool of parallel workers by using theparpool
function.
parpool;
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to the parallel pool (number of workers: 6).
To send data from the workers, create aDataQueue
object. Set up a function that updates the surface plot each time a worker sends data by using theafterEach
function. TheupdatePlot
function is a supporting function defined at the end of the example.
Q = parallel.pool.DataQueue; afterEach(Q,@(data) updatePlot(surface,data));
Perform Parallel Parameter Sweep
After you define the parameters, you can perform the parallel parameter sweep.
parfeval
works more efficiently when you distribute the workload. To distribute the workload, group the parameters to explore into partitions. For this example, split into uniform partitions of sizestep
by using the colon operator (:
). The resulting arraypartitions
contains the boundaries of the partitions. Note that you must add the end point of the last partition.
step = 100; partitions = [1:step:numel(sigma), numel(sigma)+1]
partitions =1×171 101 201 301 401 501 601 701 801 901 1001 1101 1201 1301 1401 1501 1601
For best performance, try to split into partitions that are:
Large enough that the computation time is large compared to the overhead of scheduling the partition.
Small enough that there are enough partitions to keep all workers busy.
To represent function executions on parallel workers and hold their results, use future objects.
f(1:numel(partitions)-1) = parallel.FevalFuture;
Offload computations to parallel workers by using theparfeval
function.parameterSweep
is a helper function defined at the end of this script that solves the Lorenz system on a partition of the parameters to explore. It has one output argument, so you must specify1
as the number of outputs inparfeval
.
forii = 1:numel(partitions)-1 f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);end
parfeval
does not block MATLAB, so you can continue working while computations take place. The workers compute in parallel and send intermediate results through theDataQueue
as soon as they become available.
If you want to block MATLAB untilparfeval
completes, use thewait
function on the future objects. Using thewait
function is useful when subsequent code depends on the completion ofparfeval
.
wait(f);
Afterparfeval
finishes the computations,wait
finishes and you can execute more code. For example, plot the contour of the resulting surface. Use thefetchOutputs
function to retrieve the results stored in the future objects.
results = reshape(fetchOutputs(f),gridSize,[]); contourf(rho,sigma,results) xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
If your parameter sweep needs more computational resources and you have access to a cluster, you can scale up yourparfeval
computations. For more information, seeScale Up from Desktop to Cluster.
Define Helper Functions
Define a helper function that solves the Lorenz system on a partition of the parameters to explore. Send intermediate results to the MATLAB client by using thesend
function on theDataQueue
object.
functionresults = parameterSweep(first,last,sigma,rho,beta,Q) results = zeros(last-first,1);forii = first:last-1 lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)]; [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]); result = a(end,3); send(Q,[ii,result]); results(ii-first+1) = result;endend
Define another helper function that updates the surface plot when new data arrives.
functionupdatePlot(surface,data) surface.ZData(data(1)) = data(2); drawnow('limitrate');end
Version History
Introduced in R2017a
Open Example
You have a modified version of this example. Do you want to open this example with your edits?
MATLAB Command
You clicked a link that corresponds to this MATLAB command:
Run the command by entering it in the MATLAB Command Window. Web browsers do not support MATLAB commands.
Select a Web Site
Choose a web site to get translated content where available and see local events and offers. Based on your location, we recommend that you select:.
你也可以从下面选择一个网站list:
How to Get Best Site Performance
Select the China site (in Chinese or English) for best site performance. Other MathWorks country sites are not optimized for visits from your location.
Americas
- América Latina(Español)
- Canada(English)
- United States(English)
Europe
- Belgium(English)
- Denmark(English)
- Deutschland(Deutsch)
- España(Español)
- Finland(English)
- France(Français)
- Ireland(English)
- Italia(Italiano)
- Luxembourg(English)
- Netherlands(English)
- Norway(English)
- Österreich(Deutsch)
- Portugal(English)
- Sweden(English)
- Switzerland
- United Kingdom(English)