Sunday, 28 April 2019

Function IoC for First Class Procedure

This is the second article of two to introduce the term I'm suggesting of "first class procedure". The first article provided a working example of the first class procedure to see them in action. This article delves into the detail and some theory on how the first class procedure has evolved.

The evolution of the "first class procedure" starts with looking at the function. The function takes in parameters to then produce the result:

  type Function = Array[Any] => Any

  // Example of first class function
  def exampleFunction(paramOne: Int, paramTwo: Double): String
  val firstClassFunction: Function = (parameters) => exampleFunction(parameters(0).asInstanceOf[Int], parameters(1).asInstanceOf[Double])

However, before being able to obtain any results, the function also requires a thread to be run. This is what I refer to as the "implicit thread". The function does not define details regarding the thread to use, so defaults to the invoking thread of the function (the implicit thread).

Ideally, we should enhance the function signature to indicate the thread more explicitly so we can allow more control in how the composed functions are executed:

  type Executor = (Function, Array[Any]) => Any

  def invoke(executor: Executor, function: Function, parameters: Array[Any]) =
    executor(function, parameters)

Now the thread to execute the function is made explicit. Note that the executor is means to invoke with the implicit thread or a separate explicit thread via a thread pool.

An argument might be that threading is hard and should be left to compilers/frameworks/etc. However, I'm believing that I still want some control over the execution profile of the application. A reason for this would be the following example.

I'm running a multi-user application on a large server with multiple processors that does various blocking I/O and expensive CPU computations to service a request. I really want to optimise the CPU computations to have them have affinity to a core to avoid cache misses and thread context switching overheads. Plus I want to isolate the blocking I/O to a separate thread pool so the CPU computations do not block causing idle CPUs. Given the load of I/O I might want to isolate one core to do all I/O leaving the remaining cores free for the CPU computations. Conversely, I might find the I/O is minimal and can time slice with the CPU computations so I can get an extra core for the CPU computations to gain increased throughput. Ideally, I would like means to tweak this for the application.

Then requirements change and I now want to use the same application to service a single user that is running the application on a single core (e.g. a cheap portable embedded system). In this circumstance, I'm happy for the CPU computations to block while the I/O is underway. Basically, I just want one implicit thread to run all the functions.

In both the above, it is the same application logic but just different execution profiles based on the environment the application is running within.

So taking our above explicit function signature we now require to have the higher order functions provide appropriate threading information to invoke each composed function:

  def higher(executorIO: Executor, executorCpuIntensive: Executor, parameters: Array[Any]) =
    executorIO(functionIO, Array(executorCpuIntensive(functionCpuIntensive, parameters)) ++ parameters)

This now requires the higher order function to determine the thread for each contained function. This can blow out the higher order function's signature. Therefore, let's for now create a function that can determine the executor for a function:

  type ExecutorLocator = Function => Executor

So this now changes the higher order function to be:

  def higher(executorLocator: ExecutorLocator, parameters: Array[Any]) =
    executorLocator(functionIO)(functionIO, Array(executorLocator(functionCpuIntensive)(functionCpuIntensive, parameters)) ++ parameters)

So we've stopped the blow out of the higher order function signature, but it starts to highlight a problem of passing results between the composed functions. Well not so much in passing results, but in which parts of the logic to execute with which thread.

To get the result from functionA to pass to functionB, we have two approaches:
  • As functionA completes, the system returns the result to the higher order function's thread that then passes it onto functionB
  • As functionA completes, it carries onto execute functionB
The difference is very subtle, however significant for thread context switching overheads.

Note: in my naive understanding of functional programming, I believe the first approach can be considered turning each function into an Actor, while the second approach is a Continuation (continuation passing style).

Anyway, before misunderstanding too much of the functional programming literature, the problem with the first approach is excessive thread context switching. If the higher order function is a different executor to it's composed functions, it creates two thread context switches for every composed function. The execution would be:
  1. Higher order function executes
  2. Invokes composed function on another thread
  3. Result returned to higher order function that thread context switches back to itself
  4. Invokes next composed function on another thread (possibly the same thread required for step 2)
The thread context switch that happens in step 3 is not really required. Furthermore, the cost of switching the thread would be more than the few operations to pass the result of the first function to the second function.

Note: I refer to thread context switching overheads assuming threads need to be scheduled in to handle each part. However, even if threads were running continuously on separate cores, there is overheads in having to get the messages between the threads.

There is also a second problem of exceptions. Yes, I understand exceptions are not very functional, however I'll come to this later in regards to composition.

So if we take approach 2 of continuations, then the execution would be as follows:
  1. Higher order function executes
  2. Invokes composed functionA on another thread (passing in the next composed functionB)
  3. On result, the composed functionA continues with the next composed functionB
We have eliminated the extra context switch in the first approach by letting the composed functions continue on with each other.

Now this is not overly special. This is just Continuation Passing Style with the ability to execute the continuation with the implicit thread or delegating to a different thread. In doing so, we try to use the implicit thread as much as possible to reduce threading overheads. However when functions have different execution characteristics (such as blocking I/O, expensive CPU computations) we swap threads to enable appropriate execution of the functions to keep the overall application performant.

Another way of thinking about this threading is to consider the threads running on separate cores. The first approach is very much synchronous communication. Composed function is invoked and the higher order function effectively waits until the result is available. Continuations, on the other hand, are more akin to asynchronous communication. Higher order function triggers the composed function and then is free to continue other functions. The composed function will itself continue with the next composed function.

But continuations do not come easily.

Continuation passing has implications for the function signature, as we must pass the next continuation into all functions. This now has our functions looking like this:

  type Continuation = (Any) => Unit

  def function(executorLocator: ExecutorLocator, parameters: Array[Any], continuation: Continuation)

Now might I dare say there is more than one outcome to a function - well to me anyway. Yes, we could return a data type defining both success and error. Case statements then handle these various outcomes. However, for each new error type we're having to add new case statement handling. This reminds me of reflection problems of having to interrogate for each error type. Personally, I like to handle these outcomes separately.

Rather than combining the result and error into the one next continuation, we can provide a continuation for each. In my understanding, this is not too different to try/catch blocks (except that we can now execute the catch block with the implicit thread or different thread). In other words, we provide multiple continuations:

  def function(executorLocator: ExecutorLocator, parameters: Array[Any],
      successfulContinuation: Continuation,
      errorOneContinuation: Continuation,
      errorTwoContinuation: Continuation)

But why stop there. We can also have different paths through the function for say if statements. If condition is true follow the first continuation, else follow the second continuation.

  def function(executorLocator: ExecutorLocator, parameters: Array[Any],
      trueContinuation: Continuation,
      falseContinuation: Continuation,
      errorOneContinuation: Continuation,
      errorTwoContinuation: Continuation)

This is starting to blow out the function signature again, especially when composing into higher order functions that have to handle many exceptions. This is also the reason I tend to find many frameworks are moving away from checked exceptions. However, with first class procedures, we very much like checked exceptions (though this is probably not as you typically know them). But we'll come to this soon.

So to avoid the signature blow out, let's do what we did with the choice of executor and wrap the continuation decision into a function. For now, let's assume we can have some key to aid the function to determine the appropriate continuation. This function would look as follows:

  type ContinuationLocator = (Function, Any) => Continuation

This then turns all our functions into the following:

  def function(
      executorLocator: ExecutorLocator,
      parameters: Array[Any],
      continuationLocator: ContinuationLocator)

So now we have this very flexible execution model than minimises the thread context switching.

However, how do we implement the executorLocator and continuationLocator functions?

Well the naming of the functions is deliberate, as they follow the ServiceLocator pattern. Given a key provide back the dependency. However, in this case, it is not an object but rather an executor for thread choice and continuation for invoking the next function.

Yay, we can now go create key/value configuration for every function in the system. Well, maybe not.

This granularity of configuration ends up being a nightmare. I can certainly say my first versions of implementing first class procedures showed this very much to be the case. Plus given we have an assumed key to identify the continuation, how do we know we have every continuation configured for a complete system? In other words, how can we make this compile safe?

To solve this problem, we do what we typically always do in software, add more indirection.

Huh, more dynamic indirection to create a type safe compiled solution? Well, yes.

To explain how indirection has helped, let's start with the Continuation Injection.

Firstly, we are going to give the function state (or possibly better described as meta-data). Arguably this is potentially turning the function into an object, but I'm going to avoid trying to relate things to the literature right now and focus on how the first class procedure has evolved.

So we now have a wrapping object:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation])

So we've associated the continuations to the function, but this really is only a dynamic map based on key. Therefore, to be compile safe, we need to give the key some meaning the compiler/framework can understand.

Well to do this, let's go back to the exploded function signature:

  def function(parameters: Array[Any],
      trueContinuation: Continuation,
      falseContinuation: Continuation)

Given the parameters are always ordered, we can use the index of the continuation as the key. This has the ManagedFunction look as follows:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation]) {

    def cont(key: Any): Continuation = continuations.get(key) match { case Some(cont) => cont }

    def run(parameters: Array[Any]) = logic(parameters ++ Array(cont(1), cont(2)))
  }

Now, I mentioned first class procedures actually like using checked exceptions. The reason is that the checked exception is stated on the signature. This would look like:

  function(Object[] paramaeters) throws ErrorOne, ErrorTwo;

Checked exceptions are not ordered, but their types are unique. Therefore, we can use the checked exception's type as the key. This now turns the ManagedFunction into the following:

  class ManagedFunction(
    val logic: Function,
    val continuations: Map[Any, Continuation]) {

    def cont(key: Any): Continuation = continuations.get(key) match { case Some(cont) => cont }

    def run(parameters: Array[Any]) = {
      try {
        logic(parameters ++ Array(cont(1), cont(2)))
      } catch {
        case ex: Throwable => cont(ex.getClass())(ex)
      }
    }
  }

Now, to make compile/framework safe, we provide a function that produces the required list of keys for the logic function:

  def extractContinuations(logic: Function): Array[Any] = {
    var parameterIndex = 0
    extractParameterTypes(logic)
      .filter((parameterType) => classOf[Continuation].isAssignableFrom(parameterType)).map((paramContinuation) => { parameterIndex += 1; parameterIndex }) ++
      extractExceptionTypes(logic)
  }

The compiler/framework will then confirm that configuration mappings are provided for each key. This allows validating that all continuations are configured for the function to operate. Furthermore, doing this across all ManagedFunction instances within the application, we can confirm a complete configured application. We now have compile/framework startup safe validation that all continuations are configured.

However, now we are having problems of passing state between the functions contained within the ManagedFunction. As only one argument can be passed with the continuation, how can a function have more than one parameter?

Ideally, we want to have each ManagedFunction have the following run method:

  abstract class ManagedFunction {
    def run(parameter: Any) // not, parameters: Array[Any]
  }

So, how can we provide the additional parameters for the function?

Before answering this, we need to consider how the first continuation is triggered to start the chain of ManagedFunction executions. As the application is now being realised as a mapping of continuation to ManagedFunctions, we need means to trigger a continuation from outside a ManagedFunction.

Well, why can't we give objects continuations?

We can create a ManagedObject that contains continuations:

  class ManagedObject(
    @Inject val continuationOne: Continuation,
    @Inject val continuationTwo: Continuation) {
    // ... object methods
  }

This now allows objects to trigger logic. Why is this useful? Well, for example, we can have a HTTP socket listener object that receives a request and services the request by invoking a continuation. Further ManagedFunction instances will then use details of the request, to route it via continuations to appropriate handling ManagedFunction instances to then service the request.

The HTTP example actually points us to a design pattern already solving the issue of multiple parameters for a function. A typical thread-per-request web server has a request, session and application context. Now let's ignore session and application contexts as they are not concurrency safe. It is the request context pattern that helps us.

A request context allows passing objects between controllers and view rendering components. What are the controllers and view renders? They are snippets of logic that take in a request scope to access/mutate the request scope to capture enough state to provide a response (with possible side effects of saving state in databases, logging details, etc).

These snippets of logic fit well into the ManagedFunction, with request scopes created for each continuation tree invoked from a ManagedObject. ManagedObjects are created in the application that are hooked into the network of continuations to ManagedFunctions. When the ManagedObject receives an event (HTTP request, queue message, etc), it does two things:
  1. Starts a new request scope 
  2. Triggers the first continuation with the scope that carries through for all further continuations triggered
  3. ManagedFunctions can now grab their required parameters from the scope
This can be taken further to include dependency injection. Rather than the ManagedFunction being responsible for managing the request scope, the request scope objects are provided through dependency injection. This is the following dependency context for the ManagedFunction:

  type ServiceLocator = String => Any

  class DependencyContext(val serviceLocator: ServiceLocator) {
    val objects = scala.collection.mutable.Map[String, Any]()

    def getObject(name: String) = {
      objects.get(name) match {
        case Some(obj) => obj
        case None => {
          val obj = serviceLocator(name)
          objects(name) = obj
          obj
        }
      }
    }
  }

Side benefit of providing Dependency Context is that we can re-use existing Dependency Injection frameworks for managing objects. For example, the ServiceLocator can be a Spring BeanFactory. Furthermore, we can also dependency inject ManagedObject implementations to allow objects to maintain state, but also trigger continuations in the background (e.g. background polling for JWT key changes in providing JWT authentication state).

The ManagedFunction now becomes:

  type ContinuationFactory = DependencyContext => Continuation

  class ManagedFunction(
    val logic: Function,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext) = {
      try {
        logic(Array(parameterFromContinuation, obj(1, context), obj(2, context), cont(1, context), cont(2, context)))
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

To populate the scope names, we can again use reflection on the logic signature. However, rather than having to provide explicit configuration, we can use auto-wire configuration based on parameter type and possible qualifier. This then becomes normal dependency injection for constructors, except that we are injecting into the logic function.

We can now have database connections, HTTP clients, etc provided to the logic, however it does not answer the problem of passing state across the continuation boundaries.

To solve passing state, we just create a state object. This object acts much like a variable. It's value can be set and retrieved. However, this introduces mutability and timing concerns regarding the flow of continuations. It is unclear on whether a ManagedFunction is only safely accessing the variable's value, or is unsafely mutating the variable. Therefore, for variables we provide additional support within the ManagedFunction to identify the use of the variable.

For variable state objects, we allow the ManagedFunction to use various interfaces to identify the nature of using the variable. This allows the following interfaces for a variable state:

  trait Out[T] { def set(value: T) }
  trait In[T] { def get(): T }
  trait Var[T] extends Out[T] with In[T]

ManagedFunctions can then use the appropriate interface to identify their intention on the state of the variable.

Note that it is now possible to traverse the graph from ManagedObject continuations to confirm variable state outputs of ManagedFunctions are always upstream of respective inputs. This creates an ability for compile safe state generation. Furthermore, if all objects loaded to scope variables are immutable it allows reasoning for identifying the ManagedFunction producing incorrect state (just look for the ManagedFunctions requiring the Out of the variable).

What this now also provides is multiple inputs and multiple outputs. Composition is no longer derived by output of one function being passed as input to the next function. State is maintained in scope with ManagedFunctions pulling/pushing state as appropriate to the scope. Continuations are now separated from having to be concerned with all the state needed to invoke a ManagedFunction.

Now, the above implementation assumes some order of parameters followed by continuations in invoking the logic. As this information is reflectively retrieved from the logic function, the order is not necessary. We can then have a ManagedFunction look as follows:

  class ManagedFunction(
    val logic: Function,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext) = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(logic).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      try {
        logic(arguments)
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

Notice that the return value from the function (logic) is no longer necessary. Hence, why we're considering this "first class procedures".

This can then be represented as follows:

  class ManagedFunction(
    val procedure: Array[Any] => Unit,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }

    def run(parameterFromContinuation: Any, context: DependencyContext): Unit = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(procedure).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      try {
        procedure(arguments)
      } catch {
        case ex: Throwable => cont(ex.getClass(), context)(ex)
      }
    }
  }

So we've provided composition of logic with state management, but we've not solved the original implicit thread problem that sparked this.

To solve specifying explicit threads, we need to implement the ExecutorLocator. This is achieved by looking at the parameter types of the function. As all state (objects) are now injected from the DependencyContext, we can determine the execution characteristics from the parameters. In other words, if the logic depends on a Database connection, it is likely to be making blocking calls. Therefore, we can use the parameter types to implement the ExecutorLocator:

  class ManagedFunction(
    val procedure: Array[Any] => Unit,
    val parameterScopeNames: List[String],
    val continuations: Map[Any, ContinuationFactory],
    val executorConfiguration: Map[Class[_], Executor]) {

    def obj(index: Int, context: DependencyContext): Any = context.getObject(parameterScopeNames(index))
    def cont(key: Any, context: DependencyContext): Continuation = continuations.get(key) match { case Some(factory) => factory(context) }
    def executorLocator(): Executor = {
      var executor: Executor = (logic, arguments) => logic(arguments) // default executor is synchronous (implicit thread)
      extractParameterTypes(procedure).map((parameterType) => executorConfiguration.get(parameterType) match {
        case Some(e) => { executor = e; e } // matched so override
        case None => executor
      })
      executor
    }

    def run(parameterFromContinuation: Any, context: DependencyContext): Unit = {
      var continuationIndex = 0
      var objectIndex = 0
      val arguments = extractParameterTypes(procedure).map(_ match {
        case p if p.isAnnotationPresent(classOf[Parameter]) => parameterFromContinuation
        case c if classOf[Continuation].isAssignableFrom(c) => cont({ continuationIndex += 1; continuationIndex }, context)
        case _ => obj({ objectIndex += 1; objectIndex }, context)
      })
      executorLocator()((arguments) => {
        try {
          procedure(arguments)
        } catch {
          case ex: Throwable => cont(ex.getClass(), context)(ex)
        }
      }, arguments)
    }
  }

This enables choice of Executor to be managed within configuration. This separates it from concerns of composition and state management.

And now you are up to speed with the general concepts behind the first class procedure.

Though, do note that the actual implementation uses a lot more memoization, as function signatures are static allowing the reflection to be done at compile/startup time.

Furthermore, the overall effect is that higher order functions do not need to provide all arguments to call the function (procedure). The control is inverted so configuration and the procedure itself defines what is injected into it. Higher order composition need only use the continuations to invoke the first class procedures.

Plus, I find it lifts a constraint in functional programming of having to fit all results through the small key hole of the function's return type. The return type of the function needs to provide success and error details, and is coupled to have to pass this through the chain of composed functions.  First class procedures, via variables, decouples this so any upstream procedure can output the value for any downstream procedure to consume.  Furthermore, checked exceptions continue error flows to remove this from function return types (output variable types).

There are also other concepts built on first class procedures, such as:
  • process, thread, function scoped dependency contexts for concurrency/parallel processing
  • higher order compositions (Sections)
  • thread affinity and other thread management (via Executive)
  • providing context for state, such as transactions (Governance)
  • additional ManagedFunctions inserted into flows similar to aspects (Administration)
However, this article is focused on the first class procedure and is already long enough.

So, in conclusion, the first class procedure is applying inversion of control to the function to inject in state, continuations and thread (via Executor). This means the first class procedure no longer requires composition via return values of the function. This makes it significantly easier to weave impure/pure functionality together. Furthermore, it allows execution strategies for the application to be configured in at deployment time.

And to see all this in action, please see the first article.

For more information, see this paper OfficeFloor: using office patterns to improve software design (free download here).

No comments:

Post a Comment