【UniRx】Observableを分岐させるオペレータ #98
前回の成果
メッセージを合成するオペレータについて学んだ。
今回やること
今回はObservableを分岐させるオペレータについてまとめます。
いわゆるHot変換になりますので、そちらに関しては以下を参考にしてみてください。
- 前回の成果
- 今回やること
- Observableを枝分かれさせたい
- Connectの位置
- Observableを枝分かれ點せつつ、初期値を指定させたい
- ObserverがあればConnect、なければDispose
- Publish().RefCount()を省略したい
- Observableを分岐させ、最後の値のみキャッシュ
- Observableを分岐させ、全ての値をキャッシュ
- Observableを枝分かれさせるときにSubjectを指定したい
- 参考サイト様
Observableを枝分かれさせたい
Publish
Publishを使用することでストリームをHot変換することができます。
Observableを枝分かれさせたい
/// <summary> /// Observableを枝分かれ /// </summary> private void ExcutePublishBranch() { Debug.Log("--------Cold--------"); ExcutePublishCold(); Debug.Log("--------Hot---------"); ExcutePublishHot(); } /// <summary> /// Coldなので、Observableが分岐できない /// </summary> private void ExcutePublishCold() { IObservable<int> stream = Observable .Range(1, 3) .Do(x => Debug.Log("Cold Value : " + x)); stream.Subscribe(); stream.Subscribe(); } /// <summary> /// Hotなので、Observableが分岐する /// </summary> private void ExcutePublishHot() { IConnectableObservable<int> stream = Observable .Range(1, 3) .Do(x => Debug.Log("Hot Value : " + x)) // Hot変換 .Publish(); stream.Subscribe(); stream.Subscribe(); // ストリームを稼働 stream.Connect(); }
結果
Publishをしていない方は、2回目のSubscribe時にストリームが生成されてしまっています。
Publishをしている方は、ストリームが分岐されるのでSubscribeをしても新たにストリームが生成されていません。
Connectの位置
Publishをした際に、ストリームを稼働させるためにConnectを呼びます。
Connectを呼ぶ位置によっては意図せぬ挙動をするので注意してください。
サブスクリプション前にConnect
/// <summary> /// Observableを枝分かれ、コルーチン版 /// サブスクリプション前にConnect /// </summary> private IEnumerator ExcutePublishCoroutineBad() { TimeSpan time = TimeSpan.FromSeconds(1); IConnectableObservable<long> stream = Observable .Interval(time) .Publish(); // ここでConnectを呼ぶと、1回目のSubscribeと2回目のSubscribeがズレる可能性がある stream.Connect(); stream.Subscribe(x => { Debug.Log("First Value : " + x); }); yield return new WaitForSeconds(2); stream.Subscribe(x => { Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x)); }); }
結果
コルーチンで時間を止めているので、止めている間のSubscribeが取れていません。
ですので、Second Valueには0のメッセージが来ていません。
サブスクリプション後にConnect
/// <summary> /// Observableを枝分かれ、コルーチン版 /// サブスクリプション後にConnect /// </summary> private IEnumerator ExcutePublishCoroutineGood() { TimeSpan time = TimeSpan.FromSeconds(1); IConnectableObservable<long> stream = Observable .Interval(time) .Publish(); stream.Subscribe(x => { Debug.Log("First Value : " + x); }); yield return new WaitForSeconds(2); stream.Subscribe(x => { Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x)); }); // Connectは全てのサブスクリプションが終わってから呼び出すと // コルーチンやスレッドでズレることは無くなる stream.Connect(); }
結果
Connectを呼ぶタイミングを全てのサブスクリプション後に呼ぶと意図せず呼び出されないことが無くなります。
Observableを枝分かれ點せつつ、初期値を指定させたい
Publish<T>
Publishに引数を指定することにより、初期値を指定することができます。
Observableを枝分かれ點せつつ、初期値を指定
/// <summary> /// Observableを枝分かれ點せつつ、初期値を指定 /// Multicast(new BehaviorSubject<T>(value))と同義 /// </summary> private void ExcutePublishArgument() { var stream = Observable .Range(1, 3) .Publish(100) .Subscribe(x => { Debug.Log("Publish Argument : " + x); }); }
結果
ObserverがあればConnect、なければDispose
RefCount
このオペレータを使用することで、自動でConnectとDisposeを呼んでくれます。
失敗例
/// <summary> /// ObserverがあればConnect、なければDispose /// Connectだと、Subscribeしたストリームを消してもDisposeが呼ばれない /// </summary> private void ExcuteRefCountBad() { TimeSpan time = TimeSpan.FromSeconds(1); // カウントダウンするストリーム IConnectableObservable<long> stream = Observable .Interval(time) .Do(x => { Debug.Log("Publish : " + x); }) .Publish(); stream.Connect(); IDisposable subscription = stream .Subscribe(i => { Debug.Log("Subscription : " + i); }); // 何かキーが押されたらDispose this.UpdateAsObservable() .Where(_ => Input.anyKeyDown) .Subscribe(_ => { Debug.Log("<color=red>Call Dispose</color>"); subscription.Dispose(); }); }
結果
Connectを使用して、Disposeを行おうとしている例になります。
Subscription側はDisposeを呼ばれたので、Subscribeされなくなっています。
ですが、元のストリームは変わらずメッセージを発行し続けています。
成功例
/// <summary> /// ObserverがあればConnect、なければDispose /// RefCountは自動で行ってくれる /// </summary> private void ExcuteRefCountGood() { TimeSpan time = TimeSpan.FromSeconds(1); // カウントダウンするストリーム IObservable<long> stream = Observable .Interval(time) .Do(x => { Debug.Log("Publish : " + x); }) .Publish() // ObserverがあればConnect、なければDispose .RefCount(); IDisposable subscription = stream .Subscribe(i => { Debug.Log("Subscription : " + i); }); // 何かキーが押されたらDispose this.UpdateAsObservable() .Where(_ => Input.anyKeyDown) .Subscribe(_ => { Debug.Log("<color=red>Call Dispose</color>"); subscription.Dispose(); }); }
結果
こちらはDisposeが呼ばれた後、元のストリームもメッセージが発行されなくなっています。
Publish().RefCount()を省略したい
Share
Shareを使用することで、Publish().RefCount()を省略することができます。
Publish().RefCount()を省略
/// <summary> /// Publish().RefCount()を省略する /// </summary> private void ExcuteShare() { TimeSpan time = TimeSpan.FromSeconds(1); IObservable<long> stream = Observable .Interval(time) .Do(x => { Debug.Log("Publish : " + x); }) // Publish().RefCount()を省略する .Share(); IDisposable subscription = stream .Subscribe(i => { Debug.Log("Subscription : " + i); }); this.UpdateAsObservable() .Where(_ => Input.anyKeyDown) .Subscribe(_ => { Debug.Log("<color=red>Call Dispose</color>"); subscription.Dispose(); }); }
結果
同じなので省略します。
Observableを分岐させ、最後の値のみキャッシュ
PublishLast
Hot変換したストリームの最後の値のみキャッシュすることができます。
最後の値のみキャッシュ
/// <summary> /// Observableを分岐させ、最後の値のみキャッシュ /// </summary> private void ExcutePublishLast() { Debug.Log("--------PublishLast--------"); IObservable<int> stream = Observable .Range(1, 5) .Do(x => { Debug.Log("Publishing : " + x); }) // 最後の値のみキャッシュ .PublishLast() .RefCount(); IDisposable subscription = stream .Subscribe(x => { Debug.Log("Subscription : " + x); }); subscription.Dispose(); // 比較用 Debug.Log("--------Publish-------"); IObservable<int> stream2 = Observable .Range(1, 5) .Do(x => { Debug.Log("Publishing : " + x); }) .Share(); IDisposable subscription2 = stream2 .Subscribe(x => { Debug.Log("Subscription : " + x); }); subscription2.Dispose(); }
結果
Publishと比較しています。
PublishLastでは、最後の5のみキャッシュされ発行されています。
Observableを分岐させ、全ての値をキャッシュ
Replay
こちらは全ての値をキャッシュします。
またConnectの位置等により、取得できなかったものもメッセージ発行されます。
全ての値をキャッシュ
/// <summary> /// Observableを分岐させ、全ての値をキャッシュ /// </summary> private IEnumerator ExcuteReplay() { TimeSpan time = TimeSpan.FromSeconds(1); var stream = Observable .Interval(time) .Take(3) .Publish(); stream.Connect(); var replayStream = stream.Replay(); replayStream.Connect(); replayStream.RefCount(); stream.Subscribe(x => { Debug.Log("First Value : " + x); }).AddTo(this); yield return new WaitForSeconds(2); stream.Subscribe(x => { Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x)); }); // 実行が遅れて取得できなかった値もReplayだと出る replayStream.Subscribe(x => { Debug.Log(string.Format("<color=red>Replay Value : {0}</color>", x)); }); }
結果
Connectの呼び出しタイミングにより、ScondValueでは呼び出されていない0があります。
ですが、Replayの方では0が呼び出されています。
Observableを枝分かれさせるときにSubjectを指定したい
Multicast
このオペレータを使用するとSubjectを指定しながらHot変換をすることができます。
他のオペレータの内部でMulticastを使用しているので、例のみ挙げソースコードは省略します。
オペレータ名 | Multicast |
---|---|
Publish() | Multicast(new Subject<T>) |
PublishLast() | Multicast(new AsyncSubject<T>) |
Replay() | Multicast(new ReplaySubject<T>) |
今回は以上となります。
ここまでご視聴ありがとうございました。