谷本 心 in せろ部屋

はてなダイアリーから引っ越してきました

[Java]Stream APIをつくろう

最近、上司からのパワハラではなく、部下からの「部下ハラ」というものがあると聞きますが、
それって要するにバックプレッシャー型ハラスメントでありリアクティブであるよなと思うこの頃ですが、
皆さんいかがお過ごしでしょうか。


さて、順調な滑り出しからスタートした本エントリーは Java Advent Calenda 2015 の12日目になります。


前日は最年少(?)OpenJDKコミッタの [twitter:@bitter_fox] さんによる
DeprecatedがJDK9で変わるかもしれない件(JEP 277: Enhanced Deprecation)
明日は日本唯一のJava Championの [twitter:@skrb] さんのエントリーです。
なかなかエラいところに挟まれてしまったなという感じです!


では本題、
今日のテーマは「Stream APIをつくろう」です。


Stream APIって、使う側がよくフォーカスされるのですが、
作る側がどうなっているのか、あるいはその使いどころはどこなのか、というのはあまり情報がないように思います。


今日はその作る側にフォーカスしたいと思います。
少し長いエントリーですが、ゆっくりとおつきあいください。

おしながき

1. なぜStream APIを作りたいのか
2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream として扱う
3. そもそもStream APIってどうやって作るの?
4. お題:AWS S3にあるテキストファイルをまとめて Stream として扱う
5. まとめ、そして次回予告

1. なぜSteam APIを作りたいのか

もともとのきっかけは、AWSのS3上にある大量のログファイルを
elasticsearchに流し込む処理を書いたことでした。


ファイルサイズが膨大なのはもちろんのこと、ファイル数自体も大量ですから、
処理対象となるファイルをリストアップするだけでも大変という状況です。
なのでファイルリストを少しずつ作りつつ、
ファイルを開いて少し読んでは流し込むという、逐次処理をしなくてはいけません。


Java7までなら、うんたらHandlerを渡して処理をグルグル回すところですが
Java8なので、せっかくだからStream APIを使うことにしてみました。


ただ先にも書いたように、Stream APIを提供する側の処理はあまり情報がなかったため
[twitter:@bitter_fox]さんや[twitter:@making]さん、[twitter:@backpaper0]さんに色々と質問しながら勉強しました。
皆さん、ありがとうございました。今日はその辺りをまとめたいと思います。


ちなみに言葉の定義としてはStream APIを作るんじゃなくて
Stream APIのProducerを作るっていう方が正しいとは思うんですが
ぱっと見での分かりやすさのためにこうしました。

2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream として扱う

まず手始めに、ローカルファイルにあるファイルを処理する方法から考えます。
特定のディレクトリの直下にテキストファイルが大量にあり、
ファイル名などは別に表示しなくても良いから、
テキストファイルを読み込んで、一行ずつ処理したいというシチュエーションです。

Files.listとFiles.linesをどう組み合わせるの?

ディレクトリ内にあるファイルの一覧を Stream として取得するのは Files.list メソッド、
ファイルを読み込んで Stream として取得するのは Files.lines メソッドが使えます。
では、これを組み合わせる時は、どうしたら良いんでしょうか。


変換はmapメソッドだったよなーと思いつつ、こんな事をやると。

Stream<Stream<String>> stream = Files.list(path)
        .map(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
stream.forEach(System.out::println);

Stream> になり、恐ろしく期待外れの結果になります。

java.util.stream.ReferencePipeline$Head@421faab1
java.util.stream.ReferencePipeline$Head@2b71fc7e
java.util.stream.ReferencePipeline$Head@5ce65a89


ではどうするかというと、そう、みんな大好きflatMapです。

try (Stream<String> stream = Files.list(path)
        .flatMap(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

こうすれば無事に結果をStreamとして取り出すことができます。


出力例はこんな感じ。

This is file1.
Hello world.
This is file 2.
This is file3.
Good bye world.

ちなみにFiles.listで作ったStreamはきちんとcloseする必要があるので
try-with-resourcesを使うのがオススメです。

flatMapは毎回closeを呼び出してくれる

ちなみにflatMapがスグレモノなところは、
flatMapメソッドの中で作ったStreamを、毎回必ずcloseしてくれるところです。


上の例ではFiles.linesを利用しているので、別にcloseする必要はないのですが、
たとえばBufferedReaderやInputStreamなどを作った場合などには、closeする必要があります。
そういう時は、flatMapメソッド内で作るStreamにonCloseを書いておけば、毎回呼び出されるのです。

try (Stream<String> stream = Files.list(path)
        .flatMap(s -> {
            try {
                BufferedReader reader = Files.newBufferedReader(s);
                return reader.lines().onClose(() -> { // (1)
                    try {
                        reader.close(); // (2)
                        System.out.println("closed."); // (3)
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

(1) でStreamを作ったところにonCloseを書いて
(2) でBufferedReaderをcloseしています。

This is file1.
Hello world.
closed.
This is file 2.
closed.
This is file3.
Good bye world.
closed.

(3) で書いた「closed.」が3回表示され、きちんと毎回closeされていることが分かります。

この章のまとめ

1. Streamの要素からStreamに変換する時は、flatMapだ!
2. flatMapで作ったStreamは、きちんとcloseしてくれるぞ!

3. そもそもStream APIってどうやって作るの?

前の章ではflatMapが便利だという知見を得ました。
次は、そもそもStream APIってどうやって作れば良いのかを考えます。


恥ずかしながら全然知らなかったので、twitterで聞いてみたところ
[twitter:@bitter_fox]さんと [twitter:@making]さんから、
Iteratorを作って、SpliteratorにしてStreamSupporに渡すと良いと教わりました。


この件については、BufferedReader.linesの実装が分かりやすすぎるので引用したいと思います。

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() { // (1)
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false); // (2)
}

(1) Iteratorを実装して
(2) Spliteratorsを使ってSpliteratorに変換し、StreamSupportを使ってStreamに変換しています。
これがStreamを作る時の、ひとつの王道のようですね。

この章のまとめ

1. ルークよ、Iteratorを使え

4. お題:AWS S3にあるテキストファイルをまとめて Stream として扱う

Streamのつくりかたが分かったところで、
次は、AWS S3にあるテキストファイルを処理する方法を考えます。

と、その前に、
AWSになじみのない方がここで読んでしまうことを防ぐために説明したいのですが、
この章の目的は、AWS S3の操作がしたいのではなく、
Java標準APIでStream APIを作れないようなパターンで、どのように操作できるかを考えることです。


なのでAWSだけでなく、たとえばDBや外部Webサーバなどでも同じパターンが使えますので、
AWSはあくまで一例として捉えてもらえればと思います。

今回利用するクラス

まず簡単に、AWS S3からファイルを取ってくる処理を実装するために必要なクラスを紹介します。


ちなみにAWS S3とはファイルストレージのサービスで、
ブラウザからのファイルアップロード / ダウンロードができる他にも
コマンドラインツール(AWS CLI)や、AWS SDKAWS SDK for Javaなど)から操作ができます。
今回はAWS SDK for Javaを使うことにします


それで、使うクラスは以下の通りです。

  • AmazonS3Client
    • AWS S3にアクセスするためのクライアント。通信するくせにclose不要なやつ。
  • S3ObjectSummary
    • S3のファイル情報を持つクラス。Javaで言うPathやFileクラスに相当する。
  • ObjectListing
    • ファイル一覧取得結果のメタ情報を持つクラス。ここから List を取り出すことができる。


あんまり多くないですね。

AWS S3へのアクセスを試す

まず簡単に、AWS S3にJavaからアクセスする方法を書いておきます。
型などがはっきり分かるよう、Stream APIを使わずに書きます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (1)
List<S3ObjectSummary> list = listing.getObjectSummaries(); // (2)

for (S3ObjectSummary s : list) {
    S3Object object = client.getObject(s.getBucketName(), s.getKey()); // (3)
    try (InputStream content = object.getObjectContent(); // (4)
         BufferedReader reader = new BufferedReader(new InputStreamReader(content))) {
        String line = reader.readLine();
        while (line != null) {
            System.out.println(line); // (5)
            line = reader.readLine();
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

(1)「cero.ninja.public」という名前のバケットにある「advent2015java」というフォルダから情報を取り
(2) ファイル情報をListとして取り出して
(3) ファイルの中身を読み込んで
(4) InputStreamを取り出して
(5) 標準出力に表示しています。


簡単でしょう?
ただ、実は (1) のメソッド呼び出しは最大1000件までしか取得できないという制限があります。
1001件以降のデータを取るためには、別のメソッドを使う必要があります。

List<S3ObjectSummary> allList = new ArrayList<>(); // (6)

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (7)
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    allList.addAll(list); // (8)
    listing = client.listNextBatchOfObjects(listing); // (9)
    list = listing.getObjectSummaries();
}

(6) Listを別に用意しておいて
(7) 最初の1000件を読み込み
(8) ファイル情報を (6) のListに入れながら
(9) 次の1000件を読む、を繰り返す


こんな風になります。
ただこれがたとえば10万件、100万件となってくると、Listがメモリを過剰に消費しますし
そもそも実際の処理を始めるまでのオーバーヘッドが大きくなりすぎます。


じゃぁwhileの中で処理しちゃえばいいやんか、という話になってきます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java");
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    list.stream().forEach(this::doSomething); // (10)
    listing = client.listNextBatchOfObjects(listing);
    list = listing.getObjectSummaries();
}

(10) 取得した1000件を使ってすぐに処理を始めます。


これはこれで正解だと思いますしシンプルで良いのですが、
「S3から情報を取るクラスに処理を渡す」よりも
「S3から情報を取ってStreamにする」ほうが、何かと取り回しが楽なのですよね。

AWSへのアクセスをStreamにしよう

ということで、ここまで学んだ知識を利用して、
AWS S3にあるテキストファイルをまとめて Stream にする方法を考えます。


まずはファイル一覧を取ってくる部分を、Iteratorにします。

public class S3Iterator implements Iterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    ObjectListing listing;
    List<S3ObjectSummary> cache; // (1)

    public S3Iterator(String bucketName, String key) {
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    void load() {
        if (cache != null && cache.size() > 0) {
            return;
        }

        if (listing == null) {
            listing = client.listObjects(bucketName, key); // (2)
        } else {
            listing = client.listNextBatchOfObjects(listing); // (3)
        }

        cache = listing.getObjectSummaries(); // (4)
    }

    @Override
    public boolean hasNext() {
        if (cache != null && cache.size() > 0) {
            return true;
        }

        load();
        return cache.size() > 0;
    }

    @Override
    public InputStream next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        S3ObjectSummary summary = cache.remove(0); // (5)
        return client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); // (6)
    }
}

(1) 読み込んだ1000件のファイル情報を保持しておくcacheを作っておき
(2) 初回はlistObjectsを使って読み込み
(3) 2回目以降はlistNextBatchOfObjectsを使って読み込みます。
(4) 読み込んだファイル情報はcacheに入れておき
(5) removeしながら取り出します。
(6) また、S3ObjectSummaryのStreamにはせず、InputStreamにしてから返します。
このようにすれば、Iteratorの外側でAmazonS3Clientを利用する必要がなくなるためです。


そして、このIteratorからStreamを作り、さらにflatMapを使ってStreamに集約しましょう。

Stream<InputStream> fileStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
        new S3Iterator("cero.ninja.public", "advent2015java"), Spliterator.NONNULL), false); // (7)

Stream<String> stream = fileStream.flatMap(in -> { // (8)
    BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
    return reader.lines() // (9)
            .onClose(() -> { // (10)
            reader.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    });
});

stream.forEach(System.out::println); // (11)

(7) IteratorからSpliteratorを作り、そこからStreamに変換します。これでファイル一覧が Stream になります。
(8) InputStreamから読み込んだ内容を集約するために、flatMapを使って
(9) BufferedReader.linesで作ったStreamを集約します。
(10) ここでonCloseを書いておくと、BufferedReaderが毎回きちんとcloseされます。flatMapさまさま。
(11) 最後に標準出力への書き出しをしていますが、もちろんこの Stream はどのように扱うこともできます。


という感じで、AWS S3上にあるテキストファイルを Stram にして
処理することができるようになりました。

gzipやzipだったらどうするの?

ちなみに大量のテキストファイルが存在する場合、たとえばそれがWebサーバのログだったりすると
たいていにおいてgzip圧縮されているかと思います。
そういう時は、読み込む時にGZIPInputStreamを使うと良いでしょう。


先に出した例とあまり変わらず、GZIPInputStreamを挟んだだけの形となります。

Stream<String> stream = fileStream.flatMap(in -> {
    BufferedReader reader;
    try {
        reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), StandardCharsets.UTF_8)); // (12)
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    return reader.lines()
            .onClose(() -> {
                try {
                    reader.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
});

(12) のところにGZIPInputStreamを挟みました。


では、読み込む対象が、複数のファイルを持つようなzipファイルだった場合はどうすれば良いでしょうか。
実はこれが割と難敵で、zipで処理を回せるようなIteratorをもう一つ作る必要があります。


ZipInputStreamにStreamを返すようなAPIを作ってくれれば良いのにと思うんですが
(ZipFileやFileSystemsならStreamを作りやすいんですが、ZipInputStreamからは作れない!)
そのあたりは自前で少し実装する必要があります。
さすがにエントリーが長くなってきたので、今回は割愛します。

Spliteratorを直接作っても良い

[twitter:@backpaper0]さんから、Iteratorを実装するよりも
AbstractSpliteratorを実装したSpliteratorを作ったほうが楽だよという知見をもらったので
それを使ったサンプルも掲載しておきます。

public class S3Spliterator extends Spliterators.AbstractSpliterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    List<S3ObjectSummary> cache;
    ObjectListing listing;

    public S3Spliterator(String bucketName, String key) {
        super(Long.MAX_VALUE, 0);
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    @Override
    public boolean tryAdvance(Consumer<? super InputStream> action) {
        if (cache == null) {
            listing = client.listObjects(bucketName, key);
            cache = listing.getObjectSummaries();
        } else if (cache.isEmpty()) {
            listing = client.listNextBatchOfObjects(listing);
            cache = listing.getObjectSummaries();
        }

        if (cache.isEmpty()) {
            return false;
        }

        S3ObjectSummary summary = cache.remove(0);
        S3ObjectInputStream in = client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent();
        action.accept(in);
        return true;

    }
}

tryAdvanceメソッドをひとつ作れば良いだけですので、
hasNextやnextを実装する時のように、状態がどうなるかよく分からなくなって混乱することもありません。
確かに、Streamを作ることだけが目的となる場合は、こちらを使った方が楽になりそうですね。


念のため、これを使った処理も書いておきます。

Stream<InputStream> fileStream = StreamSupport.stream(
        new S3Spliterator("cero.ninja.public", "advent2015java"), false);

Stream<String> stream = fileStream.flatMap(in -> { // 以下略

StreamSupportに直接渡すだけで済む、ということですね。

この章のまとめ

1. 「1000件ずつ読み出して処理」みたいなやつは、IteratorにしてStreamにすれば処理しやすいよ!
2. AbstractSpliteratorを継承すると、Spliteratorを作りやすいよ!

5. まとめ、そして次回予告

では今回のまとめです。
今回のエントリーでは、Streamを作るところと、
その使いどころとしてAWS S3上の大量ファイルを例に挙げて説明しました。


Iteratorを作り、Spliteratorにして、そこからStreamを作る、
また、Stream.flatMapを使って複数リソースをシームレスに処理することができました。


Stream.flatMapを使うときちんとリソースのcloseができるのは、便利さがじわじわ効いてくる系のやつで、
たとえばStreamの代わりにIteratorを取り回そうとすると、リソースのcloseタイミングで困ることになります。
そういう点でも、Streamというのは取り回しやすい形なのかなと思います。


さて、長くなってきたので、今回のエントリーはここまでにしたいと思います。


でも実はもう一つ、言及したかったテーマがあります。
それが「Hot ProducerとCold Producer」というお話です。


JavaのStream APIは、どちらかと言えば「既に存在するListやMap」などに対する処理を
簡潔に書くところがクローズアップされているように思います。
これはCold Producer、あるいはCold Streamと呼ばれているそうです。


一方で、HTTPリクエストや、JMS、AMQP、あるいはTwitterのstreamなど、
いわゆるイベントを受け取るようなもの、あるいは無限Streamになるようなものは
Hot Producer、Hot Streamと呼ばれます。


JavaではHot Producerを扱う場合には、Listenerを実装する形が一般的ですが
Streamとして処理することもできるはずです。


Reactive StreamsがJava9に入るとか、バックプレッシャー型がどうしたと言われている中で、
このHot ProducerをStream APIを使って処理するような考え方が、
Javaにおいても、今後、重要になってくるはずです。
次回は、その辺りを考えてみたいと思います。


Stay metal,
See you!