Sunday, 28 April 2019

Function IoC for First Class Procedure

This is the second article of two to introduce the term I'm suggesting of "first class procedure". The first article provided a working example of the first class procedure to see them in action. This article delves into the detail and some theory on how the first class procedure has evolved.

The evolution of the "first class procedure" starts with looking at the function. The function takes in parameters to then produce the result:

  type Function = Array[Any] => Any

  // Example of first class function
  def exampleFunction(paramOne: Int, paramTwo: Double): String
  val firstClassFunction: Function = (parameters) => exampleFunction(parameters(0).asInstanceOf[Int], parameters(1).asInstanceOf[Double])

However, before being able to obtain any results, the function also requires a thread to be run. This is what I refer to as the "implicit thread". The function does not define details regarding the thread to use, so defaults to the invoking thread of the function (the implicit thread).

Ideally, we should enhance the function signature to indicate the thread more explicitly so we can allow more control in how the composed functions are executed:

  type Executor = (Function, Array[Any]) => Any

  def invoke(executor: Executor, function: Function, parameters: Array[Any]) =
    executor(function, parameters)

Now the thread to execute the function is made explicit. Note that the executor is means to invoke with the implicit thread or a separate explicit thread via a thread pool.

An argument might be that threading is hard and should be left to compilers/frameworks/etc. However, I'm believing that I still want some control over the execution profile of the application. A reason for this would be the following example.

I'm running a multi-user application on a large server with multiple processors that does various blocking I/O and expensive CPU computations to service a request. I really want to optimise the CPU computations to have them have affinity to a core to avoid cache misses and thread context switching overheads. Plus I want to isolate the blocking I/O to a separate thread pool so the CPU computations do not block causing idle CPUs. Given the load of I/O I might want to isolate one core to do all I/O leaving the remaining cores free for the CPU computations. Conversely, I might find the I/O is minimal and can time slice with the CPU computations so I can get an extra core for the CPU computations to gain increased throughput. Ideally, I would like means to tweak this for the application.

Then requirements change and I now want to use the same application to service a single user that is running the application on a single core (e.g. a cheap portable embedded system). In this circumstance, I'm happy for the CPU computations to block while the I/O is underway. Basically, I just want one implicit thread to run all the functions.

In both the above, it is the same application logic but just different execution profiles based on the environment the application is running within.

So taking our above explicit function signature we now require to have the higher order functions provide appropriate threading information to invoke each composed function:

  def higher(executorIO: Executor, executorCpuIntensive: Executor, parameters: Array[Any]) =
    executorIO(functionIO, Array(executorCpuIntensive(functionCpuIntensive, parameters)) ++ parameters)

This now requires the higher order function to determine the thread for each contained function. This can blow out the higher order function's signature. Therefore, let's for now create a function that can determine the executor for a function:

  type ExecutorLocator = Function => Executor

So this now changes the higher order function to be:

  def higher(executorLocator: ExecutorLocator, parameters: Array[Any]) =
    executorLocator(functionIO)(functionIO, Array(executorLocator(functionCpuIntensive)(functionCpuIntensive, parameters)) ++ parameters)

So we've stopped the blow out of the higher order function signature, but it starts to highlight a problem of passing results between the composed functions. Well not so much in passing results, but in which parts of the logic to execute with which thread.

To get the result from functionA to pass to functionB, we have two approaches:
  • As functionA completes, the system returns the result to the higher order function's thread that then passes it onto functionB
  • As functionA completes, it carries onto execute functionB
The difference is very subtle, however significant for thread context switching overheads.

Note: in my naive understanding of functional programming, I believe the first approach can be considered turning each function into an Actor, while the second approach is a Continuation (continuation passing style).

Anyway, before misunderstanding too much of the functional programming literature, the problem with the first approach is excessive thread context switching. If the higher order function is a different executor to it's composed functions, it creates two thread context switches for every composed function. The execution would be:
  1. Higher order function executes
  2. Invokes composed function on another thread
  3. Result returned to higher order function that thread context switches back to itself
  4. Invokes next composed function on another thread (possibly the same thread required for step 2)
The thread context switch that happens in step 3 is not really required. Furthermore, the cost of switching the thread would be more than the few operations to pass the result of the first function to the second function.

Note: I refer to thread context switching overheads assuming threads need to be scheduled in to handle each part. However, even if threads were running continuously on separate cores, there is overheads in having to get the messages between the threads.

There is also a second problem of exceptions. Yes, I understand exceptions are not very functional, however I'll come to this later in regards to composition.

So if we take approach 2 of continuations, then the execution would be as follows:
  1. Higher order function executes
  2. Invokes composed functionA on another thread (passing in the next composed functionB)
  3. On result, the composed functionA continues with the next composed functionB
We have eliminated the extra context switch in the first approach by letting the composed functions continue on with each other.

Now this is not overly special. This is just Continuation Passing Style with the ability to execute the continuation with the implicit thread or delegating to a different thread. In doing so, we try to use the implicit thread as much as possible to reduce threading overheads. However when functions have different execution characteristics (such as blocking I/O, expensive CPU computations) we swap threads to enable appropriate execution of the functions to keep the overall application performant.

Another way of thinking about this threading is to consider the threads running on separate cores. The first approach is very much synchronous communication. Composed function is invoked and the higher order function effectively waits until the result is available. Continuations, on the other hand, are more akin to asynchronous communication. Higher order function triggers the composed function and then is free to continue other functions. The composed function will itself continue with the next composed function.

But continuations do not come easily.

Continuation passing has implications for the function signature, as we must pass the next continuation into all functions. This now has our functions looking like this:

  type Continuation = (Any) => Unit

  def function(executorLocator: ExecutorLocator, parameters: Array[Any], continuation: Continuation)

Now might I dare say there is more than one outcome to a function - well to me anyway. Yes, we could return a data type defining both success and error. Case statements then handle these various outcomes. However, for each new error type we're having to add new case statement handling. This reminds me of reflection problems of having to interrogate for each error type. Personally, I like to handle these outcomes separately.

Rather than combining the result and error into the one next continuation, we can provide a continuation for each. In my understanding, this is not too different to try/catch blocks (except that we can now execute the catch block with the implicit thread or different thread). In other words, we provide multiple continuations:

  def function(executorLocator: ExecutorLocator, parameters: Array[Any],
      successfulContinuation: Continuation,
      errorOneContinuation: Continuation,
      errorTwoContinuation: Continuation)

But why stop there. We can also have different paths through the function for say if statements. If condition is true follow the first continuation, else follow the second continuation.

  def function(executorLocator: ExecutorLocator, parameters: Array[Any],
      trueContinuation: Continuation,
      falseContinuation: Continuation,
      errorOneContinuation: Continuation,
      errorTwoContinuation: Continuation)

This is starting to blow out the function signature again, especially when composing into higher order functions that have to handle many exceptions. This is also the reason I tend to find many frameworks are moving away from checked exceptions. However, with first class procedures, we very much like checked exceptions (though this is probably not as you typically know them). But we'll come to this soon.

So to avoid the signature blow out, let's do what we did with the choice of executor and wrap the continuation decision into a function. For now, let's assume we can have some key to aid the function to determine the appropriate continuation. This function would look as follows:

  type ContinuationLocator = (Function, Any) => Continuation

This then turns all our functions into the following:

  def function(
      executorLocator: ExecutorLocator,
      parameters: Array[Any],
      continuationLocator: ContinuationLocator)

So now we have this very flexible execution model than minimises the thread context switching.

However, how do we implement the executorLocator and continuationLocator functions?

Well the naming of the functions is deliberate, as they follow the ServiceLocator pattern. Given a key provide back the dependency. However, in this case, it is not an object but rather an executor for thread choice and continuation for invoking the next function.

Yay, we can now go create key/value configuration for every function in the system. Well, maybe not.

This granularity of configuration ends up being a nightmare. I can certainly say my first versions of implementing first class procedures showed this very much to be the case. Plus given we have an assumed key to identify the continuation, how do we know we have every continuation configured for a complete system? In other words, how can we make this compile safe?

To solve this problem, we do what we typically always do in software, add more indirection.

Huh, more dynamic indirection to create a type safe compiled solution? Well, yes.

To explain how indirection has helped, let's start with the Continuation Injection.

Firstly, we are going to give the function state (or possibly better described as meta-data). Arguably this is potentially turning the function into an object, but I'm going to avoid trying to relate things to the literature right now and focus on how the first class procedure has evolved.

So we now have a wrapping object:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation])

So we've associated the continuations to the function, but this really is only a dynamic map based on key. Therefore, to be compile safe, we need to give the key some meaning the compiler/framework can understand.

Well to do this, let's go back to the exploded function signature:

  def function(parameters: Array[Any],
      trueContinuation: Continuation,
      falseContinuation: Continuation)

Given the parameters are always ordered, we can use the index of the continuation as the key. This has the ManagedFunction look as follows:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation]) {

    def cont(key: Any): Continuation = continuations.get(key) match { case Some(cont) => cont }

    def run(parameters: Array[Any]) = logic(parameters ++ Array(cont(1), cont(2)))
  }

Now, I mentioned first class procedures actually like using checked exceptions. The reason is that the checked exception is stated on the signature. This would look like:

  function(Object[] paramaeters) throws ErrorOne, ErrorTwo;

Checked exceptions are not ordered, but their types are unique. Therefore, we can use the checked exception's type as the key. This now turns the ManagedFunction into the following:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation]) {

    def cont(key: Any): Continuation = continuations.get(key) match { case Some(cont) => cont }

    def run(parameters: Array[Any]) = {
      try {
        logic(parameters ++ Array(cont(1), cont(2)))
      } catch {
        case ex: Throwable => cont(ex.getClass())(ex)
      }
    }
  }

Now, to make compile/framework safe, we provide a function that produces the required list of keys for the logic function:

  def extractContinuations(logic: Function): Array[Any] = {
    var parameterIndex = 0
    extractParameterTypes(logic)
      .filter((parameterType) => classOf[Continuation].isAssignableFrom(parameterType)).map((paramContinuation) => { parameterIndex += 1; parameterIndex }) ++
      extractExceptionTypes(logic)
  }

The compiler/framework will then confirm that configuration mappings are provided for each key. This allows validating that all continuations are configured for the function to operate. Furthermore, doing this across all ManagedFunction instances within the application, we can confirm a complete configured application. We now have compile/framework startup safe validation that all continuations are configured.

However, now we are having problems of passing state between the functions contained within the ManagedFunction. As only one argument can be passed with the continuation, how can a function have more than one parameter?

Ideally, we want to have each ManagedFunction have the following run method:

  abstract class ManagedFunction {
    def run(parameter: Any) // not, parameters: Array[Any]
  }

So, how can we provide the additional parameters for the function?

Before answering this, we need to consider how the first continuation is triggered to start the chain of ManagedFunction executions. As the application is now being realised as a mapping of continuation to ManagedFunctions, we need means to trigger a continuation from outside a ManagedFunction.

Well, why can't we give objects continuations?

We can create a ManagedObject that contains continuations:

  class ManagedObject(
    @Inject val continuationOne: Continuation,
    @Inject val continuationTwo: Continuation) {
    // ... object methods
  }

This now allows objects to trigger logic. Why is this useful? Well, for example, we can have a HTTP socket listener object that receives a request and services the request by invoking a continuation. Further ManagedFunction instances will then use details of the request, to route it via continuations to appropriate handling ManagedFunction instances to then service the request.

The HTTP example actually points us to a design pattern already solving the issue of multiple parameters for a function. A typical thread-per-request web server has a request, session and application context. Now let's ignore session and application contexts as they are not concurrency safe. It is the request context pattern that helps us.

A request context allows passing objects between controllers and view rendering components. What are the controllers and view renders? They are snippets of logic that take in a request scope to access/mutate the request scope to capture enough state to provide a response (with possible side effects of saving state in databases, logging details, etc).

These snippets of logic fit well into the ManagedFunction, with request scopes created for each continuation tree invoked from a ManagedObject. ManagedObjects are created in the application that are hooked into the network of continuations to ManagedFunctions. When the ManagedObject receives an event (HTTP request, queue message, etc), it does two things:
  1. Starts a new request scope 
  2. Triggers the first continuation with the scope that carries through for all further continuations triggered
  3. ManagedFunctions can now grab their required parameters from the scope
This can be taken further to include dependency injection. Rather than the ManagedFunction being responsible for managing the request scope, the request scope objects are provided through dependency injection. This is the following dependency context for the ManagedFunction:

  type ServiceLocator = String => Any

  class DependencyContext(val serviceLocator: ServiceLocator) {
    val objects = scala.collection.mutable.Map[String, Any]()

    def getObject(name: String) = {
      objects.get(name) match {
        case Some(obj) => obj
        case None => {
          val obj = serviceLocator(name)
          objects(name) = obj
          obj
        }
      }
    }
  }

Side benefit of providing Dependency Context is that we can re-use existing Dependency Injection frameworks for managing objects. For example, the ServiceLocator can be a Spring BeanFactory. Furthermore, we can also dependency inject ManagedObject implementations to allow objects to maintain state, but also trigger continuations in the background (e.g. background polling for JWT key changes in providing JWT authentication state).

The ManagedFunction now becomes:

  type ContinuationFactory = DependencyContext => Continuation

  class ManagedFunction(
    val logic: Function,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext) = {
      try {
        logic(Array(parameterFromContinuation, obj(1, context), obj(2, context), cont(1, context), cont(2, context)))
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

To populate the scope names, we can again use reflection on the logic signature. However, rather than having to provide explicit configuration, we can use auto-wire configuration based on parameter type and possible qualifier. This then becomes normal dependency injection for constructors, except that we are injecting into the logic function.

We can now have database connections, HTTP clients, etc provided to the logic, however it does not answer the problem of passing state across the continuation boundaries.

To solve passing state, we just create a state object. This object acts much like a variable. It's value can be set and retrieved. However, this introduces mutability and timing concerns regarding the flow of continuations. It is unclear on whether a ManagedFunction is only safely accessing the variable's value, or is unsafely mutating the variable. Therefore, for variables we provide additional support within the ManagedFunction to identify the use of the variable.

For variable state objects, we allow the ManagedFunction to use various interfaces to identify the nature of using the variable. This allows the following interfaces for a variable state:

  trait Out[T] { def set(value: T) }
  trait In[T] { def get(): T }
  trait Var[T] extends Out[T] with In[T]

ManagedFunctions can then use the appropriate interface to identify their intention on the state of the variable.

Note that it is now possible to traverse the graph from ManagedObject continuations to confirm variable state outputs of ManagedFunctions are always upstream of respective inputs. This creates an ability for compile safe state generation. Furthermore, if all objects loaded to scope variables are immutable it allows reasoning for identifying the ManagedFunction producing incorrect state (just look for the ManagedFunctions requiring the Out of the variable).

What this now also provides is multiple inputs and multiple outputs. Composition is no longer derived by output of one function being passed as input to the next function. State is maintained in scope with ManagedFunctions pulling/pushing state as appropriate to the scope. Continuations are now separated from having to be concerned with all the state needed to invoke a ManagedFunction.

Now, the above implementation assumes some order of parameters followed by continuations in invoking the logic. As this information is reflectively retrieved from the logic function, the order is not necessary. We can then have a ManagedFunction look as follows:

  class ManagedFunction(
    val logic: Function,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext) = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(logic).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      try {
        logic(arguments)
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

Notice that the return value from the function (logic) is no longer necessary. Hence, why we're considering this "first class procedures".

This can then be represented as follows:

  class ManagedFunction(
    val procedure: Array[Any] => Unit,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext): Unit = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(procedure).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      try {
        procedure(arguments)
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

So we've provided composition of logic with state management, but we've not solved the original implicit thread problem that sparked this.

To solve specifying explicit threads, we need to implement the ExecutorLocator. This is achieved by looking at the parameter types of the function. As all state (objects) are now injected from the DependencyContext, we can determine the execution characteristics from the parameters. In other words, if the logic depends on a Database connection, it is likely to be making blocking calls. Therefore, we can use the parameter types to implement the ExecutorLocator:

  class ManagedFunction(
    val procedure: Array[Any] => Unit,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory],
    val executorConfiguration: Map[Class[_], Executor]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }
    def executorLocator(): Executor = {
      var executor: Executor = (logic, arguments) => logic(arguments) // default executor is synchronous (implicit thread)
      extractParameterTypes(procedure).map((parameterType) => executorConfiguration.get(parameterType) match {
        case Some(e) => { executor = e; e } // matched so override
        case None => executor
      })
      executor
    }

    def run(parameterFromContinuation: Any, context: DependencyContext): Unit = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(procedure).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      executorLocator()((arguments) => {
        try {
          procedure(arguments)
        } catch {
          case ex: Throwable => cont(ex.getClass(), context)(ex)
        }
      }, arguments)
    }
  }

This enables choice of Executor to be managed within configuration. This separates it from concerns of composition and state management.

And now you are up to speed with the general concepts behind the first class procedure.

Though, do note that the actual implementation uses a lot more memoization, as function signatures are static allowing the reflection to be done at compile/startup time.

Furthermore, the overall effect is that higher order functions do not need to provide all arguments to call the function (procedure). The control is inverted so configuration and the procedure itself defines what is injected into it. Higher order composition need only use the continuations to invoke the first class procedures.

Plus, I find it lifts a constraint in functional programming of having to fit all results through the small key hole of the function's return type. The return type of the function needs to provide success and error details, and is coupled to have to pass this through the chain of composed functions.  First class procedures, via variables, decouples this so any upstream procedure can output the value for any downstream procedure to consume.  Furthermore, checked exceptions continue error flows to remove this from function return types (output variable types).

There are also other concepts built on first class procedures, such as:
  • process, thread, function scoped dependency contexts for concurrency/parallel processing
  • higher order compositions (Sections)
  • thread affinity and other thread management (via Executive)
  • providing context for state, such as transactions (Governance)
  • additional ManagedFunctions inserted into flows similar to aspects (Administration)
However, this article is focused on the first class procedure and is already long enough.

So, in conclusion, the first class procedure is applying inversion of control to the function to inject in state, continuations and thread (via Executor). This means the first class procedure no longer requires composition via return values of the function. This makes it significantly easier to weave impure/pure functionality together. Furthermore, it allows execution strategies for the application to be configured in at deployment time.

And to see all this in action, please see the first article.

For more information, see this paper OfficeFloor: using office patterns to improve software design (free download here).

Saturday, 27 April 2019

OO Functional Imperative Reactive weaved together

This is first of a two part article to discuss how different paradigms in programming can be weaved together seamlessly via the "First Class Procedure", a term I'm using to best describe the concept. 

The working code in this article demonstrates how you can seamlessly weave together the following to service a request:
  1. Validate a request (on socket event loop thread).
  2. Start a transaction and register the request in the database.  This will be on another thread to avoid halting the socket event loop thread.
  3. Make reactive calls to pull in data from other services.
  4. Run some functional code to work out the standard deviation on service times.
  5. Undertake alternate flows to handle special cases (including handling exceptions).  Then if no exceptions causing rollback, store results in the database.  This again is on a different thread to not tie up the reactive event loop thread.
  6. Send the response after committing the transaction
This allows you to use the programming paradigm best suited to the various problems at hand.  Note the request servicing in the demonstration is arbitrary.  The focus is on showing how the various programming paradigms can be weaved together.

Now to write a complete description of how this works with the first class procedure is beyond a single article.  There are many patterns used together to enable the composition through first class procedures.  Therefore, I'm going to provide an introduction to first class procedures in two parts:
  • This article to demonstrate with working code how flexible and easy composition is with first class procedures
  • Next article to provide an explanation more closely aligned to the theory on how the first class procedure has evolved to its current understanding
We'll start with some simple examples and then get to the more interesting above case of weaving multiple programming paradigms together.

First class procedure

Simple event loop

The following first class procedure services a REST request.  This will be run on the HTTP socket event loop thread.

public void service(ObjectResponse<ServicedThreadResponse> response) {
    response.send(new ServicedThreadResponse(Thread.currentThread().getName(), "Event", System.currentTimeMillis()));
}

Simple thread-per-request

The following first class procedure services a REST request by pulling a value from the database and sending it in the response.  This will be run by a separate thread pool.

public void service(ServicedThreadRequest request, ThreadPerRequestRepository repository, ObjectResponse<ServicedThreadResponse> response) {
    int identifier = request.getIdentifier() % 10;
    ThreadPerRequest entity = repository.findById(identifier).get();
    response.send(new ServicedThreadResponse(Thread.currentThread().getName(), entity.getName(), System.currentTimeMillis()));
}


The distinction of thread to use will be discussed later.  However, for now notice that a Spring Repository is used by only the thread-per-request first class procedure.

First Class Procedures weaved together

Ok, the above is little boring.  We've seen this in web application servers before.  Show us something interesting!

To show something more interesting we are going to weave first class procedures together to achieve the example detailed at the start of this article.

Each step in the request servicing is implemented as a first class procedure.  We'll address each first class procedure in the order specified.

Validate request (on socket event loop)

This is simple validation that the request is correct.  As it is straight forward logic, we use the thread of the socket event loop.  This way we don't have to pay overheads of a thread context switch and threading overheads to reject invalid requests.  The code is as follows:

const HttpException = Java.type("net.officefloor.server.http.HttpException");
const Integer = Java.type("java.lang.Integer")

function validate(identifier, requestIdentifier) {
    if (Number(identifier) <= 0) {
        throw new HttpException(422, "Invalid identifier");
    }
    requestIdentifier.set(Integer.valueOf(identifier))
}
validate.officefloor = [ 
    { httpPathParameter: "identifier" },
    { out: Integer },
    { next : "valid" }
];

Note that the validation is written in JavaScript.  This is so that the client side JavaScript validation rules can be re-used to validate requests to ensure consistency between client and server.

The officefloor attribute added to the function provides meta-data.  This is necessary, as JavaScript does not provide the strongly typed information required of first class procedures.

Imperative to register request in database

After validation, the request identifier is registered in the database.  This also creates a unique number for the request based on an IDENTITY column in the database.

@Next("registered")
public static void registerRequest(@Val int requestIdentifier, WeavedRequestRepository repository, Out<WeavedRequest> weavedRequest) {
    WeavedRequest entity = new WeavedRequest(requestIdentifier);
    repository.save(entity);
    weavedRequest.set(entity);
}

Reactive

The next is some Reactive code to concurrently call the two REST end points detailed at the start of this article (simple event loop and simple thread-per-request).   Because we are using Reactive we can call them concurrently to improve performance.

Note that while waiting on the responses, the flow is effectively idle with threads servicing other functionality.  This is asynchronous handling so that threads are not tied up waiting.   Once both sets of results come back, they notify the respective asynchronous flow to continue processing.

private final static String URL = "http://localhost:7878/{path}";

@Next("useData")
public static void retrieveData(WebClient client,
        AsynchronousFlow eventLoopFlow, @EventLoopResponse Out<ServicedThreadResponse> eventLoopResponse,
        @Val WeavedRequest request, AsynchronousFlow threadPerRequestFlow, @ThreadPerRequestResponse Out<ServicedThreadResponse> threadPerRequestResponse) {

    Flux.range(1, 10)
        .map((index) -> client.get().uri(URL, "event-loop").retrieve().bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response).collectList().subscribe((responses) -> eventLoopFlow.complete(
            () -> eventLoopResponse.set(responses.stream().toArray(ServicedThreadResponse[]::new))));

    Flux.range(1, 10)
        .map((index) -> client.post().uri(URL, "thread-per-request").contentType(MediaType.APPLICATION_JSON)
            .syncBody(new ServicedThreadRequest(request.getId())).retrieve()
            .bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response).collectList().subscribe((responses) -> threadPerRequestFlow.complete(
            () -> threadPerRequestResponse.set(responses.stream().toArray(ServicedThreadResponse[]::new))));
}

By now you may be noticing the Out/@Val combinations.  This is how values can be passed from one first class procedure to another first class procedure.  Note that if type for different values is the same, a qualifier can be used to distinguish them.  The rest of the arguments are provided from dependency injection (in this case Spring).

Functional

Next the reactive responses are provided to Scala functional code to determine the standard deviation of service times.

def mean(timestamps: Iterable[Long]): Double = timestamps.sum.toDouble / timestamps.size

def variance(timestamps: Iterable[Long]): Double = {
    val avg = mean(timestamps)
    timestamps.map(timestamp => math.pow(timestamp.toDouble - avg, 2)).sum / timestamps.size
}

def stdDev(timestamps: Iterable[Long]): Double = math.sqrt(variance(timestamps))

@Next("use")
def standardDeviation(@EventLoopResponse @Val eventLoopResponses: Array[ServicedThreadResponse], @ThreadPerRequestResponse @Val threadPerRequestResponses: Array[ServicedThreadResponse]): Double =
    stdDev((eventLoopResponses ++ threadPerRequestResponses).map(response => response.getTimestamp))

Note that a library could be used to reduce this code.  However, we've done this to demonstrate how functional code can be integrated into first class procedures.

Flow control

The next first class procedure triggers a flow to handle special cases.  Should there be no issues with the special cases, then it stores the standard deviation in the database.

@FlowInterface
public static interface Flows {
    void handleSpecialCases(FlowSuccessful callback);
    void stored();
}

public static void store(@Parameter double standardDeviation, Flows flows, @Val WeavedRequest request, WeavedRequestRepository repository, Out<RequestStandardDeviation> stDevOut) {
    flows.handleSpecialCases(() -> {
        request.setRequestStandardDeviation(new RequestStandardDeviation(standardDeviation, request));
        repository.save(request);
        stDevOut.set(request.getRequestStandardDeviation());
        flows.stored();
    });
}

The handling of the special cases is by the following first class procedure.

public static void handleSpecialCase(@Val WeavedRequest request) throws WeavedRollbackException, WeavedCommitException {
    switch (request.getRequestIdentifier()) {
        case 3:
            throw new WeavedRollbackException(request);
        case 4:
            throw new WeavedCommitException(request);
    }
}

Touch of exception handling

The two exception handling first class procedures are as follows.

public static void handle(@Parameter WeavedRollbackException exception, ObjectResponse<WeavedErrorResponse> response) {
    WeavedRequest request = exception.getWeavedRequest();
    response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}

public static void handle(@Parameter WeavedCommitException exception, WeavedRequestRepository repository, ObjectResponse<WeavedErrorResponse> response) {
    WeavedRequest request = exception.getWeavedRequest();
    request.setWeavedError(new WeavedError("Request Identifier (" + request.getRequestIdentifier() + ") is special case", request));
    repository.save(request);
    response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}

The second handler works within the transaction, so includes further data stored in the database.

Note that due to first class procedure composition not requiring the caller to catch exceptions, checked exceptions are embraced.  We consider checked exceptions very useful information in flow composition.  However, the distinction is that it should not be the caller's concern but rather the flow's concern.  To me this is a big difference and stops the catch and log exception handling problem.   Exception handling is now a separate concern that can be coded in afterwards.

Successful response

On successful storage of the request details in the database, the following first class procedure sends the response.

public void send(@Val WeavedRequest request, @Val RequestStandardDeviation standardDeviation, @EventLoopResponse @Val ServicedThreadResponse[] eventLoopResponse,
        @ThreadPerRequestResponse @Val ServicedThreadResponse[] threadPerRequestResponse, ObjectResponse<WeavedResponse> response) {
    response.send(new WeavedResponse(request.getRequestIdentifier(), request.getId(), eventLoopResponse, threadPerRequestResponse, standardDeviation.getStandardDeviation()));
}

Kotlin for some OO

Oh, and just for a little bit more polyglot fun, the OO objects used to represent the JSON request/responses are the following.

@HttpObject
data class ServicedThreadRequest(val identifier: Int)

data class ServicedThreadResponse(val threadName: String, val lookupName: String, val timestamp: Long)

data class WeavedErrorResponse(val requestIdentifier: Int, val requestNumber: Int)

data class WeavedResponse(val requestIdentifier: Int
        , val requestNumber: Int
        , val eventLoopResponses: Array
        , val threadPerRequestResponses: Array
        , val standardDeviation: Double)

Proving it works

The following is a test to confirm the flow of first class procedures services the request.

public static final SpringRule spring = new SpringRule();

public static final OfficeFloorRule officeFloor = new OfficeFloorRule();

@ClassRule
public static final RuleChain ordered = RuleChain.outerRule(spring).around(officeFloor);

@Rule
public final HttpClientRule client = new HttpClientRule();

private static final ObjectMapper mapper = new ObjectMapper();
static {
    mapper.registerModule(new KotlinModule());
}

@Test
public void confirmWeavedTogether() throws Exception {
    HttpResponse response = this.client.execute(new HttpPost(this.client.url("/weave/1")));
    assertEquals("Should be successful", 200, response.getStatusLine().getStatusCode());
    WeavedResponse body = mapper.readValue(EntityUtils.toString(response.getEntity()), WeavedResponse.class);
    WeavedRequest entity = spring.getBean(WeavedRequestRepository.class).findById(body.getRequestNumber()).get();
    assertNotNull("Should have standard deviation stored", entity.getRequestStandardDeviation());
}

Weaving together

The following diagram is the configuration to weave the above first class procedures together.


This is the only configuration/code necessary to compose the first class procedures together.  Notice the names represent the first class procedure names and their respective meta-data.

What this means, is check the port on the all the calls and tests.  Yes, everything you see above is running off the one port.  Yep, you don't have to choose between a framework that provides only thread-per-request or single threaded event loops.  This is because of the execution strategy provided by Thread Injection of first class procedures.

Thread Injection

The threading configuration is actually the following:

<teams>
    <team source="net.officefloor.frame.impl.spi.team.ExecutorCachedTeamSource" type="org.springframework.data.repository.CrudRepository" />
</teams>

Here we flag all procedures requiring a Spring Repository to be executed by a thread pool.  Remember I said keep note of use of Spring Repository.  Well the above configuration has any first class procedure requiring a Spring Repository executed by the configured thread pool.  Note that thread pools are named teams, due to the modeling origins of first class procedures coming from Offices.

Therefore, looking at the flow again, the thread execution is as follows:
  1. Validate uses the thread of the socket listener event loop
  2. Register request uses a Spring Repository, so execution is swapped to a thread from the configured thread pool
  3. This thread carries onto trigger the asynchronous reactive calls
  4. The reactive event loop thread then invokes the callbacks.  As the Scala code is quick to execute, the reactive event loop thread carries on to execute the Scala pure function.  Here it is deemed that a thread context switch is too much overhead, and it is more efficient to just invoke the highly optimised Scala pure function.  However, if we want to separate the Scala function to different thread pool, we can configure in a different thread pool (typically via marker dependency on the first class procedure).
  5. The remaining imperative code has a switch back to a thread from the configured thread pool, as depends on Spring repository.  Furthermore, the thread locals between the threads are propagated to each used thread, so the Spring Repository transaction is not lost (i.e. transaction is active for all first class procedures within the transaction bounds).
  6. Response is then sent.
Now all the above is configurable via Thread Injection.  If we have, for example, more than one synchronous data store, we can create a thread pool to interact with each data store to avoid one slow data store tying up all threads of the application.

This also means you can configure different threading for different environments without having to change any code.

Disclaimer

In a real world applications, I would try to avoid so many of the above programming languages together.  I'd try to streamline them to just a couple to avoid too many skill sets involved driving up maintenance costs of your application (plus reduces problems for mixed compiling).  This is only a demonstration of how OO, Functional, Imperative and Reactive code can all be weaved together with first class procedures.  Furthermore, it demonstrates how you can write concrete solutions before abstracting.

Also, as you can see we've had to cover a lot of breadth in each programming paradigm.  If the code is not a good representation of the paradigm, we're very happy to take feedback on improvements from those more acquainted with a particular paradigm.

And if we've missed an important paradigm, please let me know so we can consider including it.  When it comes to coding we appreciate diversity to give developers choice.  We're trying to tear down fences between the paradigms to have one big happy coding family.

Summary

We've demonstrated how the first class procedure can weave together polyglot code written in different paradigms to service a request.  The code outlined above in the article is all the code required for the application.  There is no further weaving code required.

Furthermore, to avoid the problems of it only works on my machine (in this article), the code for the above is available here.  See the readme on how to run it.

For more understanding of what's going on, see the tutorials, my other articles and in particular my next article.

Monday, 8 April 2019

IT "Floor Plan"

OfficeFloor's graphical configuration is inspired from the construction industry's floor plan.

Now, we did consider auto-wiring flows together based on parameter types between the first class procedures.  This is still possible if there is interest.  However, we went graphical for another reason that has less to do with the technical problems of the system but more in helping build and maintain the correct system.

This inspiration came from having been around house construction with my father and later when building my own house. Building a house is definitely not simply putting a slab down and walls up with a roof, possibly with a door to get in and out.   There is a lot of engineering going into houses to ensure the foundations are correct, the house is not going to fall over and even on the more advanced side of ensuring it fits with the home owner's style of living.

Now, I certainly claim no expertise in the house construction industry but they a have tool that made understanding and reasoning about the construction easier:

Floor Plan

The floor plan to me was a powerful communication tool within the home construction industry.   Now I had some background in construction, but engineering concerns such as house footings, load bearing walls, etc are beyond me.  However, around the floor plan we could discuss how the house was to be built.

Reflecting on my experience of building a house, I could see this discussion working even for those that had not a background around construction.  One does not need to be too technical to follow the room layouts and be able to visualise how the house is going to turn out.  Then when asking for something difficult (such as moving a load bearing wall), it is reasonably easy for the construction company to explain using the floor plan why that would incur extra costs.  Plus this same conversation can be had when renovating the house - something in IT we continually do by enhancing systems.

Now in this relationship, I saw myself as the business is to information technology.   I knew what I wanted out of the house (IT system) and the floor plan helped the communication with the (IT) construction company.  I saw the home construction industry having similar problems of non-technical customers wanting something technical, and solved this with the floor plan.

So it had me ask the question:

What do we produce in the information technology space to be the "floor plan" of IT systems?

I believe we've tried many approaches, but I find they get lost in the complex, intangible details of IT systems. We've tried model driven designs specific to certain domains with some success, however these become difficult to maintain and enhance.  We've used many modelling techniques such as UML and lots of others in the business analysis space, but these are divorced of the code and quickly stale (especially with the Agile approach of building a room at a time).  We can even incorporate functional programming as means to demonstrate how code is composed together.   However, I as a customer would not be expected to understand engineering mathematics of construction, so why should I be subjected to computing maths.

The best I've seen is story boards, but this does not convey the complexity of the internals of systems.  All too often I hear, it's only a button, why's it cost so much?   Now if we could bring out the "floor plan" of the internals of the IT system we could have a conversation around what aspects of the system is needing changes.  For example, that's load bearing impacting these rooms (components) of the house (system), so cost to change is going to be expensive.

Now when I refer to the internals of the system, I'm also not meaning front-end applications that only use the server to store data in databases.  For many of these applications, most things are quite tangible in screens to the user.  What I'm referring to is more complex server side components servicing the requests that are quite intangible to the user.  In other words, the details necessary to explain why adding this new button is 10 times more costly than the button added last week.

Furthermore, I tend to find most approaches are focused on one-way communication of capturing requirements to convey to technical individuals to build.  There is little in going the other way.  In other words, in providing understandable information back to the users on why some changes are harder than others.

This is even evident in Business Process Model and Notation (BPMN) in it becoming an executable specification.   This has been great to model business processes for technical people to implement systems.  In doing so, I'm finding it starting to incorporate the technical details of UML.  This starts becoming very technical and makes it difficult for non-technical individuals to pick up.

So I reflect on how I convey this information to users, and well the whiteboard with boxes and lines is something I keep drawing.  I tend to find drawing boxes representing things with lines indicating flow provides enough visualisation to have a discussion in 5 minutes about why something is easier or harder to change.

So is there a way to extract the boxes and lines out of the system for this conversation with the user?

Now to get technical for a moment, OfficeFloor allows composition through continuations.  For those not familiar with continuations you can can skip the rest of this paragraph and just know that continuations allow connecting code flow by drawing lines between procedures.  For those wanting a little more detail, the state passed by the continuation in OfficeFloor is decoupled from the continuation (state is Dependency Injected into the procedure from Dependency Contexts).   This allows the first class procedures of OfficeFloor to be composed together separate of state (Dependency Injection) and execution strategies (Thread Injection).  Therefore, OfficeFloor composition is merely configuring the output continuations of a procedure to the appropriate handling procedure.  This can be represented as a graph of nodes (procedures) with connectors (continuations) and directed edges (mapping configuration of continuation to procedure) - hence prime for graphical configuration.

So armed with first class procedures and means to draw lines between them to control program flow, we have the opportunity to represent the application flow composition as boxes and lines.

Therefore, when asked why graphical configuration for OfficeFloor.  It just made sense to use graphical configuration for ease of communicating the system's "floor plan".   This makes it easy for users to understand the system.  It also makes it very easy for new developers to pick up the system.  This is especially useful when having to work between many micro-services.

Therefore, OfficeFloor uses it's Continuation Injection graphical configuration to be the "floor plan" to how the IT system is constructed to make communication between user and construction company easier.