Main Content

parallel.pool.DataQueue

Send and listen for data between client and workers

    Description

    ADataQueueenables asynchronous sending data or messages from workers back to the client in a parallel pool while a computation is carried out. For example, you can get intermediate values and an indication of the progress of the computation.

    To send data from a parallel pool worker back to the client, first construct aDataQueuein the client. Pass thisDataQueueinto aparfor-loop or other parallel language construct, such asspmd. From the workers, callsendto send data back to the client. At the client, register a function to be called each time data is received by usingafterEach.

    • You can callsendfrom the worker or client that created theDataQueue, if required.

    • You can construct the queue on the workers and send it back to the client to enable communication in the reverse direction. However, you cannot send a queue from one worker to another. To transfer data between workers, usespmd,spmdSend, orspmdReceiveinstead.

    • Unlike all other handle objects,DataQueueandPollableDataQueueinstances do remain connected when they are sent to workers.

    Creation

    Description

    example

    q= parallel.pool.DataQueuecreates an object that can be used to send or listen for messages (or data) from different workers. Create theDataQueueon the worker or client where you want to receive the data.

    Properties

    expand all

    This property is read-only.

    The number of items of data waiting to be removed from the queue, specified as a zero or positive integer. The value is0or a positive integer on the worker or client that created thePollableDataQueueinstance. If the client creates thePollableDataQueueinstance, the value is0on all workers. If a worker creates thePollableDataQueue, the value is0on the client and all other workers.

    Object Functions

    afterEach Define a function to call when new data is received on a DataQueue
    send Send data from worker to client using a data queue

    Examples

    collapse all

    Construct aDataQueue, and callafterEach.

    q = parallel.pool.DataQueue; afterEach(q, @disp);
    Start aparfor-loop, and send a message. The pending message is passed to theafterEachfunction, in this example@disp.

    parfori = 1:3 send(q, i);end;
    1 2 3

    For more details on listening for data using aDataQueue, seeafterEach.

    When you send a message to aDataQueue对象,消息队列中等待,直到processed by a listener. Each message adds1to the queue length. In this example, you use theQueueLengthproperty to find the length of aDataQueueobject.

    When a client or worker creates aDataQueueobject, any messages that are sent to the queue are held in the memory of that client or worker. If the client creates aDataQueueobject, theQueueLengthproperty on all workers is0. In this example, you create aDataQueueobject on the client, and send data from a worker.

    First, create a parallel pool with one worker.

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ... Connected to the parallel pool (number of workers: 1).

    Then, create aDataQueue.

    q = parallel.pool.DataQueue
    q = DataQueue with properties: QueueLength: 0

    A newly createdDataQueuehas an empty queue. You can useparforto findq.QueueLengthon the worker. Find the queue length on the client, and the queue length on the worker.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    parfori = 1 fprintf('On the worker: %i\n', q.QueueLength)end
    On the worker: 0

    As the queue is empty, theQueueLengthis0for both the client and the worker. Next, send a message to the queue from the worker. Then, use theQueueLengthproperty to find the length of the queue.

    % Send a message firstparfori = 1 send(q,'A message');end% Find the lengthfprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    parfori = 1 fprintf('On the worker: %i\n', q.QueueLength)end
    On the worker: 0

    TheQueueLengthproperty is1on the client, and0on the worker. Create a listener to process the queue by immediately displaying the data.

    el = afterEach(q, @disp);

    Wait until the queue is empty, then delete the listener.

    whileq.QueueLength > 0 pause(0.1);enddelete(el);

    Use theQueueLengthproperty to find the length of the queue.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0

    QueueLengthis0because the queue processing is complete.

    In this example, you use aDataQueueto update a wait bar with the progress of aparfor-loop.

    When you create aparfor-loop, you offload each iteration to workers in a parallel pool. Information is only returned from the workers when theparfor-loop completes. You can use aDataQueueto update a wait bar at the end of each iteration.

    When you update a wait bar with the progress of yourparfor循环,客户机亩t record information about how many iterations remain.

    Tip

    If you are creating new parallel code and want to monitor the progress of your code, consider using aparfevalworkflow. For more information, seeUpdate User Interface Asynchronously Using afterEach and afterAll.

    The helper functionparforWaitbar, defined at the end of this example, updates a wait bar. The function usespersistentto store information about the number of remaining iterations.

    Usewaitbarto create a wait bar,w.

    w = waitbar(0,'Please wait ...');

    Create aDataQueue,D. Then useafterEachto runparforWaitbarafter messages are sent to theDataQueue.

    % Create DataQueue and listenerD = parallel.pool.DataQueue; afterEach(D,@parforWaitbar);

    Set the number of iterations for yourparfor-loop,N. Use the wait barw和迭代次数Nto initialize the functionparforWaitbar.

    At the end of each iteration of theparfor-loop, the client runsparforWaitbarand incrementally updates the wait bar.

    N = 100; parforWaitbar(w,N)

    The functionparforWaitbaruses persistent variables to store the number of completed iterations on the client. No information is required from the workers.

    Run aparfor-loop withNiterations. For this example, usepauseandrandto simulate some work. After each iteration, usesendto send a message to theDataQueue. When a message is sent to theDataQueue, the wait bar updates. Because no information is required from the workers, send an empty message to avoid unnecessary data transfer.

    After theparfor-loop completes, usedeleteto close the wait bar.

    parfori = 1:N pause(rand) send(D,[]);enddelete(w);

    Define the helper functionparforWaitbar. When you runparforWaitbarwith two input arguments, the function initializes three persistent variables (count,h, andN). When you runparforWaitbarwith one input argument, the wait bar updates.

    functionparforWaitbar(waitbarHandle,iterations)persistentcount h Nifnargin == 2% Initializecount = 0; h = waitbarHandle; N = iterations;else% Update the waitbar% Check whether the handle is a reference to a deleted objectifisvalid(h) count = count + 1; waitbar(count / N,h);endendend

    This example shows how to perform a parallel parameter sweep withparfevaland send results back during computations with aDataQueueobject.

    parfevaldoes not block MATLAB, so you can continue working while computations take place.

    The example performs a parameter sweep on the Lorenz system of ordinary differential equations, on the parameters σ and ρ , and shows the chaotic nature of this system.

    d d t x = σ ( y - z ) d d t y = x ( ρ - z ) - y d d t z = xy - β x

    Create Parameter Grid

    Define the range of parameters that you want to explore in the parameter sweep.

    gridSize = 40; sigma = linspace(5, 45, gridSize); rho = linspace(50, 100, gridSize); beta = 8/3;

    Create a 2-D grid of parameters by using themeshgridfunction.

    [rho,sigma] = meshgrid(rho,sigma);

    Create a figure object, and set'Visible'totrueso that it opens in a new window, outside of the live script. To visualize the results of the parameter sweep, create a surface plot. Note that initializing theZcomponent of the surface withNaNcreates an empty plot.

    figure('Visible',true); surface = surf(rho,sigma,NaN(size(sigma))); xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')

    Set Up Parallel Environment

    Create a pool of parallel workers by using theparpoolfunction.

    parpool;
    Starting parallel pool (parpool) using the 'Processes' profile ... Connected to the parallel pool (number of workers: 6).

    To send data from the workers, create aDataQueueobject. Set up a function that updates the surface plot each time a worker sends data by using theafterEachfunction. TheupdatePlotfunction is a supporting function defined at the end of the example.

    Q = parallel.pool.DataQueue; afterEach(Q,@(data) updatePlot(surface,data));

    Perform Parallel Parameter Sweep

    After you define the parameters, you can perform the parallel parameter sweep.

    parfevalworks more efficiently when you distribute the workload. To distribute the workload, group the parameters to explore into partitions. For this example, split into uniform partitions of sizestepby using the colon operator (:). The resulting arraypartitionscontains the boundaries of the partitions. Note that you must add the end point of the last partition.

    step = 100; partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions =1×171 101 201 301 401 501 601 701 801 901 1001 1101 1201 1301 1401 1501 1601

    For best performance, try to split into partitions that are:

    • Large enough that the computation time is large compared to the overhead of scheduling the partition.

    • Small enough that there are enough partitions to keep all workers busy.

    To represent function executions on parallel workers and hold their results, use future objects.

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    Offload computations to parallel workers by using theparfevalfunction.parameterSweepis a helper function defined at the end of this script that solves the Lorenz system on a partition of the parameters to explore. It has one output argument, so you must specify1as the number of outputs inparfeval.

    forii = 1:numel(partitions)-1 f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);end

    parfevaldoes not block MATLAB, so you can continue working while computations take place. The workers compute in parallel and send intermediate results through theDataQueueas soon as they become available.

    If you want to block MATLAB untilparfevalcompletes, use thewaitfunction on the future objects. Using thewaitfunction is useful when subsequent code depends on the completion ofparfeval.

    wait(f);

    Afterparfevalfinishes the computations,waitfinishes and you can execute more code. For example, plot the contour of the resulting surface. Use thefetchOutputsfunction to retrieve the results stored in the future objects.

    results = reshape(fetchOutputs(f),gridSize,[]); contourf(rho,sigma,results) xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')

    If your parameter sweep needs more computational resources and you have access to a cluster, you can scale up yourparfevalcomputations. For more information, seeScale Up from Desktop to Cluster.

    Define Helper Functions

    Define a helper function that solves the Lorenz system on a partition of the parameters to explore. Send intermediate results to the MATLAB client by using thesendfunction on theDataQueueobject.

    functionresults = parameterSweep(first,last,sigma,rho,beta,Q) results = zeros(last-first,1);forii = first:last-1 lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)]; [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]); result = a(end,3); send(Q,[ii,result]); results(ii-first+1) = result;endend

    Define another helper function that updates the surface plot when new data arrives.

    functionupdatePlot(surface,data) surface.ZData(data(1)) = data(2); drawnow('limitrate');end

    Version History

    Introduced in R2017a