Generic Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Introduction

Quasar is a Kotlin library that brings some asynchronous concepts to Kotlin in an easier to manage way. This includes lightweight threads, Channels, Actors, and more.

2. Setting Up the Build

To use the most recent version of Quasar, you need to run on a JDK version 11 or newer. Older versions support JDK 7, for situations where you can’t yet upgrade to Java 11.

Quasar comes with four dependencies that we need, depending on precisely what functionality you are using. When combining these, it’s essential that we use the same version for each of them.

To work correctly, Quasar needs to perform some bytecode instrumentation. This can be done either at runtime using a Java agent or at compile time. The Java agent is the preferred approach since this has no special build requirements and can work with any setup. However, this has the downside since Java only supports a single Java agent at a time.

2.1. Running from the Command Line

When running an application using Quasar, we specify the Java agent using the -javaagent flag to the JVM. This takes the full path to the quasar-core.jar file as a parameter:

$ java -javaagent:quasar-core.jar -cp quasar-core.jar:quasar-kotlin.jar:application.jar fully.qualified.main.Class

2.2. Running Our Application from Maven

If we want to, we can also use Maven to add the Java agent.

We can accomplish this with Maven in a few steps.

First, we set up the Dependency Plugin to generate a property pointing to the quasar-core.jar file:

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <execution>
            <id>getClasspathFilenames</id>
            <goals>
               <goal>properties</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Then, we use the Exec plugin to actually launch our application:

<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
        <workingDirectory>target/classes</workingDirectory>
        <executable>echo</executable>
        <arguments>
            <argument>-javaagent:${co.paralleluniverse:quasar-core:jar}</argument>
            <argument>-classpath</argument> <classpath/>
            <argument>com.baeldung.quasar.QuasarHelloWorldKt</argument>
        </arguments>
    </configuration>
</plugin>

We then need to run Maven with the correct call to make use of this:

mvn compile dependency:properties exec:exec

This ensures that the latest code is compiled and that the property pointing to our Java agent is available before we execute the application.

2.3. Running Unit Tests

It’d be great to get the same benefit in our unit tests that we get from the Quasar agent.

We can set up Surefire to make use of this same property when running the tests:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.22.1</version>
    <configuration>
        <argLine>-javaagent:${co.paralleluniverse:quasar-core:jar}</argLine>
    </configuration>
</plugin>

We can do the same for Failsafe if we’re using that for our integration tests as well.

3. Fibers

The core functionality of Quasar is that of fibers. These are similar in concept to threads but serve a subtly different purpose. Fibers are significantly lighter weight than threads – taking dramatically less memory and CPU time than standard threads require.

Fibers are not meant to be a direct replacement for threads. They are a better choice in some situations and worse in others.

Specifically, they are designed for scenarios where the executing code will spend a lot of time blocking on other fibers, threads, or processes – for example, waiting for a result from a database.

Fibers are similar, but not the same as green threads. Green threads are designed to work the same as OS threads but do not map directly on to OS threads. This means that green threads are best used in situations where they are always processing, as opposed to fibers that are designed to be used in situations that are normally blocking.

When necessary, it’s possible to use fibers and threads together to achieve the result needed.

3.1. Launching Fibers

We launch fibers in a very similar way to how we’d launch threads. We create an instance of the Fiber<V> class that wraps our code to execute – in the form of a SuspendableRunnable – and then call the start method:

class MyRunnable : SuspendableRunnable {
    override fun run() {
        println("Inside Fiber")
    }
}
Fiber<Void>(MyRunnable()).start()

Kotlin allows us to replace the SuspendableRunnable instance with a lambda if we wish:

val fiber = Fiber<Void> {
    println("Inside Fiber Lambda")
}
fiber.start()

And there is even a special helper DSL that does all of the above in an even simpler form:

fiber @Suspendable {
    println("Inside Fiber DSL")
}

This creates the fiber, creates the SuspendableRunnable wrapping the provided block, and starts it running.

The use of the DSL is much preferred over the lambda if you want to be doing it in-place. With the lambda option, we can pass the lambda around as a variable if needed.

3.2. Returning Values from Fibers

The use of a SuspendableRunnable with fibers is the direct equivalent of Runnable with threads. We can also use a SuspensableCallable<V> with fibers, which equates to Callable with threads.

We can do this in the same way as above, with an explicit type, a lambda or using the DSL:

class MyCallable : SuspendableCallable<String> {
    override fun run(): String {
        println("Inside Fiber")
        return "Hello"
    }
}
Fiber<String>(MyCallable()).start()

fiber @Suspendable {
    println("Inside Fiber DSL")
    "Hello"
}

The use of a SuspendableCallable instead of a SuspendableRunnable means that our fiber now has a generic return type – in the above, we’ve got a Fiber<String> instead of a Fiber<Unit>.

Once we’ve got a Fiber<V> in our hands, we can extract the value from it – which is the value returned by the SuspendableCallable – by using the get() method on the fiber:

val pi = fiber @Suspendable {
    computePi()
}.get()

The get() method works the same as on a java.util.concurrent.Future – and it works directly in terms of one. This means that it will block until the value is present.

3.3. Waiting on Fibers

On other occasions, we might want to wait for a fiber to have finished executing. This is typically against the reason for us using the asynchronous code, but there are occasions where we need to do so.

In the same way as Java threads, we have a join() method that we can call on a Fiber<V> that will block until it has finished executing:

val fiber = Fiber<Void>(Runnable()).start()
fiber.join()

We can also provide a timeout, so that if the fiber takes longer to finish than expected, then we don’t block indefinitely:

fiber @Suspendable {
    TimeUnit.SECONDS.sleep(5)
}.join(2, TimeUnit.SECONDS)

If the fiber does take too long, the join() method will throw a TimeoutException to indicate this has happened. We can also provide these timeouts to the get() method we saw earlier in the same way.

3.4. Scheduling Fibers

Fibers are all run on a scheduler. Specifically, by some instance of a FiberScheduler or a subclass thereof. If one isn’t specified, then a default will be used instead, which is directly available as DefaultFiberScheduler.instance.

There are several system properties that we can use to configure our scheduler:

  • co.paralleluniverse.fibers.DefaultFiberPool.parallelism – The number of threads to use.
  • co.paralleluniverse.fibers.DefaultFiberPool.exceptionHandler – The exception handler to use if a fiber throws an exception
  • co.paralleluniverse.fibers.DefaultFiberPool.monitor – The means to monitor the fibers
  • co.paralleluniverse.fibers.DefaultFiberPool.detailedFiberInfo – Whether the monitor gets detailed information or not.

By default, this will be a FiberForkJoinScheduler which runs one thread per CPU core available and provides brief monitoring information via JMX.

This is a good choice for most cases, but on occasion, you might want a different choice. The other standard choice is FiberExecutorScheduler which runs the fibers on a provided Java Executor to run on a thread pool, or you could provide your own if needed – for example, you might need to run them all on a specific thread in an AWT or Swing scenario.

3.5. Suspendable Methods

Quasar works in terms of a concept known as Suspendable Methods. These are specially tagged methods that are allowed to be suspended, and thus are allowed to run inside fibers.

Typically these methods are any that declare that they throw a SuspendException. However, because this is not always possible, we have some other special cases that we can use:

  • Any method that we annotate with the @Suspendable annotation
  • Anything that ends up as a Java 8 lambda method – these can not declare exceptions and so are treated specially
  • Any call made by reflection, since these are computed at runtime and not compile time

Additionally, it’s not allowed to use a constructor or class initializer as a suspendable method.

We can also not use synchronized blocks along with suspendable methods. This means that we can’t mark the method itself as synchronized, we can’t call synchronized methods from inside it, and we can’t use synchronized blocks inside the method.

In the same way that we can’t use synchronized within suspendable methods, they should not be directly blocking the thread of execution in other ways – for example, using Thread.sleep(). Doing so will lead to performance problems and potentially to system instability.

Doing any of these will generate an error from the Quasar java agent. In the default case, we’ll see output to the console indicating what happened:

WARNING: fiber [email protected]:fiber-10000004[task: [email protected]([email protected]), target: [email protected], scheduler: [email protected]] is blocking a thread (Thread[ForkJoinPool-default-fiber-pool-worker-3,5,main]).
	at [email protected]/java.lang.Thread.sleep(Native Method)
	at [email protected]/java.lang.Thread.sleep(Thread.java:339)
	at [email protected]/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
	at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:43)
	at app//com.baeldung.quasar.SimpleFiberTest$fiberTimeout$1.invoke(SimpleFiberTest.kt:12)
	at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.invoke(Kotlin.kt:32)
	at app//co.paralleluniverse.kotlin.KotlinKt$fiber$sc$1.run(Kotlin.kt:65535)
	at app//co.paralleluniverse.fibers.Fiber.run(Fiber.java:1099)

4. Strands

Strands are a concept in Quasar that combines both fibers and threads. They allow us to interchange threads and fibers as needed without other parts of our application caring.

We create a Strand by wrapping the thread or fiber instance in a Strand class, using Strand.of():

val thread: Thread = ...
val strandThread = Strand.of(thread)

val fiber: Fiber = ...
val strandFiber = Strand.of(fiber)

Alternatively, we can get a Strand instance for the currently executing thread or fiber using Strand.currentStrand():

val myFiber = fiber @Suspendable {
    // Strand.of(myFiber) == Strand.currentStrand()
}

Once done, we can interact with both using the same API, allowing us to interrogate the strand, wait until it’s finished executing and so on:

strand.id // Returns the ID of the Fiber or Thread
strand.name // Returns the Name of the Fiber or Thread
strand.priority // Returns the Priority of the Fiber or Thread

strand.isAlive // Returns if the Fiber or Thread is currently alive
strand.isFiber // Returns if the Strand is a Fiber

strand.join() // Block until the Fiber or Thread is completed
strand.get() // Returns the result of the Fiber or Thread

5. Wrapping Callbacks

One of the major uses for fibers is to wrap asynchronous code that uses callbacks to return the status to the caller.

Quasar provides a class called FiberAsync<T, E> which we can use for exactly this case. We can extend it to provide a fiber-based API instead of a callback based one for the same code.

This is done by writing a class that implements our callback interface, extends the FiberAsync class and delegates the callback methods to the FiberAsync class to handle:

interface PiCallback {
    fun success(result: BigDecimal)
    fun failure(error: Exception)
}

class PiAsync : PiCallback, FiberAsync<BigDecimal, Exception>() {
    override fun success(result: BigDecimal) {
        asyncCompleted(result)
    }

    override fun failure(error: Exception) {
        asyncFailed(error)
    }

    override fun requestAsync() {
        computePi(this)
    }
}

We now have a class that we can use to compute our result, where we can treat this as if it were a simple call and not a callback-based API:

val result = PiAsync().run()

This will either return the success value – the value that we passed to asyncCompleted() – or else throw the failure exception – the one that we passed to asyncFailed.

When we use this, Quasar will launch a new fiber that is directly tied to the current one and will suspend the current fiber until the result is available. This means that we must use it from within a fiber and not within a thread. It also means that the instance of FiberAsync must be both created and run from within the same fiber for it to work.

Additionally, they are not reusable – we can’t restart them once they’ve completed.

6. Channels

Quasar introduces the concept of channels to allow message passing between different strands. These are very similar to Channels in the Go programming language.

6.1. Creating Channels

We can create channels using the static method Channels.newChannel.

Channels.newChannel(bufferSize, overflowPolicy, singleProducerOptimized, singleConsumerOptimized);

So, an example that blocks when the buffer is full and targets a single producer and consumer would be:

Channels.newChannel<String>(1024, Channels.OverflowPolicy.BLOCK, true, true);

There are also some special methods for creating channels of certain primitive types – newIntChannel, newLongChannel, newFloatChannel and newDoubleChannel. We can use these if we are sending messages of these specific types and get a more efficient flow between fibers. Note that we can never use these primitive channels from multiple consumers – that is part of the efficiency that Quasar gives with them.

6.2. Using Channels

The resulting Channel object implements two different interfaces – SendPort and ReceivePort.

We can use the ReceivePort interface from the strands that are consuming messages:

fiber @Suspendable {
    while (true) {
        val message = channel.receive()
        println("Received: $message")
    }
}

We can then use the SendPort interface of the same channel to produce messages that will be consumed by the above:

channel.send("Hello")
channel.send("World")

For obvious reasons, we can’t use both of these from the same strand, but we can share the same channel instance between different strands to allow message sharing between the two. In this case, the strand can be either a fiber or a thread.

6.3. Closing Channels

In the above, we had an infinite loop reading from the channel. This is obviously not ideal.

What we should prefer doing is to loop all the while the channel is actively producing messages, and stop when then the channel is finished. We can do this using the close() to mark the channel as closed, and the isClosed property to see if the channel is closed:

fiber @Suspendable {
    while (!channel.isClosed) {
        val message = channel.receive()
        println("Received: $message")
    }
    println("Stopped receiving messages")
}

channel.send("Hello")
channel.send("World")

channel.close()

6.4. Blocking Channels

Channels are, by their very nature, blocking concepts. The ReceivePort will block until a message is available to process, and we can configure the SendPort to block until the message can be buffered.

This leverages a crucial concept of fibers – that they are suspendable. When any of these blocking actions occur, Quasar will use very lightweight mechanisms to suspend the fiber until it can continue its work, instead of repeatedly polling the channel. This allows the system resources to be used elsewhere – for processing other fibers, for example.

6.5. Waiting on Multiple Channels

We have seen that Quasar can block on a single channel until an action can be performed. Quasar also offers the ability to wait across multiple channels.

We do this using the Selector.select statement. This concept might be familiar from both Go and from Java NIO.

The select() method takes a collection of SelectAction instances and will block until one of these actions is performed:

fiber @Suspendable {
    while (!channel1.isClosed && !channel2.isClosed) {
        val received = Selector.select(
          Selector.receive(channel1), 
          Selector.receive(channel2)
        )

        println("Received: $received")
    }
}

In the above, we can then have multiple channels written to, and our fiber will read immediately on any of them that have a message available. The selector will only consume the first message that is available so that no messages will get dropped.

We can also use this for sending to multiple channels:

fiber @Suspendable {
    for (i in 0..10) {
        Selector.select(
          Selector.send(channel1, "Channel 1: $i"),
          Selector.send(channel2, "Channel 2: $i")
        )
    }
}

As with receive, this will block until the first action can be performed and then will perform that action. This has the interesting side effect that the message will send to exactly one channel, but the channel that it is sent to happens to be the first one that has buffer space available for it. This allows us to distribute messages across multiple channels based exactly on backpressure from the receiving ends of those channels.

6.6. Ticker Channels

A special kind of channel that we can create is the ticker channel. These are similar in concept to stock exchange tickers – it’s not important that the consumer sees every message, as newer ones replace older ones.

These are useful when we have a constant flow of status updates – for example, stock exchange prices or percentage completed.

We create these as normal channels, but we use the OverflowPolicy.DISPLACE setting. In this case, if the buffer is full when producing a new message, then the oldest message is silently dropped to make room for it.

We can only consume these channels from a single strand. However, we can create a TickerChannelConsumer to read from this channel across multiple strands:

val channel = Channels.newChannel<String>(3, Channels.OverflowPolicy.DISPLACE)

for (i in 0..10) {
    val tickerConsumer = Channels.newTickerConsumerFor(channel)
    fiber @Suspendable {
        while (!tickerConsumer.isClosed) {
            val message = tickerConsumer.receive()
            println("Received on $i: $message")
        }
        println("Stopped receiving messages on $i")
    }
}

for (i in 0..50) {
    channel.send("Message $i")
}

channel.close()

Every instance of the TickerChannelConsumer will potentially receive all the messages sent to the wrapped channel – allowing for any dropped by the overflow policy.

We’ll always receive messages in the correct order, and we can consume each TickerChannelConsumer at the rate that we need to work at – one fiber running slowly will not affect any others.

We will also know when the wrapped channel is closed so that we can stop reading from our TickerChannelConsumer. This allows the producer to not care about the way that the consumers are reading the messages, nor the type of channel that’s being used.

6.7. Functional Transformations to Channels

We’re all used to functional transformations in Java, using streams. We can apply these same standard transformations on channels – both as send and receive variations.

These actions that can apply include:
  • filter – Filter out messages that don’t fit a given lambda
  • map – Convert messages as they flow through the channel
  • flatMap – The same as map, but converting one message into multiple messages
  • reduce – Apply a reduction function to a channel

For example, we can convert a ReceivePort<String> into one that reverses all the strings flowing through it using the following:

val transformOnReceive = Channels.map(channel, Function<String, String> { msg: String? -> msg?.reversed() })

This will not affect the messages on the original channel, and they can still be consumed elsewhere without seeing the effect of this transformation.

Alternatively, we can convert a SendPort<String> into one that forces all the strings to uppercase as we write them to the channel as follows:

val transformOnSend = Channels.mapSend(channel, Function<String, String> { msg: String? -> msg?.toUpperCase() })

This will affect messages as they are written, and in this case, the wrapped channel will only ever see the transformed message. However, we could still write directly to the channel that is being wrapped to bypass this transformation if needed.

7. Data Flow

Quasar Core gives us a couple of tools to support reactive programming. These are not as powerful as something like RxJava, but more than enough for the majority of cases.

We have access to two concepts – Val and VarVal represents a constant value, and Var represents a varying one.

Both types are constructed with either no value or a SuspendableCallable which will be used in fiber to compute the value:

val a = Var<Int>()
val b = Val<Int>()

val c = Var<Int> { a.get() + b.get() }
val d = Var<Int> { a.get() * b.get() }

// (a*b) - (a+b)
val initialResult = Val<Int> { d.get() - c.get() }
val currentResult = Var<Int> { d.get() - c.get() }

Initially, initialResult and currentResult will have no values, and attempting to get value out of them will block the current strand. As soon as we give a and b values, we can read values from both initialResult and currentResult.

In addition to this, if we further change a then currentResult will update to reflect this but initialResult won’t:

a.set(2)
b.set(4)

Assert.assertEquals(2, initialResult.get())
Assert.assertEquals(2, currentResult.get())

a.set(3)

Assert.assertEquals(2, initialResult.get()) // Unchanged
Assert.assertEquals(5, currentResult.get()) // New Value

If we try to change b, then we’ll get an exception thrown instead, because Val is can only have a single value assigned to it.

8. Conclusion

This article has given an introduction to the Quasar library that we can use for asynchronous programming. What we’ve seen here is only the basics of what we can achieve with Quasar. Why not try it out on the next project?

Examples of some of the concepts we’ve covered here can be found over on GitHub.

Generic bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE