知識0からのUnityShader勉強

知識0からのUnityShader勉強

UnityのShaderをメインとして、0から学んでいくブログです。

【UniRx】Observableを分岐させるオペレータ #98

前回の成果

メッセージを合成するオペレータについて学んだ。

soramamenatan.hatenablog.com


今回やること

今回はObservableを分岐させるオペレータについてまとめます。
いわゆるHot変換になりますので、そちらに関しては以下を参考にしてみてください。

soramamenatan.hatenablog.com


Observableを枝分かれさせたい

Publish

Publishを使用することでストリームをHot変換することができます。

Observableを枝分かれさせたい
/// <summary>
/// Observableを枝分かれ
/// </summary>
private void ExcutePublishBranch() {
    Debug.Log("--------Cold--------");
    ExcutePublishCold();
    Debug.Log("--------Hot---------");
    ExcutePublishHot();
}

/// <summary>
/// Coldなので、Observableが分岐できない
/// </summary>
private void ExcutePublishCold() {
    IObservable<int> stream = Observable
                                    .Range(1, 3)
                                    .Do(x => Debug.Log("Cold Value : " + x));
    stream.Subscribe();
    stream.Subscribe();
}


/// <summary>
/// Hotなので、Observableが分岐する
/// </summary>
private void ExcutePublishHot() {
    IConnectableObservable<int> stream = Observable
                                    .Range(1, 3)
                                    .Do(x => Debug.Log("Hot Value : " + x))
                                    // Hot変換
                                    .Publish();
    stream.Subscribe();
    stream.Subscribe();
    // ストリームを稼働
    stream.Connect();
}
結果

Publishをしていない方は、2回目のSubscribe時にストリームが生成されてしまっています。
Publishをしている方は、ストリームが分岐されるのでSubscribeをしても新たにストリームが生成されていません。

f:id:soramamenatan:20210410171012p:plain

Connectの位置

Publishをした際に、ストリームを稼働させるためにConnectを呼びます。
Connectを呼ぶ位置によっては意図せぬ挙動をするので注意してください。

サブスクリプション前にConnect
/// <summary>
/// Observableを枝分かれ、コルーチン版
/// サブスクリプション前にConnect
/// </summary>
private IEnumerator ExcutePublishCoroutineBad() {
    TimeSpan time = TimeSpan.FromSeconds(1);
    IConnectableObservable<long> stream = Observable
                                            .Interval(time)
                                            .Publish();
    // ここでConnectを呼ぶと、1回目のSubscribeと2回目のSubscribeがズレる可能性がある
    stream.Connect();
    stream.Subscribe(x => {
        Debug.Log("First Value :  " + x);
    });
    yield return new WaitForSeconds(2);
    stream.Subscribe(x => {
        Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x));
    });
}
結果

コルーチンで時間を止めているので、止めている間のSubscribeが取れていません。
ですので、Second Valueには0のメッセージが来ていません。

f:id:soramamenatan:20210410171713p:plain


サブスクリプション後にConnect
/// <summary>
/// Observableを枝分かれ、コルーチン版
/// サブスクリプション後にConnect
/// </summary>
private IEnumerator ExcutePublishCoroutineGood() {
    TimeSpan time = TimeSpan.FromSeconds(1);
    IConnectableObservable<long> stream = Observable
                                            .Interval(time)
                                            .Publish();
    stream.Subscribe(x => {
        Debug.Log("First Value :  " + x);
    });
    yield return new WaitForSeconds(2);
    stream.Subscribe(x => {
        Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x));
    });
    // Connectは全てのサブスクリプションが終わってから呼び出すと
    // コルーチンやスレッドでズレることは無くなる
    stream.Connect();
}
結果

Connectを呼ぶタイミングを全てのサブスクリプション後に呼ぶと意図せず呼び出されないことが無くなります。

f:id:soramamenatan:20210410172139p:plain


Observableを枝分かれ點せつつ、初期値を指定させたい

Publish<T>

Publishに引数を指定することにより、初期値を指定することができます。

Observableを枝分かれ點せつつ、初期値を指定
/// <summary>
/// Observableを枝分かれ點せつつ、初期値を指定
/// Multicast(new BehaviorSubject<T>(value))と同義
/// </summary>
private void ExcutePublishArgument() {
    var stream = Observable
                        .Range(1, 3)
                        .Publish(100)
                        .Subscribe(x => {
                            Debug.Log("Publish Argument : " + x);
                        });
}
結果

f:id:soramamenatan:20210410171459p:plain


ObserverがあればConnect、なければDispose

RefCount

このオペレータを使用することで、自動でConnectとDisposeを呼んでくれます。

失敗例
/// <summary>
/// ObserverがあればConnect、なければDispose
/// Connectだと、Subscribeしたストリームを消してもDisposeが呼ばれない
/// </summary>
private void ExcuteRefCountBad() {
    TimeSpan time = TimeSpan.FromSeconds(1);
    // カウントダウンするストリーム
    IConnectableObservable<long> stream = Observable
                                    .Interval(time)
                                    .Do(x => {
                                        Debug.Log("Publish : " + x);
                                    })
                                    .Publish();
    stream.Connect();

    IDisposable subscription = stream
                                    .Subscribe(i => {
                                        Debug.Log("Subscription : " + i);
                                    });

    // 何かキーが押されたらDispose
    this.UpdateAsObservable()
        .Where(_ => Input.anyKeyDown)
        .Subscribe(_ => {
            Debug.Log("<color=red>Call Dispose</color>");
            subscription.Dispose();
        });
}
結果

Connectを使用して、Disposeを行おうとしている例になります。
Subscription側はDisposeを呼ばれたので、Subscribeされなくなっています。
ですが、元のストリームは変わらずメッセージを発行し続けています。

f:id:soramamenatan:20210410172529p:plain


成功例
/// <summary>
/// ObserverがあればConnect、なければDispose
/// RefCountは自動で行ってくれる
/// </summary>
private void ExcuteRefCountGood() {
    TimeSpan time = TimeSpan.FromSeconds(1);
    // カウントダウンするストリーム
    IObservable<long> stream = Observable
                                    .Interval(time)
                                    .Do(x => {
                                        Debug.Log("Publish : " + x);
                                    })
                                    .Publish()
                                    // ObserverがあればConnect、なければDispose
                                    .RefCount();

    IDisposable subscription = stream
                                    .Subscribe(i => {
                                        Debug.Log("Subscription : " + i);
                                    });

    // 何かキーが押されたらDispose
    this.UpdateAsObservable()
        .Where(_ => Input.anyKeyDown)
        .Subscribe(_ => {
            Debug.Log("<color=red>Call Dispose</color>");
            subscription.Dispose();
        });
}
結果

こちらはDisposeが呼ばれた後、元のストリームもメッセージが発行されなくなっています。

f:id:soramamenatan:20210410172739p:plain


Publish().RefCount()を省略したい

Share

Shareを使用することで、Publish().RefCount()を省略することができます。

Publish().RefCount()を省略
/// <summary>
/// Publish().RefCount()を省略する
/// </summary>
private void ExcuteShare()
{
    TimeSpan time = TimeSpan.FromSeconds(1);
    IObservable<long> stream = Observable
                                    .Interval(time)
                                    .Do(x => {
                                        Debug.Log("Publish : " + x);
                                    })
                                    // Publish().RefCount()を省略する
                                    .Share();

    IDisposable subscription = stream
                                    .Subscribe(i => {
                                        Debug.Log("Subscription : " + i);
                                    });

    this.UpdateAsObservable()
        .Where(_ => Input.anyKeyDown)
        .Subscribe(_ => {
            Debug.Log("<color=red>Call Dispose</color>");
            subscription.Dispose();
        });
}
結果

同じなので省略します。


Observableを分岐させ、最後の値のみキャッシュ

PublishLast

Hot変換したストリームの最後の値のみキャッシュすることができます。

最後の値のみキャッシュ
/// <summary>
/// Observableを分岐させ、最後の値のみキャッシュ
/// </summary>
private void ExcutePublishLast()
{
    Debug.Log("--------PublishLast--------");
    IObservable<int> stream = Observable
                                .Range(1, 5)
                                .Do(x => {
                                    Debug.Log("Publishing : " + x);
                                })
                                // 最後の値のみキャッシュ
                                .PublishLast()
                                .RefCount();


    IDisposable subscription = stream
                                .Subscribe(x => {
                                    Debug.Log("Subscription : " + x);
                                });
    subscription.Dispose();

    // 比較用
    Debug.Log("--------Publish-------");
    IObservable<int> stream2 = Observable
                                    .Range(1, 5)
                                    .Do(x => {
                                        Debug.Log("Publishing : " + x);
                                    })
                                    .Share();

    IDisposable subscription2 = stream2
                                    .Subscribe(x => {
                                        Debug.Log("Subscription : " + x);
                                    });
    subscription2.Dispose();
}
結果

Publishと比較しています。
PublishLastでは、最後の5のみキャッシュされ発行されています。

f:id:soramamenatan:20210410173205p:plain


Observableを分岐させ、全ての値をキャッシュ

Replay

こちらは全ての値をキャッシュします。
またConnectの位置等により、取得できなかったものもメッセージ発行されます。

全ての値をキャッシュ
/// <summary>
/// Observableを分岐させ、全ての値をキャッシュ
/// </summary>
private IEnumerator ExcuteReplay()
{
    TimeSpan time = TimeSpan.FromSeconds(1);
    var stream = Observable
                    .Interval(time)
                    .Take(3)
                    .Publish();
    stream.Connect();
    var replayStream = stream.Replay();
    replayStream.Connect();
    replayStream.RefCount();
    stream.Subscribe(x => {
        Debug.Log("First Value :  " + x);
    }).AddTo(this);
    yield return new WaitForSeconds(2);
    stream.Subscribe(x => {
        Debug.Log(string.Format("<color=blue>Second Value : {0}</color>", x));
    });
    // 実行が遅れて取得できなかった値もReplayだと出る
    replayStream.Subscribe(x => {
        Debug.Log(string.Format("<color=red>Replay Value : {0}</color>", x));
    });
}
結果

Connectの呼び出しタイミングにより、ScondValueでは呼び出されていない0があります。
ですが、Replayの方では0が呼び出されています。

f:id:soramamenatan:20210410173444p:plain


Observableを枝分かれさせるときにSubjectを指定したい

Multicast

このオペレータを使用するとSubjectを指定しながらHot変換をすることができます。

他のオペレータの内部でMulticastを使用しているので、例のみ挙げソースコードは省略します。

オペレータ名 Multicast
Publish() Multicast(new Subject<T>)
PublishLast() Multicast(new AsyncSubject<T>)
Replay() Multicast(new ReplaySubject<T>)


今回は以上となります。
ここまでご視聴ありがとうございました。


参考サイト様

introtorx.com