0

I have 2 collections, which buffer location update events:

     private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>();
    private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>();

There is also present in my code:

        private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor();
    private boolean mSaveDataScheduled;
    private final Object mEventsMonitor = new Object();

    private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture;
    private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor();

I add event to this colections like this:

    public void appendGeoEvent(LocationGeoEvent event) {
            synchronized (mEventsMonitor) {
                mUpdateGeoEvents.add(event);
                scheduleSaveEvents();
            }
    }

The same goes for the RSSI event

Now, the scheduleSaveEvents method looks like this:

      private void scheduleSaveEvents() {

        synchronized (mSaveDataExecutor) {
            if (!mSaveDataScheduled) {
                mSaveDataScheduled = true;
                mSaveDataExecutor.schedule(
                        new Runnable() {
                            @Override
                            public void run() {
                                synchronized (mSaveDataExecutor) {
                                    saveEvents(false);
                                    mSaveDataScheduled = false;
                                }
                            }
                        },
                        30,
                        TimeUnit.SECONDS);
            }
        }

    }

The problem is, that i need to synchronize the other method which stops the updates. It is triggered like this:

      private void scheduleStopLocationUpdates() {

        synchronized (mStopLocationUpdatesExecutor) {
            if (mScheduledStopLocationUpdatesFuture != null)
                mScheduledStopLocationUpdatesFuture.cancel(true);

            mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule(
                    new Runnable() {
                        @Override
                        public void run() {
                            synchronized (mStopLocationUpdatesExecutor) {
                                stopLocationUpdates();
                                saveEvents(true);
                                cleanAllReadingsData();
                            }
                        }
                    },
                    45,
                    TimeUnit.SECONDS);
        }

    }

In the saveEvents method i do:

    private void saveEvents(boolean locationUpdatesAboutToStop) {

        synchronized (mEventsMonitor) {
            if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) {

                 //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop

                mUpdateGeoEvents.clear();
                mUpdateRSSIEvents.clear();
            }

        }

    }

Is there a way to refactor this simplier to RxJava using Kotlin?

UPDATE

Here is my appendRSSIevents method:

    private fun appendRSSIEvent(event: LocationRSSIEvent) {
    synchronized(mEventsMonitor) {
        if (!shouldSkipRSSIData(event.nexoIdentifier)) {
            mUpdateRSSIEvents.add(event)
            acknowledgeDevice(event.nexoIdentifier)
            scheduleSaveEvents()
            startLocationUpdates()
        } else
            removeExpiredData()
    }
}

1 Answer 1

1

You can buffer the two streams of data and then combine them for saving. Also, you can use the buffer trigger to stop the updates as well.

PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create();
PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create();

public void appendGeoEvent(LocationGeoEvent event) {
  mUpdateGeoEventsSubject.onNext( event );
  triggerSave.onNext( Boolean.TRUE );
}

and the same for RSS feed.

Now we need triggers that will be used to drive the saving step.

PublishSubject<Boolean> triggerSave = PublishSubject.create();
PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create();

Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS );
Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave );

The trigger observable fires when either the normal save process fires or if we are stopping the save.

private void saveEvents(
  List<LocationGeoEvent> geo,
  List<LocationRSSIEvent> rss,
  boolean locationUpdatesAboutToStop) {

    synchronized (mEventsMonitor) {
        if (geo.size() > 0 || rss.size() > 0) {
             //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop
        }
    }
}
private void scheduleStopLocationUpdates() {
  stopLocationUpdates();
  triggerStopAndSave.onNext( Boolean.FALSE );
  cleanAllReadingsData();
}

Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ),
                mUpdateRSSIEventsSubject.buffer( trigger ),
                trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) )
  .subscribe();

You will still need to some tuning with respect to multi-threading and safety. The first step would be to turn the various subjects into SerializedSubjects so that multiple threads can emit events.

If you want saveEvents to run on a particular scheduler, you will either need to add an intermediate data structure, a triple, to pass the parameters through observeOn() operator, or apply observeOn() operator to each of zip() arguments.

Sign up to request clarification or add additional context in comments.

11 Comments

Thanks, i will try to work with your solution, but have you tried this solution in Android Studio IDE? Something is wrong with the Observable.zip(..) part. However i am using Kotlin(this is wrong in Kotlin too). IDE says 'Cannot infer functional interface type' and underlines the (geo, rss, trgr) -> saveEvents(geo, rss, trgr) part
Oh, sorry. saveEvents should return a boolean so that there is a value to pass on.
Also i didn't mention important thing about appendRssiEvent - it looks somehow different, maybe it don't change much to your implementation. I updated my question. Also, the scheduleStopLocationUpdates is scheduled in different time than the scheduleStopLocation, to 45 seconds. So i guess it is more complications. Thank you for your help, i really appreciate it.
Also, if you can please check if this Observable.zip(..) can be converted to Kotlin, i can't convert it. The IDE underlines the .zip and says this: gist.github.com/oksett/a406c8d6e2cae3d96fa3c19535e4b138
I have no experience with Kotlin. I suspect that it does something fancy with type inference that is getting in the way.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.