谷本 心 in せろ部屋

はてなダイアリーから引っ越してきました

Spring Boot 1.xから2.0に変更した時に問題が起きた箇所のメモ。

Spring Boot (1.3) + Cloud (Brixton) で、デモ用にECサイトのサンプルを作っていました。
https://github.com/cero-t/e-commerce-example


これをSpring Boot (2.0) + Cloud (Finchley) のマイルストーン版に置き換えてみたところ、問題が出るとは予想してたけど、思った以上に問題が起きたので、その解決策のメモ。

spring-boot-actuatorだけじゃだめ、spring-boot-starter-actuatorを使う。

actuatorを使いたいのでspring-boot-actuatorをdependenciesに入れていました。1.5.9まではこれでも良かったんですが、2.0からは構成が変わり、このままだと依存するbeanが見つからないというエラーになりました。きちんとspring-boot-starter-actuatorを入れましょう。

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-actuator</artifactId>
		</dependency>

              ↓

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

最初からstarter使ってるわ、という人には関係ない話です。

Spring Cloud Streamが動かないのでスナップショット版(か、M6以降)を使う

今日(2018年1月)時点でMavenリポジトリに公開されているSpring Cloudのバージョンは「Finchley.M5」なのですが、このバージョンではSpring Cloud Streamが動きません。どうも@EnableBindingアノテーションで指定したクラスのBeanが生成されないようで、当該のBeanが見つからないというエラーになります。
Spring Cloud Finchleyのスナップショット版を試してみたところ普通に動いたので、ちょっとしたバグのようです。

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.M5</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

              ↓

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.BUILD-SNAPSHOT</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

JAP(Hibernate)の@GeneratedValueのポリシーが変わった?

データアクセスにはSpring Data JPAを使っていたのですが、ここでも問題が起きました。起動時に次のようなスタックトレースが出て、起動できなくなってしまいます。

org.hibernate.tool.schema.spi.SchemaManagementException: Schema-validation: missing sequence [hibernate_sequence]
	at org.hibernate.tool.schema.internal.AbstractSchemaValidator.validateSequence(AbstractSchemaValidator.java:184)
	at org.hibernate.tool.schema.internal.AbstractSchemaValidator.performValidation(AbstractSchemaValidator.java:100)
	at org.hibernate.tool.schema.internal.AbstractSchemaValidator.doValidation(AbstractSchemaValidator.java:68)
	at org.hibernate.tool.schema.spi.SchemaManagementToolCoordinator.performDatabaseAction(SchemaManagementToolCoordinator.java:191)
	at org.hibernate.tool.schema.spi.SchemaManagementToolCoordinator.process(SchemaManagementToolCoordinator.java:72)
(略)

@Entityアノテーションをつけたクラスに@GeneratedValueアノテーションを付けて、DB側で発行した連番を使うようにしていたのですが、この部分で急にHibernateがエラーを出すようになったみたいです。


元々、Spring DataのCrudRepositoryを使いたいだけで、JPAHibernate)の機能は全く使っていなかったので、Spring Data JPAをやめて、Spring Data JDBCに乗り換えました。


参考)
https://qiita.com/sndr/items/88827f06a14fcff71249


Spring Data JDBCは、少し「気が利かない」印象ですね。たとえば、テーブル名やカラム名のキャメルケース⇔スネークケースの変換を自分で書かなきゃいけないとか、@Id アノテーションは org.springframework.data.annotation.Id を使わなきゃいけなくて javax.persistence.Id を使ってると動かないとか。ただそういう所さえ気をつければ、書いた通りに動く素直なフレームワークという印象です。今後も使うかどうかは分かりません。

もうちょっとやりたかったんだけど

ホントは @RequestMapping を @GetMapping に置き換えたり、WebClientってブロッキングでも使えるんだっけみたいなところをもうちょっと検証したかったんですけど、とにかく起動してテストを通すだけで年が明けてしまいました。無念。

EXレイヤーの無限コンボが12/15のアップデートでどう変わったか #FEXL

前回、無限コンボについてブログを書いた後にFEXLがアップデートされ、無限コンボへの対策が取られました。アップデートで、どう変わったのか詳しく見てみます。
前回のエントリー → http://d.hatena.ne.jp/cero-t/20171214/1513251274

地上でのダッシュ小Pチェーンのループ

元々できたダッシュ小P-大P-ダッシュ小P-... というタイプの無限は簡単にはできなくなりました。手元で調べた限りですが、小Pの発生が総じて1F遅くなったようです(3F→4F)
ただ前回のエントリーで予想した通り「持続当てになる」パターンの無限は、いくつかできてしまいました。
https://twitter.com/twitter/statuses/941631711133044736 (スカロ、背中から)
https://twitter.com/twitter/statuses/942042937143078914 (カイリ、目押しも交えて)


これ、どうするのでしょうね。完全に無限をなくそうとすると、次の選択肢ぐらいしか思いつかないのですが。
1. 持続当てできなくする(←難しいし、悲しい)
2. チェーンコンボの硬直を増やす(←ありだと思うけど、違和感に繋がるかも)
3. チェーンコンボから大攻撃を出すと必ずダウン(←いやそれだとHADES強くなりすぎでしょ)
4. イージーコンビネーション以外のチェーンをなくす(←諸々、根本的な見直しになってしまう)


今回のアップデートと同じように、いくつかの持続当てについて細かな調整だけするというのは、イタチゴッコであって根本解決になりません。また、たとえばヒット数が増えればのけぞりが小さくなる、という類の調整は様々なルールをシンプルにしてきたこのシリーズらしくないと思います。
もちろんあくまでこの話の前提は「完全に無限をなくそうとすると」なので、無限が残っても良いと考えれば、この限りではないです。

空中でのダッシュ小P拾いのループ

これも調整されて、できなくなりました。前回のエントリーでは「浮きを高くするか低くするしかない」と書きましたが、浮きは低くなる方向に調整されました。それでも浮いている相手にチェーンコンボを入れることはできるので、とても絶妙な調整です。
今回の調整のおかげで「高さ」を「リソース」と捉えるようになり、どう消費し、どう稼ぐかみたいな事を考えるようになりました。


ほくとの即死コンボでこんな感じです。連昇激で打ち上げ、J大Pでも高さを維持し、小Pでいったん高度を下げて、残りの技を当てる、という感じです。
https://twitter.com/twitter/statuses/941742592772812800


ちょうど良い塩梅の高さ調整だと思いました。

スパコンのループ中にゲージが溜まるか?

空中コンボの浮きが調整されたので、浮かせた後にゲージを1本分(50)溜めるのが難しくなりました。
今のところ、自分で確認できたゲージ溜め最大は、アレンの空中コンボです。
https://www.youtube.com/watch?v=nCiyijkm2PQ


強氣による強化がなしだと、33溜まります。
・トリプルブレイク(1)-J大P(8)-ダッシュ小P(1)-下中P(5)-下中K(5)-ライジングドラゴン(13)-トリプルブレイク-


強氣を選べばゲージ増加量を増やせます。
インフィニティのデッキにゲージ10%アップの強氣が3つ付いてるので、1.3倍になって(端数は切り捨て)40溜まります。
上にURLを張った動画のコンボがこれで、11ループのコンボとなっています。
・トリプルブレイク(1)-J大P(10)-ダッシュ小P(1)-下中P(6)-下中K(6)-ライジングドラゴン(16)-トリプルブレイク-


もし強氣のカスタマイズができるようになり、仮にゲージアップを5つ積むと1.5倍で、44溜まることになります。
・トリプルブレイク(1)-J大P(12)-ダッシュ小P(1)-下中P(7)-下中K(7)-ライジングドラゴン(16)-トリプルブレイク-


ゲージが44も溜まれば17ループのコンボができることになるので、無限でないにせよ、ラウンドの開始から終了までずっとコンボを継続できるはずです。ちょっとゲージが溜まりすぎですが、強氣による恩恵だと思えばアリの範疇ですかね?

当たり判定が残ったまま落ちてくる必殺技

このタイプの無限は今のところありません。
ただ今後、そういうタイプの必殺技が出てきた時には、空中ヒット時には「きりもみヒット」するだけでなく「浮く力が弱い」という対策を取ることで、無限コンボを防ぐことができるようになりますね。


たとえばロッソのヴェズーヴィオの怒りで言うと、1段目が地上ヒットすれば相手が浮いて2段目が入るけど、1段目が空中ヒットすると浮きが低くなって2段目が空振りしてしまう、という感じの調整ができるということです。
あとエリアも、ジャクソンキックが空中ヒットすると浮きが低くなり、ポップアップニーは入らないけどパーティションブレイクなら入る、みたいな調整をされたら良いんじゃないかなと思います。

最後に

12/15のアップデートにより、安易な無限がなくなったのはとても良かったと思います。ただ同時にツイート数なども減ったことを考えると、もしかすると無限があったほうが話題にはなりやすかったのかも知れませんよねw
ただ、コンボを作る立場としては、アップデート後のほうが断然良いです。


それでも、持続当てを使ったループコンボや、強氣を使ってゲージを溜めるコンボ、また、今回のエントリーでは紹介していない、少し特殊なテクニックを使ったループコンボなどは残っています。
この辺りを完全に潰しに行くのか、潰すのはほどほどにするのか、今後のバージョンアップや次回のβテストなどがもしあれば、よく注目したいと思います。


・・・ただ僕としては無限対策云々よりも、スパコンのカス当たりから拾えるコンボを、復活させて欲しいのですけどね!

EXレイヤーの無限コンボが12/15のアップデートでどう変わったか #FEXL

注意
このエントリーはβ版について書いたものであり、リリース版では無限コンボができないよう対策を取られています。リリース版のFIGHTING EX LAYERのコンボについては、攻略サイトをご確認ください。


前回、無限コンボについてブログを書いた後にFEXLがアップデートされ、無限コンボへの対策が取られました。アップデートで、どう変わったのか詳しく見てみます。
前回のエントリー → http://d.hatena.ne.jp/cero-t/20171214/1513251274

地上でのダッシュ小Pチェーンのループ

元々できたダッシュ小P-大P-ダッシュ小P-... というタイプの無限は簡単にはできなくなりました。手元で調べた限りですが、小Pの発生が総じて1F遅くなったようです(3F→4F)
ただ前回のエントリーで予想した通り「持続当てになる」パターンの無限は、いくつかできてしまいました。
https://twitter.com/twitter/statuses/941631711133044736 (スカロ、背中から)
https://twitter.com/twitter/statuses/942042937143078914 (カイリ、目押しも交えて)


これ、どうするのでしょうね。完全に無限をなくそうとすると、次の選択肢ぐらいしか思いつかないのですが。
1. 持続当てできなくする(←難しいし、悲しい)
2. チェーンコンボの硬直を増やす(←ありだと思うけど、違和感に繋がるかも)
3. チェーンコンボから大攻撃を出すと必ずダウン(←いやそれだとHADES強くなりすぎでしょ)
4. イージーコンビネーション以外のチェーンをなくす(←諸々、根本的な見直しになってしまう)


今回のアップデートと同じように、いくつかの持続当てについて細かな調整だけするというのは、イタチゴッコであって根本解決になりません。また、たとえばヒット数が増えればのけぞりが小さくなる、という類の調整は様々なルールをシンプルにしてきたこのシリーズらしくないと思います。
もちろんあくまでこの話の前提は「完全に無限をなくそうとすると」なので、無限が残っても良いと考えれば、この限りではないです。

空中でのダッシュ小P拾いのループ

これも調整されて、できなくなりました。前回のエントリーでは「浮きを高くするか低くするしかない」と書きましたが、浮きは低くなる方向に調整されました。それでも浮いている相手にチェーンコンボを入れることはできるので、とても絶妙な調整です。
今回の調整のおかげで「高さ」を「リソース」と捉えるようになり、どう消費し、どう稼ぐかみたいな事を考えるようになりました。


ほくとの即死コンボでこんな感じです。連昇激で打ち上げ、J大Pでも高さを維持し、小Pでいったん高度を下げて、残りの技を当てる、という感じです。
https://twitter.com/twitter/statuses/941742592772812800


ちょうど良い塩梅の高さ調整だと思いました。

スパコンのループ中にゲージが溜まるか?

空中コンボの浮きが調整されたので、浮かせた後にゲージを1本分(50)溜めるのが難しくなりました。
今のところ、自分で確認できたゲージ溜め最大は、アレンの空中コンボです。
https://www.youtube.com/watch?v=nCiyijkm2PQ


強氣による強化がなしだと、33溜まります。
・トリプルブレイク(1)-J大P(8)-ダッシュ小P(1)-下中P(5)-下中K(5)-ライジングドラゴン(13)-トリプルブレイク-


強氣を選べばゲージ増加量を増やせます。
インフィニティのデッキにゲージ10%アップの強氣が3つ付いてるので、1.3倍になって(端数は切り捨て)40溜まります。
上にURLを張った動画のコンボがこれで、11ループのコンボとなっています。
・トリプルブレイク(1)-J大P(10)-ダッシュ小P(1)-下中P(6)-下中K(6)-ライジングドラゴン(16)-トリプルブレイク-


もし強氣のカスタマイズができるようになり、仮にゲージアップを5つ積むと1.5倍で、44溜まることになります。
・トリプルブレイク(1)-J大P(12)-ダッシュ小P(1)-下中P(7)-下中K(7)-ライジングドラゴン(16)-トリプルブレイク-


ゲージが44も溜まれば17ループのコンボができることになるので、無限でないにせよ、ラウンドの開始から終了までずっとコンボを継続できるはずです。ちょっとゲージが溜まりすぎですが、強氣による恩恵だと思えばアリの範疇ですかね?

当たり判定が残ったまま落ちてくる必殺技

このタイプの無限は今のところありません。
ただ今後、そういうタイプの必殺技が出てきた時には、空中ヒット時には「きりもみヒット」するだけでなく「浮く力が弱い」という対策を取ることで、無限コンボを防ぐことができるようになりますね。


たとえばロッソのヴェズーヴィオの怒りで言うと、1段目が地上ヒットすれば相手が浮いて2段目が入るけど、1段目が空中ヒットすると浮きが低くなって2段目が空振りしてしまう、という感じの調整ができるということです。
あとエリアも、ジャクソンキックが空中ヒットすると浮きが低くなり、ポップアップニーは入らないけどパーティションブレイクなら入る、みたいな調整をされたら良いんじゃないかなと思います。

最後に

12/15のアップデートにより、安易な無限がなくなったのはとても良かったと思います。ただ同時にツイート数なども減ったことを考えると、もしかすると無限があったほうが話題にはなりやすかったのかも知れませんよねw
ただ、コンボを作る立場としては、アップデート後のほうが断然良いです。


それでも、持続当てを使ったループコンボや、強氣を使ってゲージを溜めるコンボ、また、今回のエントリーでは紹介していない、少し特殊なテクニックを使ったループコンボなどは残っています。
この辺りを完全に潰しに行くのか、潰すのはほどほどにするのか、今後のバージョンアップや次回のβテストなどがもしあれば、よく注目したいと思います。


・・・ただ僕としては無限対策云々よりも、スパコンのカス当たりから拾えるコンボを、復活させて欲しいのですけどね!

ReactorでN+1問題な処理を実装してみた話

最近、格ゲーのツイートが増えてる [twitter:@cero_t] です。前のエントリーに書いた「18年ぶりに出る続編」のβテストがついに始まりまして、最近は夜な夜なコンボをやるなどしています。
シビアな反応が要求される格闘ゲームにおいて、継続して勝ち続けるためにはどうしても反射神経が必要となり、機械のような反射神経、つまり「反応装置」にならなくてはいけません、そうだから今日のテーマは「Reactor」なのです、、、みたいな流れを考えたんですが、どうにも苦しいですよね。ろくにスベることもできない中ですが、Javaアドベントカレンダー15日目が始まりました。
https://qiita.com/advent-calendar/2017/java


さて、Project ReactorはReactiveなノンブロッキング処理を書くためのライブラリです。最近はSpring 5.0などでも全面的に利用されているため話題になりがちです。今日はこれを勉強しながら、業務にありそうなケースを実装してみます。
なお、同期とか非同期とかブロッキングとかノンブロッキングとかReactiveとかの言葉の定義は、この際、置いておきます。実装を見て、雰囲気で掴んでください。

目次

0. はじめに
1. ノンブロッキング処理を体感する
2. Reactorらしいコードを書く
3. ノンブロッキングなら、シングルスレッドでも早いのか?
4. ブロッキング処理の場合はどうなる?
5. ブロッキング処理も、マルチスレッドで高速化


0〜2までは、Reactor初心者が苦慮してコードを書いていった話。
2〜5は、ノンブロッキング処理やブロッキング処理を、それぞれシングルスレッド、マルチスレッドで試してみた話です。
長いエントリーなので、興味がある部分を拾い読みしてもらえればと思います。

0. はじめに

まずは事前の準備や、目的などを説明します。

事前にやること

Reactorについて完全に素人だったので、まずは入門のスライドを読んでから、ハンズオン(チュートリアル)をやりました。


Spring 5に備えるリアクティブプログラミング入門
https://www.slideshare.net/TakuyaIwatsuka/spring-5


Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3
https://www.slideshare.net/makingx/reactive-web-spring-5-jjugccc-cccef3


Lite Rx API Hands-on(チュートリアル
https://github.com/reactor/lite-rx-api-hands-on/


なんとなく、Java5世代の非同期脳で、MonoがFutureのようなもの、FluxがList版のFutureのようなもの、という理解をしました。実際にはJava8で追加されたCompletableFutureのようなもののようですが、それはおいとくとして。
とりあえずこれらでReactorの概要をざっくり掴んだのですが、とにかく機能が多すぎて、全く覚えきれません。Stream APIをもう一度学びなおしてるような気持ちです。

今回の目的

今回は、いわゆる「N+1問題」をReactorで実装するとどうなるか、という検証をしてみます。
テーブルAから一覧データをN件持ってきて、次にそれに関連するデータをテーブルBから持ってくるという処理において、クエリがN+1回発生してしまうせいで遅い、というアレです。SQLを工夫すればクエリ1回で済むやろっていうコメントは、本題からズレるので★1です!


今回は「生徒一覧」を取得したうえで、生徒の「成績一覧」を検索することを想定します。
また検索対象は、RDBMSのようなブロッキング処理しかできないデータソースの場合と、何かしらイイ感じのノンブロッキングなデータソースの場合とを、それぞれ想定して比較します。


コードで言うと、次のようなイメージです。

生徒一覧の取得
Flux<Student> fetchStudents(String className)

点数一覧の取得
Flux<Score> fetchScores(int year, int id)

取り出したい形
Map<Student, List<Score>>


生徒の一覧を取り、それぞれの生徒の(2017年の)成績を取り、それをMono/FluxではなくMapやListの形にして返すというものです。
なぜわざわざMapに変換するんだ、なぜここでブロックしてしまうんだ、というツッコミを受けそうですが、あくまでも「全ての結果が揃ってから返す」けど、中の処理を並列にすることで、レスポンスを早くしたいというケースを想定しました。課題設定に口出し無用。

1. ノンブロッキング処理を体感する

まずは手探りでノンブロッキングなコードを書く所までをやります。
この章で紹介するソースコードは次のURLにあります。
https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample1.java

配列をFluxにする

まずは生徒一覧取得メソッドの実装として、生徒の配列からFluxを作って返します。
モックなので固定値を返してしまうのですが、検索を模しているので、1データ取得するのに100ミリ秒掛かるという想定にしました。こんな感じです。

Flux<Student> fetchStudents(String className) {
    Student[] students = {
            new Student(1, "Muto"),
            new Student(2, "Miyoshi"),
            new Student(3, "Matsui"),
            // 略
            new Student(28, "Mori"),
            new Student(29, "Tanaka"),
            new Student(30, "Yagi"),
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> students[i.intValue()])
            .take(students.length);
}

intervalメソッドで100ミリ秒おきに、mapメソッドで配列から生徒を取り出して、takeメソッドで配列分だけ生徒を取得したら終える、という実装です。
30人いるので、シーケンシャルに行えば、3000ミリ秒、つまり3秒掛かる処理ですね。


点数の一覧を取得するところも、実装の内容はほぼ同じです。

Flux<Score> fetchScores(int year, int id) {
    final Score[] scores = {
            new Score(id, "国語", 80),
            new Score(id, "数学", 90),
            new Score(id, "英語", 85),
            new Score(id, "社会", 93),
            new Score(id, "理科", 72)
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> scores[i.intValue()])
            .take(scores.length);
}

これもシーケンシャルに行えば、5科目 * 100ミリ秒で、500ミリ秒掛かる処理です。30人分あるので、15秒掛かる計算になります。
つまり生徒一覧と成績一覧の取得処理をすべてシーケンシャルに行うと、18秒掛かることになります。それをもっとうまく並列に処理したいというのが今回のテーマです。

生徒の結果が返ってき次第、成績を取得する

ここまでに書いた2つのメソッドを使って、生徒の一覧を取得して、それぞれの生徒ごとに成績の一覧を取得し、それをMapに変換するような処理を書いてみます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

System.out.println(map);

Fluxのsubscribeメソッドを使えば、値が1件戻ってくるたびにその値を使った処理を行うことができます。
そのメソッドを使って、
えーっと、、、先にMapのインスタンスを作っておいて、
subscribeの中でMapに追加していく、と、いう・・・、
完全に「素人ですかって」怒られるタイプのコードですよね、これ。いや素人ですから💢
あと、勘が良い方がは気づかれたかも知れませんが、ConcurrentModificationExceptionが発生しかねなかったりもしますね。悪い例がすぎますね。


ちなみにこのコードを実行すると、ほとんど結果が表示されずに終わってしまいます。mainスレッドが終了するからでしょうね。
であれば、その後にスリープしてしまえばいいんです。

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

いや素人にも程があるやろ💢💢 ってレベルのコードができあがりました。


これで実行させてみると・・・

{
{17 Kurosawa=[17 国語 80, 17 数学 90]
, 8 Sato=[8 国語 80, 8 数学 90, 8 英語 85, 8 社会 93, 8 理科 72]
, 11 Notsu=[11 国語 80, 11 数学 90, 11 英語 85, 11 社会 93, 11 理科 72]
, 1 Muto=[1 国語 80, 1 数学 90, 1 英語 85, 1 社会 93, 1 理科 72]
, 13 Ooga=[13 国語 80, 13 数学 90, 13 英語 85, 13 社会 93, 13 理科 72]
, 4 Nakamoto=[4 国語 80, 4 数学 90, 4 英語 85, 4 社会 93, 4 理科 72]
, 3 Matsui=[3 国語 80, 3 数学 90, 3 英語 85, 3 社会 93, 3 理科 72]
, 12 Taguchi=[12 国語 80, 12 数学 90, 12 英語 85, 12 社会 93, 12 理科 72]
, 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85]
, 10 Kikuchi=[10 国語 80, 10 数学 90, 10 英語 85, 10 社会 93, 10 理科 72]
, 5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72]
, 18 Kurashima=[18 国語 80]
, 2 Miyoshi=[2 国語 80, 2 数学 90, 2 英語 85, 2 社会 93, 2 理科 72]
, 15 Shiroi=[15 国語 80, 15 数学 90, 15 英語 85, 15 社会 93]
, 6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72]
, 14 Sugimoto=[14 国語 80, 14 数学 90, 14 英語 85, 14 社会 93, 14 理科 72]
, 7 Sugisaki=[7 国語 80, 7 数学 90, 7 英語 85, 7 社会 93, 7 理科 72]
, 9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72]
}

あぁー、どうあれ、なんか期待通りのものが取れてるじゃないですか! 勝利!!


と思ってよく見たら、18人分しか取れていません。僕的にはNakamoto, Kikuchi, Mizunoが取れていれば良いのですが。
また、15人目〜18人目の生徒は途中までしか成績が取れていません。なるほど非同期で処理してる途中でSystem.out.printlnが呼ばれて、処理が終わってしまったわけですね。


であれば、2000Lとしたsleep時間を10000Lぐらいにすれば解決しますよね!
・・・みたいな話を続けてるといい加減怒られそうなので、そろそろ真面目にやりましょう。

ログを出そう

真面目にやる前に、少し脇道に逸れて、ログの話をします。
ReactorのFluxやMonoは、どこで何が起きているか分かりにくいのを少しでも解消するためか、随所に埋め込めるlogメソッドが用意されています。


このlogメソッドを使えばコンソールに最低限の情報を出ます。ただもう少し詳しい情報が欲しくなるため、ログフォーマットを指定したlogback.xmlを用意しておきます。

<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="debug">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>


もちろんpom.xmlにもlogback-classicのdependencyを追加しておいてください。

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>


あとは見るべき所にログを埋め込んでいきます。今回はfetchStudentsメソッドと、fetchScoresメソッドの2箇所に入れておきます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .log("fetchStudents")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .log("fetchScores")
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

細かな挙動を把握したければもう少し違う箇所にもログを埋め込めば良いのですが、ログが多すぎても見通しが悪くなるため、この2箇所だけにしています。


これで実行して取れたログを見てみます。 # で始まっている部分は僕がつけたコメントです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
16:00:09.593 [main] INFO  fetchStudents - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.601 [main] INFO  fetchStudents - request(unbounded)

# 生徒取得処理のonNextが呼ばれて、1人目のMutoさんの値が取得できる。
16:00:09.710 [parallel-1] INFO  fetchStudents - onNext(1 Muto)

# Mutoさんの成績取得処理のためにonSubscribeとrequestが呼ばれる。
16:00:09.712 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.712 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの値が取得でき、成績の取得が始まる。
16:00:09.804 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
16:00:09.805 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.805 [parallel-1] INFO  fetchScores - request(unbounded)

# 1人目のMutoさんの国語の成績が取れる。これは、先ほどまでとは別のスレッドで行われる。
16:00:09.816 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの値が取得でき、成績の取得が始まる。
16:00:09.906 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
16:00:09.906 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.906 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの国語の成績がまた別スレッドで取得できる。
16:00:09.907 [parallel-3] INFO  fetchScores - onNext(2 国語 80)

# 1人目のMutoさんの数学の成績が、国語が取れた時と同じスレッドで取得できる。
16:00:09.918 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの値が取得でき、成績の取得が始まる。
16:00:10.006 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
16:00:10.006 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:10.006 [parallel-1] INFO  fetchScores - request(unbounded)

ここまでの流れを確認すると、次のような流れになります。


1. まずmainスレッドで、生徒取得(fetchStudents)のsubscribe登録を行う
2. 生徒の値が返ってくると、parallel-1スレッドでsubscribeの中に書いた処理が行われる。
3. parallel-1スレッドで、成績取得(fetchScores)のsubscribe登録を行う
4. 成績の値が返ってくると、parallel-2からparallel-4までのスレッドでsubscribeの中に書いた処理が行われる。


ソースコードはとてもダメでしたが、処理自体はおおむね期待通りになっていることが分かりました。
ノンブロッキングかどうかみたいな話は、また後で詳しくやります。


ちなみに最後の方はこうなっていました。

16:00:11.509 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
16:00:11.511 [parallel-3] INFO  fetchScores - onNext(18 国語 80)
16:00:11.512 [parallel-4] INFO  fetchScores - onNext(15 社会 93)
16:00:11.512 [parallel-3] INFO  fetchScores - onNext(14 理科 72)
16:00:11.512 [parallel-3] INFO  fetchScores - onComplete()
16:00:11.514 [parallel-2] INFO  fetchScores - onNext(17 数学 90)
16:00:11.605 [parallel-1] INFO  fetchStudents - onNext(20 Yamaide)
16:00:11.606 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
{18 Kurashima=[18 国語 80], 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85], # (以降、省略)
16:00:11.606 [parallel-1] INFO  fetchScores - request(unbounded)

20人目のYamaideさんの処理を始めたところで、2000ミリ秒のsleepが終わって結果が出力された、という感じですね。

2. Reactorらしいコードを書く

上のようなコードを書いたあと、どうするのが正解なのかよく分からないなという気持ちになり [twitter:@making] さんに質問をしたところ、コードを添削して諸々教えてくれました。それが今回このエントリーを書くきっかけにもなったのです。


Reactorらしい修正をしたソースコードは次のURLにあります。
https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample2.java

subscribeではなくflatMapを使うが良い

Stream APIにおいて、foreachでListやMapに値を追加していくのが悪手であることは皆さんご存知だと思いますが、それはReactorのAPIでも変わりありません。subscribeの中で外部の変数に作用すると、処理の見通しが悪くなります。


今回の目的を実現するコードは、次のように修正できます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .flatMap(student -> fetchScores(2017, student.id)
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

subscribeではなくflatMapを使って結果を変換します。fetchScoresで取得した Flux をcollectListで Mono> にして、それを一度Tuple2にするという形です。
いやこれ、自分じゃ思いつかない流れですが・・・。


その後、collectMapでTuple2からMapを作り、blockを使って待ち受けて、Monoではない通常のMapを取得しました。
これを実行すると、きちんと30人分の成績を取得することができました。

配列からFluxを作る別の方法

ところで、配列からFluxを作るところも、map/takeを使うのではなく、次のような形で書く方法を教えてもらいました。

return Flux.interval(Duration.ofMillis(100))
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

こんな書き方、チュートリアルにはなかったよ!
まぁでも、なるほどですね。

ログも見てみよう

この書き方ではどのような順で処理が行われるのか、ログを見て確認をします。
上のコードに、ログ出力部分を加えます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

ログを入れるのは、やはりこの2箇所です。


出力されたログは次の通りです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
15:35:16.944 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
15:35:16.947 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理
15:35:17.073 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
15:35:17.083 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.084 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの処理
15:35:17.170 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
15:35:17.170 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.170 [parallel-1] INFO  fetchScores - request(unbounded)

# ここで1人目のMutoさんの国語の成績が返ってきた
15:35:17.187 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの処理
15:35:17.270 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
15:35:17.270 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.270 [parallel-1] INFO  fetchScores - request(unbounded)

# 次は2人目のMiyoshiさんの国語の成績と、1人目のMutoさんの数学の成績が返ってきた
15:35:17.273 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
15:35:17.285 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの処理
15:35:17.368 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
15:35:17.369 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.370 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの数学と、3人目のMatsuiさんの国語が返ってきた
15:35:17.374 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
15:35:17.374 [parallel-4] INFO  fetchScores - onNext(3 国語 80)

流れ的には元のものと同じですね。


またログの最後の部分を見ると、きちんと30人分の成績を受信しきってから、表示をしていました。

15:35:20.174 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
15:35:20.174 [parallel-4] INFO  fetchScores - onComplete()
15:35:20.270 [parallel-3] INFO  fetchScores - onNext(30 英語 85)
15:35:20.270 [parallel-2] INFO  fetchScores - onNext(29 社会 93)
15:35:20.276 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
15:35:20.276 [parallel-1] INFO  fetchScores - onComplete()
15:35:20.371 [parallel-2] INFO  fetchScores - onNext(29 理科 72)
15:35:20.371 [parallel-3] INFO  fetchScores - onNext(30 社会 93)
15:35:20.371 [parallel-2] INFO  fetchScores - onComplete()
15:35:20.471 [parallel-3] INFO  fetchScores - onNext(30 理科 72)
15:35:20.471 [parallel-3] INFO  fetchScores - onComplete()
{22 Shintani=[22 国語 80, 22 数学 90, 22 英語 85, 22 社会 93, 22 理科 72], # (以降、省略)

また、ログのタイムスタンプから、およそ3秒半ほどですべての処理が終わっていることが分かります。つまり、シーケンシャルに行えば18秒掛かる処理を、うまく並行させて3秒半で終わらせているのです。
この3秒半というのは、30人分の情報を取得するのに掛かる3秒 + 最後の1人の成績を取りきるのに掛かる0.5秒と、よく一致しています。
ノンブロッキングな感じで、いいじゃないですか。

3. ノンブロッキングなら、シングルスレッドでも早いのか?

ここまでの処理を見て、ノンブロッキングと非同期(マルチスレッド)の違いがよく分からなくなった方もいるかも知れません。
上で「ノンブロッキングな感じ」と書きましたが、果たしてそのおかげで早かったのか、マルチスレッドの恩恵で早くなったのか、分かりにくいところがあります。


では、シングルスレッドにしてみればどうなるでしょうか。見てみましょう。
シングルスレッドにしたソースコードは次のURLから取得できます。
https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample3.java

publishOnでスレッドを制御

処理をシングルスレッドにする場合は、Fluxを生成している所でpublishOnメソッドを用いて、スレッドの作成ポリシーを決めます。ここではSchedulers.single() を用いてシングルスレッドで実行することにします。

return Flux.interval(Duration.ofMillis(100))
        .publishOn(Schedulers.single())
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

publishOn(Schedulers.single()) の処理を挟んだだけですね。
これをfetchStudents、fetshScoresのそれぞれで行います。

実行してみた

それではシングルスレッドになっているか、実行してログを確認してみましょう。

16:21:49.073 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
16:21:49.080 [main] INFO  fetchStudents - request(256)
16:21:49.197 [single-1] INFO  fetchStudents - onNext(1 Muto)
16:21:49.202 [single-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
16:21:49.202 [single-1] INFO  fetchScores - request(unbounded)
16:21:49.296 [single-1] INFO  fetchStudents - onNext(2 Miyoshi)
# (省略)
16:21:52.501 [single-1] INFO  fetchScores - onNext(29 理科 72)
16:21:52.501 [single-1] INFO  fetchScores - onComplete()
16:21:52.501 [single-1] INFO  fetchScores - onNext(30 社会 93)
16:21:52.603 [single-1] INFO  fetchScores - onNext(30 理科 72)
16:21:52.604 [single-1] INFO  fetchScores - onComplete()
{6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72], # (以降、省略)

一部抜粋ですが、すべての処理が「single-1」スレッドで行われていました。


シングルスレッドで行われていたにも関わらず、処理は3.5秒程度で完了しています。これで「100ミリ秒待つ」というのがノンブロッキングで行われていることが、なんとなく実感できました。

4. ブロッキング処理の場合はどうなる?

さて、ここまでは「ノンブロッキング処理は早くていいね、18秒掛かる処理が3.5秒になったよ」みたいな話でした。
しかし、もしRDBMSのような、ブロッキング処理のあるデータソースから情報を取得しなくてはいけなくなった場合は、どうなるのでしょうか。それを模したコードを書いて検証してみます。


ブロッキング処理にしたソースコードは、ここにあります。
https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample4.java

Thread.sleepはブロッキング処理

ブロッキング処理を行うためには、Fluxを作成する処理の中で、Thread.sleepを行うのが良いです。
生徒や成績のFluxを行う処理を、次のように変更します。

Flux<Student> studentFlux = Flux.create(sink -> {
    for (Student student : students) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        sink.next(student);
    }
    sink.complete();
});
return studentFlux;

Flux.createメソッドで、sinkに値を入れていれていく形です。ここでThread.sleepを使うことで(先程までのFlux.intervalと異なり)このFluxの作成処理がブロッキング処理となるんです。ほぅ。

実行してログで確認

これまでの流れ通り、実行してログを確認してみましょう。

# mainスレッドでfetchStudentsの呼び出し
16:38:37.457 [main] INFO  fetchStudents - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.461 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理をmainメソッドで行い、成績取得もすべてmainメソッド
16:38:37.570 [main] INFO  fetchStudents - onNext(1 Muto)
16:38:37.574 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.575 [main] INFO  fetchScores - request(unbounded)
16:38:37.677 [main] INFO  fetchScores - onNext(1 国語 80)
16:38:37.781 [main] INFO  fetchScores - onNext(1 数学 90)
16:38:37.882 [main] INFO  fetchScores - onNext(1 英語 85)
16:38:37.983 [main] INFO  fetchScores - onNext(1 社会 93)
16:38:38.085 [main] INFO  fetchScores - onNext(1 理科 72)
16:38:38.087 [main] INFO  fetchScores - onComplete()
16:38:38.087 [main] INFO  fetchStudents - request(1)

# 2人目のMiyoshiさんの処理もとにかくmainメソッド
16:38:38.193 [main] INFO  fetchStudents - onNext(2 Miyoshi)
16:38:38.195 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:38.195 [main] INFO  fetchScores - request(unbounded)
16:38:38.296 [main] INFO  fetchScores - onNext(2 国語 80)
16:38:38.401 [main] INFO  fetchScores - onNext(2 数学 90)
16:38:38.503 [main] INFO  fetchScores - onNext(2 英語 85)
16:38:38.606 [main] INFO  fetchScores - onNext(2 社会 93)
16:38:38.709 [main] INFO  fetchScores - onNext(2 理科 72)
16:38:38.709 [main] INFO  fetchScores - onComplete()
16:38:38.709 [main] INFO  fetchStudents - request(1)

# 3人目以降も同様
16:38:38.812 [main] INFO  fetchStudents - onNext(3 Matsui)
# (省略)
16:38:54.864 [main] INFO  fetchStudents - request(1)
16:38:54.967 [main] INFO  fetchStudents - onNext(29 Tanaka)
16:38:54.968 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:54.968 [main] INFO  fetchScores - request(unbounded)
16:38:55.072 [main] INFO  fetchScores - onNext(29 国語 80)
16:38:55.176 [main] INFO  fetchScores - onNext(29 数学 90)
16:38:55.281 [main] INFO  fetchScores - onNext(29 英語 85)
16:38:55.386 [main] INFO  fetchScores - onNext(29 社会 93)
16:38:55.488 [main] INFO  fetchScores - onNext(29 理科 72)
16:38:55.488 [main] INFO  fetchScores - onComplete()
16:38:55.488 [main] INFO  fetchStudents - request(1)
16:38:55.592 [main] INFO  fetchStudents - onNext(30 Yagi)
16:38:55.592 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:55.592 [main] INFO  fetchScores - request(unbounded)
16:38:55.697 [main] INFO  fetchScores - onNext(30 国語 80)
16:38:55.798 [main] INFO  fetchScores - onNext(30 数学 90)
16:38:55.902 [main] INFO  fetchScores - onNext(30 英語 85)
16:38:56.004 [main] INFO  fetchScores - onNext(30 社会 93)
16:38:56.110 [main] INFO  fetchScores - onNext(30 理科 72)
16:38:56.110 [main] INFO  fetchScores - onComplete()
16:38:56.110 [main] INFO  fetchStudents - request(1)
16:38:56.110 [main] INFO  fetchStudents - onComplete()
{5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72], # (以降、省略)

すべての処理がmainスレッドで行われており、時間も18.5秒掛かっています。これはすべての処理がシーケンシャルに行われれば18秒である、という計算と一致します。
いかにReactorを使っていようとも、途中にRDBMSへのJDBCドライバー経由でのアクセスなど、ブロッキング処理が入るとこのようになってしまうのです。

5. ブロッキング処理も、マルチスレッドで高速化

ブロッキング処理を使うと、Reactorを使う意味がないのでしょうか、というとそういうわけでもありません。
Reactorを使って、ブロッキング処理であっても、マルチスレッド処理を書くことができます。


ブロッキング処理をマルチスレッド化したソースコードは、次の場所にあります。
https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample5.java

subscribeOnでスレッドを制御

先の章ではFluxを生成する際にpublishOnメソッドを用いてスレッドの制御をしましたが、それと同様に、subscibeする側でsubscribeOnメソッドを用いることでも、スレッドを制御することができるようになります。シングルスレッドで生成されたものを、マルチスレッドで分担して処理するという形になります。


次のように、fetchStudents、fetchScoresの直後にsubscribeOnを渡します。ここで渡しているSchedulers.elastic()は、必要なだけスレッドを起こすというものです。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.elastic())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.elastic())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これだけでマルチスレッド化ができます。元の流れと大きく変わっていないところがポイントですね。


それでは、実行してログを見てみましょう。

17:35:03.769 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.773 [main] INFO  fetchStudents - request(256)
17:35:03.883 [elastic-2] INFO  fetchStudents - onNext(1 Muto)
17:35:03.897 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.898 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.001 [elastic-2] INFO  fetchStudents - onNext(2 Miyoshi)
17:35:04.001 [elastic-3] INFO  fetchScores - onNext(1 国語 80)
17:35:04.001 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.001 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-3] INFO  fetchScores - onNext(1 数学 90)
17:35:04.103 [elastic-2] INFO  fetchStudents - onNext(3 Matsui)
17:35:04.103 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.103 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-4] INFO  fetchScores - onNext(2 国語 80)
17:35:04.204 [elastic-3] INFO  fetchScores - onNext(1 英語 85)
17:35:04.204 [elastic-4] INFO  fetchScores - onNext(2 数学 90)
17:35:04.204 [elastic-2] INFO  fetchStudents - onNext(4 Nakamoto)
17:35:04.205 [elastic-5] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:35:07.131 [elastic-5] INFO  fetchScores - onComplete()
17:35:07.136 [elastic-6] INFO  fetchScores - onNext(28 社会 93)
17:35:07.136 [elastic-7] INFO  fetchScores - onNext(29 英語 85)
17:35:07.137 [elastic-8] INFO  fetchScores - onNext(30 数学 90)
17:35:07.241 [elastic-7] INFO  fetchScores - onNext(29 社会 93)
17:35:07.241 [elastic-6] INFO  fetchScores - onNext(28 理科 72)
17:35:07.241 [elastic-8] INFO  fetchScores - onNext(30 英語 85)
17:35:07.242 [elastic-6] INFO  fetchScores - onComplete()
17:35:07.344 [elastic-8] INFO  fetchScores - onNext(30 社会 93)
17:35:07.344 [elastic-7] INFO  fetchScores - onNext(29 理科 72)
17:35:07.345 [elastic-7] INFO  fetchScores - onComplete()
17:35:07.449 [elastic-8] INFO  fetchScores - onNext(30 理科 72)
17:35:07.450 [elastic-8] INFO  fetchScores - onComplete()
{30 Yagi=[30 国語 80, 30 数学 90, 30 英語 85, 30 社会 93, 30 理科 72], # (以降、省略)

詳細な説明は割愛しますが、ノンブロッキングで行っていたときと同じような処理の流れとなりました。処理は3.7秒で、ノンブロッキングのときと大差はありません。また、このログで見えているだけでも「elastic-8」まであり、8スレッド使っていることが分かります。

elastic vs parallel

上の例では8スレッド使っていましたが、Fluxを生成する際のsleep時間などを少し調整すると、優に数十スレッドを使ってしまいました。これはこれで、スレッドを使いすぎる問題が起きかねません。


もう少し加減してスレッドを使って欲しい場合には、Schedulers.elastic()ではなく、Schedulers.parallel()を使います。こちらはスレッド数をCPU数分までに制限します(ただし最低は4)

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.parallel())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.parallel())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);


これを実行した結果、こうなりました。

17:40:42.309 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.312 [main] INFO  fetchStudents - request(256)
17:40:42.427 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
17:40:42.438 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.438 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.542 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
17:40:42.542 [parallel-2] INFO  fetchScores - onNext(1 国語 80)
17:40:42.542 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.542 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.646 [parallel-2] INFO  fetchScores - onNext(1 数学 90)
17:40:42.646 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
17:40:42.646 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
17:40:42.648 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.648 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.751 [parallel-2] INFO  fetchScores - onNext(1 英語 85)
17:40:42.751 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
17:40:42.751 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
17:40:42.751 [parallel-4] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:40:47.023 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.125 [parallel-1] INFO  fetchScores - onNext(16 国語 80)
17:40:47.228 [parallel-1] INFO  fetchScores - onNext(16 数学 90)
17:40:47.333 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
17:40:47.437 [parallel-1] INFO  fetchScores - onNext(16 社会 93)
17:40:47.542 [parallel-1] INFO  fetchScores - onNext(16 理科 72)
17:40:47.543 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.648 [parallel-1] INFO  fetchScores - onNext(20 国語 80)
17:40:47.750 [parallel-1] INFO  fetchScores - onNext(20 数学 90)
17:40:47.853 [parallel-1] INFO  fetchScores - onNext(20 英語 85)
17:40:47.955 [parallel-1] INFO  fetchScores - onNext(20 社会 93)
17:40:48.060 [parallel-1] INFO  fetchScores - onNext(20 理科 72)
17:40:48.060 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.163 [parallel-1] INFO  fetchScores - onNext(24 国語 80)
17:40:48.264 [parallel-1] INFO  fetchScores - onNext(24 数学 90)
17:40:48.365 [parallel-1] INFO  fetchScores - onNext(24 英語 85)
17:40:48.468 [parallel-1] INFO  fetchScores - onNext(24 社会 93)
17:40:48.569 [parallel-1] INFO  fetchScores - onNext(24 理科 72)
17:40:48.569 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.671 [parallel-1] INFO  fetchScores - onNext(28 国語 80)
17:40:48.775 [parallel-1] INFO  fetchScores - onNext(28 数学 90)
17:40:48.879 [parallel-1] INFO  fetchScores - onNext(28 英語 85)
17:40:48.985 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
17:40:49.087 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
17:40:49.087 [parallel-1] INFO  fetchScores - onComplete()
{9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72], 1 # (以降、省略)

スレッド名が parallel-1 から parallel-4 までの4スレッドになりました。4スレッド使ってできる範囲で処理を分担し、最後は残った処理を1スレッドで片付けている、という形です。処理全体には7秒弱掛かっており、スレッド生成し放題のelasticに比べれば2倍近く時間が掛かっていますが、単一スレッドで行うよりもレスポンスは半分以下に短縮されています。


もちろんこの処理はReactorを使わずともJava標準のExecutorServiceなり何なりを使っても同じことができます。ただ、ノンブロッキングな処理も扱えるReactorと、統一した書き方ができるところがメリットの一つになると思います。いや、Reactorでブロッキング処理を扱うようなものを書けること自体、おまけみたいなものかも知れませんが。


ちなみにelasticやparallel以外にも、ExecutorServiceを渡すなどすることもできますが、shutdownのタイミングなど考えると管理が面倒なので、普通にelasticかparallelを使っておくのが良いと私は思っています。

まとめ

Reactorを使って、ノンブロッキング処理、ブロッキング処理を、それぞれどのように扱うかを確認しました。


特にデータアクセスや、マイクロサービス呼び出しのような待ち時間の多いシチュエーションにおいて、うまくノンブロッキング処理にできれば、レスポンスタイムを短縮できるんじゃないかなと思います。
もちろん、その分データソース側に負荷が掛かるため、データソース側が十分にスケールアウトできることが前提となりますが、現代であればそういう環境は手に入りやすいため、取り組む価値があると見ています。


今回のケースがReactorの最適なユースケースだというわけではないですが、業務において、このような使い方もできるという一例として、参考にしてもらえばと思います!
Enjoy, Reactor!

EXレイヤーの無限コンボについて #FEXL

※このエントリーはβ版のものであり、すでに古い内容です。最新の攻略情報やコンボ情報は、FIGHTING EX LAYER Guideを参照してください。
https://fexl.tokyo


大前提として、僕は無限コンボがとても嫌いで、調整で取り除くべきだという立場です。理由は、最多段や最大ダメージを組み立てる楽しみがなくなるから。
この後の話はそういう立場での意見表明なので、そもそも無限は悪くないじゃんって人とは、また別の場所で殴り合いましょう。


で、発売初日からみんな試してるEXレイヤーの無限コンボ。
正直みんながやりすぎるせいで、このゲームの印象が「ダッシュ小Pから無限コンボ入れるゲームだよね」になって、今後のセールスに影響するんじゃないかって心配するぐらい。
まぁそう言う僕も「いやこれ地上だけじゃなくて、空中でもできるのでは、なんか浮き低いし」っていうSAI-RECチームの会話から、対空での無限コンボをササっと撮ったりしたんですけども。


このエントリーは、そういう無限コンボをなくしてくださいという話です。

EXの無限コンボは3種類

過去のEXシリーズにおける無限コンボの可能性は、大きく分けてこの3つに分類できる。

1. 当たり判定を残したまま落ちてくる必殺技が、空中ヒットしても同等の性質を持つ場合
2. 有利フレームが取れる技を当てた後に密着する場合
3. スパコンのループ中にゲージが溜まる場合

1. 当たり判定が残ったまま落ちてくる必殺技

1は、EXの豪鬼と、2PLUSのロッソ。
https://www.youtube.com/watch?v=UB8ch4ilyqE&t=3m6s (ロッソ)


豪鬼は続編で落下時の当たり判定をなくすことにより対策された。正直、豪鬼は竜巻から拾えるキャラなので、この調整が「豪鬼らしさ」を失った面もあるとは思う。
一方でロッソは明らかな調整ミスか見落としだと思う。ヴェズーヴィオの怒りは空中ヒット時にきりもみにすべきだった。エリアのジャクソンキックはそういう調整がされているのに。
同じような無限の可能性はハヤテの朧月にもあるけど、当たり方や隙の大きさの都合で今のところ無限には至ってない。ただ危険だとは思っていて、無限にならないのはたまたまだと思っている。


EXレイヤーでは、落下時の判定残りは凄く気を使ってるように見える。
スパコンから当たり判定を残しにくくなったのは、過去作のコンボ人としてはちょっと残念だけど、納得する部分もある。今作は空中コンボをチェーンコンボにしてダメージを高くしやすい分、打ち上げスパコンのダメージを抑えるという調整をしているように見える。なので、ファイヤーフォースとか気錬射とか蒼魂弾とかスカロエナジーみたいなダメージあるスパコンから安易に拾われたら困る、と考えれば納得できる。


話が逸れたけど、必殺技から拾える場合は、見落とさずに調整してくれれば大丈夫だと思う。

2. 有利フレームを取ったまま密着

2は、EXの豪鬼と、2PLUSのサガット春麗、シャドウ
https://www.youtube.com/watch?v=I9JtCuYihiM&t=2m43s豪鬼
https://www.youtube.com/watch?v=LZOaASBaaYg&t=2m36s (シャドウ)


「他の技を当てた後に空刃脚を出せる」というパターンは、常に無限コンボの可能性が伴うと思ってる。2PLUSのプルムも無限になりかねないけど、たまたま見つかってない可能性も大いにある。
EXの豪鬼が良い例であるように、たとえヒット時の距離を調整して無限にならないようにしても、コンボ職人は「途中の技を空振り」したり「持続当てになる連続技」を見つけて有利フレームを獲得して、近づいたりする方法を編み出したりする。


EXレイヤーの地上ダッシュループも同じで、正直、最初にダッシュチェーンコンボがあると聞いた時点で「そんなん無限コンボじゃん」と思ってたぐらい、システム的には当たり前に無限コンボができる。いまは1フレ繋ぎ、2フレ繋ぎでとどまってるけど、そのうち「大Pが持続当たりする」パターンが見つかって、もっとコンボ猶予が増えて誰でもできるようになる可能性がある。そう考えると、ダッシュを1〜2フレ調整したところで意味がないと思う。
あと、もしエリアが出てきて、小Pからレバー入れ大Pがチェーンで繋がったりするとまた小Pに戻れるので、個別の調整が必要になる。やっぱりチェーンのない世界感で作ってきたゲームに、チェーンを入れるというのが無理筋な気がする。


そう考えると、チェーンコンボはダウンを奪わない限り完全五分とか、イージーコンビネーション以外のチェーンはなしとするとか、イージーコンビネーションのPとKを交互に行き来できる程度にするとか、なんかもっと大胆な調整が必要だと思う。
ここで言ってるイージーコンビネーションとは、「開発者がPとKで出せる技を決められるので、有利フレームが取れるものは選ばない」という枠組みでのチェーンコンボのこと。

3. スパコンのループ中にゲージが溜まる

3は、例で言うと2PLUSの七瀬。
https://www.youtube.com/watch?v=8ZIOOguoYRE&t=2m56s


EXレイヤーでも、浮かし技からチェーンやダッシュチェーンを挟んで1ゲージ回復して、また打ち上げコンボにつなげるパターンがいくつも見つかっている。強氣でゲージアップを積めば、さらにやりやすくなるかもしれない。
このあたりは(少なくとも空中コンボの無限がなくなる前提で)ゲージ調整で何とかなるし、コンボ職人がβテスト期間中に頑張るべきやつだと思う。

新しいタイプの無限空中コンボ

浮いてる相手にダッシュ小P。もっと簡単なルートもある。
https://twitter.com/twitter/statuses/940622418745294848


空中相手に対して通常技を刻む無限コンボは、EXシリーズとしては全く新しいタイプの無限と言える。これは浮きが中途半端ゆえに起きてる。
過去のEXシリーズは技を刻んでいると徐々に相手が高く浮いていくので拾えなくなっていたのだけど、今作はそこまで浮かない。恐らく打ち上げ系のスパコンからチェーンコンボを入れたいための調整だと思う。
しかしだからと言って、バーチャシリーズほど落ちていくわけでもないので、結果、コパンや下中Pで浮かし直して無限コンボができてしまう。
もちろん「ダッシュで距離を詰めることができる」のも一因だけど、本質的には「浮きの高さ」の問題。


もっと浮くようにするか、もっと落ちるようにするか、どっちかしかないと思う。浮かし技からいっぱい技を入れて浮かし直すコンボにロマンがあると言うなら、やっぱり「もっと落ちる」方向じゃないかなぁ。なんだったら、多少、バウンドを拾えてもいいと思う。もうEX/レイヤーの世界観ではないけども。

なんて言ってる間にアップデート

そんな事を考えてモヤモヤしてたら、無限コンボの改善をしたアップデートが行われるらしい。
https://twitter.com/arika_co_jp/status/941248365643186176


良かった良かった。
いや、ほんと無限のせいでβ版のコンボを調べる気力をロストしてたので、良かった。

Goで格闘ゲームのマクロを実装してみた

このところ、夜な夜な格ゲーの動画を撮ってはYoutubeにアップしては、SRK(shoryuken.com)に取り上げられて承認欲求を満たしている [twitter:@cero_t] です。SRKは世界最大の格ゲーコミュニティだよ!
http://shoryuken.com/?s=Shin+Tanimoto


このエントリーは Go2 Advent Calendar 2017 の7日目です。
https://qiita.com/advent-calendar/2017/go2

はじめに

まえがき

学生時代、僕は格闘ゲームの連続技(コンボ)の研究を嗜んでおりまして、学校に行くぐらいならゲーセンを行く、試験勉強するぐらいならコンボの研究をする、卒論を書くぐらいならコンボビデオを撮るという、クズみたいな学生生活を送っておりました。当時、いまは亡き「ゲーメスト」というアーケードゲーム雑誌で体験ライターとして編集部に泊まり込んで記事を書くような体験もしました。
その頃よくプレイしていたストリートファイター系のゲームが、なんと18年の歳月を経て新作を出すというのです。これは万難を排してやらねばならぬと相成ったのですが、当時20歳前後だった僕も先日ついに40歳を迎えたわけで、さすがにもうあの頃のようには手が動きません。
しかし、いまの僕には、あの頃の自分にはなかったプログラミング能力があります。人間が手作業で行っていることを、機械に行わせるのが仕事です。そう、格闘ゲームも機械にやらせてしまえば良いじゃないか、というのが今回のテーマになります。


※なお、ゲームのネット対戦やオンラインランキングでマクロを使うのはマナー違反です。決してやらないでください。

目的

プレステ3やプレステ4、Xboxなどで使えるマクロを作ります。実装にはGoを使いました。

用意するもの

ゲーム機とPCをつなぐ、何かしらのデバイスが必要になります。今回はConsole Tuner社のTitan Oneという機器を利用します。
https://www.consoletuner.com/products/titan-one/


これはUSBゲームコントローラーを様々なゲーム機で使えるようにするための変換器です。しかしただの変換機能だけでなく、PCと繋ぐことでマクロ機能やプログラミングを組んだりできるほか、API自体もDLLとして公開されているので、自作アプリからも利用することができます。今回の用途にはベストです。


Titan OneとDLLを使って、次のような流れで操作を流し込みます。

ゲーム機 <-(USB)- Titan One <-(USB)- Windows(DLL <- 自作アプリケーション)

なんとなくイメージが湧くでしょうか?

なぜアプリを自作するのか?

先ほど「マクロ機能やプログラミング環境が用意されている」と書いた通り、用意された環境を利用すれば、別にアプリを自作するまでもなくマクロを実行することができます。ただ、付属のプログラミング環境ではミリ秒単位の制御しかできず、格闘ゲームの60fps(framer per second)の制御をするには不便でした。格闘ゲーマーは物事をフレーム単位で考えていて、ミリ秒単位で考えるわけではないのです。
なので、フレーム単位で扱いやすくするための、簡単なアプリを自作することにしました。

なぜGoなのか?

今回のアプリは、配布が容易であることと、Mac/Windowsのいずれでも開発できるという点でGoを選びました。
僕はJavaエンジニアなので、最初はJavaでJNAを使って実装しました。Javaでも問題なく動くものができたのですが、ただちょっと配布するのが面倒だなと思い、かと言ってC#にするとMacでの開発環境が微妙なので、Goを選ぶに至りました。もちろんそんな消去法的な選択だけでなく、ちょっとGoを勉強してみたかったという背景もあります。

DLL呼び出しの実装

それではGoで実装していきます。僕はGo初心者で体系的に学んだこともないので、ここがおかしいよとか、ここもっとこうした方が良いよというのがあれば、教えてもらえると凄く嬉しいです。

DLLの読み込み

まずTitan Oneが提供しているDLL(gcdapi.dll)を読み込む部分を作ります。
goでDLLを読み込むにはsyscall.LoadDLLを使います。

dll, _err := syscall.LoadDLL("gcdapi.dll")
if _err != nil {
	log.Fatal("Error loading gcdapi.dll", _err)
}

これでDLLの読み込みができます。


ここで「Failed to load gcdapi.dll: %1 is not a valid Win32 application.」というエラーに悩まされました。
原因は「gcdapi.dllが32bit」だったことです。64bit環境でコンパイル/実行しようとするとエラーになるので、32bit環境向けにコンパイルする必要があります。

set GOARCH=386

これでOK。
ここで1時間ほどハマっていました。

DLL内の関数呼び出し

DLLにある関数を呼び出すには、読み込んだdllのFindProc関数を使います。

gcdapi_Load, _err := dll.FindProc("gcdapi_Load")
if _err != nil {
	log.Fatal("cannot find gcdapi_Load", _err)
}

result, _, _err := gcdapi_Load.Call()
if result == 0 {
	log.Fatal("gcdapi cannot be loaded")
}

これでgcdapi_Loadという関数を実行できます。
DLL内にある他の関数も、同じ要領で先に読み込んでおきます。


また、関数に引数を渡す必要がある場合は、uintptr型のポインタとして渡します。
次の例は、someFunction関数にarg1という値を渡す例です。

result, _, _ := someFunction.Call(uintptr(arg1))

戻り値はresultに入ります。

配列や構造体を引数に渡す

引数に配列や構造体を渡す場合は、先に参照をポインタに変換してから、さらにuintptrに変換します。
正直、参照がどうなってるのかもはや僕には理解できないのですが、こうすれば動くということだけ確認しました。

func write(inputs *[36]int8) bool {
	result, _, _ := procWrite.Call(uintptr(unsafe.Pointer(inputs)))
	// 略
	return true
}

ちなみにここで配列ではなくスライスを渡していたために上手く動かず、2時間ほどハマりました。初学者はとにかくハマって時間をロストしますね。


また構造体を引数にする場合は、DLL側に用意された構造体と同じ名前、同じ型の構造体をGo側に用意します。
次の例ではGCAPI_REPORTという構造体を引数に渡しています。

type GCAPI_REPORT struct {
	console       uint8
	controller    uint8
	led           [4]uint8
	rumble        [2]uint8
	battery_level uint8
	input         [30]GCAPI_INPUT
}

type GCAPI_INPUT struct {
	value      int8
	prev_value int8
	press_tv   uint32
}

func read() bool {
	var report GCAPI_REPORT
	result, _, _ := procRead.Call(uintptr(unsafe.Pointer(&report)))
	log.Println(report)
	// 略
	retun true
}

構造体は恐ろしいぐらいにハマらずに値が入りました。簡単です。最高です。

MacでもWindowsでも実行したい

ちなみにここまで使ってきたsyscall.LoadDLLや、その戻り値であるsyscall.DLL、syscall.ProcなどはWindows環境でしか利用できず、Mac環境でコンパイルしようとするとエラーになってしまいます。
せめてMacではDLL呼び出し部分だけモックにして、アプリ全体としては動くようにしておきたいものです。そういうのもきちんと用意されていました。


同じパッケージ内に、同名の関数を定義した別ソースファイル(モック用関数群)を作り、そのファイルの先頭にこんなコメントを入れます。

// +build !windows

これでWindowsの環境以外でビルドした場合には、モック用関数群のファイルが利用されるようになります。
・・・ちょっと何を言ってるのか分からない感じなので、実物を見て頂いたほうが早いと思います。


Windows用のソースコードがこれ。
https://github.com/cero-t/cero-macro.t1/blob/master/gcapi/gcapi_dll_windows.go


Windows以外用のソースコードはこれです。
https://github.com/cero-t/cero-macro.t1/blob/master/gcapi/gcapi_dll_other.go


いいですね、簡単です。

次に入力側を作る

DLLの呼び出しが上手く行ったので、次は呼び出し側の設計/実装です。

懐かしのコマンド

20世紀に格闘ゲームをやっていた人にとって、「236p」という文字列は波動拳コマンドにしか見えません。これはテンキー配列の「下 右下 右」に相当するもので、最後のpがパンチです。当時のパソコン通信やインターネットで頻繁に用いられた記法です。同様に、昇龍拳は「623p」、竜巻旋風脚は「214k」となります。今回はこの表記をします。
また、パンチの小中大はそれぞれl, m, hで表現し、たとえば小パンチは「lp」、中キックは「mk」、しゃがみ大キックは「2,hk」とします。これは海外の格闘ゲーマーが用いている表記方法です。


この表記法を使い、たとえば次のようなテキストを受け取ることを想定します。

2,mk 8
3 1
6,lp 1

これはレバー下と中キックを8フレーム、右下に1フレーム、右と小パンチを1フレーム、というコマンドです。中足波動拳ですね。
このようなコマンドを受け取って、上で説明した関数の呼び出しを行うようにします。

入力コマンドの変換

格闘ゲームの入力は精度が命ですから、Go側の処理に時間が掛かってコマンドが不安定になってしまっては意味がありません。そのため、上に書いたテキストをあらかじめパースして配列などに変換しておき、その後にまとめてDLL経由でコマンドを流し込む処理を行うようにします。変換とDLL呼び出しを逐次行っていると、想定しない処理遅延が発生する可能性があるためです。


テキストをパースした後、次のような構造体に変換します。

type State struct {
	buttons [36]int8
	frames  uint16
}

buttonsが押すべきボタンの一覧です。配列のそれぞれがボタンやレバーに対応しており、押さない時は0、押した時は100という値が入ります。


変換は地味なコードなので割愛します。ここでやってます。
https://github.com/cero-t/cero-macro.t1/blob/master/processor/converter.go

正確な60fpsのコマンド実行

続いて、コマンド実行の処理です。State構造体の配列をきっちりと指定したフレームで呼び出す必要があります。
ちなみに格闘ゲームではよく「1/60秒」という表現が使われて、「光速は遅すぎる」など話題になることがありますが、少なくとも手元環境のプレステ3で確認したところ、1フレームは「1/60秒」ではなく「1/59.94秒」でした。


そのあたりも踏まえて、次のようなコードを書きました。ボタンを押した後に、そのフレーム数分(たとえば5フレームなら、約 5 * 16.6ms = 83ms)だけ待つというコードです。
ただしこの処理自体の実行時間も考慮する必要があるため、現在時刻など利用して、フレームレートを保つようにしています。

// フレームレート (frame per 1000 seconds)
const frameRate uint64 = 59940

func Process(states *[]State) {
	var totalFrames uint64
	start := time.Now().UnixNano()

	for _, state := range *states {
		// ボタンを押す
		gcapi.Push(&state.buttons)

		// フレーム数分だけsleepする
		totalFrames += uint64(state.frames)
		sleepTime := start + int64(totalFrames*1000*1000*1000*1000/frameRate) - time.Now().UnixNano()
		time.Sleep(time.Duration(sleepTime))
	}
}

正確に59.94fpsを保つためにわざわざアプリを自作したので、この処理が一番のキモと言えますね。


この関数を含む処理全体はここにあります。
https://github.com/cero-t/cero-macro.t1/blob/master/processor/processor.go

UIはどうする?

最後にUIです。
GoのUIをどうすべきかは議論があるようですが、「クロスプラットフォームで使えて、皆が慣れ親しんでいるUI」と言えば、コマンドラインCUI)とHTMLの2つでしょう。

まずはCUI

テキストファイルを読み込んでその中に書かれた操作コマンドを実行するCUIを作ります。
こんな感じで、実行時の引数にファイル名を指定します。

$ go run macro.go combo.txt

これでcombo.txtに書かれた内容が、ゲーム機に送られるのです。


このmacro.txtという渋い名前のファイルの中身は、こんな感じです。

9 34
hp 20
2,mk 8
6 1
2 1
3,hp 1

ジャンプ大パンチ 中足 昇龍拳、という感じです。

続いてGUI

もう一つのUIが、HTTPサーバとして起動し、画面で入力されたコマンドを実行するGUIです。実行時の引数にファイル名を指定しなければHTTPサーバとして立ち上がります。

$ go run macro.go
Server started at [::]:8080

それでlocalhostの(あるいは別のPCからIPアドレスを指定して)8080ポートにアクセスすれば、こんな画面が出てきます。

テキストボックスにコマンドを入力して「run」を押せば実行できる形です。
マクロで動かす以外にも簡単な操作をリモートでできるよう、他のボタンも置いておきました。「LP」ボタンを押すと、「lp 2」というマクロ(小パンチを2フレーム押す)が送られます。


なおHTMLファイルの使い方ですが、Goでリソースをバンドルするのがちょっと面倒だったので、ソースコード内にヒアドキュメントで書いてしまいました。

func formHTML() string {
	return `
<!DOCTYPE html>
<html>
<head>
(略)
</body>
</html>
`
}

なおViewフレームワークには超軽量で有名な「Vanilla JS」を用いているため、この1ファイルだけでViewのレンダリングやサーバとの通信処理なども実装できています。
http://vanilla-js.com/
はい。


HTTPクライアント、サーバ関連のソース全体はこちらです。
https://github.com/cero-t/cero-macro.t1/blob/master/http/form-html.go
https://github.com/cero-t/cero-macro.t1/blob/master/http/http.go

実際、試してみたところ・・・!!

これで画面からDLLまですべて繋がったので、いざ実機で挑戦!


おぉ、動く動く! キャラが動くよ!


・・・あれ、コンボが途切れるぞ?
なんか、コンボが不安定だぞ?


そうなんです、どうやらTitan Oneは処理遅延が10ms程度あり、その遅延も安定しないため、格闘ゲームの1フレームを安定させて動かすのが難しいようなのです。
ここまで書いてきてこのオチはなかなか酷いのですが、別にこれで終わりというわけではありません。
より遅延が少ないデバイスが出てきて、それをAPI経由で叩けるようになれば、また再挑戦したいと思います。


あれ、この手元にあるデバイスは、何だろう??


来年には、API経由で叩けるようになるのかなぁ。

PS3のメモリーカードアダプターをPCに繋いで読み書きする

久々の投稿がゲームの、しかもニッチすぎる話題でアレですけど、備忘録として。

やりたいこと

1. PCに保存しているPS1のメモリーカードのデータを、PS1のメモリーカードに書き込む。
2. PS1のメモリーカードのデータを、PS3にコピーする。
3. PS3でPS1のゲームを起動して、メモリーカードのデータを読み込む。

背景

PS3では、PS3内に保存された仮想メモリーカードデータをUSBメモリ経由でPCにコピーできる。ただこのファイルは暗号化されているため、好き勝手できるわけではない。海外のエンジニアが復号ツールを作って公開しており、それを使えばメモリーカードのデータを閲覧することはできるのだけど、逆の暗号化ツールは作っていないため、任意のデータをPS3仮想メモリーカードデータとして保存することができない。


要するに、USBメモリ経由で、
PS3 → PC はできるけど
PC → PS3 はできない
ということ。


ここで、PS3用のメモリーカードアダプターを使えば、任意のデータを読み書きできるようになるので、今回紹介するのはその方法。

用意するもの

1. PS3メモリーカードアダプター。プレミアがついて中古でも4000円ぐらいする。
2. Windows、今回は7の64bit版で。別にMac + Parallelsとかでも大丈夫。

手順

1. Windowsメモリーカードアダプターのデバイスドライバをインストール。
このサイトに書いてあることがすべて。
http://kazzx2.web.fc2.com/64bit_edit.html
ただ「64bit OS版 メモカアダプタドライバ」のURLが変わってるので注意。
http://www.axfc.net/uploader/so/1918620?key=DDR_KAZZ


2. MCRWwinを起動して、メモリーカードアダプターにメモリーカードを挿し、認識されればOK。


3. MCRWwinの「File(VM)」→「Read(File->VM)」を選択して、
 PCに保存しているPS1のメモリーカードのデータを指定する。
 拡張子は .mem が指定されるけど、128KBのファイルならまず読める。


4. メモリーカードへの書き込みが終わったら、メモリーカードアダプターごとPS3に接続


5. 起動後のメニュー画面からメモリーカードの管理を選んで、PS1のメモリーカードからPS3仮想メモリーカードにコピー。
http://manuals.playstation.net/document/jp/ps3/current/game/mcsavedata.html