Saturday 27 April 2019

OO Functional Imperative Reactive weaved together

This is first of a two part article to discuss how different paradigms in programming can be weaved together seamlessly via the "First Class Procedure", a term I'm using to best describe the concept. 

The working code in this article demonstrates how you can seamlessly weave together the following to service a request:
  1. Validate a request (on socket event loop thread).
  2. Start a transaction and register the request in the database.  This will be on another thread to avoid halting the socket event loop thread.
  3. Make reactive calls to pull in data from other services.
  4. Run some functional code to work out the standard deviation on service times.
  5. Undertake alternate flows to handle special cases (including handling exceptions).  Then if no exceptions causing rollback, store results in the database.  This again is on a different thread to not tie up the reactive event loop thread.
  6. Send the response after committing the transaction
This allows you to use the programming paradigm best suited to the various problems at hand.  Note the request servicing in the demonstration is arbitrary.  The focus is on showing how the various programming paradigms can be weaved together.

Now to write a complete description of how this works with the first class procedure is beyond a single article.  There are many patterns used together to enable the composition through first class procedures.  Therefore, I'm going to provide an introduction to first class procedures in two parts:
  • This article to demonstrate with working code how flexible and easy composition is with first class procedures
  • Next article to provide an explanation more closely aligned to the theory on how the first class procedure has evolved to its current understanding
We'll start with some simple examples and then get to the more interesting above case of weaving multiple programming paradigms together.

First class procedure

Simple event loop

The following first class procedure services a REST request.  This will be run on the HTTP socket event loop thread.

public void service(ObjectResponse<ServicedThreadResponse> response) {
    response.send(new ServicedThreadResponse(Thread.currentThread().getName(), "Event", System.currentTimeMillis()));
}

Simple thread-per-request

The following first class procedure services a REST request by pulling a value from the database and sending it in the response.  This will be run by a separate thread pool.

public void service(ServicedThreadRequest request, ThreadPerRequestRepository repository, ObjectResponse<ServicedThreadResponse> response) {
    int identifier = request.getIdentifier() % 10;
    ThreadPerRequest entity = repository.findById(identifier).get();
    response.send(new ServicedThreadResponse(Thread.currentThread().getName(), entity.getName(), System.currentTimeMillis()));
}


The distinction of thread to use will be discussed later.  However, for now notice that a Spring Repository is used by only the thread-per-request first class procedure.

First Class Procedures weaved together

Ok, the above is little boring.  We've seen this in web application servers before.  Show us something interesting!

To show something more interesting we are going to weave first class procedures together to achieve the example detailed at the start of this article.

Each step in the request servicing is implemented as a first class procedure.  We'll address each first class procedure in the order specified.

Validate request (on socket event loop)

This is simple validation that the request is correct.  As it is straight forward logic, we use the thread of the socket event loop.  This way we don't have to pay overheads of a thread context switch and threading overheads to reject invalid requests.  The code is as follows:

const HttpException = Java.type("net.officefloor.server.http.HttpException");
const Integer = Java.type("java.lang.Integer")

function validate(identifier, requestIdentifier) {
    if (Number(identifier) <= 0) {
        throw new HttpException(422, "Invalid identifier");
    }
    requestIdentifier.set(Integer.valueOf(identifier))
}
validate.officefloor = [ 
    { httpPathParameter: "identifier" },
    { out: Integer },
    { next : "valid" }
];

Note that the validation is written in JavaScript.  This is so that the client side JavaScript validation rules can be re-used to validate requests to ensure consistency between client and server.

The officefloor attribute added to the function provides meta-data.  This is necessary, as JavaScript does not provide the strongly typed information required of first class procedures.

Imperative to register request in database

After validation, the request identifier is registered in the database.  This also creates a unique number for the request based on an IDENTITY column in the database.

@Next("registered")
public static void registerRequest(@Val int requestIdentifier, WeavedRequestRepository repository, Out<WeavedRequest> weavedRequest) {
    WeavedRequest entity = new WeavedRequest(requestIdentifier);
    repository.save(entity);
    weavedRequest.set(entity);
}

Reactive

The next is some Reactive code to concurrently call the two REST end points detailed at the start of this article (simple event loop and simple thread-per-request).   Because we are using Reactive we can call them concurrently to improve performance.

Note that while waiting on the responses, the flow is effectively idle with threads servicing other functionality.  This is asynchronous handling so that threads are not tied up waiting.   Once both sets of results come back, they notify the respective asynchronous flow to continue processing.

private final static String URL = "http://localhost:7878/{path}";

@Next("useData")
public static void retrieveData(WebClient client,
        AsynchronousFlow eventLoopFlow, @EventLoopResponse Out<ServicedThreadResponse> eventLoopResponse,
        @Val WeavedRequest request, AsynchronousFlow threadPerRequestFlow, @ThreadPerRequestResponse Out<ServicedThreadResponse> threadPerRequestResponse) {

    Flux.range(1, 10)
        .map((index) -> client.get().uri(URL, "event-loop").retrieve().bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response).collectList().subscribe((responses) -> eventLoopFlow.complete(
            () -> eventLoopResponse.set(responses.stream().toArray(ServicedThreadResponse[]::new))));

    Flux.range(1, 10)
        .map((index) -> client.post().uri(URL, "thread-per-request").contentType(MediaType.APPLICATION_JSON)
            .syncBody(new ServicedThreadRequest(request.getId())).retrieve()
            .bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response).collectList().subscribe((responses) -> threadPerRequestFlow.complete(
            () -> threadPerRequestResponse.set(responses.stream().toArray(ServicedThreadResponse[]::new))));
}

By now you may be noticing the Out/@Val combinations.  This is how values can be passed from one first class procedure to another first class procedure.  Note that if type for different values is the same, a qualifier can be used to distinguish them.  The rest of the arguments are provided from dependency injection (in this case Spring).

Functional

Next the reactive responses are provided to Scala functional code to determine the standard deviation of service times.

def mean(timestamps: Iterable[Long]): Double = timestamps.sum.toDouble / timestamps.size

def variance(timestamps: Iterable[Long]): Double = {
    val avg = mean(timestamps)
    timestamps.map(timestamp => math.pow(timestamp.toDouble - avg, 2)).sum / timestamps.size
}

def stdDev(timestamps: Iterable[Long]): Double = math.sqrt(variance(timestamps))

@Next("use")
def standardDeviation(@EventLoopResponse @Val eventLoopResponses: Array[ServicedThreadResponse], @ThreadPerRequestResponse @Val threadPerRequestResponses: Array[ServicedThreadResponse]): Double =
    stdDev((eventLoopResponses ++ threadPerRequestResponses).map(response => response.getTimestamp))

Note that a library could be used to reduce this code.  However, we've done this to demonstrate how functional code can be integrated into first class procedures.

Flow control

The next first class procedure triggers a flow to handle special cases.  Should there be no issues with the special cases, then it stores the standard deviation in the database.

@FlowInterface
public static interface Flows {
    void handleSpecialCases(FlowSuccessful callback);
    void stored();
}

public static void store(@Parameter double standardDeviation, Flows flows, @Val WeavedRequest request, WeavedRequestRepository repository, Out<RequestStandardDeviation> stDevOut) {
    flows.handleSpecialCases(() -> {
        request.setRequestStandardDeviation(new RequestStandardDeviation(standardDeviation, request));
        repository.save(request);
        stDevOut.set(request.getRequestStandardDeviation());
        flows.stored();
    });
}

The handling of the special cases is by the following first class procedure.

public static void handleSpecialCase(@Val WeavedRequest request) throws WeavedRollbackException, WeavedCommitException {
    switch (request.getRequestIdentifier()) {
        case 3:
            throw new WeavedRollbackException(request);
        case 4:
            throw new WeavedCommitException(request);
    }
}

Touch of exception handling

The two exception handling first class procedures are as follows.

public static void handle(@Parameter WeavedRollbackException exception, ObjectResponse<WeavedErrorResponse> response) {
    WeavedRequest request = exception.getWeavedRequest();
    response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}

public static void handle(@Parameter WeavedCommitException exception, WeavedRequestRepository repository, ObjectResponse<WeavedErrorResponse> response) {
    WeavedRequest request = exception.getWeavedRequest();
    request.setWeavedError(new WeavedError("Request Identifier (" + request.getRequestIdentifier() + ") is special case", request));
    repository.save(request);
    response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}

The second handler works within the transaction, so includes further data stored in the database.

Note that due to first class procedure composition not requiring the caller to catch exceptions, checked exceptions are embraced.  We consider checked exceptions very useful information in flow composition.  However, the distinction is that it should not be the caller's concern but rather the flow's concern.  To me this is a big difference and stops the catch and log exception handling problem.   Exception handling is now a separate concern that can be coded in afterwards.

Successful response

On successful storage of the request details in the database, the following first class procedure sends the response.

public void send(@Val WeavedRequest request, @Val RequestStandardDeviation standardDeviation, @EventLoopResponse @Val ServicedThreadResponse[] eventLoopResponse,
        @ThreadPerRequestResponse @Val ServicedThreadResponse[] threadPerRequestResponse, ObjectResponse<WeavedResponse> response) {
    response.send(new WeavedResponse(request.getRequestIdentifier(), request.getId(), eventLoopResponse, threadPerRequestResponse, standardDeviation.getStandardDeviation()));
}

Kotlin for some OO

Oh, and just for a little bit more polyglot fun, the OO objects used to represent the JSON request/responses are the following.

@HttpObject
data class ServicedThreadRequest(val identifier: Int)

data class ServicedThreadResponse(val threadName: String, val lookupName: String, val timestamp: Long)

data class WeavedErrorResponse(val requestIdentifier: Int, val requestNumber: Int)

data class WeavedResponse(val requestIdentifier: Int
        , val requestNumber: Int
        , val eventLoopResponses: Array
        , val threadPerRequestResponses: Array
        , val standardDeviation: Double)

Proving it works

The following is a test to confirm the flow of first class procedures services the request.

public static final SpringRule spring = new SpringRule();

public static final OfficeFloorRule officeFloor = new OfficeFloorRule();

@ClassRule
public static final RuleChain ordered = RuleChain.outerRule(spring).around(officeFloor);

@Rule
public final HttpClientRule client = new HttpClientRule();

private static final ObjectMapper mapper = new ObjectMapper();
static {
    mapper.registerModule(new KotlinModule());
}

@Test
public void confirmWeavedTogether() throws Exception {
    HttpResponse response = this.client.execute(new HttpPost(this.client.url("/weave/1")));
    assertEquals("Should be successful", 200, response.getStatusLine().getStatusCode());
    WeavedResponse body = mapper.readValue(EntityUtils.toString(response.getEntity()), WeavedResponse.class);
    WeavedRequest entity = spring.getBean(WeavedRequestRepository.class).findById(body.getRequestNumber()).get();
    assertNotNull("Should have standard deviation stored", entity.getRequestStandardDeviation());
}

Weaving together

The following diagram is the configuration to weave the above first class procedures together.


This is the only configuration/code necessary to compose the first class procedures together.  Notice the names represent the first class procedure names and their respective meta-data.

What this means, is check the port on the all the calls and tests.  Yes, everything you see above is running off the one port.  Yep, you don't have to choose between a framework that provides only thread-per-request or single threaded event loops.  This is because of the execution strategy provided by Thread Injection of first class procedures.

Thread Injection

The threading configuration is actually the following:

<teams>
    <team source="net.officefloor.frame.impl.spi.team.ExecutorCachedTeamSource" type="org.springframework.data.repository.CrudRepository" />
</teams>

Here we flag all procedures requiring a Spring Repository to be executed by a thread pool.  Remember I said keep note of use of Spring Repository.  Well the above configuration has any first class procedure requiring a Spring Repository executed by the configured thread pool.  Note that thread pools are named teams, due to the modeling origins of first class procedures coming from Offices.

Therefore, looking at the flow again, the thread execution is as follows:
  1. Validate uses the thread of the socket listener event loop
  2. Register request uses a Spring Repository, so execution is swapped to a thread from the configured thread pool
  3. This thread carries onto trigger the asynchronous reactive calls
  4. The reactive event loop thread then invokes the callbacks.  As the Scala code is quick to execute, the reactive event loop thread carries on to execute the Scala pure function.  Here it is deemed that a thread context switch is too much overhead, and it is more efficient to just invoke the highly optimised Scala pure function.  However, if we want to separate the Scala function to different thread pool, we can configure in a different thread pool (typically via marker dependency on the first class procedure).
  5. The remaining imperative code has a switch back to a thread from the configured thread pool, as depends on Spring repository.  Furthermore, the thread locals between the threads are propagated to each used thread, so the Spring Repository transaction is not lost (i.e. transaction is active for all first class procedures within the transaction bounds).
  6. Response is then sent.
Now all the above is configurable via Thread Injection.  If we have, for example, more than one synchronous data store, we can create a thread pool to interact with each data store to avoid one slow data store tying up all threads of the application.

This also means you can configure different threading for different environments without having to change any code.

Disclaimer

In a real world applications, I would try to avoid so many of the above programming languages together.  I'd try to streamline them to just a couple to avoid too many skill sets involved driving up maintenance costs of your application (plus reduces problems for mixed compiling).  This is only a demonstration of how OO, Functional, Imperative and Reactive code can all be weaved together with first class procedures.  Furthermore, it demonstrates how you can write concrete solutions before abstracting.

Also, as you can see we've had to cover a lot of breadth in each programming paradigm.  If the code is not a good representation of the paradigm, we're very happy to take feedback on improvements from those more acquainted with a particular paradigm.

And if we've missed an important paradigm, please let me know so we can consider including it.  When it comes to coding we appreciate diversity to give developers choice.  We're trying to tear down fences between the paradigms to have one big happy coding family.

Summary

We've demonstrated how the first class procedure can weave together polyglot code written in different paradigms to service a request.  The code outlined above in the article is all the code required for the application.  There is no further weaving code required.

Furthermore, to avoid the problems of it only works on my machine (in this article), the code for the above is available here.  See the readme on how to run it.

For more understanding of what's going on, see the tutorials, my other articles and in particular my next article.

1 comment: