知識0からのUnityShader勉強

知識0からのUnityShader勉強

UnityのShaderをメインとして、0から学んでいくブログです。

【UniRx】HotとCold #90

前回の成果

UniRxとコルーチンの変換について学んだ。

soramamenatan.hatenablog.com


今回やること

UniRxのHotとColdについて学びます。


HotとColdとは

IObservableの性質になります。
これらを理解せずにUniRxを使用すると意図せぬ挙動となる場合があります。

Cold

ColdなObservableは単体では何もしないものとなります。
Subscribeされて初めてストリームを流します。
また、それぞれのObserverに別の値を流す特徴もあります。
ほとんどのオペレーターがColdとなります。

Hot

HotなObservableは自ら値を発行するものとなります。
つまりSubscribeされなくてもストリームを流します。
また、購読している全てのObserverに同じ値を流す特徴があります。


ColdなObservable

実際のコードで具体的に見てみます。

Coldのみ
using UnityEngine;
using UniRx;
using System;

public class ColdObservable : MonoBehaviour {
    void Start() {
        ColdTimerObservable();
    }

    /// <summary>
    /// Coldの例
    /// </summary>
    private void ColdTimerObservable() {
        // ColdなObservableのみなので、発行されない
        IObservable<long> timer= Observable
                    // 1秒ごとに発行
                    .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
                    .Select(x => {
                        Debug.Log(x);
                        return x;
                    });
    }
}

1秒ごとに値を発行したいソースコードとなります。

結果

何も発行されていません。

f:id:soramamenatan:20210123152036p:plain

これは先程も説明したとおり、Coldは単体では何もしない性質だからです。
メッセージを発行させたい場合には以下のようにします。

Coldを発行させる
/// <summary>
/// Coldの例をSubscribe
/// </summary>
private void SubscribeTimer() {
    IObservable<long> timer= Observable
                // 1秒ごとに発行
                .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
                .Select(x => {
                    Debug.Log(x);
                    return x;
                });
    // Subscribeする
    timer.Subscribe();
}

先程の処理にSubscribeを追加したのみになります。

結果

こちらが意図したとおり、1秒ごとにメッセージを発行してくれています。

f:id:soramamenatan:20210123153036p:plain


HotなObservable

こちらも実際のコードで見てみます。

意図せぬメッセージを発行
using UnityEngine;
using UniRx;
using System;

public class HotObservable : MonoBehaviour {
    void Start() {
        MissHotObservable();
    }

    /// <summary>
    /// 意図していないメッセージ発行
    /// </summary>
    private void MissHotObservable() {
        Subject<int> subject = new Subject<int>();
        // subjectから生成されたObservableはHot
        IObservable<int> asObservable = subject.AsObservable();
        // 渡された値を発行
        IObservable<int> observable = asObservable.Select(x => {
            Debug.Log("Value : " + x);
            return x;
        });

        // 発行されない
        subject.OnNext(1);
        subject.OnNext(10);
        observable.Subscribe();
        // 発行される
        subject.OnNext(100);
    }
}

OnNextの引数を発行したいソースコードになります。

結果

Subscribe後の100しか発行されていません。

f:id:soramamenatan:20210123155726p:plain

これは、SelectがColdなので発生してしまいます。
ColdはSubscribeされるまでは何もしないので、そのColdなObservableに値を渡されても何も処理されません。

こちらの画像がイメージとして非常に分かりやすいです。

f:id:soramamenatan:20210123160019p:plain

RxのHotとColdについて - Qiita:より引用

Subscribe前にColdに渡された値も処理してほしい場合にはHot変換オペレータというものを使用します。

Hot変換オペレータを使用
/// <summary>
/// 意図通りのメッセージ発行
/// </summary>
private void SuccessHotObservable() {
    Subject<int> subject = new Subject<int>();
    // subjectから生成されたObservableはHot
    IObservable<int> asObservable = subject.AsObservable();
    // 渡された値を発行
    IConnectableObservable<int> observable = asObservable
                                                    .Select(x => {
                                                        Debug.Log("Value : " + x);
                                                        return x;
                                                    })
                                                    // Hot変換オペレータ
                                                    .Publish();
    // ストリームの稼働開始
    observable.Connect();

    // 発行されるようになる
    subject.OnNext(1);
    subject.OnNext(10);
    observable.Subscribe();
    // 発行される
    subject.OnNext(100);
}

以前のソースコードに、Publich()とConnect()を加えています。

結果

Hot変換オペレータを挟むことにより、Coldが稼働するようになります。

f:id:soramamenatan:20210127111807p:plain


PublishとConnect

PublishとConnectの内部について少し確認します。
まず、Publishメソッドを呼ぶことにより、ConnectableObservable<T>クラスのインスタンスが生成されます。

Publishの中身
/// <summary>
/// ConnectableObservable<T>のインスタンス生成
/// </summary>
public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject)
{
    return new ConnectableObservable<T>(source, subject);
}

/// <summary>
/// Publishの呼び出し
/// </summary>
public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
    return source.Multicast(new Subject<T>());
}

そして、ConnectableObservable<T>クラスのConnectメソッドでSubscribeしているのでHot変換が行われます。

Connectの中身
class ConnectableObservable<T> : IConnectableObservable<T>
{
    readonly IObservable<T> source;
    readonly ISubject<T> subject;
    readonly object gate = new object();
    Connection connection;

    public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
    {
        this.source = source.AsObservable();
        this.subject = subject;
    }

    /// <summary>
    /// 内部でSubscribe
    /// </summary>
    public IDisposable Connect()
    {
        lock (gate)
        {
            // don't subscribe twice
            if (connection == null)
            {
                var subscription = source.Subscribe(subject);
                connection = new Connection(this, subscription);
            }

            return connection;
        }
    }

Connectの中でSubscribeが呼ばれているので、Connectを忘れるとメッセージが発行されないので気をつけてください。


HotとColdの分岐

Hot変換を利用するときには、ストリームを分岐させたい時に使用します。
実際のコードで見てみます。

Coldの分岐
/// <summary>
/// Coldは分岐できない
/// </summary>
private void ColdBranch() {
    // 1~3を発行するストリーム
    IObservable<int> stream = Observable
                                .Range(1,3)
                                .Select(x => {
                                    Debug.Log("Cold : " + x);
                                    return x;
                                });

    stream.Subscribe();
    stream.Subscribe();
}
結果

ColdなObservableは分岐することができません。
ですので、Subscribeする度にストリームが生成されてしまいます

f:id:soramamenatan:20210127120717p:plain


Hotの分岐
/// <summary>
/// Hotは分岐できる
/// </summary>
private void HotBranch() {
    // 1~3を発行するストリーム
    IConnectableObservable<int> stream = Observable
                                            .Range(1,3)
                                            .Select(x => {
                                                Debug.Log("Hot : " + x);
                                                return x;
                                            })
                                            // Hot変換オペレータ
                                            .Publish();
    // ストリームの稼働開始
    stream.Connect();
    stream.Subscribe();
    stream.Subscribe();
}
結果

HotなObservableは分岐することができます。
ですので、Subscribeしてもストリームが生成されません

f:id:soramamenatan:20210127120956p:plain


値の流し方の違い

最初にHotは同じ値を、Coldは別の値を流すと説明しました。
その具体例を見てみます。
何かキーを押されると押された回数を通知する処理となります。

Coldなカウンター
/// <summary>
/// Coldなカウンター
/// </summary>
private void ColdCounter() {
    // 何かのキーを押された回数を通知する
    IObservable<int> counterObservable = Observable
                                    .EveryUpdate()
                                    .Where(_ => Input.anyKeyDown)
                                    .Select(_ => 1)
                                    .Scan((a, b) => a + b);

    // counterObservableの値を発行する
    IDisposable stream = Observable
                            .EveryUpdate()
                            .Where(_ => Input.anyKeyDown)
                            .Select(_ => 1)
                            .Scan((a, b) => a + b)
                            .Subscribe(subscribeNum => {
                                counterObservable.Subscribe(counter => {
                                    Debug.Log(string.Format("【Cold】{0}回目のSubscribe, Counter Value : {1}", subscribeNum, counter));
                                });
                    });
}
結果

Coldは購読時点からのカウントの値を流しています。
ですので、各Subscribe時に違う値が流れてしまっています。

f:id:soramamenatan:20210127123656p:plain


Hotなカウンター
/// <summary>
/// Hotなカウンター
/// </summary>
private void HotCounter() {
    // 何かのキーを押された回数を通知する
    IConnectableObservable<int> counterObservable = Observable
                                    .EveryUpdate()
                                    .Where(_ => Input.anyKeyDown)
                                    .Select(_ => 1)
                                    .Scan((a, b) => a + b)
                                    // Hot変換
                                    .Publish();

    // counterObservableの値を発行する
    IDisposable stream = Observable
                            .EveryUpdate()
                            .Where(_ => Input.anyKeyDown)
                            .Select(_ => 1)
                            .Scan((a, b) => a + b)
                            .Subscribe(subscribeNum => {
                                counterObservable.Subscribe(counter => {
                                    Debug.Log(string.Format("【Hot】{0}回目のSubscribe, Counter Value : {1}", subscribeNum, counter));
                                });
                    });

    // ストリーム稼働
    counterObservable.Connect();
}
結果

Hot変換をすることにより、同じ値が流れるようになりました。

f:id:soramamenatan:20210127124145p:plain


今回は以上となります。
ここまでご視聴ありがとうございました。


参考サイト様

qiita.com

qiita.com

light11.hatenadiary.com

www.slideshare.net