【UniRx】HotとCold #90
前回の成果
UniRxとコルーチンの変換について学んだ。
今回やること
UniRxのHotとColdについて学びます。
HotとColdとは
IObservableの性質になります。
これらを理解せずにUniRxを使用すると意図せぬ挙動となる場合があります。
Cold
ColdなObservableは単体では何もしないものとなります。
Subscribeされて初めてストリームを流します。
また、それぞれのObserverに別の値を流す特徴もあります。
ほとんどのオペレーターがColdとなります。
Hot
HotなObservableは自ら値を発行するものとなります。
つまりSubscribeされなくてもストリームを流します。
また、購読している全てのObserverに同じ値を流す特徴があります。
ColdなObservable
実際のコードで具体的に見てみます。
Coldのみ
using UnityEngine; using UniRx; using System; public class ColdObservable : MonoBehaviour { void Start() { ColdTimerObservable(); } /// <summary> /// Coldの例 /// </summary> private void ColdTimerObservable() { // ColdなObservableのみなので、発行されない IObservable<long> timer= Observable // 1秒ごとに発行 .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)) .Select(x => { Debug.Log(x); return x; }); } }
1秒ごとに値を発行したいソースコードとなります。
結果
何も発行されていません。
これは先程も説明したとおり、Coldは単体では何もしない性質だからです。
メッセージを発行させたい場合には以下のようにします。
Coldを発行させる
/// <summary> /// Coldの例をSubscribe /// </summary> private void SubscribeTimer() { IObservable<long> timer= Observable // 1秒ごとに発行 .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)) .Select(x => { Debug.Log(x); return x; }); // Subscribeする timer.Subscribe(); }
先程の処理にSubscribeを追加したのみになります。
結果
こちらが意図したとおり、1秒ごとにメッセージを発行してくれています。
HotなObservable
こちらも実際のコードで見てみます。
意図せぬメッセージを発行
using UnityEngine; using UniRx; using System; public class HotObservable : MonoBehaviour { void Start() { MissHotObservable(); } /// <summary> /// 意図していないメッセージ発行 /// </summary> private void MissHotObservable() { Subject<int> subject = new Subject<int>(); // subjectから生成されたObservableはHot IObservable<int> asObservable = subject.AsObservable(); // 渡された値を発行 IObservable<int> observable = asObservable.Select(x => { Debug.Log("Value : " + x); return x; }); // 発行されない subject.OnNext(1); subject.OnNext(10); observable.Subscribe(); // 発行される subject.OnNext(100); } }
OnNextの引数を発行したいソースコードになります。
結果
Subscribe後の100しか発行されていません。
これは、SelectがColdなので発生してしまいます。
ColdはSubscribeされるまでは何もしないので、そのColdなObservableに値を渡されても何も処理されません。
こちらの画像がイメージとして非常に分かりやすいです。
Subscribe前にColdに渡された値も処理してほしい場合にはHot変換オペレータというものを使用します。
Hot変換オペレータを使用
/// <summary> /// 意図通りのメッセージ発行 /// </summary> private void SuccessHotObservable() { Subject<int> subject = new Subject<int>(); // subjectから生成されたObservableはHot IObservable<int> asObservable = subject.AsObservable(); // 渡された値を発行 IConnectableObservable<int> observable = asObservable .Select(x => { Debug.Log("Value : " + x); return x; }) // Hot変換オペレータ .Publish(); // ストリームの稼働開始 observable.Connect(); // 発行されるようになる subject.OnNext(1); subject.OnNext(10); observable.Subscribe(); // 発行される subject.OnNext(100); }
以前のソースコードに、Publich()とConnect()を加えています。
結果
Hot変換オペレータを挟むことにより、Coldが稼働するようになります。
PublishとConnect
PublishとConnectの内部について少し確認します。
まず、Publishメソッドを呼ぶことにより、ConnectableObservable<T>クラスのインスタンスが生成されます。
Publishの中身
/// <summary> /// ConnectableObservable<T>のインスタンス生成 /// </summary> public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject) { return new ConnectableObservable<T>(source, subject); } /// <summary> /// Publishの呼び出し /// </summary> public static IConnectableObservable<T> Publish<T>(this IObservable<T> source) { return source.Multicast(new Subject<T>()); }
そして、ConnectableObservable<T>クラスのConnectメソッドでSubscribeしているのでHot変換が行われます。
Connectの中身
class ConnectableObservable<T> : IConnectableObservable<T> { readonly IObservable<T> source; readonly ISubject<T> subject; readonly object gate = new object(); Connection connection; public ConnectableObservable(IObservable<T> source, ISubject<T> subject) { this.source = source.AsObservable(); this.subject = subject; } /// <summary> /// 内部でSubscribe /// </summary> public IDisposable Connect() { lock (gate) { // don't subscribe twice if (connection == null) { var subscription = source.Subscribe(subject); connection = new Connection(this, subscription); } return connection; } }
Connectの中でSubscribeが呼ばれているので、Connectを忘れるとメッセージが発行されないので気をつけてください。
HotとColdの分岐
Hot変換を利用するときには、ストリームを分岐させたい時に使用します。
実際のコードで見てみます。
Coldの分岐
/// <summary> /// Coldは分岐できない /// </summary> private void ColdBranch() { // 1~3を発行するストリーム IObservable<int> stream = Observable .Range(1,3) .Select(x => { Debug.Log("Cold : " + x); return x; }); stream.Subscribe(); stream.Subscribe(); }
結果
ColdなObservableは分岐することができません。
ですので、Subscribeする度にストリームが生成されてしまいます。
Hotの分岐
/// <summary> /// Hotは分岐できる /// </summary> private void HotBranch() { // 1~3を発行するストリーム IConnectableObservable<int> stream = Observable .Range(1,3) .Select(x => { Debug.Log("Hot : " + x); return x; }) // Hot変換オペレータ .Publish(); // ストリームの稼働開始 stream.Connect(); stream.Subscribe(); stream.Subscribe(); }
結果
HotなObservableは分岐することができます。
ですので、Subscribeしてもストリームが生成されません。
値の流し方の違い
最初にHotは同じ値を、Coldは別の値を流すと説明しました。
その具体例を見てみます。
何かキーを押されると押された回数を通知する処理となります。
Coldなカウンター
/// <summary> /// Coldなカウンター /// </summary> private void ColdCounter() { // 何かのキーを押された回数を通知する IObservable<int> counterObservable = Observable .EveryUpdate() .Where(_ => Input.anyKeyDown) .Select(_ => 1) .Scan((a, b) => a + b); // counterObservableの値を発行する IDisposable stream = Observable .EveryUpdate() .Where(_ => Input.anyKeyDown) .Select(_ => 1) .Scan((a, b) => a + b) .Subscribe(subscribeNum => { counterObservable.Subscribe(counter => { Debug.Log(string.Format("【Cold】{0}回目のSubscribe, Counter Value : {1}", subscribeNum, counter)); }); }); }
結果
Coldは購読時点からのカウントの値を流しています。
ですので、各Subscribe時に違う値が流れてしまっています。
Hotなカウンター
/// <summary> /// Hotなカウンター /// </summary> private void HotCounter() { // 何かのキーを押された回数を通知する IConnectableObservable<int> counterObservable = Observable .EveryUpdate() .Where(_ => Input.anyKeyDown) .Select(_ => 1) .Scan((a, b) => a + b) // Hot変換 .Publish(); // counterObservableの値を発行する IDisposable stream = Observable .EveryUpdate() .Where(_ => Input.anyKeyDown) .Select(_ => 1) .Scan((a, b) => a + b) .Subscribe(subscribeNum => { counterObservable.Subscribe(counter => { Debug.Log(string.Format("【Hot】{0}回目のSubscribe, Counter Value : {1}", subscribeNum, counter)); }); }); // ストリーム稼働 counterObservable.Connect(); }
結果
Hot変換をすることにより、同じ値が流れるようになりました。
今回は以上となります。
ここまでご視聴ありがとうございました。
参考サイト様
www.slideshare.net