rx java - How to compose properly timer Observables in ReactiveX? -
the method startpositioninfopolling executes polling every 2 seconds of positioninfo remote side , updating ui.
private subscription mpollingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 2, timeunit.seconds); private void startpositioninfopolling() { logger.trace("..."); mpollingtimersubscription = mpollingtimerobservable .flatmap(new func1<long, observable<positioninfo>>() { @override public observable<positioninfo> call(long ticker) { logger.debug("xxx ticker = {}", ticker); return mmediarendererclient.createpositioninfoobservable(); } }) .subscribeon(androidschedulers.mainthread()) .subscribe(new action1<positioninfo>() { @override public void call(final positioninfo positioninfo) { logger.debug("xxx positioninfo = {}", positioninfo); mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo); } }); } the method stoppositioninfopolling stops ui updates.
private void stoppositioninfopolling() { logger.trace("..."); mpollingtimersubscription.unsubscribe(); } i change code less periodically (e.g. every 20s) fetches remote data , periodically (e.g. every 1s) updates ui extrapolated value.
my first solution using rxjava looked like:
private subscription mpollingtimersubscription, mupdatingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 20, timeunit.seconds); private observable<long> mupdatingtimerobservable = observable.timer(0, 1, timeunit.seconds); private void startpositioninfopolling() { logger.trace("..."); mpollingtimersubscription = mpollingtimerobservable .subscribe(new action1<long>() { @override public void call(long ticker) { logger.debug("xxx ticker = {}", ticker); mmediarendererclient.createpositioninfoobservable() .retry(2) .subscribe(new action1<positioninfo>() { @override public void call(final positioninfo positioninfo) { logger.debug("xxx positioninfo = {}", positioninfo); mupdatingtimersubscription = mupdatingtimerobservable .take(20, timeunit.seconds) .subscribeon(androidschedulers.mainthread()) .subscribe(new action1<long>() { @override public void call(long ticker) { logger.debug("xxx ticker = {}", ticker); string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker); positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime()); logger.debug("xxx positioninfo = {}", updatedpositioninfo); mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo); } }); } }); } }); } private void stoppositioninfopolling() { logger.trace("..."); mpollingtimersubscription.unsubscribe(); mupdatingtimersubscription.unsubscribe(); } could me transforming code less callback looking like?! feel flatmap key, mind still not thinking reactively ;-)
also problem mpollingtimersubscription.unsubscribe(); not unsubscribing/cancelling/stopping mupdatingtimerobservable, subscription being maintained.
thanks in advance comments.
update: @hello_world reducing original complexity ;-)
private subscription mpollingtimersubscription, mupdatingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 20, timeunit.seconds); private observable<long> mupdatingtimerobservable = observable.timer(0, 1, timeunit.seconds); private void startpositioninfopolling() { logger.trace("..."); mpollingtimersubscription = mpollingtimerobservable .flatmap(new func1<long, observable<positioninfo>>() { @override public observable<positioninfo> call(long ticker) { logger.debug("xxx ticker = {}", ticker); return mmediarendererclient.createpositioninfoobservable(); } }) .retry(2) .subscribe(new action1<positioninfo>() { @override public void call(final positioninfo positioninfo) { logger.debug("xxx positioninfo = {}", positioninfo); mupdatingtimersubscription = mupdatingtimerobservable .take(20, timeunit.seconds) .subscribeon(androidschedulers.mainthread()) .subscribe(new action1<long>() { @override public void call(long ticker) { logger.debug("xxx ticker = {}", ticker); string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker); positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime()); logger.debug("xxx positioninfo = {}", updatedpositioninfo); mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo); } }); } }); } private void stoppositioninfopolling() { logger.trace("..."); mpollingtimersubscription.unsubscribe(); mupdatingtimersubscription.unsubscribe(); }
i can't comment, i'll post answer.
why don't try using timer operator?
observable .timer(delay, interval, timeunit.seconds) .subscribe(/* ... */); also, have few other variants of timer use.
maybe these -
learning rxjava (for android) example checkout slides around middle. shows how use timer.
try code:
mpollingtimerobservable .flatmap(new func1<long, observable<positioninfo>>() { @override public observable<positioninfo> call(long long) { logger.debug("xxx ticker = {}", ticker); return mmediarendererclient.createpositioninfoobservable(); } }) .retry(2) .subscribe(new action1<positioninfo>() { @override public void call(final positioninfo positioninfo) { logger.debug("xxx positioninfo = {}", positioninfo); mupdatingtimersubscription = mupdatingtimerobservable .take(20, timeunit.seconds) .subscribeon(androidschedulers.mainthread()) .subscribe(new action1<long>() { @override public void call(long ticker) { logger.debug("xxx ticker = {}", ticker); string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker); positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime()); logger.debug("xxx positioninfo = {}", updatedpositioninfo); mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo); } }); } }); update
i've thought of way make simple. had whole zip concept wrong. :p bad! anyway, here new 'flattened' code
mpollingtimerobservable .flatmap(new func1<long, observable<positioninfo>>() { @override public observable<positioninfo> call(long long) { logger.debug("xxx ticker = {}", ticker); return mmediarendererclient.createpositioninfoobservable(); } }) .retry(2) .zipwith(mupdatingtimerobservable, new func2<positioninfo, long, positioninfo>() { @override public positioninfo call(positioninfo p, long l) { return p; } }) .take(20) .subscribeon(androidschedulers.mainthread()) .subscribe(/* ... */); i haven't had time test this, think should work. let me know if not!
Comments
Post a Comment