【UniRx】非同期処理のオペレータ #101
前回の成果
時間系のオペレータについて学んだ。
今回やること
今回は非同期処理のオペレータについてまとめます。
- 前回の成果
- 今回やること
- 処理を別スレッドで実行したい
- Start
- 複数回Subscribe
- Observableのスレッドをメインスレッドへ切り替えたい
- Observableの途中で処理スレッドを切り替えたい
- Observableを別スレッドにする
- Observableの結果をコルーチンで待ちたい
- 処理を連結させたい
- 参考サイト様
処理を別スレッドで実行したい
ToAsync
このオペレータを使用することで、引数に指定した処理を別のスレッドで行うことができます。
Startとの違いは、こちらはInvoke()のタイミングを指定できることです。
ソースコード
/// <summary> /// 処理を別スレッドで実行 /// </summary> private void ExcuteToAsync() { Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId); Observable .ToAsync(() => { Debug.Log("ToAsync ThreadId : " + Thread.CurrentThread.ManagedThreadId); }) .Invoke() .Subscribe(x => { Debug.Log("ToAsync OnNext : " + x); }, () => { Debug.Log("ToAsync OnCompleted"); }); }
結果
Thread.CurrentThread.ManagedThreadIdは現在行われている処理のスレッドのIDを取得できるものとなります。
最初のスレッドIDとは異なるスレッドIDで処理を実行しています。
Invoke()の呼ぶタイミング
ToAsyncはInvoke()を呼んだ際にストリームが生成されます。
ですので、使用方法によっては挙動が変わってしまいます。
変数にInvoke()を定義したソースコード
/// <summary> /// ToAsync, Subscribeのみ /// </summary> private void ExcuteToAsyncSubscribe() { // 同じストリームを購読するので、ToAsync内は1回のみ呼ばれる IObservable<Unit> stream = Observable .ToAsync(() => { Debug.Log("ToAsync action called"); }) .Invoke(); stream.Subscribe(x => Debug.Log("1回目")); stream.Subscribe(x => Debug.Log("2回目")); }
結果
Invoke()を呼んだ際にストリームが生成されるので、この場合は同じストリームが購読されます。
ですので、ToAsync内のログは1回のみ呼ばれます。
Invoke()を個別に呼んだソースコード
/// <summary> /// ToAsync, InvokeとSubscribe /// </summary> private void ExcuteToAsyncInvokeSubscribe() { // ストリームがそれぞれ作られるので、今回の場合ToAsync内は2回呼ばれる // Startと同じ Func<IObservable<Unit>> stream = Observable .ToAsync(() => { Debug.Log("ToAsync action called"); }); stream.Invoke().Subscribe(x => Debug.Log("1回目")); stream.Invoke().Subscribe(x => Debug.Log("2回目")); }
結果
こちらの場合、Invoke()がそれぞれ別で呼ばれています。
なので、ストリームが2回生成されToAsync内のログが2回呼ばれます。
ToAsyncの例
Invoke()の時点でストリームが生成されるので、Subscribeをしなくてもメッセージが発行されます。
また、ToAsync内で出した結果をreturnして、メッセージとして発行することもできます。
ソースコード
/// <summary> /// ToAsyncの例 /// </summary> private void ExcuteToAsyncExample() { Debug.Log("--------Don't use Subscribe-------"); // Subscribeをしなくても呼び出される Observable .ToAsync(() => { Debug.Log("ToAsync"); }) .Invoke(); Debug.Log("--------return T-------"); // <T>なので、ToAsync内でreturn可能 Observable .ToAsync(() => { return DateTime.Now; }) .Invoke() .Subscribe(x => Debug.Log("return value : " + x)); }
結果
SubscribeをしていなくてもToAsyncが呼ばれています。
また今回の場合、DateTimeをreturnできています。
Start
このオペレータを使用することで、引数に指定した処理を別のスレッドで行うことができます。
ToAsyncとの違いは、こちらはInvoke()のタイミングを指定できないことです。
ソースコード
/// <summary> /// 処理を別スレッドで実行 /// </summary> private void ExcuteStart() { Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId); Observable .Start(() => { Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId); }) .Subscribe(x => { Debug.Log("Start OnNext : " + x); }, () => { Debug.Log("Start OnCompleted"); }); }
結果
Thread.CurrentThread.ManagedThreadIdは現在行われている処理のスレッドのIDを取得できるものとなります。
最初のスレッドIDとは異なるスレッドIDで処理を実行しています。
複数回Subscribe
Startは内部でInvoke()を呼んでいます。
ですので、ストリームが複数個生成されます。
ソースコード
/// <summary> /// Start, Subscribe時 /// </summary> private void ExcuteStartSubscribe() { // ストリームがそれぞれ作られるので、今回の場合Start内は2回呼ばれる // ToAsync().Invoke().Subscribe()と同じ IObservable<Unit> stream = Observable .Start(() => { Debug.Log("Start action called"); }); stream.Subscribe(x => Debug.Log("1回目")); stream.Subscribe(x => Debug.Log("2回目")); }
結果
今回の場合、ストリームが2つ生成されています。
なので、Start内の処理が2回呼ばれています。
Observableのスレッドをメインスレッドへ切り替えたい
ObserveOnMainThread
ToAsyncやStartで別のスレッドに切り替え、メインスレッドに戻したい時に使用します。
また、InstantiateやuGUI系のメソッドといったUnityAPIを使用しているものはメインスレッドでないと使用できません。
サンプルコード
/// <summary> /// メッセージのスレッドをメインスレッドに変更 /// </summary> private void ExcuteObserveOnMainThread() { Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId); Observable // 別スレッドで実行 .Start(() => { // メインスレッドではないので、UnityAPIは呼び出すことが出来ない // Instantiate(GameObject); // Button.onClick.AddListener(Hoge); Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId); }) // メインスレッドで実行 .ObserveOnMainThread() .Subscribe(x => { Debug.Log("ObserveOnMainThread ThreadId : " + Thread.CurrentThread.ManagedThreadId); }); }
結果
Startを使って、別スレッドに移動していますが、ObserveOnMainThreadでメインスレッドへと戻っています。
Observableの途中で処理スレッドを切り替えたい
ObserveOn
引数で指定したSchedulerにスレッドを切り替えることができます。
SubscribeOnとは違い、ObserveOnを呼んだ箇所以下の部分が別スレッドへと切り替わります。
ソースコード
/// <summary> /// Observableの途中で別スレッドにする /// </summary> private void ExcuteObserveOn() { Debug.Log("Default ThreadId : " + Thread.CurrentThread.ManagedThreadId); Observable // 別スレッドで実行 .Start(() => { Debug.Log("Start ThreadId : " + Thread.CurrentThread.ManagedThreadId); }) // メインスレッドで実行 .ObserveOnMainThread() .Do(x => Debug.Log("ObserveOnMainThread ThreadId : " + Thread.CurrentThread.ManagedThreadId)) // 別スレッドで実行 .ObserveOn(Scheduler.ThreadPool) .Subscribe(x => { Debug.Log("ObserveOn ThreadId : " + Thread.CurrentThread.ManagedThreadId); }); }
結果
SubscribeのOnNextのログが別スレッドで呼ばれています。
Observableを別スレッドにする
SubscribeOn
引数で指定したSchedulerにスレッドを切り替えることができます。
ObserveOnとは違い、Subscribe時のObservableの処理を別スレッドで行うオペレータとなります。
ソースコード
/// <summary> /// Observableを別スレッドにする /// </summary> private void ExcuteSubscribeOn() { Observable .Create<Unit>(stream => { stream.OnNext(Unit.Default); stream.OnCompleted(); // SubscribeOnにより、別スレッドで実行される Debug.Log("Create ThreadId : " + Thread.CurrentThread.ManagedThreadId); return Disposable.Empty; }) // 別スレッドで実行 .SubscribeOn(Scheduler.ThreadPool) // メインスレッドで実行 .ObserveOnMainThread() .Subscribe(x => { Debug.Log("Subscribe ThreadId : " + Thread.CurrentThread.ManagedThreadId); }); }
結果
直前でObserveOnMainThreadによりメインスレッドに処理を戻しています。
しかし、Createで生成されたストリームは別スレッドで実行されています。
Observableの結果をコルーチンで待ちたい
ToYieldInstruction
ストリームをコルーチンへと変換することができ、yeild returnと組み合わせることにより、結果を待つことができます。
ソースコード
/// <summary> /// Observableの結果をコルーチン上で待ち受ける /// </summary> private void ExcuteToYieldInstruction() { Observable // gameobjectが破棄されても裏で動くので、tokenを渡す .FromCoroutine(token => ToYieldInstructionCoroutine(token)) .Subscribe(x => { Debug.Log("ToYieldInstruction OnNext"); }, () => { Debug.Log("ToYieldInstruction OnCompleted"); }); } /// <summary> /// 1秒後に発行されるObservableをコルーチンに変換 /// </summary> /// <param name="token"></param> /// <returns></returns> private IEnumerator ToYieldInstructionCoroutine(CancellationToken token) { yield return Observable .Timer(TimeSpan.FromSeconds(1)) // コルーチンに変換 .ToYieldInstruction(token); Debug.Log("Finished Coroutine"); }
結果
結果を待ったコルーチンが終了してから、OnNextとOnCompletedが呼ばれています。
処理を連結させたい
ContinueWith
指定したストリームが完了した後、このオペレータで連結させたストリームが実行されます。
サンプルコード
/// <summary> /// 処理を連結させる /// </summary> private void ExcuteContinueWith() { Subject<string> subject = new Subject<string>(); subject.Subscribe(x => { Debug.Log("Subject OnNext"); }, () => { Debug.Log("Subject OnCompleted"); }); // subjectに連結させる IObservable<string> continueWith = subject.ContinueWith(Observable.Return("Return")); continueWith.Subscribe(x => { Debug.Log("ContinueWith OnNext"); }, () => { Debug.Log("ContinueWith OnCompleted"); }); subject.OnNext("OnNext"); subject.OnCompleted(); }
結果
subjectのOnCompletedが発行されてから、ContinueWithで連結させたストリームが発行されます。
今回は以上となります。
ここまでご視聴ありがとうございました。