Geschicktes Cachen von Stream-Inhalten

Ich habe eine Klasse, die einen Stream<X> von Elementen liefern können soll:

interface Data { Stream<X> getStream(); }

Dabei ist nicht festgelegt, wo dieser Stream herkommt. Er kann von einer List<X> stammen, die direkt in dieser Klasse liegt:

class DataWithList implements Data {
    List<X> list;
    Stream<X> getStream() { return list.stream(); }
}

In einem anderen Fall ist die Quelle aber über einen Supplier wegabstrahiert. Ganz grob also sowas:

class DataWithSupplier implements Data {
    Supplier<Stream<X>> supplier;
    Stream<X> getStream() { return supplier.get(); }
}

Wobei der „Supplier“ natürlich auch eine List sein könnte. Aber das weiß man nicht. Er könnte auch „kompliziert“ sein, und den Stream aufwändig zusammenbauen müssen.

Deswegen habe ich in einem umnachteten Moment gedacht: Joa, ich bau’ da mal so eine Art Cache ein:

class DataWithCachedSupplier implements Data {
    Supplier<Stream<X>> supplier;
    List<X> cached;
    Stream<X> getStream() { 
        if (cached != null) return cached.stream();
        cached = new ArrayList<X>();
        return supplier.get().map(x -> {
            cached.add(x);
            return x;
        });
    }
}

Zugegeben :pensive: ich muss schon SEHR umnachtet gewesen sein: Das macht natürlich gar keinen Sinn. Wenn jemand

data.getStream().limit(5).forEach(...);
data.getStream().forEach(...); 

macht, kommen beim zweiten mal nur die 5 gecachten Elemente an…

Man könnte nun natürlich direkt sagen: Was auch immer das für ein Supplier ist, ich pack’ erstmal alles, was dem seine Streams liefern, in eine List, und die Sache hat sich erledigt. Aber ich hatte gehofft, dass man das caching irgendwie geschickt „on the fly“ machen könnte.

Hat jemand eine Idee?

Mal ohne gross nachgedacht zu haben. Drei Punkte

• Warum Caching überhaupt?

• Könnte man nicht etwas mit
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#generate-java.util.function.Supplier-
bauen?

• Hast du mal über FRP (Functional Reactive Programming) nachgedacht? RxJava zum Beispiel. (https://stackoverflow.com/questions/30216979/difference-between-java-8-streams-and-rxjava-observables)
Ich hab nämlich so ein Gefühl, dass es bei dem Caching um eben die Probleme geht, die man versucht mit FRP zu lösen.

1 „Gefällt mir“

1.

Aus zwei Gründen: Erstens könnte sich jemand den gleichen Stream mehrfach abholen wollen. (Dabei gibt es viele andere Caveats, die aber etwas vage sind. Grob haben sie damit zu tun, dass auch der Fall berücksichtigt werden sollte/müßte, dass der Stream-Inhalt gar nicht komplett in den Speicher passt. Das ist erstmal recht akademisch, aber ich hatte eben auch einen “Soft cache” eingeführt…). Das entscheidende ist, dass die Quelle des Streams theoretisch auch eine Datei (oder sogar eine Netzwerkverbindung) sein könnte, und die Performance da dann schon eine Rolle spielt. Zweitens soll es explizit möglich sein, eine solche Klasse zu cachen. Also in eine Instanz zu verwandeln, die explizit die Daten nicht nur als Stream, sondern als List anbietet.

2.

Nö. Das ist ein infinite Stream, der nicht sinnvoll von einer beliebigen Quelle gespeist werden kann. Wenn ein Iterator dahinter steht, ist alles OK.

3.

Ich kann nicht mit Sicherheit sagen, dass das nicht relevant ist, aber das Bauchgefühl sagt: Eher nicht. Es geht wirklich um eine Datenverarbeitungspipeline, und die Klasse, die den Stream anbietet, ist ““Ein Datensatz””. Man kann es sich grob wie eine “Poor Man Version” von http://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html vorstellen…

http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/ReplaySubject.html

Wenn ich mir dort das Perlendiagramm anschaue, dann kommt das doch an das geforderte relativ nah dran.

Das ReplaySubject ist der Datensatz und wird eben mit den Datenpunkten gefüttert.
Jeder Observer subscribed eben dieses Subject und bekommt alle Datenpunkte von beginn an, die bereits vorhanden sind, sowie die restlichen, sobald diese zur Verfügung stehen und noch Interesse besteht.

Zumindest kann man sich mal anschauen, wie das intern gelöst ist.

Vielleicht ist da ja noch eine Idee vorhanden um das auf Java-Streams zu übertragen.

OK, von RxJava hatte ich schon gehört, und auch schon “die” Beispiele gesehen (bei denen alles ganz einfach aussieht), aber das sieht jetzt aus, als müßte man da mehr Zeit investieren, um sich reinzufräsen.

Ganz high-level ist die Beschreibung eines möglichen Lösungsansatzes ja vielleicht nicht sooo schwer:

  • Erstelle einen Stream, der (wie ursprünglich gedacht) das map(x -> cache.add(x)) macht

  • Von diesem Zeitpunkt an wird immer, wenn der Stream angefordert wird, sowas gemacht wie

    Stream<X> cached = cache.stream();
    Stream<X> original = supplier.get();
    original.skip(cache.size());
    return Streams.concat(cached, original);
    

Das schwierige ist, dass streams nun mal sehr intransparent sind. Man weiß nicht, was der “Konsument” damit macht, wie viele Elemente er schon gelesen hat, und ob er noch weiterlesen wird. D.h. das oben skizzierte könnte auf tausende Arten krachen. Nahe liegend ist: Der ursprüngliche Konsument liest auf einmal weiter, und es landen neue Elemente im Cache. Von “echter” Thread-safety mal ganz abgesehen.

Im Moment habe ich das implizite caching mal rausgenommen. Man bekommt immer einen Stream, und wenn man was gecacht haben will, kann man CachedData c = data.cache() aufrufen…

Ich hab nochmal etwas nachgedacht. Aber nichts ausprobiert, von daher mal ohne Gewähr.

Es gibt die Möglichkeit über einen Iterator einen Stream zu erstellen. (https://stackoverflow.com/questions/24511052/how-to-convert-an-iterator-to-a-stream)

Jetzt stellt sich die Frage, kann man einen Iterator erstellen, der seine Daten aus einem gemeinsamen Cache bezieht und falls er soweit iteriert hat, dass er am Ende des Caches angekommen ist, diesem weitere Elemente hinzufügt?
Ich denke ja, bin aber etwas zu faul, das gerade zu implementieren.
Vorteil ist eben, dass ein Iterator nur iteriert. Keine ConcurrentModificationException, wie bei Streams und Listen.
Das ganze müsste weitgehend Lazy bleiben.
Das erweitern, bzw. cachen müsste man synchronizieren, was mal ein Nachteil ist.
Parallel-Streams würde ich auch erst einmal kritisch sehen.

Kurz und Pseudo-Code, StreamSupport.stream(new Itearator(refToCache));

Falls ich das richtig verstanden habe, würde damit “nur” die Verantwortung dafür, neue Elemente einzufügen, vom map(x->cache.add(x)) in den Iterator verschoben. (Übrigens werden die meisten Streams da schon aus Iteratoren erstellt). Das könnte vielleicht helfen, weil man bei einem Custom Iterator doch etwas mehr Kontrolle über das Verhalten hat.

Dass man da an einigen Stellen synchronized brauchen wird, ist klar, bzw. zumindest wird der Cache eine Threadsichere Liste sein müssen. Abgesehen von diesen Fragen könnte das Muster da dann aber vielleicht sogar recht einfach sein, und ggf. könnten alle Iteratoren auf einen gemeinsamen Cache zugreifen.

Aber das

Stream<X> original = supplier.get();
original.skip(cache.size());

(für den Teil, der noch nicht gecacht ist) müßte da noch eingeflochten werden…

Werd’ da wohl (jetzt am WE) nochmal drüber nachdenken müssen