The problem with building message driven systems, is the message provides the context. The nature of message driven architectures is that the handler of the message is disconnected from the producer of the message. The only information connecting them is the message. Hence, the message needs to provide all the necessary context and data for the handler to process the message.
This becomes complicated when downstream message handlers require more context (such as transactions). How do you pass a transaction via a message? Yes, we can use XA transactions to suspend and resume transactions but this creates a lot of overhead to the performance and scale these Reactive frameworks are chasing. Furthermore, Spring Data even considers transactions not a good fit for reactive.
So how can we create a Reactive framework that enables context for such things as transactions?
Dependency Contexts
Well the first problem is co-ordination. Firing off lots of events serviced by different functions in different threads, creates a lot of multi-threading co-ordination. How does one know when to commit the transaction? Or possibly decide to rollback the transaction? Or what if an asynchronous I/O takes too long causing the held transaction to time out. We would be required to monitor all messages and co-ordinate their results to then make a decision on commit / rollback (along with handling further non-application events, such as transaction timeout events). This puts a lot of coding decisions on the developer outside the normal application logic. Gone are the days the developer just threw an exception and the transaction is rolled back.So let's bring back order to the event handling chaos. Load the events to a queue and process events in the order they are added to the queue. As requests on the system should be independent, we create a separate queue for each request to keep requests isolated from each other.
The resulting event loop would look as follows:
public void startRequest(Function function) {
this.functions.add(function);
}
BlockingQueue<Function> functions = new LinkedBlockingQueue<>();
public void eventLoop() throws InterruptedException {
for (;;) {
// Obtain next function to execute
Function function = this.functions.take();
// Determine if new request (no context)
FunctionContext context = (function instanceof ContextualFunction)
? ((ContextualFunction) function).context
: new FunctionContext();
// Allow for multi-threading in executing
synchronized (context) {
function.doFunction(context);
}
// Register the next function to execute
Function nextFunction = context.functions.pollFirst();
if (nextFunction != null) {
this.functions.add(nextFunction);
}
}
}
public interface Function {
void doFunction(FunctionContext context);
}
public class FunctionContext {
private Deque<Function> functions = new LinkedList<>();
public void triggerFunction(Function function) {
this.functions.add(new ContextualFunction(function, this));
}
}
public class ContextualFunction implements Function {
private final Function delegate;
private final FunctionContext context;
public ContextualFunction(Function delegate, FunctionContext context) {
this.delegate = delegate;
this.context = context;
}
@Override
public void doFunction(FunctionContext context) {
this.delegate.doFunction(context);
}
}
Ok, a lot of code but it is now doing two things:
- ordering all functions to be executed sequentially
- providing a context that travels with the execution chain of functions
But how do we make use of the FunctionContext to begin and manage the transaction?
We incorporate the ManagedFunction of Invervsion of Control and use a ServiceLocator to enable storing the dependencies within the context:
public interface ServiceLocator {
// Uses context for transitive dependencies
Object locateService(String serviceName, FunctionContext context);
}
ServiceLocator serviceLocator = ...; // can, for example, wrap Spring BeanFactory
public class FunctionContext {
// ... triggerFunction logic omitted for brevity
private Map<String, Object> contextDependencies = new HashMap<>();
public Object getDependency(String dependencyName) {
// Pull dependency from context cache
Object dependency = this.contextDependencies.get(dependencyName);
if (dependency == null) {
// Not cached, so create new and cache in context
dependency = serviceLocator.locateService(dependencyName, this);
this.contextDependencies.put(dependencyName, dependency);
}
// Return dependency for Function to use
return dependency;
}
}
The Functions are now able to use the getDependency("name") method to retrieve the same objects for a request. As the dependency objects are cached within the context, the various Functions involved in servicing the request are able to retrieve the same object.
Therefore, a transaction can be managed across Functions. The first Function retrieves the Connection and starts the transaction. Further Functions execute, pulling in the same Connection (with transaction established). The final Function in the chain then commits the transaction.
Should there be a failure, the injected Exception handlers of the ManagedFunction can rollback the transaction. Within thread-per-request architectures, exceptions are thrown by the developer's code typically rolling back the transaction. By having the ManagedFunction injected handlers also rollback the transaction, this reproduces the ease of thread-per-request transaction management for exceptions.
Furthermore, the exception handlers would clear the FunctionContext queue of Functions. As the transaction has been rolled back, there is no point further executing the remaining Functions. In typical thread-per-request handling, the remaining part of the try block would be skipped. By clearing the FunctionContext queue this mimics skipping the remaining logic and goes straight to the catch block. In the case of ManagedFunction exception handler, this would be triggering a new function to handle the failure.
But we've just reduced Reactive framework to a single sequence of functions, losing the concurrency it can provide!
Well beyond making it easier to code now that there is some order in function execution, we can introduce concurrency by spawning another sequence of functions. As the FunctionContext ties the sequence of functions together, we just create a new FunctionContext to enable concurrency. The following code shows this:
In actual fact, we have just created Threads running on an event loop. The sequence of functions are executed in order just like imperative statements are executed in order by a thread. So we now have Threads without the overheads of a thread-per-request. The dependencies are bound to the context and subsequently the Thread of execution - making them effectively ThreadLocal. As ThreadLocals are thread safe, we now have safe multi-threading functional code.
As dependencies are effectively ThreadLocal to the sequence of Functions, they can be mutated for the next Function to pick up the change. Yes, immutability better removes developer errors, however this should not be reason to restrict the developer from doing it. This is especially the case if you want to mutate objects via an ORM (e.g. JPA or Spring Repositories) to do updates in the database.
Therefore, a transaction can be managed across Functions. The first Function retrieves the Connection and starts the transaction. Further Functions execute, pulling in the same Connection (with transaction established). The final Function in the chain then commits the transaction.
Should there be a failure, the injected Exception handlers of the ManagedFunction can rollback the transaction. Within thread-per-request architectures, exceptions are thrown by the developer's code typically rolling back the transaction. By having the ManagedFunction injected handlers also rollback the transaction, this reproduces the ease of thread-per-request transaction management for exceptions.
Furthermore, the exception handlers would clear the FunctionContext queue of Functions. As the transaction has been rolled back, there is no point further executing the remaining Functions. In typical thread-per-request handling, the remaining part of the try block would be skipped. By clearing the FunctionContext queue this mimics skipping the remaining logic and goes straight to the catch block. In the case of ManagedFunction exception handler, this would be triggering a new function to handle the failure.
But we've just reduced Reactive framework to a single sequence of functions, losing the concurrency it can provide!
Well beyond making it easier to code now that there is some order in function execution, we can introduce concurrency by spawning another sequence of functions. As the FunctionContext ties the sequence of functions together, we just create a new FunctionContext to enable concurrency. The following code shows this:
public class FunctionContext {
private Deque<Function> functions = new LinkedList<>();
public void triggerFunction(Function function) {
this.functions.add(new ContextualFunction(function, this));
}
public void spawnNewSequence(Function function) {
this.functions.add(function); // event loop will create new FunctionContext
}
// ... getDependency(...) removed for brevity
}
In actual fact, we have just created Threads running on an event loop. The sequence of functions are executed in order just like imperative statements are executed in order by a thread. So we now have Threads without the overheads of a thread-per-request. The dependencies are bound to the context and subsequently the Thread of execution - making them effectively ThreadLocal. As ThreadLocals are thread safe, we now have safe multi-threading functional code.
As dependencies are effectively ThreadLocal to the sequence of Functions, they can be mutated for the next Function to pick up the change. Yes, immutability better removes developer errors, however this should not be reason to restrict the developer from doing it. This is especially the case if you want to mutate objects via an ORM (e.g. JPA or Spring Repositories) to do updates in the database.
OfficeFloor (http://officefloor.net) implements Dependency Contexts in it's threading models. OfficeFloor, actually, makes this easier to develop by introducing the concept of Governance. Governance does the above transaction management by configuration declaration (much like a @Transaction annotation on a method). However, as OfficeFloor uses graphical configuration, this is done graphically. Rather than annotating the code directly, OfficeFloor graphically marks functions for transaction governance for better code re-use and flexibility in configuring applications. An example of this configuration is as follows:
Please try out OfficeFloor (tutorials) and we value all feedback.
Summary
So when Reactive frameworks are tooting their horn, they are actually doing this while restricting you further as a developer. Because the reactive frameworks can't handle the context problem, they push this problem onto you the developer in avoiding context.Subsequently, reactive frameworks rob developers (Peter) of context to pay for the message passing problem in their frameworks (Paul).
By incorporating dependency contexts into event loops, you can incorporate context that we have grown to love in thread-per-request for managing things such as transactions.
No comments:
Post a Comment