wake-up-neo.net

Wann sollte ich einen CompletionService über einen ExecutorService verwenden?

Ich habe CompletionService gerade in diesem Blogbeitrag gefunden. Dies zeigt jedoch nicht wirklich die Vorteile von CompletionService gegenüber einem Standard-ExecutorService. Derselbe Code kann mit beiden geschrieben werden. Wann ist ein CompletionService sinnvoll?

Können Sie ein kurzes Codebeispiel geben, um es kristallklar zu machen? Dieses Codebeispiel zeigt beispielsweise nur, wo ein CompletionService nicht benötigt wird (= äquivalent zu ExecutorService).

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
67
ripper234

Nachdem Sie die auszuführenden Aufgaben übermittelt haben, müssen Sie mit ExecutorService manuell Code eingeben, um die Ergebnisse der Aufgaben effizient zu erhalten. 

Mit CompletionService ist dies ziemlich automatisiert. Der Unterschied ist nicht sehr deutlich in dem von Ihnen vorgelegten Code, da Sie nur eine Aufgabe übergeben. Stellen Sie sich jedoch vor, Sie haben eine Liste der zu übergebenden Aufgaben. In dem folgenden Beispiel werden mehrere Aufgaben an CompletionService übermittelt. Anstatt herauszufinden, welche Aufgabe abgeschlossen ist (um die Ergebnisse zu erhalten), fordert es die CompletionService-Instanz auf, die Ergebnisse zurückzugeben, sobald sie verfügbar sind.

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }
86
Bhaskar

Viele Details auslassen:

  • ExecutorService = eingehende Warteschlange + Arbeitsthreads
  • CompletionService = eingehende Warteschlange + Arbeitsthreads + Ausgabewarteschlange 
135
Alex Miller

Ich denke, der Javadoc beantwortet am besten die Frage, wann CompletionService in einer Weise nützlich ist, die ExecutorService nicht ist.

Ein Dienst, der die Erzeugung neuer asynchroner Aufgaben vom Verbrauch der Ergebnisse abgeschlossener Aufgaben abkoppelt.

Grundsätzlich ermöglicht diese Schnittstelle einem Programm Produzenten, die Aufgaben erstellen und einreichen (und sogar die Ergebnisse dieser Einreichungen prüfen), ohne andere Verbraucher über die Ergebnisse dieser Aufgaben zu kennen. In der Zwischenzeit könnten Verbraucher, die die CompletionService kennen, poll für oder take Ergebnisse erhalten, ohne sich der Produzenten bewusst zu sein, die die Aufgaben übermitteln.

Für das Protokoll und ich könnte falsch sein, weil es ziemlich spät ist, aber ich bin ziemlich sicher, dass der Beispielcode in diesem Blogpost einen Speicherverlust verursacht. Ich bin nicht sicher, wie ein Blogger erwartet, dass ein aktiver Consumer Ergebnisse aus der internen Warteschlange der Variable ExecutorCompletionService entnimmt.

10
Tim Bender

Grundsätzlich verwenden Sie eine CompletionService, wenn Sie mehrere Tasks parallel ausführen möchten und dann in ihrer Abschlussreihenfolge mit ihnen arbeiten. Wenn ich also 5 Aufträge ausführte, gibt mir die CompletionService den ersten Auftrag, der abgeschlossen wird. Das Beispiel, bei dem es nur eine einzige Aufgabe gibt, verleiht einer Executor keinen zusätzlichen Wert, abgesehen von der Möglichkeit, eine Callable zu übergeben.

9

Erstens, wenn wir keine Prozessorzeit verschwenden wollen, werden wir keine Zeit dafür verwenden

while (!future.isDone()) {
        // Do some work...
}

Wir müssen verwenden

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

Das Schlechte an diesem Code ist, dass ExecutorService heruntergefahren wird. Wenn wir weiter damit arbeiten wollen (d. H. Wir haben rekursive Aufgabenerstellung), gibt es zwei Alternativen: invokeAll oder ExecutorService.

invokeAll wartet, bis alle Aufgaben abgeschlossen sind. ExecutorService gibt uns die Möglichkeit, die Ergebnisse einzeln zu übernehmen oder abzufragen.

Und ein rekursives Beispiel:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}
4
Sklavit

Sehen Sie sich zur Laufzeit selbst um, versuchen Sie, beide Lösungen (Executorservice und Completionservice) zu implementieren, und Sie werden sehen, wie sich diese verhalten, und es wird klarer, wann die eine oder die andere verwendet werden soll. Hier ist ein Beispiel Wenn Sie möchten http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

1
marcocast

Angenommen, Sie haben 5 lange laufende Task (aufrufbare Task), und Sie haben diese Task an den ausführenden Dienst übergeben. Stellen Sie sich nun vor, Sie möchten nicht auf den Wettbewerb aller 5 Aufgaben warten, stattdessen möchten Sie eine Verarbeitung dieser Aufgaben durchführen, wenn eine Aufgabe abgeschlossen ist. Jetzt können Sie dies tun, indem Sie entweder Abfragelogik für zukünftige Objekte schreiben oder diese API verwenden. 

1
Bikas Katwal

es gibt noch einen weiteren Vorteil der Verwendung von completionservice: Performance

wenn Sie future.get() anrufen, warten Sie mit:

von Java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

wenn Sie eine langwierige Aufgabe haben, ist dies ein Desaster für die Leistung.

mit completionservice wird das Ergebnis nach Fertigstellung der Task in eine Warteschlange aufgenommen, und Sie können die Warteschlange mit geringerer Leistung abfragen.

completionservice erreicht dies durch Verwendung einer Wrap-Task mit einem done-Hook. 

Java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
0
宏杰李

Wenn der Task-Produzent nicht an den Ergebnissen interessiert ist und eine andere Komponente dafür verantwortlich ist, die Ergebnisse der durch den Executor-Service ausgeführten asynchronen Task zu verarbeiten, sollten Sie CompletionService verwenden. Es hilft Ihnen, den Ergebnisprozessor vom Aufgabenhersteller zu trennen. Siehe Beispiel http://www.zoftino.com/Java-concurrency-executors-framework-tutorial

0
Arnav Rao
package com.barcap.test.test00;

import Java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

================================================ =====

package com.barcap.test.test00;

import Java.util.*;
import Java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

================================================ =========

package com.barcap.test.test00;

import Java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

================================================ ===================

unterschied der Protokolle für den Executor Completion Service: Der aktuelle Thread ist Pool-1-Thread-1. Das Ergebnis sollte 1 sein. Der aktuelle Thread ist Pool-1-Thread-2. Das Ergebnis sollte 2 sein. Der aktuelle Thread ist Pool-1-Thread-. 3 Das Ergebnis sollte 3 sein. Der aktuelle Thread ist Pool-1-Thread-4. Das Ergebnis sollte 4 sein. Der aktuelle Thread ist Pool-1-Thread-6. Das Ergebnis sollte 6 sein. Der aktuelle Thread ist Pool-1-Thread-5 Das Ergebnis sollte 5 sein. Der aktuelle Thread ist Pool-1-Thread-7. Das Ergebnis sollte 7 sein. Der aktuelle Thread ist Pool-1-Thread-9. Das Ergebnis sollte 9 sein. Der aktuelle Thread ist Pool-1-Thread-8. Das Ergebnis sollte sein be 8 die aufgabe erledigt für pool-1-thread-9 das ergebnis sollte 9 sein das ergebnis ist 9 die aufgabe erledigt für pool-1-thread-8 das ergebnis sollte 8 die aufgabe erledigt für pool-1-thread-7 das sein Das Ergebnis sollte 7 sein. Die Aufgabe für Pool-1-Thread-6 ist erledigt. Das Ergebnis sollte 6 sein. Die Aufgabe für Pool-1-Thread-5 ist erledigt. Das Ergebnis sollte 5 sein. Die Aufgabe für Pool-1-Thread-4 ist erledigt sei 4 die aufgabe erledigt für pool-1-thread-3 sollte das Ergebnis 3 sein

die Aufgabe wurde für Pool-1-Thread-2 abgeschlossen. Das Ergebnis sollte 2 sein

der aktuelle Thread ist Pool-1-Thread-1. Das Ergebnis sollte 1 sein. Der aktuelle Thread ist Pool-1-Thread-3. Das Ergebnis sollte 3 sein. Der aktuelle Thread ist Pool-1-Thread-2. Das Ergebnis sollte 2 sein Thread ist Pool-1-Thread-5 Das Ergebnis sollte 5 sein. Der aktuelle Thread ist Pool-1-Thread-4. Das Ergebnis sollte 4 sein. Der aktuelle Thread ist Pool-1-Thread-6. Das Ergebnis sollte 6 sein. Der aktuelle Thread ist pool-1-thread-7 das ergebnis sollte 7 sein der aktuelle thread ist pool-1-thread-8 das ergebnis sollte 8 sein der aktuelle thread ist pool-1-thread-9 das ergebnis sollte 9 sein die aufgabe erfüllt für pool- 1-thread-9 Das Ergebnis sollte 9 sein. Die Aufgabe wurde für Pool-1-thread-8 erledigt. Das Ergebnis sollte 8 sein. Die Aufgabe wurde für Pool-1-thread-7 erledigt. Das Ergebnis sollte 7 sein. Die Aufgabe wurde für Pool-1- erledigt. thread-6 das Ergebnis sollte 6 sein die Aufgabe für Pool-1-thread-5 das Ergebnis sollte 5 sein die Aufgabe für Pool-1-thread-4 das Ergebnis sollte 4 sein die Aufgabe für Pool-1-thread- 3 das ergebnis sollte 3 die aufgabe sein, die für po erledigt wurde ol-1-thread-2 das ergebnis sollte 2 sein die aufgabe erledigt für pool-1-thread-1 das ergebnis sollte 1 sein das ergebnis von future ist 1

================================================ =====

für executorservice ist das ergebnis erst verfügbar, wenn alle aufgaben erledigt sind.

executor completionservice liefert jedes verfügbare Ergebnis.

0
Prasad