wake-up-neo.net

Java 8 Streams bedingte Verarbeitung

Ich bin daran interessiert, einen Stream in zwei oder mehr Teilströme aufzuteilen und die Elemente auf unterschiedliche Weise zu verarbeiten. Eine (große) Textdatei könnte beispielsweise Zeilen des Typs A und Zeilen des Typs B enthalten. In diesem Fall möchte ich Folgendes tun:

File.lines(path)
.filter(line -> isTypeA(line))
.forEachTrue(line -> processTypeA(line))
.forEachFalse(line -> processTypeB(line))

Das Vorhergehende ist mein Versuch, die Situation zu abstrahieren. In Wirklichkeit habe ich eine sehr große Textdatei, in der jede Zeile gegen einen Regex getestet wird. Wenn die Zeile durchläuft, wird sie verarbeitet, und wenn sie abgelehnt wird, möchte ich einen Zähler aktualisieren. Diese Weiterverarbeitung für abgelehnte Zeichenfolgen verwende ich deshalb nicht einfach filter.

Gibt es eine vernünftige Möglichkeit, dies mit Streams zu tun, oder muss ich auf Schleifen zurückgreifen? (Ich möchte, dass dies auch parallel läuft, daher sind Streams meine erste Wahl).

17
gdiazc

Java 8-Streams wurden nicht für diese Art von Operationen entwickelt. Aus dem jdk :

Ein Stream sollte nur einmal bearbeitet werden (Aufruf einer Zwischen- oder Terminal-Stream-Operation). Dies schließt beispielsweise "gegabelte" Streams aus, bei denen dieselbe Quelle zwei oder mehr Pipelines oder mehrere Durchläufe desselben Streams zuführt.

Wenn Sie es im Speicher ablegen können, können Sie Collectors.partitioningBy verwenden, wenn Sie nur zwei Typen haben und einen Map<Boolean, List> verwenden. Andernfalls verwenden Sie Collectors.groupingBy

16
Cosu

Testen Sie einfach jedes Element und handeln Sie entsprechend.

lines.forEach(line -> {
    if (isTypeA(line)) processTypeA(line);
    else processTypeB(line);
});

Dieses Verhalten kann in einer Hilfsmethode ausgeblendet werden:

public static <T> Consumer<T> branch(Predicate<? super T> test, 
                                     Consumer<? super T> t, 
                                     Consumer<? super T> f) {
    return o -> {
        if (test.test(o)) t.accept(o);
        else f.accept(o);
    };
}

Dann würde die Verwendung so aussehen:

lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB));

Tangentiale Anmerkung

Die Files.lines()-Methode schließt die zugrunde liegende Datei nicht,. Sie müssen sie also wie folgt verwenden:

try (Stream<String> lines = Files.lines(path, encoding)) {
  lines.forEach(...);
}

Variablen vom Typ Stream werfen für mich etwas rote Flagge auf, daher ziehe ich es vor, eine BufferedReader direkt zu verwalten:

try (BufferedReader lines = Files.newBufferedReader(path, encoding)) {
    lines.lines().forEach(...);
}
11
erickson

Neben Nebenwirkungen in Verhaltensparametern wird davon abgeraten, sie sind jedoch nicht verboten, solange es keine Interferenzen gibt. Die einfachste, aber nicht sauberste Lösung besteht darin, direkt im Filter zu zählen:

AtomicInteger rejected=new AtomicInteger();
Files.lines(path)
    .filter(line -> {
        boolean accepted=isTypeA(line);
        if(!accepted) rejected.incrementAndGet();
        return accepted;
})
// chain processing of matched lines

Solange Sie alle Artikel bearbeiten, ist das Ergebnis konsistent. Nur wenn Sie einen kurzgeschlossenen Terminalbetrieb (in einem parallelen Stream) verwenden, ist das Ergebnis unvorhersehbar.

Das Aktualisieren einer atomaren Variablen ist möglicherweise nicht die effizienteste Lösung, aber im Zusammenhang mit der Verarbeitung von Zeilen aus einer Datei ist der Overhead wahrscheinlich vernachlässigbar.

Wenn Sie eine saubere, parallele Lösung suchen, besteht ein allgemeiner Ansatz darin, eine Collector zu implementieren, die die Verarbeitung von zwei Erfassungsoperationen basierend auf einer Bedingung kombinieren kann. Dies erfordert, dass Sie die Downstream-Operation als Collector ausdrücken können, die meisten Stream-Operationen können jedoch auch als Collector ausgedrückt werden (und der Trend geht dahin, dass alle Operationen auf diese Weise ausgedrückt werden können, dh Java 9 fügt die aktuell fehlenden filtering und flatMapping .

Sie benötigen einen Paar-Typ, um zwei Ergebnisse zu speichern

class Pair<A,B> {
    final A a;
    final B b;
    Pair(A a, B b) {
        this.a=a;
        this.b=b;
    }
}

die Implementierung des Kombinationskollektors wird aussehen

public static <T, A1, A2, R1, R2> Collector<T, ?, Pair<R1,R2>> conditional(
        Predicate<? super T> predicate,
        Collector<T, A1, R1> whenTrue, Collector<T, A2, R2> whenFalse) {
    Supplier<A1> s1=whenTrue.supplier();
    Supplier<A2> s2=whenFalse.supplier();
    BiConsumer<A1, T> a1=whenTrue.accumulator();
    BiConsumer<A2, T> a2=whenFalse.accumulator();
    BinaryOperator<A1> c1=whenTrue.combiner();
    BinaryOperator<A2> c2=whenFalse.combiner();
    Function<A1,R1> f1=whenTrue.finisher();
    Function<A2,R2> f2=whenFalse.finisher();
    return Collector.of(
        ()->new Pair<>(s1.get(), s2.get()),
        (p,t)->{
            if(predicate.test(t)) a1.accept(p.a, t); else a2.accept(p.b, t);
        },
        (p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)),
        p -> new Pair<>(f1.apply(p.a), f2.apply(p.b)));
}

und kann zum Beispiel zum Sammeln von übereinstimmenden Elementen in einer Liste und zum Zählen der Nichtübereinstimmung verwendet werden:

Pair<List<String>, Long> p = Files.lines(path)
  .collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting()));
List<String> matching=p.a;
long nonMatching=p.b;

Der Collector ist parallel anpassbar und ermöglicht beliebig komplexe Delegatenkollektoren. Beachten Sie jedoch, dass bei der aktuellen Implementierung der von Files.lines zurückgegebene Stream bei der parallelen Verarbeitung möglicherweise nicht so gut funktioniert. Vergleichen Sie mit Größenpolitik in ihrem Spliterator " . Verbesserungen sind für die Java 9-Version geplant.

5
Holger

Die Art und Weise, wie ich damit umgehen würde, ist, das Ganze überhaupt nicht aufzuteilen, sondern eher zu schreiben

Files.lines(path)
   .map(line -> {
      if (condition(line)) {
        return doThingA(line);
      } else {
        return doThingB(line);
      }
   })...

Details variieren je nachdem, was Sie genau tun möchten und wie Sie dies planen.

2
Louis Wasserman

Hier ist ein Ansatz (der die Vorsichtshinweise über das Erzwingen der bedingten Verarbeitung in einen Stream ignoriert), der ein Prädikat und einen Consumer in einen einzigen Prädikat mit Nebeneffekt umschließt:

public static class StreamProc {

    public static <T> Predicate<T> process( Predicate<T> condition, Consumer<T> operation ) {
        Predicate<T> p = t -> { operation.accept(t); return false; };
        return (t) -> condition.test(t) ? p.test(t) : true;
    }

}

Dann filtern Sie den Stream:

someStream
    .filter( StreamProc.process( cond1, op1 ) )
    .filter( StreamProc.process( cond2, op2 ) )
    ...
    .collect( ... )

Elemente, die im Stream verbleiben, wurden noch nicht verarbeitet.

Zum Beispiel sieht eine typische Dateisystemdurchquerung mit externer Iteration so aus

File[] files = dir.listFiles();
for ( File f : files ) {
    if ( f.isDirectory() ) {
        this.processDir( f );
    } else if ( f.isFile() ) {
        this.processFile( f );
    } else {
        this.processErr( f );
    }
}

Mit Streams und interner Iteration wird dies

Arrays.stream( dir.listFiles() )
    .filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) )
    .filter( StreamProc.process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

Ich möchte, dass Stream die Prozessmethode direkt implementiert. Dann könnten wir haben

Arrays.stream( dir.listFiles() )
    .process( f -> f.isDirectory(), this::processDir ) )
    .process( f -> f.isFile(), this::processFile ) )
    .forEach( f -> this::processErr );

Gedanken?

1
tom

Das kannst du einfach tun

Counter counter = new Counter();
File.lines(path)
    .forEach(line -> {
        if (isTypeA(line)) {
            processTypeA(line);
        }
        else {
            counter.increment();
        }
    });

Nicht sehr funktional, aber es tut dies auf ähnliche Weise wie Ihr Beispiel. Wenn parallel, müssen Counter.increment() und processTypeA() natürlich threadsicher sein.

1
JB Nizet

Es scheint, dass Sie in der Realität jede Zeile bearbeiten möchten, sie jedoch je nach Bedingung (Typ) anders verarbeiten.

Ich denke, dass dies eine mehr oder weniger funktionale Art der Implementierung ist:

public static void main(String[] args) {
    Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println);
}

static Supplier<Integer> processor(int i) {
    return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i);
}

static boolean tellType(int i) {
    return i % 2 == 0;
}

static int processTypeA(int i) {
    return i * 100;
}

static int processTypeB(int i) {
    return i * 10;
}
0
Oleg Mikheev