Skip to content

How to checkpoint a Observable returned in mapMany? #126

Closed
@dcapwell

Description

@dcapwell

I have the following use case:
For each file in a FTP directory
read file line by line
filter comments
parse line into a java object
serialize that obj
send each obj in batches to another component

I also want to make sure I don't process the same files over and over so I first filter all files based off a checkpoint mechanism, and after all the contents of a file have been sent, add that file to the checkpoint.

I have tried to replicate the logic in RxJava the following way:

Observable.from(client.list(basePath, true)) // recursively get all files under the path
          .filter(new Func1<FtpClient.FileStats, Boolean>() {
            @Override
            public Boolean call(final FtpClient.FileStats fileStats) {
              return fileStats == null || fileStats.getSize() <= 0 || !checkpointer.hasProcessed(fileStats.getPath());
            }
          })
          .mapMany(new Func1<FtpClient.FileStats, Observable<? extends byte[]>>() {
            @Override
            public Observable<? extends byte[]> call(final FtpClient.FileStats path) {
              final String location = path.getPath();
              final Observable<byte[]> obs = Observable.from(client.getIterable(location)) // opens the file and returns Iterable<String>
                  .filter(COMMENT_FILTER) // remove comment line
                  .map(PARSE_LINE) // convert from String to Event
                  .map(SERIALIZE_EVENT); // convert from Event to byte[]
              obs.subscribe(new CheckpointObserver(checkpointer, location)); // when done processing, checkpoint file
              return obs;
            }
          })
          .subscribe(new EventSender(cp));

The above code causes the following error to be passed to onError:

java.lang.IllegalStateException: Can not set subscription more than once.
    at rx.util.AtomicObservableSubscription.wrap(AtomicObservableSubscription.java:58) ~[rxjava-core-0.5.0.jar:na]
    at rx.operators.OperationFilter$Filter.call(OperationFilter.java:48) [rxjava-core-0.5.0.jar:na]
    at rx.operators.OperationFilter$Filter.call(OperationFilter.java:36) [rxjava-core-0.5.0.jar:na]

But this works:

Observable.from(client.list(basePath, true))
          .filter(new Func1<FtpClient.FileStats, Boolean>() {
            @Override
            public Boolean call(final FtpClient.FileStats fileStats) {
              return fileStats == null || fileStats.getSize() <= 0 || !checkpointer.hasProcessed(fileStats.getPath());
            }
          })
          .mapMany(new Func1<FtpClient.FileStats, Observable<? extends byte[]>>() {
            @Override
            public Observable<? extends byte[]> call(final FtpClient.FileStats path) {
              final String location = path.getPath();
              final Observable<byte[]> obs = Observable.from(client.getIterable(location))
                  .filter(COMMENT_FILTER) // remove comment line
                  .map(PARSE_LINE) // convert from String to Event
                  .map(SERIALIZE_EVENT); // convert from Event to byte[]
              obs.subscribe(new EventSender(cp));
              obs.subscribe(new CheckpointObserver(checkpointer, location)); // when done processing, checkpoint file
              return obs;
            }
          })
          .subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
              // no-op, just trigger execution
            }

            @Override
            public void onError(final Exception e) {
              // no-op, just trigger execution
            }

            @Override
            public void onNext(final byte[] args) {
              // no-op, just trigger execution
            }
          });

Two things: is this a bug in rx? and whats the better way to handle this? (#16 seems like it would really help to replace the EventSender since it really just buffers events before sending them)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions