rx java - How to compose properly timer Observables in ReactiveX? -


the method startpositioninfopolling executes polling every 2 seconds of positioninfo remote side , updating ui.

private subscription mpollingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 2, timeunit.seconds);  private void startpositioninfopolling() {     logger.trace("...");     mpollingtimersubscription = mpollingtimerobservable             .flatmap(new func1<long, observable<positioninfo>>() {                 @override                 public observable<positioninfo> call(long ticker) {                     logger.debug("xxx ticker = {}", ticker);                     return mmediarendererclient.createpositioninfoobservable();                 }             })             .subscribeon(androidschedulers.mainthread())             .subscribe(new action1<positioninfo>() {                 @override                 public void call(final positioninfo positioninfo) {                     logger.debug("xxx positioninfo = {}", positioninfo);                     mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo);                 }             }); } 

the method stoppositioninfopolling stops ui updates.

private void stoppositioninfopolling() {     logger.trace("...");     mpollingtimersubscription.unsubscribe(); } 

i change code less periodically (e.g. every 20s) fetches remote data , periodically (e.g. every 1s) updates ui extrapolated value.

my first solution using rxjava looked like:

private subscription mpollingtimersubscription, mupdatingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 20, timeunit.seconds); private observable<long> mupdatingtimerobservable = observable.timer(0, 1, timeunit.seconds);  private void startpositioninfopolling() {     logger.trace("...");     mpollingtimersubscription = mpollingtimerobservable             .subscribe(new action1<long>() {                 @override                 public void call(long ticker) {                     logger.debug("xxx ticker = {}", ticker);                     mmediarendererclient.createpositioninfoobservable()                             .retry(2)                             .subscribe(new action1<positioninfo>() {                                 @override                                 public void call(final positioninfo positioninfo) {                                     logger.debug("xxx positioninfo = {}", positioninfo);                                     mupdatingtimersubscription = mupdatingtimerobservable                                             .take(20, timeunit.seconds)                                             .subscribeon(androidschedulers.mainthread())                                             .subscribe(new action1<long>() {                                                 @override                                                 public void call(long ticker) {                                                     logger.debug("xxx ticker = {}", ticker);                                                     string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker);                                                     positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime());                                                     logger.debug("xxx positioninfo = {}", updatedpositioninfo);                                                     mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo);                                                 }                                             });                                 }                             });                 }             }); }  private void stoppositioninfopolling() {     logger.trace("...");     mpollingtimersubscription.unsubscribe();     mupdatingtimersubscription.unsubscribe(); } 

could me transforming code less callback looking like?! feel flatmap key, mind still not thinking reactively ;-)

also problem mpollingtimersubscription.unsubscribe(); not unsubscribing/cancelling/stopping mupdatingtimerobservable, subscription being maintained.

thanks in advance comments.

update: @hello_world reducing original complexity ;-)

private subscription mpollingtimersubscription, mupdatingtimersubscription; private observable<long> mpollingtimerobservable = observable.timer(0, 20, timeunit.seconds); private observable<long> mupdatingtimerobservable = observable.timer(0, 1, timeunit.seconds);  private void startpositioninfopolling() {     logger.trace("...");     mpollingtimersubscription = mpollingtimerobservable             .flatmap(new func1<long, observable<positioninfo>>() {                 @override                 public observable<positioninfo> call(long ticker) {                     logger.debug("xxx ticker = {}", ticker);                     return mmediarendererclient.createpositioninfoobservable();                 }             })             .retry(2)             .subscribe(new action1<positioninfo>() {                 @override                 public void call(final positioninfo positioninfo) {                     logger.debug("xxx positioninfo = {}", positioninfo);                     mupdatingtimersubscription = mupdatingtimerobservable                             .take(20, timeunit.seconds)                             .subscribeon(androidschedulers.mainthread())                             .subscribe(new action1<long>() {                                 @override                                 public void call(long ticker) {                                     logger.debug("xxx ticker = {}", ticker);                                     string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker);                                     positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime());                                     logger.debug("xxx positioninfo = {}", updatedpositioninfo);                                     mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo);                                 }                             });                 }             }); }  private void stoppositioninfopolling() {     logger.trace("...");     mpollingtimersubscription.unsubscribe();     mupdatingtimersubscription.unsubscribe(); } 

i can't comment, i'll post answer.

why don't try using timer operator?

observable     .timer(delay, interval, timeunit.seconds)     .subscribe(/*  ...  */); 

also, have few other variants of timer use.

maybe these -

try code:

mpollingtimerobservable     .flatmap(new func1<long, observable<positioninfo>>() {         @override         public observable<positioninfo> call(long long) {             logger.debug("xxx ticker = {}", ticker);             return mmediarendererclient.createpositioninfoobservable();         }     })     .retry(2)     .subscribe(new action1<positioninfo>() {         @override         public void call(final positioninfo positioninfo) {             logger.debug("xxx positioninfo = {}", positioninfo);             mupdatingtimersubscription = mupdatingtimerobservable                 .take(20, timeunit.seconds)                 .subscribeon(androidschedulers.mainthread())                 .subscribe(new action1<long>() {                     @override                     public void call(long ticker) {                         logger.debug("xxx ticker = {}", ticker);                         string updatedreltime = modelutil.totimestring(modelutil.fromtimestring(positioninfo.getreltime()) + ticker);                         positioninfo updatedpositioninfo = new positioninfo(positioninfo, updatedreltime, positioninfo.getabstime());                         logger.debug("xxx positioninfo = {}", updatedpositioninfo);                         mmusicmediatrackdetailsfragment.updateview(updatedpositioninfo);                     }                 });         }     }); 

update

i've thought of way make simple. had whole zip concept wrong. :p bad! anyway, here new 'flattened' code

mpollingtimerobservable     .flatmap(new func1<long, observable<positioninfo>>() {         @override         public observable<positioninfo> call(long long) {             logger.debug("xxx ticker = {}", ticker);             return mmediarendererclient.createpositioninfoobservable();         }     })     .retry(2)     .zipwith(mupdatingtimerobservable, new func2<positioninfo, long, positioninfo>() {             @override             public positioninfo call(positioninfo p, long l) {                 return p;             }         })     .take(20)     .subscribeon(androidschedulers.mainthread())     .subscribe(/* ... */); 

i haven't had time test this, think should work. let me know if not!


Comments

Popular posts from this blog

toolbar - How to add link to user registration inside toobar in admin joomla 3 custom component -

linux - disk space limitation when creating war file -

How to provide Authorization & Authentication using Asp.net, C#? -