![]() | RxJavaリアクティブプログラミング (CodeZine BOOKS) 新品価格 |

RxJavaでinterval・take・scanを使う
の続編
次は単純なjust interalでなく
Flowable.createでロジックを書いた場合
・コード
Flowablefcreate = Flowable.create(emitter -> {
String [] datas = {"Hello", "masterka"};
for (String data : datas) {
if (emitter.isCancelled()) {
return;
}
emitter.onNext(data);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
fcreate
.observeOn(Schedulers.computation())
.subscribe(new Subscriber(){
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(1L);
}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
Log.d("create", threadName+" "+s);
this.subscription.request(1L);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
Log.d("create", threadName+" COMPLETE");
}
});
・onNext
onNextでsubscription.request
呼ばないと止まる
・onSubscribe
以下にするとonNextでrequestしなくてよくなる
this.subscription.request(Long.MAX_VALUE);
・androidMainThread
Observable.subscribeOn(AndroidSchedulers.mainThread());
・別スレッド
別々のFloawableをsubscribeして
AndroidSchedulers.mainThread()
すると待ちが発生
Scheduler.single()
も同様かな
これが普通のRxJavaなのかしら
まだまだ続くよ