2

Looking at RxJava to build asynchronous support for our APIs. Right now we use jetty + JAX-RS @Path annotations and am not sure what is the correct approach to tie the incoming REST api call to RxJava APIs.

Basically this is in the context of freeing up the request thread until the response from DB is ready.

Looked at Vert.x but that requires java 7 and we are tied right now to java 6.

Looking for suggestions regarding the above. what are the typical approaches one takes to tie up the incoming http request to RxJava APIs.

1
  • Are you looking for a client or server? Commented Feb 7, 2015 at 16:02

2 Answers 2

4

Here's an example that would create a Customer Observable for JAX-RS:

public class ApiService {
    Client client;

    public ApiService() {
        client = ClientBuilder.newClient();
    }

    public Observable<Customer> createCustomerObservable(final int customerId) {
        return Observable.create(new Observable.OnSubscribe<Customer>() {
            @Override
            public void call(final Subscriber<? super Customer> subscriber) {
                client
                        .target("http://domain.com/customers/{id}")
                        .resolveTemplate("id", customerId)
                        .request()
                        .async()
                        .get(new InvocationCallback<Customer>() {
                            @Override
                            public void completed(Customer customer) {
                                // Do something
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onNext(customer);
                                    subscriber.onCompleted();
                                }
                            }

                            @Override
                            public void failed(Throwable throwable) {
                                // Process error
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onError(throwable);
                                }
                            }
                        });
            }
        });
    }
}
Sign up to request clarification or add additional context in comments.

Comments

1

Something like the following should work for Jetty:

public class ApiService {
    HttpClient httpClient;

    public ApiService(HttpClient httpClient,) {
        this.httpClient = httpClient;
    }

    public <RequestType, ResultType> Observable<ResultType> createApiObservable(final RequestType requestContent) {
        return Observable.create(new Observable.OnSubscribe<ResultType>() {
            @Override
            public void call(final Subscriber<? super ResultType> subscriber) {
                // Create the request content for your API. Your logic here...
                ContentProvider contentProvider = serializeRequest(requestContent);

                httpClient
                        .newRequest("http://domain.com/path")
                        .content(contentProvider)
                        .send(new Response.CompleteListener() {
                            @Override
                            void onComplete(Result result) {
                                // Pass along the error if one occurred.
                                if (result.isFailed()) {
                                    subscriber.onError(result.getFailure());
                                    return;
                                }

                                // Convert the response data to the ResultType. Your logic here...
                                ResultType resultContent = parseResponse(result.getResponse());

                                // Send the result to the subscriber.
                                subscriber.onNext(responseBytes);
                                subscriber.onCompleted();
                            }
                        });
            }
        });
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.