配列をFluxにする
まずは生徒一覧取得メソッドの実装として、生徒の配列からFluxを作って返します。
モックなので固定値を返してしまうのですが、検索を模しているので、1データ取得するのに100ミリ秒掛かるという想定にしました。こんな感じです。
Flux<Student> fetchStudents(String className) {
Student[] students = {
new Student(1, "Muto"),
new Student(2, "Miyoshi"),
new Student(3, "Matsui"),
new Student(28, "Mori"),
new Student(29, "Tanaka"),
new Student(30, "Yagi"),
};
return Flux.interval(Duration.ofMillis(100))
.map(i -> students[i.intValue()])
.take(students.length);
}
intervalメソッドで100ミリ秒おきに、mapメソッドで配列から生徒を取り出して、takeメソッドで配列分だけ生徒を取得したら終える、という実装です。
30人いるので、シーケンシャルに行えば、3000ミリ秒、つまり3秒掛かる処理ですね。
点数の一覧を取得するところも、実装の内容はほぼ同じです。
Flux<Score> fetchScores(int year, int id) {
final Score[] scores = {
new Score(id, "国語", 80),
new Score(id, "数学", 90),
new Score(id, "英語", 85),
new Score(id, "社会", 93),
new Score(id, "理科", 72)
};
return Flux.interval(Duration.ofMillis(100))
.map(i -> scores[i.intValue()])
.take(scores.length);
}
これもシーケンシャルに行えば、5科目 * 100ミリ秒で、500ミリ秒掛かる処理です。30人分あるので、15秒掛かる計算になります。
つまり生徒一覧と成績一覧の取得処理をすべてシーケンシャルに行うと、18秒掛かることになります。それをもっとうまく並列に処理したいというのが今回のテーマです。
生徒の結果が返ってき次第、成績を取得する
ここまでに書いた2つのメソッドを使って、生徒の一覧を取得して、それぞれの生徒ごとに成績の一覧を取得し、それをMapに変換するような処理を書いてみます。
Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
.subscribe(student -> {
fetchScores(2017, student.id)
.subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
});
System.out.println(map);
Fluxのsubscribeメソッドを使えば、値が1件戻ってくるたびにその値を使った処理を行うことができます。
そのメソッドを使って、
えーっと、、、先にMapのインスタンスを作っておいて、
subscribeの中でMapに追加していく、と、いう・・・、
完全に「素人ですかって」怒られるタイプのコードですよね、これ。いや素人ですから💢
あと、勘が良い方がは気づかれたかも知れませんが、ConcurrentModificationExceptionが発生しかねなかったりもしますね。悪い例がすぎますね。
ちなみにこのコードを実行すると、ほとんど結果が表示されずに終わってしまいます。mainスレッドが終了するからでしょうね。
であれば、その後にスリープしてしまえばいいんです。
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
いや素人にも程があるやろ💢💢 ってレベルのコードができあがりました。
これで実行させてみると・・・
{
{17 Kurosawa=[17 国語 80, 17 数学 90]
, 8 Sato=[8 国語 80, 8 数学 90, 8 英語 85, 8 社会 93, 8 理科 72]
, 11 Notsu=[11 国語 80, 11 数学 90, 11 英語 85, 11 社会 93, 11 理科 72]
, 1 Muto=[1 国語 80, 1 数学 90, 1 英語 85, 1 社会 93, 1 理科 72]
, 13 Ooga=[13 国語 80, 13 数学 90, 13 英語 85, 13 社会 93, 13 理科 72]
, 4 Nakamoto=[4 国語 80, 4 数学 90, 4 英語 85, 4 社会 93, 4 理科 72]
, 3 Matsui=[3 国語 80, 3 数学 90, 3 英語 85, 3 社会 93, 3 理科 72]
, 12 Taguchi=[12 国語 80, 12 数学 90, 12 英語 85, 12 社会 93, 12 理科 72]
, 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85]
, 10 Kikuchi=[10 国語 80, 10 数学 90, 10 英語 85, 10 社会 93, 10 理科 72]
, 5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72]
, 18 Kurashima=[18 国語 80]
, 2 Miyoshi=[2 国語 80, 2 数学 90, 2 英語 85, 2 社会 93, 2 理科 72]
, 15 Shiroi=[15 国語 80, 15 数学 90, 15 英語 85, 15 社会 93]
, 6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72]
, 14 Sugimoto=[14 国語 80, 14 数学 90, 14 英語 85, 14 社会 93, 14 理科 72]
, 7 Sugisaki=[7 国語 80, 7 数学 90, 7 英語 85, 7 社会 93, 7 理科 72]
, 9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72]
}
あぁー、どうあれ、なんか期待通りのものが取れてるじゃないですか! 勝利!!
と思ってよく見たら、18人分しか取れていません。僕的にはNakamoto, Kikuchi, Mizunoが取れていれば良いのですが。
また、15人目〜18人目の生徒は途中までしか成績が取れていません。なるほど非同期で処理してる途中でSystem.out.printlnが呼ばれて、処理が終わってしまったわけですね。
であれば、2000Lとしたsleep時間を10000Lぐらいにすれば解決しますよね!
・・・みたいな話を続けてるといい加減怒られそうなので、そろそろ真面目にやりましょう。
ログを出そう
真面目にやる前に、少し脇道に逸れて、ログの話をします。
ReactorのFluxやMonoは、どこで何が起きているか分かりにくいのを少しでも解消するためか、随所に埋め込めるlogメソッドが用意されています。
このlogメソッドを使えばコンソールに最低限の情報を出ます。ただもう少し詳しい情報が欲しくなるため、ログフォーマットを指定したlogback.xmlを用意しておきます。
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
もちろんpom.xmlにもlogback-classicのdependencyを追加しておいてください。
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
あとは見るべき所にログを埋め込んでいきます。今回はfetchStudentsメソッドと、fetchScoresメソッドの2箇所に入れておきます。
Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
.log("fetchStudents")
.subscribe(student -> {
fetchScores(2017, student.id)
.log("fetchScores")
.subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
});
細かな挙動を把握したければもう少し違う箇所にもログを埋め込めば良いのですが、ログが多すぎても見通しが悪くなるため、この2箇所だけにしています。
これで実行して取れたログを見てみます。 # で始まっている部分は僕がつけたコメントです。
16:00:09.593 [main] INFO fetchStudents - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.601 [main] INFO fetchStudents - request(unbounded)
16:00:09.710 [parallel-1] INFO fetchStudents - onNext(1 Muto)
16:00:09.712 [parallel-1] INFO fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.712 [parallel-1] INFO fetchScores - request(unbounded)
16:00:09.804 [parallel-1] INFO fetchStudents - onNext(2 Miyoshi)
16:00:09.805 [parallel-1] INFO fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.805 [parallel-1] INFO fetchScores - request(unbounded)
16:00:09.816 [parallel-2] INFO fetchScores - onNext(1 国語 80)
16:00:09.906 [parallel-1] INFO fetchStudents - onNext(3 Matsui)
16:00:09.906 [parallel-1] INFO fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.906 [parallel-1] INFO fetchScores - request(unbounded)
16:00:09.907 [parallel-3] INFO fetchScores - onNext(2 国語 80)
16:00:09.918 [parallel-2] INFO fetchScores - onNext(1 数学 90)
16:00:10.006 [parallel-1] INFO fetchStudents - onNext(4 Nakamoto)
16:00:10.006 [parallel-1] INFO fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:10.006 [parallel-1] INFO fetchScores - request(unbounded)
ここまでの流れを確認すると、次のような流れになります。
1. まずmainスレッドで、生徒取得(fetchStudents)のsubscribe登録を行う
2. 生徒の値が返ってくると、parallel-1スレッドでsubscribeの中に書いた処理が行われる。
3. parallel-1スレッドで、成績取得(fetchScores)のsubscribe登録を行う
4. 成績の値が返ってくると、parallel-2からparallel-4までのスレッドでsubscribeの中に書いた処理が行われる。
ソースコードはとてもダメでしたが、処理自体はおおむね期待通りになっていることが分かりました。
ノンブロッキングかどうかみたいな話は、また後で詳しくやります。
ちなみに最後の方はこうなっていました。
16:00:11.509 [parallel-1] INFO fetchScores - onNext(16 英語 85)
16:00:11.511 [parallel-3] INFO fetchScores - onNext(18 国語 80)
16:00:11.512 [parallel-4] INFO fetchScores - onNext(15 社会 93)
16:00:11.512 [parallel-3] INFO fetchScores - onNext(14 理科 72)
16:00:11.512 [parallel-3] INFO fetchScores - onComplete()
16:00:11.514 [parallel-2] INFO fetchScores - onNext(17 数学 90)
16:00:11.605 [parallel-1] INFO fetchStudents - onNext(20 Yamaide)
16:00:11.606 [parallel-1] INFO fetchScores - onSubscribe(FluxTake.TakeSubscriber)
{18 Kurashima=[18 国語 80], 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85],
16:00:11.606 [parallel-1] INFO fetchScores - request(unbounded)
20人目のYamaideさんの処理を始めたところで、2000ミリ秒のsleepが終わって結果が出力された、という感じですね。