はじめに

Java8 から導入されている Stream API は, 既に一定の市民権を得て, 実際の業務で活用しているプログラマも多いと思う.

Stream API の使い方については, ググれば腐るほど出てくるので, ここではあまり述べない.

本稿では Stream をより深く理解するために, 再発明を目指してみる.

Java8 のソースコードをちょっとでも見ると, Stream はそれを使う側のコードとは裏腹に, 実装はお世辞にもスマートとは言えない, 非常に煩雑なコードとなっている.

原因の一つは並列化への対応だと思う. そこで今回 Stream の真似をしてみるにあたり, 並列化はばっさり切捨てることにした.

もう一つ, Java にはオブジェクトならぬプリミティブ型というものがあって, 本物の Stream は IntStream だの何だのと, やたらごちゃごちゃしている. 煩雑なのは嫌なので, プリミティブは一切合切捨ててしまおう. ラッパークラスのおかげで, あまり困らないはず.

プリミティブは捨てるので, PredicateFunction で代用することにする.

基本的なアイディアは,

public interface Stream<T>
  {
    Optional<T> get();
  }

というメソッドを用意し, これが呼出されたとき, 終端に達していれば Optional.empty(), 終端に達していなければ要素を一つ返す, という戦略である.

実際に記述して行ってみよう.

簡易ストリームの生成

本物の Stream は, コンテナの stream() メソッドなどを通じて生成することが多いが, 標準 API を書換えるわけには行かないので, 専ら Stream インタフェースの static なメソッドで生成することにした.

iterate

まずは iterate.

Stream.java が巨大になるのを避けるため, 実装は別クラスに飛ばすことにする.

Stream.java(一部):

public interface Stream<T>
  {
    // ~略~

    /**
     * 初期要素から関数を繰返し適用することによって生成される要素から成
     * るストリームを返す.
     *
     * @param seed 初期要素
     * @param next 要素から次の要素を決定する関数
     * @return 生成したストリーム
     */
    static <X> Stream<X> iterate(
            X seed,
            Function<? super X, ? extends X> next)
      {
        return new Iterate<>(seed, next);
      }

    // ~略~

    /**
     * このストリームから要素を取得する.
     * <p>
     * ストリームが終端に達していた場合は空の Optional を返す.
     *
     * @return 取得した要素または空の Optional
     */
    Optional<T> get();
  }

Iterate.java:

package org.creasys.sstream;

import java.util.Optional;
import java.util.function.Function;

/**
 * 簡易版ストリームの要素を繰返し生成する.
 *
 * @param <T> ストリームの要素の型
 */
public class Iterate<T> implements Stream<T>
  {
    private T value;
    private Function<? super T, ? extends T> next;

    Iterate(T seed, Function<? super T, ? extends T> next)
      {
        this.value = seed;
        this.next = next;
      }

    /** 現在保持している値を返し, 次の値に更新する. */
    @Override public Optional<T> get()
      {
        T value = this.value;
        this.value = next.apply(value);
        return Optional.of(value);
      }
  }

iterate の実装は簡単である. iterate は終端がないので, 常に同じ操作を行うだけ.

この調子でどんどん書いて行ってみよう.

of(Iterable)

前述の通り, List#stream などで生成するわけには行かないので, 代わりに Stream#of を用意する. ついでに要素を列挙する of も用意する.

Stream.java(一部):

    /**
     * {@link Iterable<T>} の全要素から成るストリームを生成する.
     *
     * @param iterable 要素の提供元
     * @return 生成したストリーム
     */
    static <X> Stream<X> of(Iterable<? extends X> iterable)
      {
        return new OfIterable<X>(iterable);
      }

    /**
     * 引数に指定された要素から成るストリームを生成する.
     *
     * @param arg 要素
     * @return 生成したストリーム
     */
    @SafeVarargs
    static <X> Stream<X> of(X... arg)
      {
        // iterable の実装を利用.
        return of(Arrays.asList(arg));
      }

OfIterable.java: これも比較的簡単.

package org.creasys.sstream;

import java.util.Iterator;
import java.util.Optional;

/**
 * {@link Iterable} から生成した簡易版ストリーム.
 *
 * @param <T> ストリームの要素の型
 */
public class OfIterable<T> implements Stream<T>
  {
    private Iterator<? extends T> iter;

    OfIterable(Iterable<? extends T> iterable)
      {
        this.iter = iterable.iterator();
      }

    @Override public Optional<T> get()
      {
        return iter.hasNext()
                ? Optional.of(iter.next())
                : Optional.empty();
      }
  }

今回実装してみた Stream の生成は以上.

中間操作

ここからが Stream の醍醐味だろう.

limit

iterate を用意したので, 簡単に終わらせることができるように limit をまず書いてみる.

Stream.java(一部):

    /**
     * このストリームの要素数を size に切詰めたストリームを返す.
     * <p>
     * これは短絡中間操作である.
     *
     * @param size 最大要素数
     * @return 結果のストリーム
     */
    default Stream<T> limit(int size)
      {
        return new Limit<>(this, size);
      }

Limit.java:

package org.creasys.sstream;

import java.util.Optional;

/**
 * 簡易版ストリームの要素数を切詰めたストリーム.
 *
 * @param <T> ストリームの要素の型
 */
public class Limit<T> implements Stream<T>
  {
    private Stream<T> stream;
    private int size;

    Limit(Stream<T> stream, int size)
      {
        this.stream = stream;
        this.size = size;
      }

    /** 残り要素数をカウントダウンしながら, 取得した要素を返す. */
    @Override public Optional<T> get()
      {
        Optional<T> o = size > 0 ? stream.get() : Optional.empty();
        if (size > 0)
            size = o.isPresent() ? size - 1 : 0;
        return o;
      }
  }

前段の Stream が一度終端となれば, 念のためカウントを 0 にして, 二度と前段からの要素の取得を試みないことにした.

skip

limit とくれば当然 skip も欲しい.

Stream.java(一部):

    /**
     * このストリームの最初の size 個の要素を破棄した残りの要素で構成さ
     * れるストリームを返す.
     * <p>
     * このストリームに含まれる要素の数が size 個より少ない場合は, 空の
     * ストリームを返す.
     * <p>
     * これは中間操作である.
     *
     * @param size 破棄する要素数
     * @return 結果のストリーム
     */
    default Stream<T> skip(int size)
      {
        return new Skip<>(this, size);
      }

Skip.java:

package org.creasys.sstream;

import java.util.Optional;

/**
 * 簡易版ストリームの先頭から特定数要素を破棄する.
 *
 * @param <T> ストリームの要素の型
 */
public class Skip<T> implements Stream<T>
  {
    private Stream<T> stream;
    private int size;

    Skip(Stream<T> stream, int size)
      {
        this.stream = stream;
        this.size = size;
      }

    @Override public Optional<T> get()
      {
        Optional<T> o;
        for (; ; )
          {
            o = stream.get();
            if (size <= 0 || !o.isPresent())
                break;
            --size;
          }
        return o;
      }
  }

Limit の実装のところで「前段の…」と述べた舌の根が乾かないうちに, Skip の実装は手抜きである. size == -1 で表現するなどすればよいだけだが, あんまり if 文を書きたくないので...

filter

中間操作の花形スターの一人, filter である.

Stream.java(一部):

    /**
     * このストリームの要素のうち, 指定された述語が真となる要素から構成
     * されるストリームを返す.
     * <p>
     * これは中間操作である.
     *
     * @param predicate ストリームに含めるかどうか判定する述語
     * @return 結果のストリーム
     */
    default Stream<T> filter(Function<? super T, Boolean> predicate)
      {
        return new Filter<>(this, predicate);
      }

Filter.java: filter の実装は, Optional を駆使すると一行.

package org.creasys.sstream;

import java.util.Optional;
import java.util.function.Function;

/**
 * 簡易版ストリームのフィルタ.
 *
 * @param <T> ストリームの要素の型
 */
public class Filter<T> implements Stream<T>
  {
    private Stream<T> stream;
    private Function<? super T, Boolean> predicate;

    Filter(Stream<T> stream, Function<? super T, Boolean> predicate)
      {
        this.stream = stream;
        this.predicate = predicate;
      }

    /** 要素を繰返し取得し, 述語が真となる最初の要素を返す. */
    @Override public Optional<T> get()
      {
        Optional<T> o;
        while ((o = stream.get())
            .map(x -> !predicate.apply(x)).orElse(false));
        return o;
      }
  }

Java9 には Optional#ifPresentOrElse が導入されて便利になったそうだが, これに頼らず Optional#map を使って何とかするのが硬派.

map

中間操作のもう一人の花形スター map.

Stream.java(一部):

    /**
     * 要素に指定された関数を適用した変換結果から構成されるストリームを
     * 返す.
     * <p>
     * これは中間操作である.
     *
     * @param mapper 各要素を変換する関数
     * @return 結果のストリーム
     */
    default <R> Stream<R> map(Function<? super T, ? extends R> mapper)
      {
        return new Map<>(this, mapper);
      }

Map.java: 流れを変えない中間操作は, 非常に簡単.

package org.creasys.sstream;

import java.util.Optional;
import java.util.function.Function;

/**
 * 簡易版ストリームの要素を変換する.
 *
 * @param <T> 変換前の要素の型
 * @param <R> 変換後の要素の型
 */
public class Map<T, R> implements Stream<R>
  {
    private Stream<T> stream;
    private Function<? super T, ? extends R> mapper;

    Map(Stream<T> stream, Function<? super T, ? extends R> mapper)
      {
        this.stream = stream;
        this.mapper = mapper;
      }

    @Override public Optional<R> get()
      {
        return stream.get().map(x -> mapper.apply(x));
      }
  }

その他

本物には sort などがあるが, そういう邪道な機能は無視. sort は本質的に要素を溜込まなければならないのだから, 素直に Collection を使えばいいと思う.

そういうわけで, 中間操作は以上で終わり.

終端操作

中間操作が意外なほど簡単なのに比べて, 終端操作はなかなかバリエーションが豊富で理解が難しい. (あ, 本物の Stream API の話, ね)

最終的に得られた要素をどうするかは, それこそ様々な要求が存在するのが道理なので, なんちゃって Stream では思い切って, 悪名高い forEach と, 個人的に一番よく使う collect に絞って実装した.

forEach

使ったら負けとまで言われる forEach だが, 個人的にも最後の最後まで使うのを躊躇う forEach だが, 最終兵器として何でもできてしまうので...

Stream.java(一部):

    /**
     * このストリームの各要素に対してアクションを実行する.
     * <p>
     * これは終端操作である.
     *
     * @param consumer 要素に対して実行するアクション
     */
    default void forEach(Consumer<? super T> consumer)
      {
        new ForEach<>(this, consumer).doForEach();
      }

ForEach.java:

package org.creasys.sstream;

import java.util.function.Consumer;

/**
 * 簡易版ストリームの各要素について繰返しアクションを実行する.
 *
 * @param <T> ストリームの要素の型
 */
public class ForEach<T>
  {
    private Stream<T> stream;
    private Consumer<? super T> consumer;

    ForEach(Stream<T> stream, Consumer<? super T> consumer)
      {
        this.stream = stream;
        this.consumer = consumer;
      }

    void doForEach()
      {
        while (stream.get()
                .map(x -> { consumer.accept(x); return true; })
                .orElse(false))
            ;
      }
  }

説明の必要はないと思うが, 硬派の Optional#map を使っているとだけ言っておこう.

collect

なんだかんだ言って, 結局 Stream の最終結果を Collection に入れたくなったとき用の collect である.

最初 Stream API のリファレンスを見たとき, 正直意味不明だと感じたが, reduce 操作が不変なオブジェクトに集約する操作で, collect 操作は変更可能なオブジェクトに集約する操作だと理解すれば, リファレンスのひどい日本語訳でも読めるようにはなる.

それにしても「可変リダクション操作」という日本語はあまりにもひどいと思う. これ以上は本稿の主旨から外れるので解説は省く.

本物の Collector インタフェースには, 並列化の機能が入り込んでいるので, その簡略版をまず用意する.

Collector.java:

package org.creasys.sstream;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * 可変コンテナに入力要素を蓄積し, すべての入力要素が処理された後, 蓄積
 * された結果を最終的な表現に変換する可変リダクション操作.
 * <p>
 * Collector の仕様は次の 3 つの関数により決定される.
 * <ul>
 * <li>新しい可変コンテナの作成 (supplier())</li>
 * <li>可変コンテナへのデータ要素の取込み (accumulator())</li>
 * <li>(オプション) 可変コンテナに対する最終的な変換の実行 (finisher())</li>
 * </ul>
 *
 * @param <T> 可変リダクション操作の入力要素の型
 * @param <A> 可変リダクション操作の可変コンテナの型
 * @param <R> 可変リダクション操作の結果の型
 */
public interface Collector<T, A, R>
  {
    static <X> Collector<X, List<X>, List<X>> toList()
      {
        return new ToList<X>();
      }

    /**
     * 新しい可変コンテナを作成して返す関数.
     *
     * @return 新しい可変コンテナを作成して返す関数
     */
    Supplier<A> supplier();

    /**
     * 可変コンテナに値を折りたたむ関数.
     *
     * @return 可変コンテナに値を折りたたむ関数
     */
    BiConsumer<A, T> accumulator();

    /**
     * 可変コンテナ (中間結果) から最終結果へ変換する関数.
     *
     * @return 可変コンテナ (中間結果) から最終結果へ変換する関数
     */
    Function<A, R> finisher();
  }

ついでに, 本物では Collectors.toList となっているリスト用の Collector を返す static メソッドをここに置いた.

ToList.java: 上記 toList の実装. Collector インタフェースに必要な 3 メソッドを定義し, それぞれ関数を返すだけ.

package org.creasys.sstream;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * 簡易版ストリームからリストへの可変リダクション操作をカプセル化したク
 * ラス.
 *
 * @param <T> ストリームの要素の型
 */
public class ToList<T> implements Collector<T, List<T>, List<T>>
  {
    @Override public Supplier<List<T>> supplier()
      { return ArrayList<T>::new; }
    @Override public BiConsumer<List<T>, T> accumulator()
      { return (c, v) -> c.add(v); }
    @Override public Function<List<T>, List<T>> finisher()
      { return c -> c; }
  }

ここからが collect の本体.

Stream.java(一部):

    /**
     * このストリームに対して可変リダクション操作を実行する.
     * <p>
     * 可変リダクション操作はリダクション操作とよく似ているが,可変リダク
     * ション操作はリダクション操作と違い操作対象が変更可能なオブジェク
     * トでなければならない.
     *
     * @param collector コレクト操作内容をカプセル化したオブジェクト
     * @return 結果のオブジェクト
     */
    default <R> R collect(Collector<? super T, ?, R> collector)
      {
        return new Collect<>(this, collector).doCollect();
      }

Collect.java: collect 操作の実装.

package org.creasys.sstream;

import java.util.function.BiConsumer;

/**
 * 簡易版ストリームの可変リダクション操作を実行するクラス.
 *
 * @param <T> ストリームの要素の型
 * @param <R> 結果の型
 */
public class Collect<T, A, R>
  {
    private Stream<T> stream;
    private Collector<? super T, A, R> collector;

    Collect(Stream<T> stream, Collector<? super T, A, R> collector)
      {
        this.stream = stream;
        this.collector = collector;
      }

    R doCollect()
      {
        A container = collector.supplier().get();
        BiConsumer<A, ? super T> accumulator = collector.accumulator();
        while (stream.get()
                .map(x -> { accumulator.accept(container, x);
                            return true; })
                .orElse(false))
            ;
        return collector.finisher().apply(container);
      }
  }

最後に

簡易版ストリームの実装は以上である. 実に簡単な機能だけで実現できることが分かっていただけたと思う.

そもそもなぜこんなものを実装しようと思ったかであるが, 冒頭述べた通り, 一つには Stream への理解をより深めてもらうためであるが, もう一つ, Java8 の Stream には分岐と結合の機能がないという点が大いに不満であって, それが Java の文法の枠組みだけで実現できないものだろうかと考え始め, 実験的に実装してみるための土台が欲しくなったというのが本当の理由である.

実は頭の中では分岐も結合も出来上がっているのだが, それについては次回と言うことにする.

おまけ

Stream.java: 全体.

package org.creasys.sstream;

import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * 簡易版ストリームのインタフェース.
 *
 * @param <T> ストリームの要素の型
 */
public interface Stream<T>
  {
    /**
     * {@link Iterable<T>} の全要素から成るストリームを生成する.
     *
     * @param iterable 要素の提供元
     * @return 生成したストリーム
     */
    static <X> Stream<X> of(Iterable<? extends X> iterable)
      {
        return new OfIterable<X>(iterable);
      }

    /**
     * 引数に指定された要素から成るストリームを生成する.
     *
     * @param arg 要素
     * @return 生成したストリーム
     */
    @SafeVarargs
    static <X> Stream<X> of(X... arg)
      {
        // iterable の実装を利用.
        return of(Arrays.asList(arg));
      }

    /**
     * 初期要素から関数を繰返し適用することによって生成される要素から成
     * るストリームを返す.
     *
     * @param seed 初期要素
     * @param next 要素から次の要素を決定する関数
     * @return 生成したストリーム
     */
    static <X> Stream<X> iterate(
            X seed,
            Function<? super X, ? extends X> next)
      {
        return new Iterate<>(seed, next);
      }

    /**
     * このストリームの要素数を size に切詰めたストリームを返す.
     * <p>
     * これは短絡中間操作である.
     *
     * @param size 最大要素数
     * @return 結果のストリーム
     */
    default Stream<T> limit(int size)
      {
        return new Limit<>(this, size);
      }

    /**
     * このストリームの最初の size 個の要素を破棄した残りの要素で構成さ
     * れるストリームを返す.
     * <p>
     * このストリームに含まれる要素の数が size 個より少ない場合は, 空の
     * ストリームを返す.
     * <p>
     * これは中間操作である.
     *
     * @param size 破棄する要素数
     * @return 結果のストリーム
     */
    default Stream<T> skip(int size)
      {
        return new Skip<>(this, size);
      }

    /**
     * このストリームの要素のうち, 指定された述語が真となる要素から構成
     * されるストリームを返す.
     * <p>
     * これは中間操作である.
     *
     * @param predicate ストリームに含めるかどうか判定する述語
     * @return 結果のストリーム
     */
    default Stream<T> filter(Function<? super T, Boolean> predicate)
      {
        return new Filter<>(this, predicate);
      }

    /**
     * 要素に指定された関数を適用した変換結果から構成されるストリームを
     * 返す.
     * <p>
     * これは中間操作である.
     *
     * @param mapper 各要素を変換する関数
     * @return 結果のストリーム
     */
    default <R> Stream<R> map(Function<? super T, ? extends R> mapper)
      {
        return new Map<>(this, mapper);
      }

    /**
     * このストリームに対して可変リダクション操作を実行する.
     * <p>
     * 可変リダクション操作はリダクション操作とよく似ているが,可変リダク
     * ション操作はリダクション操作と違い操作対象が変更可能なオブジェク
     * トでなければならない.
     *
     * @param collector コレクト操作内容をカプセル化したオブジェクト
     * @return 結果のオブジェクト
     */
    default <R> R collect(Collector<? super T, ?, R> collector)
      {
        return new Collect<>(this, collector).doCollect();
      }

    /**
     * このストリームの各要素に対してアクションを実行する.
     * <p>
     * これは終端操作である.
     *
     * @param consumer 要素に対して実行するアクション
     */
    default void forEach(Consumer<? super T> consumer)
      {
        new ForEach<>(this, consumer).doForEach();
      }

    /**
     * このストリームから要素を取得する.
     * <p>
     * ストリームが終端に達していた場合は空の Optional を返す.
     *
     * @return 取得した要素または空の Optional
     */
    Optional<T> get();
  }

おまけのおまけ

Stream API は例外と相性が悪いと言う噂があるが, 単に標準の関数インタフェースが throws 宣言をしていないことを指して文句を言っている人もいるようだ.

その不満は, 自分で関数インタフェースを用意することで, ある程度解消することができると思う.

例:

package org.creasys.exp;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

public class Example
  {
    @FunctionalInterface
    interface SupplierWithThrows<X>
      {
        X get() throws Exception;
      }

    @FunctionalInterface
    interface RunnableWithThrows
      {
        void run() throws Exception;
      }

    private static <X> X xr(SupplierWithThrows<X> supplier)
      {
        try
          {
            return supplier.get();
          }
        catch (Exception e)
          {
            if (e instanceof RuntimeException)
                throw (RuntimeException) e;
            throw new RuntimeException(e);
          }
      }

    private static void xr(RunnableWithThrows runnable)
      {
        try
          {
            runnable.run();
          }
        catch (Exception e)
          {
            if (e instanceof RuntimeException)
                throw (RuntimeException) e;
            throw new RuntimeException(e);
          }
      }

    public static void main(String[] args)
      {
        BufferedReader br = xr(() -> new BufferedReader(
            new InputStreamReader(
                new FileInputStream("foo.txt"),
                StandardCharsets.UTF_8)));
        try
          {
            String line;
            while ((line = xr(() -> br.readLine())) != null)
                System.out.println(line);
          }
        finally
          {
            xr(() -> br.close());
          }
      }
  }

メソッド xr は, ExceptionRuntimeException に変換する機能を持つ. (eXception to Runtimeexception で xr, ね)

一見, 引数を取る関数が使えないので, 現場で使おうと思うと大量の関数インタフェースと, xr のオーバーロードを自作する羽目になるのでは, と心配する人もいるかもしれないが, それには及ばない.

x = someFunc(a)someFunc の例外宣言が嫌なら, x = xr(() -> someFunc(a)) と書けばよいだけ.

私自身, この技を仕事中にふと思いついて, 以来とても便利に多用している. お試しあれ.