知識0からのUnityShader勉強

知識0からのUnityShader勉強

UnityのShaderをメインとして、0から学んでいくブログです。

【UniRx】Observableを合成するオペレータ #95

前回の成果

メッセージのフィルタリングをするオペレータについて学んだ。

soramamenatan.hatenablog.com


今回やること

今回はObservableを合成するオペレータについてまとめます。


複数のObservableから一番早く値が流れてきたものを流す

Amb
/// <summary>
/// 複数のObservableから一番早く値が流れてきたものを流す
/// </summary>
private void ExcuteAmb() {
    Observable
        .Amb(
            // 流れない
            Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "3s"),
            // 流れない
            Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "2s"),
            // 流れる
            Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => "1s"))
        .Subscribe(x => {
            Debug.Log("Amb onNext : " + x);
        }, () => {
            Debug.Log("Amb onCompleted");
        });
    // 値が流れてきたら、onCompleted
}
結果

f:id:soramamenatan:20210313130058p:plain


合成元のObservable全てに1回以上値が流れたら、合成後に値を流す

Zip
/// <summary>
/// 合成元のObservable全てに1回以上値が流れたら、合成後に値を流す
/// 同じObservableに複数の値が流れていた場合、先に流れた値が優先される
/// </summary>
private void ExcuteZip() {
    Subject<int> subjectFirst = new Subject<int>();
    Subject<int> subjectSecond = new Subject<int>();

    Observable
        .Zip(subjectFirst, subjectSecond)
        .Subscribe(x => {
            int index = 0;
            foreach (int item in x) {
                index++;
                Debug.Log(string.Format("Zip onNext : {0}番目のonNext : {1}", index, item));
            }
        }, () => {
            Debug.Log("Zip onCompleted");
        });

    // 流れない
    subjectFirst.OnNext(1);
    // 流れる Firstは1が流れる
    subjectSecond.OnNext(10);
    // 流れない
    subjectSecond.OnNext(100);
    // 流れない
    subjectSecond.OnNext(1000);
    // 流れる Secondは100が流れる
    subjectFirst.OnNext(10000);

    // onCompleted呼ばれない
    subjectFirst.OnCompleted();
    // onCompleted呼ばれる
    subjectSecond.OnCompleted();
}
結果

f:id:soramamenatan:20210313130223p:plain


合成元のObservable全てに1回以上値が流れたら、合成後に値を流す

ZipLatest
/// <summary>
/// 合成元のObservable全てに1回以上値が流れたら、合成後に値を流す
/// 同じObservableに複数の値が流れていた場合、後に流れた値が優先される
/// </summary>
private void ExcuteZipLatest()
{
    Subject<int> subjectFirst = new Subject<int>();
    Subject<int> subjectSecond = new Subject<int>();

    Observable
        .ZipLatest(subjectFirst, subjectSecond)
        .Subscribe(x => {
            int index = 0;
            foreach (int item in x) {
                index++;
                Debug.Log(string.Format("ZipLatest onNext : {0}番目のonNext : {1}", index, item));
            }
        }, () => {
            Debug.Log("ZipLatest onCompleted");
        });

    // 流れない
    subjectFirst.OnNext(1);
    // 流れる Firstは1が流れる
    subjectSecond.OnNext(10);
    // 流れない
    subjectSecond.OnNext(100);
    // 流れない
    subjectSecond.OnNext(1000);
    // 流れる Secondは1000が流れる
    subjectFirst.OnNext(10000);

    // onCompleted呼ばれない
    subjectFirst.OnCompleted();
    // onCompleted呼ばれる
    subjectSecond.OnCompleted();
}
結果

f:id:soramamenatan:20210313130525p:plain


合成元のObservableに値が流れたら、他のObservableの最後に発行した値を流す

CombineLatest
/// <summary>
/// 合成元のObservableに値が流れたら、他のObservableの最後に発行した値を流す
/// 1回も流れていないObservableがあれば流さない
/// </summary>
private void ExcuteCombineLatest() {
    Subject<int> subjectFirst = new Subject<int>();
    Subject<int> subjectSecond = new Subject<int>();

    Observable
        .CombineLatest(subjectFirst, subjectSecond)
        .Subscribe(x => {
            int index = 0;
            foreach (int item in x) {
                index++;
                Debug.Log(string.Format("CombineLatest onNext : {0}番目のonNext : {1}", index, item));
            }
        }, () => {
            Debug.Log("CombineLatest onCompleted");
        });

    // 流れない
    subjectFirst.OnNext(1);
    // 流れる Firstは1が流れる
    subjectSecond.OnNext(10);
    // 流れる Firstは1が流れる
    subjectSecond.OnNext(100);
    // 流れる Firstは1が流れる
    subjectSecond.OnNext(1000);
    // 流れる Secondは1000が流れる
    subjectFirst.OnNext(10000);

    // onCompleted呼ばれない
    subjectFirst.OnCompleted();
    // onCompleted呼ばれる
    subjectSecond.OnCompleted();
}
結果

f:id:soramamenatan:20210313130559p:plain


1つのObservableをメインとし、メインに値が流れてきたらサブのObservableと合成する

WithLatestFrom
/// <summary>
/// 1つのObservableをメインとし、メインに値が流れてきたらサブのObservableと合成する
/// サブのObservableは一番最後に流れてきた値
/// </summary>
private void ExcuteWithLatestFrom() {
    Subject<int> subjectMain = new Subject<int>();
    Subject<int> subjectSub = new Subject<int>();

    Observable
        .WithLatestFrom(subjectMain, subjectSub, (mainValue, subValue) => mainValue + subValue)
        .Subscribe(x => {
            Debug.Log("WithLatestFrom onNext : " + x);
        }, () => {
            Debug.Log("WithLatestFrom onCompleted");
        });

    // 流れない
    subjectMain.OnNext(1);
    // 流れない
    subjectSub.OnNext(10);
    // 流れない
    subjectSub.OnNext(100);
    // 流れる 1100
    subjectMain.OnNext(1000);
    // 流れる 10100
    subjectMain.OnNext(10000);

    // onCompleted呼ばれる
    subjectMain.OnCompleted();
}
結果

f:id:soramamenatan:20210313130636p:plain


複数のObservableを1つにまとめる

Merge
/// <summary>
/// 複数のObservableを1つにまとめる
/// </summary>
private void ExcuteMerge() {
    Subject<int> subjectFirst = new Subject<int>();
    Subject<int> subjectSecond = new Subject<int>();

    Observable
        .Merge(subjectFirst, subjectSecond)
        .Subscribe(x => {
            Debug.Log("Merge onNext : " + x);
        }, () => {
            Debug.Log("Merge onCompleted");
        });

    // 流れる
    subjectFirst.OnNext(1);
    // 流れる
    subjectSecond.OnNext(10);
    // 流れる
    subjectFirst.OnNext(100);

    // onCompleted呼ばれない
    subjectFirst.OnCompleted();
    // onCompleted呼ばれる
    subjectSecond.OnCompleted();
}
結果

f:id:soramamenatan:20210313130704p:plain


複数のObservableを直列で流す

Concat
/// <summary>
/// 複数のObservableを直列で流す
/// onCompleted発行時に次のObservableを購読
/// </summary>
private void ExcuteConcat() {
    Subject<string> subjectFirst = new Subject<string>();
    Subject<string> subjectSecond = new Subject<string>();

    Observable
        .Concat(subjectFirst, subjectSecond)
        .Subscribe(x => {
            Debug.Log("Concat onNext : " + x);
        }, () => {
            Debug.Log("Concat onCompleted");
        });

    // 流れる
    subjectFirst.OnNext("First 1回目");
    // 流れない
    subjectSecond.OnNext("Second 1回目");
    // onCompletedは呼ばれないが、subjectSecondが購読される
    subjectFirst.OnCompleted();
    // 流れない
    subjectFirst.OnNext("First 2回目");
    // 流れる
    subjectSecond.OnNext("Second 2回目");
    // onCompleted呼ばれる
    subjectSecond.OnCompleted();
}
結果

f:id:soramamenatan:20210313130731p:plain


Observableの値を使って、別のObservableを制作して合成する

SelectMany
/// <summary>
/// Observableの値を使って、別のObservableを制作して合成する
/// SelectとMarge合わせたイメージ
/// </summary>
private void ExcuteSelectMany()
{
    Subject<int> subject = new Subject<int>();

    // 0,1,2を発行する
    IObservable<int> stream = Observable.Range(0,3);

    // 合成
    subject
        .SelectMany(x => stream.Select(y => x + y))
        .Subscribe(x => {
            Debug.Log("SelectMany onNext : " + x);
        }, () => {
            Debug.Log("SelectMany onCompleted");
        });

    subject.OnNext(10);
    subject.OnNext(100);
    subject.OnCompleted();
}
結果

f:id:soramamenatan:20210313130807p:plain


今回は以上となります。
ここまでご視聴ありがとうございました。


参考サイト様

qiita.com

nobollel-tech.hatenablog.com