【UniRx】その他のオペレータ #104
前回の成果
Observableが完了したときに処理を行うオペレータについてまとめた。
今回やること
その他のオペレータについてまとめます。
- 前回の成果
- 今回やること
- 購読時に指定したメッセージを最初の値として発行する
- 発行されたOnNextをキャッシュし、OnCompleted時にArrayとして1つのメッセージに変換
- 発行されたOnNextをキャッシュし、OnCompleted時にListとして1つのメッセージに変換
- メッセージの加工はせずに、ストリームのメッセージを用いて処理を行う
- メッセージの処理を行い、一番最後にUnitを流す
- 参考サイト様
購読時に指定したメッセージを最初の値として発行する
StartWith
購読時に引数に指定したメッセージを最初の値として発行します。
値だけではなく、関数も引数に指定することができます。
サンプルコード
/// <summary> /// Subscribe時に初期値を流す /// </summary> private void ExcuteStartWith() { Observable .Range(0, 3) .Select(x => x.ToString()) // Rangeより先に呼ばれる .StartWith("Red", "Green", "Blue") .Subscribe(x => { Debug.Log("StartWith OnNext : " + x); }, () => { Debug.Log("StartWith OnCompleted"); }); }
結果
先に呼ばれるはずのRangeの値より先に、引数で指定した"Red", "Green", "Blue"が呼ばれています。
その後、Rangeの値が発行されます。
発行されたOnNextをキャッシュし、OnCompleted時にArrayとして1つのメッセージに変換
ToArray
発行されたOnNextの値を全てキャッシュします。
OnCompleted時に、キャッシュされた値を1つのOnNextのArrayとして変換して、発行します。
サンプルコード
/// <summary> /// 発行されたOnNextをキャッシュし、OnCompleted時にArrayとして1つのメッセージに変換 /// </summary> private void ExcuteToArray() { Observable .Range(0, 3) .ToArray() .Subscribe(x => { Debug.Log("ToArray OnNext : " + x); foreach (var value in x) { Debug.Log("Value : " + value); } }, () => { Debug.Log("ToArray OnCompleted"); }); }
結果
Arrayに変換されています。
また、Arrayの中身がOnNextでキャッシュしたRangeの値となっています。
発行されたOnNextをキャッシュし、OnCompleted時にListとして1つのメッセージに変換
ToList
発行されたOnNextの値を全てキャッシュします。 OnCompleted時に、キャッシュされた値を1つのOnNextのListとして変換して、発行します。
サンプルコード
/// <summary> /// 発行されたOnNextをキャッシュし、OnCompleted時にListとして1つのメッセージに変換 /// </summary> private void ExcuteToList() { Observable .Range(0, 3) .ToList() .Subscribe(x => { Debug.Log("ToList OnNext : " + x); foreach (var value in x) { Debug.Log("Value : " + value); } }, () => { Debug.Log("ToList OnCompleted"); }); }
結果
Listに変換されています。 また、Listの中身がOnNextでキャッシュしたRangeの値となっています。
メッセージの加工はせずに、ストリームのメッセージを用いて処理を行う
Do〇〇
Do〇〇系のオペレータが該当します。
それぞれの呼ばれるタイミングは以下になります。
オペレータ名 | 呼ばれるタイミング |
---|---|
Do | OnNext |
DoOnCompleted | OnCompleted |
DoOnError | OnError |
DoOnSubscribe | Subscribe |
DoOnCancel | Dispose |
DoOnTerminate | OnCompletedとOnError |
ソースコード
/// <summary> /// Do系のメッセージ /// ストリームのメッセージを用いて処理を行う /// メッセージ自体の加工は行わない /// </summary> private void ExcuteDoOperators() { Observable .Range(0, 1) // OnNext時に呼ばれる .Do (x => Debug.Log("Call Do")) // OnCompleted時に呼ばれる .DoOnCompleted(() => Debug.Log("Call DoOnCompleted")) // 購読されたときに呼ばれる .DoOnSubscribe(() => Debug.Log("Call DoOnSubscribe")) // OnCompleted時かOnError時に呼ばれる .DoOnTerminate(() => Debug.Log("Call DoOnTerminate")) .Subscribe(); Subject<int> cancelSubject = new Subject<int>(); IDisposable dispose = cancelSubject // Dispose時に呼ばれる .DoOnCancel(() => Debug.Log("Call DoOnCancel")) .Subscribe(); dispose.Dispose(); Subject<int> errorSubject = new Subject<int>(); errorSubject // OnError時に呼ばれる .DoOnError(e => Debug.Log("Call DoOnError")) .Subscribe(); errorSubject.OnError(new Exception()); }
結果
上記に記載した表のタイミングで各オペレータが呼ばれています。
メッセージの処理を行い、一番最後にUnitを流す
ForEachAsync
OnNextが流れてきたときに処理を挟みたいけど、最終的なストリームにはOnNextとOnCompletedが1回だけ同時に呼ばれるようにしたい場合に使用します。
主にファイルのロード等の非同期処理に使用します。
また、行っていることはDo()とLast()とAsUnitObservable()を組み合わせたものと同じになります。
ソースコード
/// <summary> /// メッセージの処理を行い、一番最後にUnitを流す /// </summary> private void ExcuteForEachAsync() { Observable .Range(0, 3) .ForEachAsync(x => Debug.Log("ForEachAsync : " + x)) .Subscribe(x => { Debug.Log("Subscribe OnNext : " + x); }, () => { Debug.Log("Subscribe OnCompleted"); }); Debug.Log("--------Same--------"); // ForEachAsyncを使用しない場合 Observable .Range(0, 3) .Do(x => Debug.Log("Do : " + x)) .Last() .AsUnitObservable() .Subscribe(x => { Debug.Log("Subscribe OnNext : " + x); }, () => { Debug.Log("Subscribe OnCompleted"); }); }
結果
最終的なストリームには、OnNextとOnCompletedが1回しか流れていません。
また、ForEachAsyncを使用した場合とDo()Last()AsUnitObservable()を組み合わせた場合でも結果が同じです。
今回は以上となります。
ここまでご視聴ありがとうございました。