Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

November Discount Launch 2022 – TEMP TOP (NPI)
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

1. Introduction

In a multi-threaded environment, sometimes we need to schedule tasks based on custom criteria instead of just the creation time.

Let's see how we can achieve this in Java – using a PriorityBlockingQueue.

2. Overview

Let us say we have jobs that we want to execute based on their priority:

public class Job implements Runnable {
    private String jobName;
    private JobPriority jobPriority;
    
    @Override
    public void run() {
        System.out.println("Job:" + jobName +
          " Priority:" + jobPriority);
        Thread.sleep(1000); // to simulate actual execution time
    }

    // standard setters and getters
}

For demonstration purposes, we're printing the job name and priority in the run() method.

We also added sleep() so that we simulate a longer-running job; while the job is executing, more jobs will get accumulated in the priority queue.

Finally, JobPriority is a simple enum:

public enum JobPriority {
    HIGH,
    MEDIUM,
    LOW
}

3. Custom Comparator

We need to write a comparator defining our custom criteria; and, in Java 8, it's trivial:

Comparator.comparing(Job::getJobPriority);

4. Priority Job Scheduler

With all the setup done, let's now implement a simple job scheduler – which employs a single thread executor to look for jobs in the PriorityBlockingQueue and executes them:

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler 
      = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(
          queueSize, 
          Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {
                    priorityJobPoolExecutor.execute(priorityQueue.take());
                } catch (InterruptedException e) {
                    // exception needs special handling
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }
}

The key here is to create an instance of PriorityBlockingQueue of Job type with a custom comparator. The next job to execute is picked from the queue using take() method which retrieves and removes the head of the queue.

The client code now simply needs to call the scheduleJob() – which adds the job to the queue. The priorityQueue.add() queues the job at appropriate position as compared to existing jobs in the queue, using the JobExecutionComparator.

Note that the actual jobs are executed using a separate ExecutorService with a dedicated thread pool.

5. Demo

Finally, here's a quick demonstration of the scheduler:

private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
    Job job1 = new Job("Job1", JobPriority.LOW);
    Job job2 = new Job("Job2", JobPriority.MEDIUM);
    Job job3 = new Job("Job3", JobPriority.HIGH);
    Job job4 = new Job("Job4", JobPriority.MEDIUM);
    Job job5 = new Job("Job5", JobPriority.LOW);
    Job job6 = new Job("Job6", JobPriority.HIGH);
    
    PriorityJobScheduler pjs = new PriorityJobScheduler(
      POOL_SIZE, QUEUE_SIZE);
    
    pjs.scheduleJob(job1);
    pjs.scheduleJob(job2);
    pjs.scheduleJob(job3);
    pjs.scheduleJob(job4);
    pjs.scheduleJob(job5);
    pjs.scheduleJob(job6);

    // clean up
}

In order to demo that the jobs are executed in the order of priority, we've kept the POOL_SIZE as 1 even though the QUEUE_SIZE is 10. We provide jobs with varying priority to the scheduler.

Here is a sample output we got for one of the runs:

Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW

The output could vary across runs. However, we should never have a case where a lower priority job is executed even when the queue contains a higher priority job.

6. Conclusion

In this quick tutorial, we saw how PriorityBlockingQueue can be used to execute jobs in a custom priority order.

As usual, source files can be found over on GitHub.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!