Ich habe eine große Datei, die eine Liste von Elementen enthält.
Ich möchte einen Stapel von Elementen erstellen und eine HTTP-Anforderung mit diesem Stapel erstellen (alle Elemente werden als Parameter in der HTTP-Anforderung benötigt). Ich kann das sehr leicht mit einer for
-Schleife machen, aber als Java 8-Liebhaber möchte ich versuchen, dies mit dem Stream-Framework von Java 8 zu schreiben (und die Vorteile der langsamen Verarbeitung nutzen).
Beispiel:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Ich möchte etwas eine lange Zeile von lazyFileStream.group(500).map(processBatch).collect(toList())
machen
Was wäre der beste Weg, dies zu tun?
Sie könnten dies mit jOOλ tun, einer Bibliothek, die Java 8-Streams für Single-Threaded-Sequence-Stream-Anwendungsfälle erweitert:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(Tuple -> Tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
Hinter den Kulissen ist zipWithIndex()
nur:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return Tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... während groupBy()
API-Komfort für:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(Haftungsausschluss: Ich arbeite für die Firma hinter jOOλ)
Der Vollständigkeit halber sei hier eine Guava -Lösung angegeben.
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
In der Frage ist die Sammlung verfügbar, so dass kein Stream benötigt wird und er als geschrieben werden kann.
Iterables.partition(data, batchSize).forEach(this::process);
Auch eine reine Java-8-Implementierung ist möglich:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
Beachten Sie, dass es im Gegensatz zu JOOl gut parallel arbeiten kann (vorausgesetzt, Ihr data
ist eine Direktzugriffsliste).
Pure Java 8-Lösung:
Wir können einen benutzerdefinierten Collector erstellen, um dies elegant zu erledigen. Dazu werden batch size
und Consumer
für die Verarbeitung jedes Stapels benötigt:
import Java.util.ArrayList;
import Java.util.Collections;
import Java.util.List;
import Java.util.Set;
import Java.util.function.*;
import Java.util.stream.Collector;
import static Java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
Erstellen Sie dann optional eine Hilfsprogrammklasse:
import Java.util.List;
import Java.util.function.Consumer;
import Java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
Verwendungsbeispiel:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
Ich habe meinen Code auch auf GitHub gepostet, wenn jemand einen Blick darauf werfen möchte:
Ich habe einen benutzerdefinierten Spliterator für Szenarien wie diesen geschrieben. Es werden Listen mit einer bestimmten Größe aus dem Eingabestrom gefüllt. Der Vorteil dieses Ansatzes ist, dass er eine verzögerte Verarbeitung ausführt und mit anderen Stream-Funktionen arbeitet.
public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
return batchSize <= 0
? Stream.of(stream.collect(Collectors.toList()))
: StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}
private static class BatchSpliterator<E> implements Spliterator<List<E>> {
private final Spliterator<E> base;
private final int batchSize;
public BatchSpliterator(Spliterator<E> base, int batchSize) {
this.base = base;
this.batchSize = batchSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final List<E> batch = new ArrayList<>(batchSize);
for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
;
if (batch.isEmpty())
return false;
action.accept(batch);
return true;
}
@Override
public Spliterator<List<E>> trySplit() {
if (base.estimateSize() <= batchSize)
return null;
final Spliterator<E> splitBase = this.base.trySplit();
return splitBase == null ? null
: new BatchSpliterator<>(splitBase, batchSize);
}
@Override
public long estimateSize() {
final double baseSize = base.estimateSize();
return baseSize == 0 ? 0
: (long) Math.ceil(baseSize / (double) batchSize);
}
@Override
public int characteristics() {
return base.characteristics();
}
}
Sie können auch RxJava verwenden:
Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));
oder
Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();
oder
Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
Sie könnten auch einen Blick auf cyclops-reagieren werfen, ich bin der Autor dieser Bibliothek. Es implementiert die jOOλ-Schnittstelle (und durch die Erweiterung JDK 8 Streams), konzentriert sich jedoch im Gegensatz zu JDK 8 Parallel Streams auf asynchrone Operationen (z. B. potenziell blockierende Async-E/A-Aufrufe). JDK Parallel Streams hingegen konzentrieren sich auf Datenparallelität für CPU-gebundene Operationen. Es funktioniert durch das Verwalten von Aggregaten von Future-basierten Aufgaben unter der Haube, bietet jedoch eine standardisierte erweiterte Stream-API für Endbenutzer.
Dieser Beispielcode kann Ihnen beim Einstieg helfen
LazyFutureStream.parallelCommonBuilder()
.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
Es gibt ein Tutorial zum Stapeln hier
Und ein allgemeineres Tutorial hier
Um Ihren eigenen Thread-Pool zu verwenden (der wahrscheinlich besser für das Blockieren von E/A geeignet ist), können Sie mit der Verarbeitung beginnen
LazyReact reactor = new LazyReact(40);
reactor.react(data)
.grouped(BATCH_SIZE)
.map(this::process)
.run();
Wir hatten ein ähnliches Problem zu lösen. Wir wollten einen Stream nehmen, der größer als der Systemspeicher war (durch alle Objekte in einer Datenbank iteriert) und die Reihenfolge so gut wie möglich randomisieren. Wir dachten, es wäre in Ordnung, 10.000 Elemente zu puffern und zufällig zu ordnen.
Das Ziel war eine Funktion, die einen Stream aufnahm.
Von den hier vorgeschlagenen Lösungen scheint es eine Reihe von Optionen zu geben:
Unser Instinkt bestand ursprünglich darin, einen benutzerdefinierten Collector zu verwenden. Die kundenspezifische Kollektorlösung ist sehr gut und wir haben sie fast benutzt.
Hier ist eine Lösung, die schummelt, indem sie die Tatsache verwendet, dass Stream
s Ihnen eine Iterator
geben kann, die Sie als eine Fluchtluke verwenden können, um etwas zu tun, das Streams nicht unterstützen. Die Variable Iterator
wird mit einem anderen Bit von Java 8 StreamSupport
sorcery in einen Stream zurück konvertiert.
/**
* An iterator which returns batches of items taken from another iterator
*/
public class BatchingIterator<T> implements Iterator<List<T>> {
/**
* Given a stream, convert it to a stream of batches no greater than the
* batchSize.
* @param originalStream to convert
* @param batchSize maximum size of a batch
* @param <T> type of items in the stream
* @return a stream of batches taken sequentially from the original stream
*/
public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
}
private static <T> Stream<T> asStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator,ORDERED),
false);
}
private int batchSize;
private List<T> currentBatch;
private Iterator<T> sourceIterator;
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.sourceIterator = sourceIterator;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch!=null && !currentBatch.isEmpty();
}
@Override
public List<T> next() {
return currentBatch;
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(sourceIterator.next());
}
}
}
Ein einfaches Beispiel für die Verwendung dieses Befehls würde folgendermaßen aussehen:
@Test
public void getsBatches() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
.forEach(System.out::println);
}
Die obigen Drucke
[A, B, C]
[D, E, F]
Für unseren Anwendungsfall wollten wir die Stapel mischen und dann als Stream beibehalten - es sah so aus:
@Test
public void howScramblingCouldBeDone() {
BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
// the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
.map(list -> {
Collections.shuffle(list); return list; })
.flatMap(List::stream)
.forEach(System.out::println);
}
Dies gibt etwas aus (es ist randomisiert, also jedes Mal anders)
A
C
B
E
D
F
Die geheime Soße hier ist, dass es immer einen Stream gibt, also können Sie entweder einen Stapel von Stapeln bearbeiten oder für jeden Stapel etwas tun und dann flatMap
zurück zu einem Stream. Noch besser ist, dass alle oben genannten nur als endgültige forEach
oder collect
oder andere abschließende AusdrückePULLdie Daten durch den Stream laufen.
Es stellt sich heraus, dass iterator
eine spezielle Art von Beendigungsoperation in einem Stream ist und nicht bewirkt, dass der gesamte Stream läuft und in den Speicher gelangt! Vielen Dank an die Java 8-Jungs für ein brillantes Design!
Ein reines Java 8-Beispiel, das auch mit parallelen Streams funktioniert.
Wie benutzt man:
Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));
Die Methodendeklaration und Implementierung:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
List<ElementType> newBatch = new ArrayList<>(batchSize);
stream.forEach(element -> {
List<ElementType> fullBatch;
synchronized (newBatch)
{
if (newBatch.size() < batchSize)
{
newBatch.add(element);
return;
}
else
{
fullBatch = new ArrayList<>(newBatch);
newBatch.clear();
newBatch.add(element);
}
}
batchProcessor.accept(fullBatch);
});
if (newBatch.size() > 0)
batchProcessor.accept(new ArrayList<>(newBatch));
}
Mit Java 8
und com.google.common.collect.Lists
können Sie Folgendes tun:
public class BatchProcessingUtil {
public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
List<List<T>> batches = Lists.partition(data, batchSize);
return batches.stream()
.map(processFunction) // Send each batch to the process function
.flatMap(Collection::stream) // flat results to gather them in 1 stream
.collect(Collectors.toList());
}
}
Hier ist T
der Typ der Elemente in der Eingabeliste und U
der Typ der Elemente in der Ausgabeliste
Und Sie können es so verwenden:
List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
userKeys,
10, // Batch Size
partialKeys -> service.getUsers(partialKeys)
);
Einfaches Beispiel mit Spliterator
// read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
//skip header
Spliterator<String> split = stream.skip(1).spliterator();
Chunker<String> chunker = new Chunker<String>();
while(true) {
boolean more = split.tryAdvance(chunker::doSomething);
if (!more) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Chunker<T> {
int ct = 0;
public void doSomething(T line) {
System.out.println(ct++ + " " + line.toString());
if (ct % 100 == 0) {
System.out.println("====================chunk=====================");
}
}
}
Bruces Antwort ist umfassender, aber ich suchte nach etwas Schnellem und Dreckigem, um eine Reihe von Dateien zu verarbeiten.
Sie können Apache.commons verwenden:
ListUtils.partition(ListOfLines, 500).stream()
.map(partition -> processBatch(partition)
.collect(Collectors.toList());
dies ist eine reine Java-Lösung, die träge bewertet wird.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}