Skip to end of metadata
Go to start of metadata

 


This section introduces OSGi™ Promises and Asynchronous Services. The fractal example demonstrates these concepts along with the Service Fabric's asynchronous load balancer service. The fractal example also provides the opportunity to demonstrate System Versioning, and the Service Fabric's update and rollback capabilities.

Source Code

The source code reviewed in the next section is available at https://github.com/paremus/async-examples

Introducing OSGi™ Promises and Asynchronous Services 

OSGi™ Enterprise R6 includes the Promise interface, which can be implemented on any version of Java.

OSGi™ Promises:

When properly organised and sequenced, Promises can be used to control program flow without explicit branching, making code more readable and business logic easier to identify (see https://www.infoq.com/interviews/tim-ward-osgi-promises).

As will be now demonstrated, OSGi™ Promises are simply to use and offer the same powerful, composable functional behaviours as you might expect from other react asynchronous functional frameworks; e.g. RxJava or Akka. However, unlike these alternatives, OSGi™ Promises are part of an increasing rich eco-system of OSGi Specifications and Implementations. 

While the Java Future interface went some way toward addressing this generic need in Java, futures lack the ability to register callbacks notifying the program when an asynchronous result is available. There have been improvements to Java 8's support for Promises via the CompletableFuture type, but the API is complex and difficult to use for many simple use cases.

Obtaining a Promise

Creating a Promise is very simple. The Promise API includes a default implementation that can be created using the Deferred type. This can be used to create, and later resolve (i.e. set the value of) or fail (i.e. set an exception for) a Promise.

final Deferred<Long> deferred = new Deferred<Long>();
 
new Thread() {
    public void run() {
        try {
        	Long total = service.calculateDifficultSum();
            deferred.resolve(total);
        } catch (Exception e) {
            deferred.fail(e);
        }
    }
}.start();
 
Promise<Long> promise = deferred.getPromise();

Whist the above code is simple, it has a few disadvantages:

  • The code creating the promise is responsible for its own thread management, and there is no reuse of the thread
  • Error management is manual, hence the ugly try/catch Exception block.
  • The code is quite verbose, hiding the fact that what we really care about is the call to calculateDifficultSum().
  • Every client has to reproduce this code pattern if they want to use the service asynchronously.

Ideally, what we really want is a way to directly return a promise from the service without the client having to worry about how this happens. This abstraction is provided by the Async Service.

The Async Service

The Async Service is another OSGi specification in OSGi Enterprise R6: see http://www.china.osgiusers.org/wiki/uploads/Main/JavaOne2014-BuildingSystemsWithAsynchronousMicroservices.pdf. Async Services provide a mechanism for transforming any OSGi service call into an asynchronous call represented by a Promise. This is accomplished through the use of an Asynchronous Mediator, which behaves a bit like a `mock` that you might use in testing. The client code creates a mediator object, records a method call using the mediator, and then starts the execution obtaining the async service. This is very similar to the create/use/verify pattern that test mocks use.

Using Asynchronous Services, the update calculateDifficultSum now has the following form:

Async async = getAsync();
 
Calculator mediated = async.mediate(calculator);
 
Promise<Long> promise = async.call(mediated.calculateDifficultSum());

 

This is significantly more compact than using the Deferred object, avoids thread lifecycle management, and prevents mistakes in error handling. The Async Service also has the significant advantage that it transparently delegates to services that implement the AsyncDelegate interface. In this case the service is given the opportunity to optimise the asynchronous call. For example if the service is remote, and the distribution provider supports it, then the invocation request can be sent without blocking for the return value, returning a Promise directly instead.

These techniques are now demonstrated in the Fractal example.

 

The Fractal Viewer

The fractal viewer is a simple REST application. This is expressed in the fractal system by the following system.part named viewer.

 

<system.part name="viewer">

	<description>
		The Fractal viewer and local calculation services, running on a single node
	</description>

 
	<system.part.element name="fractal-http" category="osgi.bundle"/>
    <config pid="org.apache.felix.http">
       <property name="org.osgi.service.http.port" value="8192"/>
    </config>

    <!--  Provide colour mappings  -->
    <system.part.element name="fractal-colours" category="osgi.bundle"/>
    <!--  Provide a basic mandelbrot equation  -->
    <system.part.element name="fractal-equation" category="osgi.bundle"/>
    ...
    ...

 </system.part>

 

The viewer system.part has an AngularJS front-end, and a pair of JAX-RS services. One service is used to populate the front-end form, the other is used to render fractal data on the server, and return the data to the client. The fractal viewer allows different fractal equations to be plugged in using the OSGi™ Whiteboard Pattern. Equation services are then offered to the client for rendering requests. A similar pattern is used to provide different colour schemes for the rendering. 

Fractal Render implementation

In a synchronous implementation the viewer might generate sections of the image, and delivers them as Server Sent Events. This improves the responsiveness of the user interface. However, the viewer is generating and transmitting each section of the fractal image in sequence to the remote workers.

This synchronous code looks as follows:

Synchronous Output
		protected void render(OutputStream out, int width, int height,
				double deltaX, double deltaY, int xOffset, int yOffset) throws IOException {

			int[][] values = eqn.execute(width, height, minX + xOffset * deltaX, deltaX, 

					maxY - yOffset * deltaY, deltaY, maxIterations, colourMap.getSpectrum().length);

			writeOutChunk(out, maxIterations, xOffset, yOffset, values, colourMap);
		}

As shown, this code makes no attempt to reduce the overall waiting time, but it could do so if executions ran in parallel. However managing parallel executions can be difficult and error prone.

This becomes much simpler when using OSGi™ promises and asynchronous primitives.

 

Updating the renderer

As shown, the above renderer code can be made asynchronous very easily:

protected void render(OutputStream out, int width, int height,
	double deltaX, double deltaY, int xOffset, int yOffset) throws IOException {

	Promise<int[][]> values = async.call(
		eqn.execute(width, height, minX + xOffset * deltaX, deltaX, 
		maxY - yOffset * deltaY, deltaY, maxIterations, 
		colourMap.getSpectrum().length));
 
	values.then(complete -> {
			writeOutChunk(out, maxIterations, xOffset, yOffset, complete.getValue(), colourMap);
			return null;
		});
}

This isn't quite enough, however.

Now that the rendering is asynchronous there's nothing to stop our code from sending the final event before any rendering has actually finished! To help here we make use of two things.

Firstly, the then() method returns a Promise. This Promise is a "chained" promise, whose value depends upon the result of its parent, and on the result of the Success callback. The chained Promise only resolves when the Success callback has completed, and the Promise that the callback returns has resolved (or immediately after if the callback returns null).

Secondly we can make use of the Promises utility class to create a "latch" Promise. The latch Promise resolves when all of the Promises used to create it have resolved.

The resulting code now looks like this:

		@Override
		protected void render(final OutputStream out, int width, int height,
				double deltaX, double deltaY, final int xOffset, final int yOffset) throws IOException {

			Promise<int[][]> values = async.call(eqn.execute(width, height, 
					minX + xOffset * deltaX, deltaX, maxY - yOffset * deltaY, deltaY, 
					maxIterations, colourMap.getSpectrum().length));

			// Store the chained promise, not the original. We want to wait until the data
			// has been completely written.

			pendingWork.add(values.then(complete -> {
					writeOutChunk(out, maxIterations, xOffset, yOffset, complete.getValue(), colourMap);
					return null;
				}));
		}

		@Override
		protected void awaitCompletion() {
			// Use the latch function in the Promises class to wait until all the data is written.
			try {
				Promises.all(pendingWork).getValue();
			} catch (InvocationTargetException e) {
				throw new WebApplicationException(e.getTargetException());
			} catch (InterruptedException e) {
				throw new WebApplicationException(e);
			}
		}

 

The synchronous and asynchronous implementations of the render are included in the viewer system.part. To see the performance difference between these implementations, when running the Fractal example below, try checking the  `Run Asynchronously` checkbox and then clicking `Display Fractal` or zooming in.

The Fabric's Replication Handler & Asynchronous Load Balancer 

The performance of the application, particularly under load, will be limited by local CPU resource and the threads available to the Async Service. So in addition to the use of Promises and Asynchronous services we need to:

  1. Move the computation load to remote workers.
  2. Scale out those computational workers
  3. Load balance across that population.

Of these, (1) and (2) are achieved in the same way as with the distributed Hello examples in the introductory tutorial. Specifically:

  • The worker system.part advertise its rendering API as a remote service ( `property name="service.exported.interfaces" value="com.paremus.demo.fractal.api.Equation` ) .
  • The fixed replication handler is included the worker system.part to allow manual scaling of the number of worker instances.

As shown, the fixed replication handler has a default of zero instances.

Remote Workers
    <system.part name="workers" >
        <description>
            Optional remote worker services
        </description>

        <system.part.element category="osgi.bundle" name="com.paremus.demo.fractal.equation" >
            <!-- Make the equation available remotely -->
            <config pid="com.paremus.demo.fractal.equation.MandelbrotEquation">
                <property name="service.exported.interfaces" value="com.paremus.demo.fractal.api.Equation" />
            </config>
        </system.part.element>

        <!-- Start with zero extra, and add them in by changing this value -->
        <replication.handler type="fixed">
            <property name="size" value="0" type="integer" />
        </replication.handler>

        <!-- Don't deploy us on the master node, or on a node that already has a worker  -->
        <contract features="(!(|(master=true)(fibre.system.fractal-demo&gt;0)))" />

    </system.part>

 

Having scaled out multiple workers we need some mechanism to allow the viewer to load-balance across these. Rather than complicating the implementation of viewer with load-balancer logic, we can instead composed the Service Fabric's runtime load-balancer into the fractal system.

As shown in the following extract, the load-balancer is configured for each exported interface. 

Load balancer configuration
        <config pid="com.paremus.fabric.balancer" factory="true">
            <property name="interfaces" value="com.paremus.demo.fractal.api.Equation" />
            <property name="target.filter" value="(equation.type=mandelbrot)" />
            <property name="equation.type" value="mandelbrot (load balanced)" />

            <!-- Add defaults which cover the standard range of mandelbrot values -->
            <property name="minX" value="-2.0" />
            <property name="maxX" value="1.0" />
            <property name="minY" value="-1.3" />
            <property name="maxY" value="1.3" />
            <property name="iterations" value="100" />
        </config> 

 

The Fractal System

The completed fractal system example, including runtime load-balancing and worker scaleout behaviours is shown.

<?xml version="1.0" encoding="UTF-8"?><system xmlns="http://schema.paremus.com/sf/1.2" boundary="fabric" name="com.paremus.example.fractal-system" version="1.0.0" repopath="https://nexus.paremus.com/content/repositories/releases/com/paremus/example/fractal-index/1.0.0/fractal-index-1.0.0.xml">
    <description>
       Deploy the Fractal viewer and optional additional calculation workers
    </description>
    
    <admin group="demo"/>
    
    <!-- 
   		The Fractal viewer, some basic colour maps, and a load balancer.
    -->
    <system.part name="viewer">
    
        <description>
            The Fractal viewer and a load balancer for remote equations
        </description>
    
        <system.part.element name="fractal-http" category="osgi.bundle"/>
        <config pid="org.apache.felix.http">
            <property name="org.osgi.service.http.port" value="8192"/>
        </config>
           
        <!-- Provide colour mappings -->
        <system.part.element name="fractal-colours" category="osgi.bundle"/>
        
        <!-- Provide a basic mandelbrot equation -->
        <system.part.element name="fractal-equation" category="osgi.bundle"/>
        
        <!-- Provide service load balancer -->
        <system.part.element name="com.paremus.fabric.balancer" category="osgi.bundle"/>
        
        <!-- Configure a load balancer for mandelbrot equation instances -->
        <config factory="true" pid="com.paremus.fabric.balancer">
            <property name="interfaces" value="com.paremus.demo.fractal.api.Equation"/>
            <property name="target.filter" value="(equation.type=mandelbrot)"/>
            <property name="equation.type" value="mandelbrot (load balanced)"/>
            <!-- Add defaults which cover the standard range of fractal values -->
            <property name="minX" value="-2.0"/>
            <property name="maxX" value="1.0"/>
            <property name="minY" value="-1.3"/>
            <property name="maxY" value="1.3"/>
            <property name="iterations" value="100"/>
        </config>
        
        <!-- Configure a load balancer for julia equation instances -->
        <config factory="true" pid="com.paremus.fabric.balancer">
            <property name="interfaces" value="com.paremus.demo.fractal.api.Equation"/>
            <property name="target.filter" value="(equation.type=julia)"/>
            <property name="equation.type" value="julia (load balanced)"/>
            <!-- Add defaults which cover the standard range of fractal values -->
            <property name="minX" value="-1.65"/>
            <property name="maxX" value="1.65"/>
            <property name="minY" value="-1.43"/>
            <property name="maxY" value="1.43"/>
            <property name="iterations" value="200"/>
        </config>
        
    </system.part>
    
    <!-- Workers providing equations -->
    <system.part name="workers">
        <description>
            Remote worker services
        </description>
        
        <system.part.element name="fractal-equation" category="osgi.bundle">
            <!-- Make the equations available remotely -->
            <config pid="com.paremus.demo.fractal.equation.MandelbrotEquation">
                <property name="service.exported.interfaces" value="com.paremus.demo.fractal.api.Equation"/>
            </config>
            <config pid="com.paremus.demo.fractal.equation.JuliaEquation">
                <property name="service.exported.interfaces" value="com.paremus.demo.fractal.api.Equation"/>
            </config>
        </system.part.element>
        
        <!-- Start with one worker, and add them in by changing this value -->
        <replication.handler type="fixed">
            <property name="size" type="integer" value="1"/>
        </replication.handler>
        
    </system.part>
    
</system>

 

Running the Fractal Example

 

Lets run the examples!

The fractal system will run in the `Fabric in a Virtual Machine`: see Quick Start. To create your own Service Fabric see Creating a Fabric

Having create a Fabric, import the following two fractal systems:

These systems are identical with the exception that the calculation engine in version 1.0.1 deliberately introduces algorithmic noise. 

As shown, once imported the revisions of the system (i.e. 1.0.0, 1.0.1) are displayed under the same shared system name (i.e. com.paremus.exmple.fractal-system).

Deploying version 1.0.0 of the fractal system resultants in the viewer and a single instance of a worker system.part being hosted by different myFabric fibres. 

As shown version 1.0.0 is now highlighted in blue on the main system page, this indicating that it is this version of com.paremus.example.fabric-system that is currently deployed.

The resultant fractal URL is registered in Entire's App view. 

Clicking on the fractal URL will open the fractal example in a new browser.

By default the fractal example is synchronous. This can be seen by the sequenced responses from the remote worker. To help to see the re-rendering process either:

  • Select a different equation.
  • Use a different colour scheme. 
  • Or use a different version of the fractal system.

To make the example asynchronous, select the `Run Asynchronous` option and re-run. The throughput of the fractal system is now seen to be significantly higher; the worker engine sending the results back to the viewer as soon as the calculations have completed, and these immediately displayed by the viewer.

System Update & Roll Back

Now return to the fractal system view, and deploy version 1.0.1 of the example. The first point to note is that version 1.0.0 is now marked as un-deployed.

Looking at the fractal example, we see that the worker has now changed to the version that includes an algorithmic noise component. Go to the fractal URL and select Display Fractal to see the effect. 

To back out this change, simple re-deploy version 1.0.0.

Scale Out

The fractal workers may be horizontal scale across more fibres by changing the scaling value assigned to the replication handler associated with the workers; i.e. `com.paremus.example.fractal-system/workers.replication`; this property located below the runtime topology view. 

As shown, in response the workers are scaled across additional fibres.

Ensure that the load balanced version of the equation is used, and that `Run Asynchronously` is selected: the rendering throughput of the fractal application will been seen to increase by the same factor as the workers were horizontally scaled. 

Try adding a few additional fibres (at least one!) to your Service Fabric (see ), and then reconfiguring the fractal system to deploy additional worker instances. You should see that adding progressively more workers makes rendering faster, and that removing them makes it slower again.

  • No labels