知識0からのUnityShader勉強

知識0からのUnityShader勉強

UnityのShaderをメインとして、0から学んでいくブログです。

【UniRx】非同期処理のオペレータ #101

前回の成果

時間系のオペレータについて学んだ。

soramamenatan.hatenablog.com


今回やること

今回は非同期処理のオペレータについてまとめます。



処理を別スレッドで実行したい

ToAsync

このオペレータを使用することで、引数に指定した処理を別のスレッドで行うことができます。
Startとの違いは、こちらはInvoke()のタイミングを指定できることです。

ソースコード
/// <summary>
/// 処理を別スレッドで実行
/// </summary>
private void ExcuteToAsync() {
    Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId);
    Observable
        .ToAsync(() => {
            Debug.Log("ToAsync ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        })
        .Invoke()
        .Subscribe(x => {
            Debug.Log("ToAsync OnNext : " + x);
        }, () => {
            Debug.Log("ToAsync OnCompleted");
        });
}
結果

Thread.CurrentThread.ManagedThreadIdは現在行われている処理のスレッドのIDを取得できるものとなります。
最初のスレッドIDとは異なるスレッドIDで処理を実行しています。

f:id:soramamenatan:20210501143124p:plain


Invoke()の呼ぶタイミング

ToAsyncはInvoke()を呼んだ際にストリームが生成されます。
ですので、使用方法によっては挙動が変わってしまいます。

変数にInvoke()を定義したソースコード
/// <summary>
/// ToAsync, Subscribeのみ
/// </summary>
private void ExcuteToAsyncSubscribe() {
    // 同じストリームを購読するので、ToAsync内は1回のみ呼ばれる
    IObservable<Unit> stream = Observable
                                    .ToAsync(() => {
                                        Debug.Log("ToAsync action called");
                                    })
                                    .Invoke();
    stream.Subscribe(x => Debug.Log("1回目"));
    stream.Subscribe(x => Debug.Log("2回目"));
}
結果

Invoke()を呼んだ際にストリームが生成されるので、この場合は同じストリームが購読されます。
ですので、ToAsync内のログは1回のみ呼ばれます。

f:id:soramamenatan:20210501143753p:plain

Invoke()を個別に呼んだソースコード
/// <summary>
/// ToAsync, InvokeとSubscribe
/// </summary>
private void ExcuteToAsyncInvokeSubscribe() {
    // ストリームがそれぞれ作られるので、今回の場合ToAsync内は2回呼ばれる
    // Startと同じ
    Func<IObservable<Unit>> stream = Observable
                                        .ToAsync(() => {
                                            Debug.Log("ToAsync action called");
                                        });
    stream.Invoke().Subscribe(x => Debug.Log("1回目"));
    stream.Invoke().Subscribe(x => Debug.Log("2回目"));
}
結果

こちらの場合、Invoke()がそれぞれ別で呼ばれています。
なので、ストリームが2回生成されToAsync内のログが2回呼ばれます。

f:id:soramamenatan:20210501144209p:plain


ToAsyncの例

Invoke()の時点でストリームが生成されるので、Subscribeをしなくてもメッセージが発行されます。
また、ToAsync内で出した結果をreturnして、メッセージとして発行することもできます。

ソースコード
/// <summary>
/// ToAsyncの例
/// </summary>
private void ExcuteToAsyncExample() {
    Debug.Log("--------Don't use Subscribe-------");
    // Subscribeをしなくても呼び出される
    Observable
        .ToAsync(() => {
            Debug.Log("ToAsync");
        })
        .Invoke();

    Debug.Log("--------return T-------");
    // <T>なので、ToAsync内でreturn可能
    Observable
        .ToAsync(() => {
            return DateTime.Now;
        })
        .Invoke()
        .Subscribe(x => Debug.Log("return value : " + x));
}


結果

SubscribeをしていなくてもToAsyncが呼ばれています。
また今回の場合、DateTimeをreturnできています。

f:id:soramamenatan:20210501144900p:plain


Start

このオペレータを使用することで、引数に指定した処理を別のスレッドで行うことができます。
ToAsyncとの違いは、こちらはInvoke()のタイミングを指定できないことです。

ソースコード
/// <summary>
/// 処理を別スレッドで実行
/// </summary>
private void ExcuteStart() {
    Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId);
    Observable
        .Start(() => {
            Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        })
        .Subscribe(x => {
            Debug.Log("Start OnNext : " + x);
        }, () => {
            Debug.Log("Start OnCompleted");
        });
}
結果

Thread.CurrentThread.ManagedThreadIdは現在行われている処理のスレッドのIDを取得できるものとなります。
最初のスレッドIDとは異なるスレッドIDで処理を実行しています。

f:id:soramamenatan:20210501145311p:plain


複数回Subscribe

Startは内部でInvoke()を呼んでいます。
ですので、ストリームが複数個生成されます。

ソースコード
/// <summary>
/// Start, Subscribe時
/// </summary>
private void ExcuteStartSubscribe() {
    // ストリームがそれぞれ作られるので、今回の場合Start内は2回呼ばれる
    // ToAsync().Invoke().Subscribe()と同じ
    IObservable<Unit> stream = Observable
                                    .Start(() => {
                                        Debug.Log("Start action called");
                                    });
    stream.Subscribe(x => Debug.Log("1回目"));
    stream.Subscribe(x => Debug.Log("2回目"));
}
結果

今回の場合、ストリームが2つ生成されています。
なので、Start内の処理が2回呼ばれています。

f:id:soramamenatan:20210501145924p:plain


Observableのスレッドをメインスレッドへ切り替えたい

ObserveOnMainThread

ToAsyncやStartで別のスレッドに切り替え、メインスレッドに戻したい時に使用します。
また、InstantiateやuGUI系のメソッドといったUnityAPIを使用しているものはメインスレッドでないと使用できません。

サンプルコード
/// <summary>
/// メッセージのスレッドをメインスレッドに変更
/// </summary>
private void ExcuteObserveOnMainThread() {

    Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId);
    Observable
        // 別スレッドで実行
        .Start(() => {
            // メインスレッドではないので、UnityAPIは呼び出すことが出来ない
            // Instantiate(GameObject);
            // Button.onClick.AddListener(Hoge);
            Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        })
        // メインスレッドで実行
        .ObserveOnMainThread()
        .Subscribe(x => {
            Debug.Log("ObserveOnMainThread ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        });
}
結果

Startを使って、別スレッドに移動していますが、ObserveOnMainThreadでメインスレッドへと戻っています。

f:id:soramamenatan:20210501150958p:plain


Observableの途中で処理スレッドを切り替えたい

ObserveOn

引数で指定したSchedulerにスレッドを切り替えることができます。
SubscribeOnとは違い、ObserveOnを呼んだ箇所以下の部分が別スレッドへと切り替わります。

ソースコード
/// <summary>
/// Observableの途中で別スレッドにする
/// </summary>
private void ExcuteObserveOn() {
    Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId);
    Observable
        // 別スレッドで実行
        .Start(() => {
            Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        })
        // メインスレッドで実行
        .ObserveOnMainThread()
        .Do(x => Debug.Log("ObserveOnMainThread ThreadId : " + Thread.CurrentThread.ManagedThreadId))
        // 別スレッドで実行
        .ObserveOn(Scheduler.ThreadPool)
        .Subscribe(x => {
            Debug.Log("ObserveOn ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        });
}
結果

SubscribeのOnNextのログが別スレッドで呼ばれています。

f:id:soramamenatan:20210501151733p:plain


Observableを別スレッドにする

SubscribeOn

引数で指定したSchedulerにスレッドを切り替えることができます。
ObserveOnとは違い、Subscribe時のObservableの処理を別スレッドで行うオペレータとなります。

ソースコード
/// <summary>
/// Observableを別スレッドにする
/// </summary>
private void ExcuteSubscribeOn() {
    Observable
        .Create<Unit>(stream => {
            stream.OnNext(Unit.Default);
            stream.OnCompleted();
            // SubscribeOnにより、別スレッドで実行される
            Debug.Log("Create ThreadId : " + Thread.CurrentThread.ManagedThreadId);
            return Disposable.Empty;
        })
        // 別スレッドで実行
        .SubscribeOn(Scheduler.ThreadPool)
        // メインスレッドで実行
        .ObserveOnMainThread()
        .Subscribe(x => {
            Debug.Log("Subscribe ThreadId : " + Thread.CurrentThread.ManagedThreadId);
        });
}
結果

直前でObserveOnMainThreadによりメインスレッドに処理を戻しています。
しかし、Createで生成されたストリームは別スレッドで実行されています。

f:id:soramamenatan:20210501152257p:plain


Observableの結果をコルーチンで待ちたい

ToYieldInstruction

ストリームをコルーチンへと変換することができ、yeild returnと組み合わせることにより、結果を待つことができます。

ソースコード
/// <summary>
/// Observableの結果をコルーチン上で待ち受ける
/// </summary>
private void ExcuteToYieldInstruction() {
    Observable
        // gameobjectが破棄されても裏で動くので、tokenを渡す
        .FromCoroutine(token => ToYieldInstructionCoroutine(token))
        .Subscribe(x => {
            Debug.Log("ToYieldInstruction OnNext");
        }, () => {
            Debug.Log("ToYieldInstruction OnCompleted");
        });
}

/// <summary>
/// 1秒後に発行されるObservableをコルーチンに変換
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
private IEnumerator ToYieldInstructionCoroutine(CancellationToken token) {
    yield return Observable
                        .Timer(TimeSpan.FromSeconds(1))
                        // コルーチンに変換
                        .ToYieldInstruction(token);
    Debug.Log("Finished Coroutine");
}
結果

結果を待ったコルーチンが終了してから、OnNextとOnCompletedが呼ばれています。

f:id:soramamenatan:20210501152622p:plain


処理を連結させたい

ContinueWith

指定したストリームが完了した後、このオペレータで連結させたストリームが実行されます。

サンプルコード
/// <summary>
/// 処理を連結させる
/// </summary>
private void ExcuteContinueWith() {
    Subject<string> subject = new Subject<string>();
    subject.Subscribe(x => {
            Debug.Log("Subject OnNext");
        }, () => {
            Debug.Log("Subject OnCompleted");
        });

    // subjectに連結させる
    IObservable<string> continueWith = subject.ContinueWith(Observable.Return("Return"));
    continueWith.Subscribe(x => {
            Debug.Log("ContinueWith OnNext");
        }, () => {
            Debug.Log("ContinueWith OnCompleted");
        });

    subject.OnNext("OnNext");
    subject.OnCompleted();
}
結果

subjectのOnCompletedが発行されてから、ContinueWithで連結させたストリームが発行されます。

f:id:soramamenatan:20210501153012p:plain


今回は以上となります。
ここまでご視聴ありがとうございました。


参考サイト様

light11.hatenadiary.com

www.programmersought.com