Chapter TEN
Concurrency and Multithreading


Exam Objectives

Create both platform and virtual threads. Use both Runnable and Callable objects, manage the thread lifecycle, and use different Executor services and concurrent API to run tasks.
Develop thread-safe code, using locking mechanisms and concurrent API.
Process Java collections concurrently and utilize parallel streams.

Chapter Content


Introducing Threads

Threads allow multiple paths of execution to occur concurrently within a single program. Each thread represents a separate path of execution, allowing different parts of the code to run simultaneously. You can think of threads like lanes on a highway. Just as multiple lanes allow many cars to drive down the road simultaneously, multiple threads allow different segments of code to execute concurrently within the same application. However, just as cars in different lanes need to coordinate when merging or exiting, threads must coordinate carefully when accessing shared resources to avoid conflicts.

In Java 21, there are two types of threads:

  1. Platform threads: These are the traditional threads that are mapped directly to operating system threads.
  2. Virtual threads: These are lightweight threads managed by the Java Virtual Machine (JVM).

Let’s start by looking at platform threads, which have been around since the early days of Java.

To create a new platform thread, you can extend the Thread class or implement the Runnable interface. When extending Thread, you override the run() method to define the code that will execute in the new thread:

public class MyThread extends Thread {
    public void run() {
        System.out.println("New platform thread is running");
    } 
}

To launch the new thread, create an instance of the class and call its start() method:

MyThread myThread = new MyThread();
myThread.start();

The start() method initiates a new thread that executes the code defined in run(). Alternatively, you can create a new thread by implementing Runnable:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

And passing an instance to the Thread constructor:

public class MyRunnable implements Runnable {
    public void run() {
        System.out.println("New platform thread is running");
    }
} 

MyRunnable myRunnable = new MyRunnable();
Thread myThread = new Thread(myRunnable);
myThread.start();

However, in Java 21 you can use the Thread.Builder.OfPlatform class to create platform threads. Here’s a simple example:

// Create a Runnable task
Runnable task = () -> {
   System.out.println("Platform thread is running");
};

// Create a platform thread using Thread.Builder.OfPlatform
Thread platformThread = Thread.ofPlatform().start(task);

// Wait for the thread to finish
try {
   platformThread.join();
} catch (InterruptedException e) {
   e.printStackTrace();
}

In this example:

You can customize the platform thread by setting its name, priority, and other properties using the builder pattern. Here’s an example:

// Create a Runnable task
Runnable task = () -> {
   System.out.println("Custom platform thread is running");
};

// Create and customize a platform thread
Thread platformThread = Thread.ofPlatform()
                             .name("CustomThread", 0)
                             .priority(Thread.MAX_PRIORITY)
                             .unstarted(task);

// Start the thread
platformThread.start();

// Wait for the thread to finish
try {
   platformThread.join();
} catch (InterruptedException e) {
   e.printStackTrace();
}

In this example:

On the other hand, you can use the Thread.Builder.OfVirtual class to create a virtual thread:

Thread vThread = Thread.ofVirtual().start(() -> {
    System.out.println("Hello from a virtual thread!");
});
vThread.join();

For platform threads, Java distinguishes between daemon and non-daemon threads. Daemon threads are those that do not prevent the JVM from exiting when the program finishes. They run in the background and are typically used for tasks like garbage collection, background cleanup, etc. The JVM will continue running as long as there is at least one active non-daemon thread. Daemon threads are terminated when all non-daemon threads complete. To make a thread a daemon, call its setDaemon(true) method before starting it:

public class DaemonThreadExample {
    public static void main(String[] args) {
        Thread daemonThread = new Thread(() -> {
            while (true) {
                System.out.println("Daemon thread is running");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        daemonThread.setDaemon(true);
        daemonThread.start();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Main thread exiting");
    }
}

In this example, we create a daemon thread using a lambda expression. The daemon thread runs in an infinite loop, printing a message every second. We set the thread to be a daemon by calling setDaemon(true) before starting it.

The main thread sleeps for 5 seconds and then continues its execution. When the main thread (which is a non-daemon thread) terminates, the JVM will automatically terminate the daemon thread.

You can also use Thread.ofPlatform() to set the daemon status:

Thread platformThread = Thread.ofPlatform()
                                      .name("CustomThread", 0)
                                      .daemon(false)

The default implementation invokes daemon(boolean) with a value of true.

On the other hand, virtual threads are always daemon threads, so they do not prevent the JVM from exiting when the program finishes.

A thread progresses through several states during its life cycle:

                                   ┌─────────┐
                                   │   NEW   │
                                   └────┬────┘
                                        │
                                        │ start()
                                        │
                                        ▼
                                ┌──────────────┐
                       ┌───────▶│   RUNNABLE   │◀────────┐
                       │        └──────┬───────┘         │
                       │               │                 │
                       │               │ run()           │
                       │               │ completes       │
                       │               ▼                 │
                       │        ┌─────────────┐          │
                       │        │ TERMINATED  │          │
                       │        └─────────────┘          │
                       │                                 │
                       │                                 │
                       │                                 │
                       │        ┌──────────────┐         │
                       │        │   BLOCKED    │◀────────┘
                       │        └───────┬──────┘         │
                       │                │                │
                       │                │ Lock           │
                       │                │ acquired       │
                       │                │                │
                       │                │                │
                       │                ▼                │
                       │        ┌──────────────┐         │
                       └───────▶│   WAITING    │         │
                                └───────┬──────┘         │
                                        │                │
                                        │ interrupt()    │
                                        │ notify()       │
                                        │ notifyAll()    │
                                        │                │
                                        ▼                │
                                ┌───────────────┐        │
                                │ TIMED_WAITING │────────┘
                                └───────────────┘
                                        ▲
                                        │
                                        │
                            sleep()─────┘
                            wait(long)
                            join(long)
                            LockSupport.parkNanos(long)
                            LockSupport.parkUntil(long)

This lifecycle applies to both platform and virtual threads, although the internal management of these states differs between the two types.

The static Thread.sleep(long millis) method causes the current thread to suspend execution for the specified number of milliseconds:

try {
    Thread.sleep(1000); 
} catch (InterruptedException e) {
    // Handle interruption
}

To prematurely wake a sleeping or waiting thread, you can call its interrupt() method. This will throw an InterruptedException in the target thread, which must be handled:

public class InterruptExample {
    public static void main(String[] args) {
        Thread thread = Thread.ofVirtual().start(() -> {
            try {
                System.out.println("Thread is going to sleep");
                Thread.sleep(5000);
                System.out.println("Thread woke up");
            } catch (InterruptedException e) {
                System.out.println("Thread was interrupted");
            }
        });

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        thread.interrupt();
    }
}

In this example, we create a virtual thread that goes to sleep for 5 seconds. The main thread sleeps for 2 seconds and then calls interrupt() on the other thread.

When interrupt() is called, it sets the interrupted status of the target thread. If the target thread is sleeping or waiting, it will immediately throw an InterruptedException. The thread can then handle the interruption appropriately.

In the example, the output will be:

Thread is going to sleep
Thread was interrupted

The thread’s sleep is prematurely interrupted after 2 seconds, and it catches the InterruptedException and prints a message.

Virtual Threads

Characteristics of Virtual Threads

The key characteristic of virtual threads is their lightweight nature. Unlike platform threads, which are mapped directly to OS threads, virtual threads are managed by the JVM, which maps a large number of virtual threads to a small number of OS threads.

When a virtual thread needs to run, the Java runtime attaches it to a regular thread (called a carrier thread). This is like giving the virtual thread a temporary vehicle to move around in.

The operating system then schedules this platform thread as it normally would. While attached to this carrier, the virtual thread can execute its code.

If the virtual thread needs to perform an operation that might take a while, like reading from a file or waiting for a network response, it can detach from its carrier. This is called unmounting. When this happens, the carrier becomes available, and the JVM can use it to run a different virtual thread.

However, there are some situations where a virtual thread can’t detach from its carrier. This is called being pinned to the carrier. Two common scenarios where this happens are:

  1. When the virtual thread is using a synchronized block or method. These are used to control access to shared resources, and they need to stay on the same carrier to work properly.

  2. When the virtual thread is running code that interacts directly with the operating system or other low-level functions (native methods or foreign functions). These also need to stay on the same carrier because they’re tightly connected to the underlying system.

In these pinned situations, the virtual thread has to stay attached to its carrier until it’s done with these special operations. This means the carrier can’t be used for other virtual threads during this time, which can potentially reduce some of the efficiency benefits of virtual threads.

For this reason, virtual threads are particularly effective for I/O-bound tasks. The JVM can suspend the virtual thread and free up the platform thread to do other work. This is efficient because I/O operations often involve waiting for external resources, during which time the CPU isn’t actively engaged.

However, virtual threads are not intended for long-running CPU-intensive operations. These operations actively use the CPU for extended periods. When a virtual thread is performing a CPU-intensive task, it’s continuously using the platform thread it’s running on. In this case, there’s no opportunity for the runtime to suspend the virtual thread and reassign the platform thread elsewhere, because the CPU is constantly busy with the intensive computation.

Additionally, virtual threads have a fixed priority that cannot be changed. This design choice simplifies the scheduling of virtual threads, as they are intended to be used for parallelism in a more straightforward manner compared to platform threads.

Virtual threads do not have a thread name by default. The getName method returns the empty string if a thread name is not set:

// Create a virtual thread without setting its name
Thread virtualThread = Thread.ofVirtual().start(() -> {
    System.out.println("Running in a virtual thread");
});

// Check and print the name of the virtual thread
String threadName = virtualThread.getName();
if (threadName.isEmpty()) {
    System.out.println("The virtual thread has no name.");
} else {
    System.out.println("The virtual thread name is: " + threadName);
}

In this example, the virtualThread is created without setting its name, so calling getName on it returns an empty string. The output will be:

Running in a virtual thread
The virtual thread has no name.

You can set the name of a virtual thread by using the name method of the Thread.Builder.OfVirtual class:

// Create a virtual thread with a specific name
Thread virtualThread = Thread.ofVirtual()
    .name("MyVirtualThread")
    .start(() -> {
        System.out.println("Running in a virtual thread");
    });

// Check and print the name of the virtual thread
String threadName = virtualThread.getName();
if (threadName.isEmpty()) {
    System.out.println("The virtual thread has no name.");
} else {
    System.out.println("The virtual thread name is: " + threadName);
}

In this example, the virtualThread is created with the name "MyVirtualThread" using the name method. The output will be:

Running in a virtual thread
The virtual thread name is: MyVirtualThread

Another important feature of virtual threads is their ability to simplify concurrent programming. With virtual threads, you can write straightforward, sequential-looking code that actually runs concurrently. This can make your code easier to read and maintain, as you don’t need to explicitly manage thread pools or use complex asynchronous programming models.

Creating Virtual Threads

Java 21 introduces several ways to create and use virtual threads. Let’s explore these methods in detail.

The primary way to create a virtual thread is by using the Thread.ofVirtual() method. This method returns a Thread.Builder that can be used to configure and start a virtual thread. Here’s an example:

Thread vThread = Thread.ofVirtual().start(() -> {
    System.out.println("Virtual thread is running");
});

try {
    vThread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

In this example, we create and start a virtual thread in one line. The start() method takes a Runnable and immediately starts the thread.

If you want to create a virtual thread without starting it immediately, you can use the unstarted() method instead:

Thread vThread = Thread.ofVirtual().unstarted(() -> {
    System.out.println("Virtual thread is running");
});

vThread.start();
try {
    vThread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

Another convenient method for creating and starting a virtual thread is Thread.startVirtualThread(Runnable task). This static method creates a virtual thread, starts it, and returns the Thread object:

Thread vThread = Thread.startVirtualThread(() -> {
    System.out.println("Virtual thread created with startVirtualThread");
});

try {
    vThread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

For cases where you need to create multiple virtual threads with the same configuration, you can use a java.util.concurrent.ThreadFactory. The Thread.ofVirtual().factory() method returns a ThreadFactory that creates virtual threads:

ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();

for (int i = 0; i < 10; i++) {
    Thread vThread = virtualThreadFactory.newThread(() -> {
        System.out.println("Virtual thread " + Thread.currentThread().threadId() + " is running");
    });
    vThread.start();
}

This example creates a ThreadFactory for virtual threads and uses it to create and start 10 virtual threads.

You can also customize the ThreadFactory to set names for the virtual threads it creates:

ThreadFactory namedVirtualThreadFactory = Thread.ofVirtual().name("worker-", 0).factory();

for (int i = 0; i < 5; i++) {
    Thread vThread = namedVirtualThreadFactory.newThread(() -> {
        System.out.println(Thread.currentThread().getName() + " is running");
    });
    vThread.start();
}

This will create virtual threads with names like worker-0, worker-1, and so on.

It’s worth noting that while virtual threads are very lightweight, they’re not free. Creating millions of virtual threads that do nothing but immediately terminate is still a non-trivial operation. In practice, you should create virtual threads as needed for actual concurrent tasks, rather than creating a huge number upfront.

Virtual Threads vs Platform Threads

Here’s quick comparison of virtual threads and platform threads:

Characteristic Virtual Threads Platform Threads
Management Managed by JVM Managed by OS
Resource usage Very lightweight Heavier, limited by OS
Scalability Can create millions Typically limited to thousands
Blocking behavior Automatically yields carrier thread Blocks OS thread
Use case Ideal for I/O-bound tasks Better for CPU-bound tasks
Stack size Grows and shrinks as needed Fixed size
Thread-local variables Should be used cautiously Can be used freely

Virtual threads excel in scenarios with many concurrent, mostly-idle tasks. For example, in a web server handling many simultaneous connections, each connection could be handled by its own virtual thread. This allows for simple, synchronous-style code that scales extremely well.

Here’s an example that demonstrates the scalability difference:

long start = System.currentTimeMillis();

List<FutureTask<Integer>> tasks = new ArrayList<>();
ThreadFactory threadFactory = Thread.ofVirtual().factory();

for (int i = 0; i < 100_000; i++) {
    int taskId = i;
    FutureTask<Integer> task = new FutureTask<>(() -> {
        Thread.sleep(Duration.ofSeconds(1));
        return taskId;
    });
    tasks.add(task);
    threadFactory.newThread(task).start();
}

for (FutureTask<Integer> task : tasks) {
    task.get();
}

long end = System.currentTimeMillis();
System.out.println("Time taken: " + (end - start) + "ms");

This code snippet will execute 100,000 virtual threads, each sleeping for one second and then returning their task ID. When I executed the program, the total execution time was less than 2 seconds:

Time taken: 1713ms

But by changing this line:

ThreadFactory threadFactory = Thread.ofVirtual().factory();

To the following to use platform threads:

ThreadFactory threadFactory = Thread.ofPlatform().factory();

The program ran out of resources. This was the output:

[0.307s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[0.307s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-2021"
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:1526)
	at App.main(App.java:42)

However, it’s worth mentioning that while virtual threads bring many benefits, they don’t change the fundamental principles of concurrent programming. You still need to properly synchronize access to shared mutable state, regardless of whether you’re using virtual or platform threads.

Let’s review some common threading problems next.

Threading Problems

When working with threads, it’s important to be aware of potential problems that can arise due to the complex nature of concurrent programming. These issues can lead to unexpected behavior, reduced performance, or even complete program failure.

In the context of multi-threaded programming, problems start to occur when threads get stuck in a state where they cannot proceed, preventing the program from moving forward. Let’s talk about some of the most common problems.

Deadlock

A deadlock occurs when two or more threads are unable to proceed because each thread is waiting for a resource that another thread holds, resulting in a circular dependency. It’s a situation where threads are permanently blocked, waiting for each other to release the resources they need.

Imagine two friends, Anne and Joe, who are each trying to cross a narrow bridge from opposite ends. The bridge is so narrow that only one person can cross at a time. Anne starts walking from one end, and Joe starts walking from the other end. When they meet in the middle, neither can continue forward, and neither can go back because there’s no space to turn around. They’re stuck in a situation where neither can proceed, and neither can retreat. This deadlock situation halts their progress, similar to how a deadlock in Java halts the execution of threads waiting on each other to release resources.

In the context of multi-threaded programming, resources are typically locks or other synchronization mechanisms used to control access to shared data. Deadlocks occur when the following four conditions are simultaneously met:

  1. Mutual Exclusion: At least one resource must be held in a non-sharable mode, meaning only one thread can use the resource at a time.

  2. Hold and Wait: A thread must be holding at least one resource while waiting to acquire additional resources held by other threads.

  3. No Preemption: Resources cannot be forcibly taken away from a thread; they must be released voluntarily by the thread holding them.

  4. Circular Wait: There must be a circular chain of two or more threads, each waiting for a resource held by the next thread in the chain.

Consider this class that illustrates the deadlock analogy:

public class DeadlockExample {
    private static final Object narrowBridgePart1 = new Object();
    private static final Object narrowBridgePart2 = new Object();

    public static void main(String[] args) {
        Runnable anneTask = () -> {
            synchronized (narrowBridgePart1) {
                System.out.println("Anne: Holding part 1 of the bridge...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Anne: Waiting for part 2 of the bridge...");
                synchronized (narrowBridgePart2) {
                    System.out.println("Anne: Holding part 1 and part 2 of the bridge...");
                }
            }
        };

        Runnable joeTask = () -> {
            synchronized (narrowBridgePart2) {
                System.out.println("Joe: Holding part 2 of the bridge...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Joe: Waiting for part 1 of the bridge...");
                synchronized (narrowBridgePart1) {
                    System.out.println("Joe: Holding part 1 and part 2 of the bridge...");
                }
            }
        };

        // Create and start Anne's thread
        Thread.ofPlatform().start(anneTask);

        // Create and start Joe's thread
        Thread.ofPlatform().start(joeTask);
    }
}

In this example, we have two threads, Anne and Joe, and two locks, narrowBridgePart1 and narrowBridgePart2. The program encounters a deadlock when the following sequence of events occurs:

  1. Anne acquires narrowBridgePart1 and enters the first synchronized block.

  2. Joe acquires narrowBridgePart2 and enters the first synchronized block.

  3. Anne attempts to acquire narrowBridgePart2 in the second synchronized block but is blocked because narrowBridgePart2 is held by Joe.

  4. Joe attempts to acquire narrowBridgePart1 in the second synchronized block but is blocked because narrowBridgePart1 is held by Anne.

At this point, both threads are waiting for each other to release the parts of the bridge they hold, resulting in a deadlock. Anne and Joe are stuck in the middle of the bridge, unable to proceed or retreat, just like the threads in a deadlock situation in Java. The program will hang indefinitely, with no thread being able to proceed.

To avoid deadlocks, it’s important to follow best practices such as:

Starvation

Starvation occurs when a thread is perpetually denied access to a shared resource, preventing it from making progress. In other words, a thread is starved of the resources it needs to complete its task. Starvation can happen when other threads continuously acquire the shared resource, causing the starved thread to wait indefinitely.

Think of a scenario where a group of people is waiting in line to buy tickets for a popular concert. If someone cuts in line repeatedly or if the ticket seller keeps serving only a certain group of people, some individuals may never get a chance to buy tickets. They are essentially starved of the opportunity to make their purchase.

In multi-threaded programs, starvation often arises when threads are assigned different priorities. Java assigns priorities to threads ranging from 1 (lowest) to 10 (highest), with 5 being the default priority. When threads with higher priorities are continuously given preference over threads with lower priorities, the lower-priority threads may suffer from starvation.

Let’s review this class that illustrates the concert ticket analogy:

public class ConcertTicketStarvationExample {
    private static final Object ticketSeller = new Object();

    public static void main(String[] args) {
        Runnable impatientFanTask = () -> {
            while (true) {
                synchronized (ticketSeller) {
                    System.out.println("Impatient Fan: Bought a ticket");
                    // Simulate buying a ticket
                }
            }
        };

        Runnable patientFanTask = () -> {
            while (true) {
                synchronized (ticketSeller) {
                    System.out.println("Patient Fan: Bought a ticket");
                    // Simulate buying a ticket
                }
            }
        };

        // Create and start the impatient fan thread with MAX_PRIORITY
        Thread impatientFan = Thread.ofPlatform()
                                .priority(Thread.MAX_PRIORITY)
                                .start(impatientFanTask);

        // Create and start the patient fan thread with MIN_PRIORITY
        Thread patientFan = Thread.ofPlatform()
                                .priority(Thread.MIN_PRIORITY)
                                .start(patientFanTask);
    }
}

In this example, we have two threads, impatientFan and patientFan, competing for the same lock object, ticketSeller. The threads are assigned different priorities: impatientFan has the maximum priority (10), while patientFan has the minimum priority (1).

When the program runs, impatientFan, having a higher priority, is likely to acquire the lock more frequently than patientFan. As a result, patientFan may starve, waiting for its turn to access the shared resource. The output of the program might show that impatientFan acquires the lock repeatedly, while patientFan doesn’t get a chance to execute as often as impatientFan.

It’s important to note that thread priorities are not guaranteed to be strictly followed by the Java Virtual Machine (JVM). The JVM’s thread scheduler uses priorities as a hint for making scheduling decisions but may not always adhere to them. Nevertheless, assigning proper priorities to threads can help reduce the risk of starvation.

To mitigate starvation, consider the following approaches:

Livelock

Livelock occurs when two or more threads are actively responding to each other’s actions but are unable to make progress. Unlike deadlock, where threads are stuck in a waiting state, threads in a livelock are constantly changing their state in response to the actions of other threads. However, despite the continuous activity, no real progress is made towards completing the intended task.

Imagine a scenario where a husband and wife are sitting at a table with only one spoon to share for their meal. Both are extremely polite and insist that the other should eat first. The husband, holding the spoon, offers it to the wife, but she refuses and insists that he eats first. This back-and-forth continues indefinitely, with neither of them ever eating because they keep offering the spoon to each other.

In the context of multi-threaded programming, livelock often occurs when threads are repeatedly yielding to each other without making any meaningful progress. Livelock can also happen when threads keep retrying an operation that persistently fails due to the actions of other threads.

Consider this program:

public class LivelockExample {

    static class Spoon {
        private Diner owner;

        public Spoon(Diner d) {
            owner = d;
        }

        public Diner getOwner() {
            return owner;
        }

        public synchronized void setOwner(Diner d) {
            owner = d;
        }

        public synchronized void use() {
            System.out.println(owner.name + " is eating.");
        }
    }

    static class Diner {
        private String name;
        private boolean isHungry;

        public Diner(String n) {
            name = n;
            isHungry = true;
        }

        public String getName() {
            return name;
        }

        public boolean isHungry() {
            return isHungry;
        }

        public void eatWith(Spoon spoon, Diner spouse) {
            while (isHungry) {
                if (spoon.getOwner() != this) {
                    try {
                        Thread.sleep(1); // wait for the spoon to be free
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }

                if (spouse.isHungry()) {
                    System.out.println(name + ": " + spouse.getName() + " you eat first.");
                    spoon.setOwner(spouse);
                    continue;
                }

                spoon.use();
                isHungry = false;
                System.out.println(name + ": I am done eating.");
                spoon.setOwner(spouse);
            }
        }
    }

    public static void main(String[] args) {
        Diner husband = new Diner("Husband");
        Diner wife = new Diner("Wife");

        Spoon spoon = new Spoon(husband);

        Thread husbandThread = 
            Thread.ofPlatform().start(() -> husband.eatWith(spoon, wife));
        Thread wifeThread = 
            Thread.ofPlatform().start(() -> wife.eatWith(spoon, husband));
    }
}

This program is kind of complex, so let me walk you through it step by step.

First, we have a Spoon class:

static class Spoon {
    private Diner owner;

    public Spoon(Diner d) {
        owner = d;
    }

    public Diner getOwner() {
        return owner;
    }

    public synchronized void setOwner(Diner d) {
        owner = d;
    }

    public synchronized void use() {
        System.out.println(owner.name + " is eating.");
    }
}

Think of the spoon as a shared resource. This class keeps track of who currently has the spoon. It has a few methods:

Next, we have the Diner class:

static class Diner {
    private String name;
    private boolean isHungry;

    public Diner(String n) {
        name = n;
        isHungry = true;
    }

    public String getName() {
        return name;
    }

    public boolean isHungry() {
        return isHungry;
    }

    public void eatWith(Spoon spoon, Diner spouse) {
        while (isHungry) {
            if (spoon.getOwner() != this) {
                try {
                    Thread.sleep(1); // wait for the spoon to be free
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }

            if (spouse.isHungry()) {
                System.out.println(name + ": " + spouse.getName() + " you eat first.");
                spoon.setOwner(spouse);
                continue;
            }

            spoon.use();
            isHungry = false;
            System.out.println(name + ": I am done eating.");
            spoon.setOwner(spouse);
        }
    }
}

It represents each person who wants to eat. Each diner has a name and a flag to indicate if they are hungry. The key part of this class is the eatWith method, which is where the livelock happens. This method does the following:

  1. It checks if the diner owns the spoon.

  2. If they don’t, they wait a little and check again.

  3. If they do own the spoon, they check if their spouse is hungry.

  4. If the spouse is hungry, they offer the spoon to the spouse and wait.

  5. If the spouse is not hungry, they use the spoon to eat and then stop being hungry.

In the main method, we create two Diner objects: husband and wife. We also create one Spoon object and give it to the husband initially. Then we start two threads, one for each diner. Each thread runs the eatWith method for their respective diner, trying to use the spoon:

public static void main(String[] args) {
    Diner husband = new Diner("Husband");
    Diner wife = new Diner("Wife");

    Spoon spoon = new Spoon(husband);

    Thread husbandThread = 
        Thread.ofPlatform().start(() -> husband.eatWith(spoon, wife));
    Thread wifeThread = 
        Thread.ofPlatform().start(() -> wife.eatWith(spoon, husband));
}

When the program runs, both the husband and wife are trying to eat using the spoon. Here’s what happens step by step:

  1. The husband starts with the spoon.

  2. The husband checks if the wife is hungry (she is), so he offers the spoon to her.

  3. The wife now has the spoon. She checks if the husband is hungry (he is), so she offers the spoon back to him.

  4. This process repeats endlessly, with both the husband and wife constantly offering the spoon to each other without either of them ever eating.

This continuous back-and-forth without making any progress is a livelock. Both threads (the husband and wife) are active and continuously changing their state, but they are not able to proceed with eating because they keep deferring to each other.

To resolve livelocks, consider the following approaches:

Race Conditions

Race conditions occur when multiple threads access shared data concurrently, and the final outcome depends on the relative timing of their executions. In other words, the behavior of the program becomes unpredictable and inconsistent because the threads race each other to perform operations on the shared data. Race conditions can lead to incorrect results, data corruption, and unexpected program behavior.

Imagine a scenario where two people, Anne and Joe, have a joint bank account. They both independently decide to withdraw money from an ATM at the same time. Suppose the account initially has a balance of $100. Anne tries to withdraw $50, while Joe tries to withdraw $70. If the ATM processes their requests concurrently without proper synchronization, the outcomes become unpredictable. The final balance could be $50, $30, or even negative $20, depending on the order in which the withdrawals are processed.

In multi-threaded programs, race conditions typically arise when multiple threads access shared variables or resources without appropriate synchronization mechanisms. The threads may read and write the shared data simultaneously, leading to inconsistent or unexpected results.

The following class simulates the race condition described in the analogy:

public class BankAccount {
    private int balance;

    public BankAccount(int initialBalance) {
        this.balance = initialBalance;
    }

    public void withdraw(String name, int amount) {
        if (balance >= amount) {
            System.out.println(name + " is going to withdraw " + amount);
            try {
                // Simulate the time taken to process withdrawal
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            balance -= amount;
            System.out.println(name + " completed the withdrawal of " + amount);
        } else {
            System.out.println(name + " tried to withdraw " + amount + " but insufficient balance.");
        }
        System.out.println("Current balance: " + balance);
    }

    public static void main(String[] args) {
        BankAccount account = new BankAccount(100);

        Runnable anneWithdrawal = () -> {
            account.withdraw("Anne", 50);
        };

        Runnable joeWithdrawal = () -> {
            account.withdraw("Joe", 70);
        };

        Thread anneThread = Thread.ofPlatform().unstarted(anneWithdrawal);
        Thread joeThread = Thread.ofPlatform().unstarted(joeWithdrawal);

        anneThread.start();
        joeThread.start();

        try {
            anneThread.join();
            joeThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final balance: " + account.balance);
    }
}

The class uses threads to represent Anne and Joe withdrawing money from a shared bank account:

  1. The BankAccount class has a balance that both Anne and Joe will try to withdraw from.

  2. The withdraw method checks if there is enough balance, simulates the processing time with Thread.sleep(100), and then deducts the amount from the balance.

  3. The main method creates a BankAccount instance with an initial balance of $100.

  4. It defines two Runnable tasks for Anne and Joe, each trying to withdraw money.

  5. Two threads are created and started to simulate concurrent withdrawals.

  6. The join method ensures the main thread waits for both withdrawal operations to complete before printing the final balance.

Running this code multiple times can produce negative final balances, illustrating the race condition caused by unsynchronized access to the shared balance variable.

To prevent race conditions, it’s essential to use synchronization mechanisms that ensure exclusive access to shared resources. Some common techniques include:

It’s important to note that while synchronization is necessary to prevent race conditions, excessive synchronization can lead to performance overhead and potential liveness issues like deadlocks. Therefore, it’s important to strike a balance and synchronize only when necessary, using granular locks and minimizing the scope of synchronized regions.

In general, identifying and resolving threading problems requires careful analysis and understanding of the program’s behavior. By being aware of issues like deadlocks, starvation, livelocks, and race conditions, you can design and implement thread-safe code in concurrent programs.

In the next section, we’ll explore techniques for synchronizing access to shared resources and coordinating thread execution to prevent these common problems.

Writing Thread-Safe Code

When developing multi-threaded applications, it’s important to ensure that the code is thread-safe.

Thread-safety is the property of a program or a piece of code that guarantees its correct execution in a multi-threaded environment. A thread-safe code ensures that the shared data remains consistent and the program produces the expected output, regardless of the interleaving or timing of thread execution.

To achieve thread-safety, we need to address two main concerns:

  1. Data Visibility: Ensuring that changes made by one thread are visible to other threads.

  2. Data Consistency: Maintaining the integrity and correctness of shared data when multiple threads access and modify it concurrently.

Java provides several mechanisms to tackle these concerns and facilitate thread-safe programming:

┌───────────────────────────────────────────────────────────┐
│             Thread-Safety Mechanisms                      │
│                                                           │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │  volatile   │    │   Atomic     │    │ synchronized │  │
│  │             │    │   Classes    │    │              │  │
│  │ Visibility  │    │ Atomicity of │    │  Exclusivity │  │
│  │ guarantee   │    │  operations  │    │ of execution │  │
│  └─────────────┘    └──────────────┘    └──────────────┘  │
│                                                           │
│  ┌─────────────┐    ┌───────────────┐    ┌─────────────┐  │
│  │    Lock     │    │   Cyclic      │    │  Concurrent │  │
│  │  Interface  │    │   Barrier     │    │ Collections │  │
│  │             │    │               │    │             │  │
│  │ Fine-grained│    │Synchronization│    │ Thread-safe │  │
│  │   control   │    │     point     │    │ data struct │  │
│  └─────────────┘    └───────────────┘    └─────────────┘  │
│                                                           │
└───────────────────────────────────────────────────────────┘

Let’s explore them in more detail.

Accessing Data with volatile

The volatile keyword in Java is used to indicate that a variable may be modified by multiple threads concurrently. When a variable is declared as volatile, it guarantees that any write to that variable will be immediately visible to other threads, and any subsequent read will always see the most up-to-date value.

Here’s an example:

public class VolatileExample {
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread thread1 = Thread.ofVirtual().unstarted(() -> {
            while (!flag) {
                // Wait for the flag to become true
            }
            System.out.println("Thread 1 finished");
        });

        Thread thread2 = Thread.ofVirtual().unstarted(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag = true;
            System.out.println("Thread 2 set the flag");
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

In this example, we have a volatile variable named flag. thread1 continuously checks the value of flag and waits for it to become true. thread2 sleeps for a second and then sets flag to true.

By declaring flag as volatile, we ensure that when thread2 modifies its value, the change is immediately visible to thread1. This guarantees that thread1 will see the updated value and exit the waiting loop.

However, it’s important to note that volatile only ensures visibility and does not provide atomicity or mutual exclusion. If multiple threads perform compound operations (such as read-modify-write) on a volatile variable concurrently, it can still lead to race conditions. In such cases, additional synchronization mechanisms are required.

Protecting Data with Atomic Classes

Java provides a set of atomic classes in the java.util.concurrent.atomic package that offer thread-safe operations on single variables. These classes ensure that the operations performed on the variables are atomic, meaning they are executed as a single, indivisible unit of work.

Some commonly used atomic classes are:

Here’s an example using AtomicInteger:

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                count.incrementAndGet();
            }
        });

        Thread thread2 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                count.incrementAndGet();
            }
        });

        thread1.join();
        thread2.join();

        System.out.println("Final count: " + count.get());
    }
}

In this example, we have two threads that increment the count variable 1000 times each. The count variable is an instance of AtomicInteger, which provides thread-safe operations for incrementing and retrieving its value.

By using AtomicInteger, we ensure that the increment operation is performed atomically, avoiding race conditions. The incrementAndGet() method atomically increments the value and returns the updated value. The get() method retrieves the current value of the AtomicInteger.

Atomic classes provide various methods for performing thread-safe operations on variables. Some common methods include:

Here’s an example that demonstrates the usage of these methods:

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicMethodsExample {
    private static AtomicInteger value = new AtomicInteger(0);

    public static void main(String[] args) {
        // get(): Returns the current value
        int currentValue = value.get();
        System.out.println("Current value: " + currentValue);

        // set(type newValue): Sets the value to newValue
        value.set(10);
        System.out.println("Value after set(10): " + value.get());

        // getAndSet(type newValue): Sets the value to newValue and returns the previous value
        int previousValue = value.getAndSet(20);
        System.out.println("Previous value: " + previousValue);
        System.out.println("Value after getAndSet(20): " + value.get());

        // incrementAndGet(): Atomically increments the value by one and returns the updated value
        int incrementedValue = value.incrementAndGet();
        System.out.println("Value after incrementAndGet(): " + incrementedValue);

        // getAndIncrement(): Atomically increments the value by one and returns the previous value
        previousValue = value.getAndIncrement();
        System.out.println("Previous value: " + previousValue);
        System.out.println("Value after getAndIncrement(): " + value.get());

        // decrementAndGet(): Atomically decrements the value by one and returns the updated value
        int decrementedValue = value.decrementAndGet();
        System.out.println("Value after decrementAndGet(): " + decrementedValue);

        // getAndDecrement(): Atomically decrements the value by one and returns the previous value
        previousValue = value.getAndDecrement();
        System.out.println("Previous value: " + previousValue);
        System.out.println("Value after getAndDecrement(): " + value.get());
    }
}

This is the output of the program:

Current value: 0
Value after set(10): 10
Previous value: 10
Value after getAndSet(20): 20
Value after incrementAndGet(): 21
Previous value: 21
Value after getAndIncrement(): 22
Value after decrementAndGet(): 21
Previous value: 21
Value after getAndDecrement(): 20

This example demonstrates the usage of the various methods provided by the AtomicInteger class:

  1. get(): Retrieves the current value of the AtomicInteger using get() and print it.

  2. set(type newValue): Sets the value of the AtomicInteger to 10 using set(10) and then print the updated value.

  3. getAndSet(type newValue): Sets the value of the AtomicInteger to 20 using getAndSet(20). This method returns the previous value, which we store in the previousValue variable and print. We also print the updated value after the operation.

  4. incrementAndGet(): Atomically increments the value of the AtomicInteger by one using incrementAndGet(). This method returns the updated value after the increment, which we store in the incrementedValue variable and print.

  5. getAndIncrement(): Atomically increments the value of the AtomicInteger by one using getAndIncrement(). This method returns the previous value before the increment, which we store in the previousValue variable and print. We also print the updated value after the operation.

  6. decrementAndGet(): Atomically decrements the value of the AtomicInteger by one using decrementAndGet(). This method returns the updated value after the decrement, which we store in the decrementedValue variable and print.

  7. getAndDecrement(): Atomically decrements the value of the AtomicInteger by one using getAndDecrement(). This method returns the previous value before the decrement, which we store in the previousValue variable and print. We also print the updated value after the operation.

You can use similar methods for other atomic classes like AtomicLong, AtomicBoolean, etc., depending on the type of variable you need to work with.

Synchronized Blocks

In Java, the synchronized keyword is used to achieve mutual exclusion and synchronize access to shared resources. When a block of code is marked as synchronized, only one thread can execute that block at a time, while other threads attempting to enter the synchronized block will be blocked until the lock is released.

The general syntax for using a synchronized block is as follows:

synchronized (lockObject) {
    // Code block that requires synchronization
}

Here, lockObject is an object that serves as the lock. The thread that enters the synchronized block must acquire the lock on lockObject before executing the code inside the block. Once the thread exits the synchronized block, it automatically releases the lock, allowing other threads to acquire it and enter the block.

Here’s an example that demonstrates the usage of a synchronized block:

public class SynchronizedExample {
    private static int count = 0;
    private static final Object lock = new Object();

    public static void increment() {
        synchronized (lock) {
            count++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        Thread thread2 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        thread1.join();
        thread2.join();

        System.out.println("Final count: " + count);
    }
}

In this example, we have a shared variable count that needs to be incremented by multiple threads. To ensure thread-safety, we use a synchronized block inside the increment() method. The lock object serves as the lock for synchronization.

When a thread enters the increment() method, it acquires the lock on lock before entering the synchronized block. Once inside the block, the thread increments the count variable. After exiting the block, the lock is automatically released, allowing other threads to acquire it and enter the block.

By synchronizing access to the count variable using a synchronized block, we ensure that only one thread can increment the variable at a time, preventing race conditions and maintaining data consistency.

Synchronizing on Methods

In addition to using synchronized blocks, Java allows you to synchronize entire methods using the synchronized keyword. The lock associated with the method depends on whether the method is an instance method or a static method.

For instance methods, the lock is associated with the object on which the method is invoked. Each instance of the class has its own lock, so multiple threads can simultaneously execute synchronized instance methods on different instances of the class.

On the other hand, for static methods, the lock is associated with the class itself, rather than any specific instance. Since there is only one class object per JVM, only one thread can execute a synchronized static method in the class at a time, regardless of the number of instances of that class.

Here’s an example of synchronizing a method:

public class SynchronizedMethodExample {
    private static int count = 0;

    public static synchronized void increment() {
        count++;
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        Thread thread2 = Thread.ofPlatform().start(() -> {
            for (int i = 0; i < 1000; i++) {
                increment();
            }
        });

        thread1.join();
        thread2.join();

        System.out.println("Final count: " + count);
    }
}

In this example, the increment() method is declared as synchronized. When a thread invokes the increment() method, it automatically acquires the lock associated with the object on which the method is called (in this case, the class itself since the method is static).

Only one thread can execute the increment() method at a time, while other threads attempting to invoke the method will be blocked until the lock is released. This ensures that the count variable is incremented atomically and avoids race conditions.

It’s important to note that synchronizing static methods can potentially lead to reduced concurrency since there is only one lock associated with the entire class. If multiple threads need to access different shared resources within the class, synchronizing at the method level may be too coarse-grained, and using synchronized blocks or more fine-grained locking mechanisms might be more appropriate.

Synchronizing methods provides a cleaner and more concise way of achieving thread-safety compared to using synchronized blocks. However, it’s important to note that synchronizing an entire method can potentially lead to reduced concurrency if the method contains code that doesn’t require synchronization.

In general, it’s recommended to synchronize only the critical sections of code that access shared resources, using synchronized blocks or methods judiciously to strike a balance between thread-safety and performance.

This is particularly important with virtual threads. Remember, there’s a limitation when it comes to using synchronized blocks or methods with virtual threads.

When a virtual thread performs a blocking operation (like I/O) inside a synchronized block or method, it causes the virtual thread scheduler to block an operating system thread. This situation is called pinning. Normally, outside of a synchronized context, the virtual thread would not block an OS thread during such operations.

Pinning can negatively impact server throughput if the blocking operation takes a long time and occurs frequently. However, using synchronized for short-lived or infrequent operations shouldn’t cause problems.

If you detect frequent and long-lived pinning the recommendation is to replace synchronized blocks with a lock (like ReentrantLock) in those specific areas.

The Lock Interface

Java provides the Lock interface in the java.util.concurrent.locks package as an alternative to the synchronized keyword. The Lock interface offers more flexibility and control over lock acquisition and release compared to the implicit locking mechanism of synchronized.

The main methods provided by the Lock interface are:

  1. void lock(): Acquires the lock, blocking until the lock is available.

  2. void unlock(): Releases the lock. Always call unlock() in a finally block to ensure proper lock release.

  3. boolean tryLock(): Attempts to acquire the lock without blocking. Returns true if the lock is acquired, false otherwise.

  4. boolean tryLock(long time, TimeUnit unit): Attempts to acquire the lock while blocking for a specified amount of time. Returns true if the lock is acquired within the specified time, false otherwise.

  5. Condition newCondition(): Creates a new Condition instance associated with the lock for coordinating thread execution based on conditions.

The java.util.concurrent.locks package provides a few implementations of the Lock interface, including:

To use a Lock, follow these steps:

  1. Create an instance of the desired Lock implementation.

  2. Acquire the lock using lock(), tryLock(), or tryLock(long time, TimeUnit unit).

  3. Perform the critical section operations while holding the lock.

  4. Release the lock using unlock() in a finally block.

Here’s an example of using a ReentrantLock:

Lock lock = new ReentrantLock();
try {
    lock.lock();
    // Critical section
} finally {
    lock.unlock();
}

The Lock interface provides additional features compared to synchronized, such as:

Lock lock = new ReentrantLock();
if (lock.tryLock()) {
    try {
        // Critical section
    } finally {
        lock.unlock();
    }
} else {
    // Lock not acquired, perform alternative actions
}
Lock lock = new ReentrantLock();
try {
    if (lock.tryLock(1, TimeUnit.SECONDS)) {
        try {
            // Critical section
        } finally {
            lock.unlock();
        }
    } else {
        // Lock not acquired within the specified time
    }
} catch (InterruptedException e) {
    // Handle interruption
}
Lock lock = new ReentrantLock(true); // Creating a fair lock
try {
    lock.lock();
    // Critical section
} finally {
    lock.unlock();
}

The ReentrantLock class is the most common implementation of the Lock interface. It provides explicit lock acquisition and release, exception handling for incorrect lock usage, and lock reentrancy.

Here’s an example comparing synchronized and ReentrantLock:

// Using synchronized
synchronized (lock) {
    // Critical section
}

// Using ReentrantLock
Lock lock = new ReentrantLock();
try {
    lock.lock();
    // Critical section
} finally {
    lock.unlock();
}

The ReentrantReadWriteLock.ReadLock and ReentrantReadWriteLock.WriteLock classes provide a way to handle concurrent read and write access to a shared resource. Here’s a simplified example:

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
    private int sharedResource = 0;

    public void write(int value) {
        writeLock.lock();
        try {
            sharedResource = value;
            System.out.println("Written: " + value);
        } finally {
            writeLock.unlock();
        }
    }

    public void read() {
        readLock.lock();
        try {
            System.out.println("Read: " + sharedResource);
        } finally {
            readLock.unlock();
        }
    }

    public static void main(String[] args) {
        ReadWriteLockExample example = new ReadWriteLockExample();

        Thread writer = Thread.ofPlatform().start(() -> {
            example.write(42);
        });

        Thread reader = Thread.ofPlatform().start(() -> {
            example.read();
        });

        try {
            writer.join();
            reader.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

As you can see, the Lock interface provides more advanced features and control compared to the synchronized keyword, allowing for fine-grained locking, non-blocking lock attempts, and fairness control. However, it also requires explicit management of lock acquisition and release, which can be error-prone if not handled properly.

The CyclicBarrier Class

In concurrent programming, there are scenarios where multiple threads need to work together and synchronize their progress at certain points. The java.util.concurrent.CyclicBarrier class provides a synchronization aid that allows a set of threads to wait for each other to reach a common barrier point before proceeding further.

The CyclicBarrier class is designed to facilitate coordination between a fixed number of threads. It is particularly useful when you have a group of threads that need to perform tasks in parallel and then wait for each other to finish before moving on to the next stage.

Here’s how the CyclicBarrier works:

  1. When creating a CyclicBarrier, you specify the number of threads that need to reach the barrier before they can all proceed.

  2. Each thread performs its task and then calls the await() method on the CyclicBarrier to indicate that it has reached the barrier.

  3. The thread calling await() is blocked until all the specified number of threads have reached the barrier.

  4. Once all threads have reached the barrier, the barrier is released, and all threads can proceed.

  5. If desired, you can specify a barrier action, which is a Runnable task that is executed by one of the threads after all threads have reached the barrier but before they are released.

Here’s a simple example that demonstrates the usage of CyclicBarrier:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    private static final int NUM_THREADS = 3;

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> {
            System.out.println("All threads reached the barrier");
        });

        for (int i = 0; i < NUM_THREADS; i++) {
            final int threadId = i;
            Thread.ofPlatform().start(() -> {
                try {
                    System.out.println("Thread " + threadId + " is performing task");
                    Thread.sleep(1000); // Simulating task execution
                    System.out.println("Thread " + threadId + " reached the barrier");
                    barrier.await();
                    System.out.println("Thread " + threadId + " continued after the barrier");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

In this example, we create a CyclicBarrier with a count of NUM_THREADS (3 in this case). We also specify a barrier action that will be executed once all threads have reached the barrier.

We then start three threads, each performing a task (simulated by sleeping for a short duration). After completing its task, each thread calls await() on the barrier to indicate that it has reached the synchronization point.

The output of the program will be similar to the following:

Thread 0 is performing task
Thread 1 is performing task
Thread 2 is performing task
Thread 1 reached the barrier
Thread 0 reached the barrier
Thread 2 reached the barrier
All threads reached the barrier
Thread 2 continued after the barrier
Thread 1 continued after the barrier
Thread 0 continued after the barrier

As you can see, all threads perform their tasks concurrently. Once all threads have reached the barrier, the barrier action is executed, and then all threads proceed further.

The CyclicBarrier is called cyclic because it can be reused after all threads have passed the barrier. You can call await() again on the same barrier object, and it will wait for the specified number of threads to reach the barrier again.

It’s important to note that if any thread leaves the barrier prematurely by interrupting itself or throwing an exception, all other threads waiting on the barrier will receive a BrokenBarrierException. In such cases, you need to handle the exception appropriately and decide whether to continue or terminate the execution.

The Concurrency API

Java provides a powerful and flexible Concurrency API in the java.util.concurrent package, which offers a wide range of classes and interfaces for managing concurrent operations. This API simplifies the development of concurrent applications by providing high-level abstractions and utilities for managing threads, coordinating tasks, and synchronizing access to shared resources.

The Concurrency API was introduced in Java 5 and has been continuously enhanced in subsequent versions. It includes several key components, such as:

  1. Executors: The Executor and ExecutorService interfaces provide a way to manage the execution of tasks in a thread pool, allowing you to focus on defining the tasks rather than managing the threads directly.

  2. Concurrent Collections: The java.util.concurrent package offers thread-safe collections, such as ConcurrentHashMap, CopyOnWriteArrayList, and BlockingQueue, which provide better performance and scalability compared to using synchronized collections.

  3. Synchronizers: Classes like CountDownLatch, CyclicBarrier, Semaphore, and Phaser help coordinate the actions of multiple threads, allowing them to wait for each other or control access to shared resources.

  4. Locks: The Lock interface and its implementations, provide more advanced locking mechanisms compared to the synchronized keyword.

  5. Atomic Variables: The java.util.concurrent.atomic package provides atomic variables, such as AtomicInteger and AtomicReference, which offer thread-safe operations on single variables without the need for explicit synchronization.

These components work together to provide a comprehensive framework for building concurrent and parallel applications in Java.

In previous sections, we have covered atomic variables, locks, and CyclicBarrier. In this section, we are going to focus on executors.

The ExecutorService Interface

The ExecutorService interface is a central part of the Concurrency API and extends the Executor interface. It provides methods for submitting tasks for execution and managing the lifecycle of the underlying thread pool.

A thread pool is a collection of pre-created and reusable threads that are ready to perform tasks. It acts as a pool of worker threads that can be used to execute tasks concurrently.

Imagine you have a big task that needs to be done, like painting a house. You could do it all by yourself, but it would take a long time. Instead, you decide to hire a group of workers to help paint the house. These workers are like a thread pool. When you have a task that needs to be executed, such as painting a room, you assign it to one of the workers in the pool. The worker takes the task, performs it, and when finished, returns to the pool, ready to take on another task.

The advantage of using a thread pool is that you don’t have to create a new worker (thread) every time you have a task to execute. Creating a new thread for each task can be expensive in terms of time and resources. Instead, you have a pre-created pool of workers (threads) that are ready to take on tasks as they come in. The thread pool manages the lifecycle of the threads, meaning it creates the threads when the pool is initialized and destroys them when the pool is shut down. It also handles the allocation of tasks to the available threads in the pool.

Here’s an example of creating an ExecutorService using the Executors factory class:

ExecutorService executorService = Executors.newFixedThreadPool(5);

In this case, we create a fixed thread pool with 5 threads using the Executors.newFixedThreadPool() method.

The primary methods of the ExecutorService interface include:

The ExecutorService interface also provides methods for managing the lifecycle of the thread pool:

It’s important to properly shut down an ExecutorService when it’s no longer needed to allow graceful termination of the threads and to release any resources held by the thread pool.

Here’s an example showing the use of these methods:

ExecutorService executorService = Executors.newFixedThreadPool(5);

// Submit tasks for execution
executorService.execute(() -> {
    System.out.println("Task 1 executed by " + Thread.currentThread().getName());
});
executorService.execute(() -> {
    System.out.println("Task 2 executed by " + Thread.currentThread().getName());
});

// Initiate orderly shutdown
executorService.shutdown();

// Check if the ExecutorService has been shut down
boolean isShutdown = executorService.isShutdown();
System.out.println("ExecutorService is shut down: " + isShutdown);

// Wait for all tasks to complete and check if the ExecutorService has terminated
try {
    boolean isTerminated = executorService.awaitTermination(1, TimeUnit.MINUTES);
    System.out.println("ExecutorService is terminated: " + isTerminated);
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}

In this example, we create an ExecutorService, submit tasks for execution using the execute() method, initiate an orderly shutdown using shutdown(), and check the status of the ExecutorService using isShutdown() and isTerminated() methods.

However, it’s worth noting that since Java 19, ExecutorService extends the AutoCloseable interface. This allows us to use a try-with-resources block, which automatically calls the close() method at the end of the try block. So the above example can be rewritten as follows:

try (ExecutorService executorService = Executors.newFixedThreadPool(5)) {
    // Submit tasks for execution
    executorService.execute(() -> {
        System.out.println("Task 1 executed by " + Thread.currentThread().getName());
    });
    executorService.execute(() -> {
        System.out.println("Task 2 executed by " + Thread.currentThread().getName());
    });
} // ExecutorService.close() is called automatically here, which calls shutdown()

This change simplifies the use of ExecutorService instances and helps prevent resource leaks by ensuring that the executor is properly shut down, even if an exception occurs.

Submitting Tasks

The ExecutorService interface provides several methods for submitting tasks for execution:

In addition to submitting individual tasks, the ExecutorService interface also provides methods for submitting multiple tasks at once:

These methods allow you to submit multiple tasks concurrently and retrieve their results using the Future interface.

Consider the following example:

try (ExecutorService executorService = Executors.newFixedThreadPool(5)) {
    // Submit a Runnable task using execute()
    executorService.execute(() -> {
        System.out.println("Task executed by " + Thread.currentThread().getName());
    });

    // Submit a Callable task using submit()
    Future<String> future = executorService.submit(() -> {
        // Perform some computation
        return "Result of the task";
    });

    // Submit multiple Callable tasks using invokeAll()
    List<Callable<Integer>> tasks = Arrays.asList(
            () -> 1,
            () -> 2,
            () -> 3
    );
    List<Future<Integer>> futures = executorService.invokeAll(tasks);

    // Submit multiple Callable tasks using invokeAny()
    Integer result = executorService.invokeAny(tasks);
} catch (Exception e) {
    e.printStackTrace();
}

This example demonstrates submitting tasks using execute() for a Runnable task, submit() for a Callable task, invokeAll() for submitting multiple Callable tasks and retrieving their results as a list of Future objects, and invokeAny() for submitting multiple Callable tasks and retrieving the result of one of the completed tasks.

When submitting tasks using the submit() or invokeAll()/invokeAny() methods, you receive Future objects representing the pending results of the tasks. The Future interface provides methods to check the status of a task and retrieve its result:

These methods allow you to synchronize the main thread with the completion of the submitted tasks and retrieve their results when needed.

Consider the following example:

try (ExecutorService executorService = Executors.newSingleThreadExecutor()) {
    // Submit a Callable task using submit()
    Future<String> future = executorService.submit(() -> {
        // Simulate a long-running task
        Thread.sleep(2000);
        return "Result of the task";
    });

    // Check if the task is done
    boolean isDone = future.isDone();
    System.out.println("Task is done: " + isDone);

    // Cancel the task
    boolean isCancelled = future.cancel(true);
    System.out.println("Task is cancelled: " + isCancelled);

    // Retrieve the result of the task
    String result = null;
    try {
        result = future.get(1, TimeUnit.SECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        e.printStackTrace();
    }
    System.out.println("Result: " + result);
}

In this example, we submit a Callable task using submit(), check if the task is done using isDone(), attempt to cancel the task using cancel(), and retrieve the task’s result using get() with a timeout. If the task completes within the specified timeout, the result is obtained. Otherwise, a TimeoutException is thrown.

The Callable Interface

As you have seen from the previous examples, the Callable interface is similar to the Runnable interface but with a few key differences. While Runnable represents a task that can be executed concurrently, Callable represents a task that returns a result and that may throw an exception.

Here’s the declaration of the Callable interface:

public interface Callable<V> {
    V call() throws Exception;
}

The Callable interface has a single method, call(), which returns a value of type V and may throw an exception. This is in contrast to the Runnable interface, which has a void run() method that does not return a value or throw checked exceptions.

The main differences between Callable and Runnable are:

  1. Return Value: Callable tasks can return a result, whereas Runnable tasks cannot. The call() method of Callable returns a value of the specified type V, while the run() method of Runnable is void and does not return a value.

  2. Exception Handling: Callable tasks can throw checked exceptions, whereas Runnable tasks cannot. The call() method of Callable declares that it may throw an Exception, while the run() method of Runnable does not declare any checked exceptions.

Here’s an example that demonstrates the usage of Callable:

try (ExecutorService executorService = Executors.newSingleThreadExecutor()) {
    // Create a Callable task
    Callable<Integer> task = () -> {
        // Perform some computation
        int result = 0;
        for (int i = 1; i <= 10; i++) {
            result += i;
        }
        return result;
    };

    // Submit the Callable task to the ExecutorService
    Future<Integer> future = executorService.submit(task);

    // Retrieve the result of the task
    try {
        Integer result = future.get();
        System.out.println("Result: " + result); // Prints 55
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

In this example, we create a Callable task that performs a simple computation and returns the result. We submit the task to the ExecutorService using the submit() method, which returns a Future object representing the pending result. We then use the get() method of Future to retrieve the result of the task. If the task throws an exception, the exception is wrapped in an ExecutionException, but an InterruptedException can also be thrown if the current thread was interrupted while waiting.

The choice between using Callable and Runnable depends on whether you need to return a result from the task and handle checked exceptions. If your task does not need to return a value and does not throw checked exceptions, you can use Runnable. However, if your task needs to return a result or throws checked exceptions, you should use Callable.

Scheduling Tasks

In addition to executing tasks immediately, the Concurrency API provides the ability to schedule tasks for execution at a later time or to execute tasks repeatedly with a fixed delay or at a fixed rate. This functionality is provided by the ScheduledExecutorService interface, which extends the ExecutorService interface.

The ScheduledExecutorService interface provides the following methods for scheduling tasks:

  1. schedule(Runnable command, long delay, TimeUnit unit): Schedules a Runnable task to be executed after the specified delay, expressed in the given TimeUnit.

  2. schedule(Callable<V> callable, long delay, TimeUnit unit): Schedules a Callable task to be executed after the specified delay, expressed in the given TimeUnit, and returns a ScheduledFuture representing the pending result.

  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): Schedules a Runnable task to be executed periodically, with a fixed time interval between the end of one execution and the start of the next. The initialDelay parameter specifies the delay before the first execution, and the period parameter specifies the fixed time interval between executions.

  4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): Schedules a Runnable task to be executed repeatedly, with a fixed delay between the end of one execution and the start of the next. The initialDelay parameter specifies the delay before the first execution, and the delay parameter specifies the fixed delay between executions.

Here’s an example that demonstrates the usage of ScheduledExecutorService methods:

try (ScheduledExecutorService scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor()) {
    // Schedule a task to run after a delay of 2 seconds
    scheduledExecutorService.schedule(() -> {
        System.out.println("Task executed after 2 seconds delay");
    }, 2, TimeUnit.SECONDS);

    // Schedule a task to run repeatedly at a fixed rate of 1 second
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        System.out.println("Task executed at fixed rate");
    }, 0, 1, TimeUnit.SECONDS);

    // Schedule a task to run repeatedly with a fixed delay of 500 milliseconds
    scheduledExecutorService.scheduleWithFixedDelay(() -> {
        System.out.println("Task executed with fixed delay");
    }, 0, 500, TimeUnit.MILLISECONDS);

    // Keep the main thread alive for 5 seconds
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

In this example, we create a ScheduledExecutorService using the Executors.newSingleThreadScheduledExecutor() method. We then demonstrate the usage of the schedule(), scheduleAtFixedRate(), and scheduleWithFixedDelay() methods.

The schedule() method is used to schedule a task to run after a delay of 2 seconds. The scheduleAtFixedRate() method is used to schedule a task to run repeatedly at a fixed rate of 1 second, meaning that the next execution will start exactly 1 second after the previous execution starts, regardless of how long the task takes to complete. The scheduleWithFixedDelay() method is used to schedule a task to run repeatedly with a fixed delay of 500 milliseconds between the end of one execution and the start of the next.

It’s important to note that the ScheduledExecutorService does not automatically terminate after the scheduled tasks are executed. You need to explicitly shut it down using the shutdown() method once you no longer need it.

Remember that the ScheduledExecutorService uses a limited number of threads to execute the scheduled tasks, so it’s essential to choose the appropriate execution method based on your requirements and ensure the scheduled tasks do not overwhelm the available resources.

Executors Factory Methods

Throughout this section, we have used various factory methods provided by the Executors class to create instances of ExecutorService and ScheduledExecutorService. The Executors class is a utility class that offers several static factory methods for creating different types of thread pools and executor services.

Here’s an overview of the commonly used factory methods provided by the Executors class:

  1. ExecutorService newSingleThreadExecutor(): Creates an ExecutorService that uses a single worker thread to execute tasks. Tasks are guaranteed to be executed sequentially, and no more than one task will be active at any given time.

  2. ScheduledExecutorService newSingleThreadScheduledExecutor(): Creates a single-threaded ScheduledExecutorService that can schedule tasks to run after a given delay or to execute periodically.

  3. ExecutorService newCachedThreadPool(): Creates a thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. Idle threads are kept in the pool for 60 seconds before being terminated and removed from the pool.

  4. ExecutorService newFixedThreadPool(int nThreads): Creates a thread pool with a fixed number of threads. The nThreads parameter specifies the number of threads in the pool. If additional tasks are submitted when all threads are active, they will wait in a queue until a thread becomes available.

  5. ScheduledExecutorService newScheduledThreadPool(int corePoolSize): Creates a thread pool that can schedule tasks to run after a given delay or to execute periodically. The corePoolSize parameter specifies the number of threads to keep in the pool, even if they are idle.

Here are code examples demonstrating the usage of each factory method:

// newSingleThreadExecutor()
try (ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor()) {
    singleThreadExecutor.submit(() -> {
        System.out.println("Task executed by single thread");
    });
}

// newSingleThreadScheduledExecutor()
try (ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor()) {
    singleThreadScheduledExecutor.schedule(() -> {
        System.out.println("Task scheduled by single thread scheduled executor");
    }, 2, TimeUnit.SECONDS);
}

// newCachedThreadPool()
try (ExecutorService cachedThreadPool = Executors.newCachedThreadPool()) {
    for (int i = 0; i < 5; i++) {
        cachedThreadPool.submit(() -> {
            System.out.println("Task executed by cached thread pool");
        });
    }
}

// newFixedThreadPool(int nThreads)
try (ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3)) {
    for (int i = 0; i < 10; i++) {
        fixedThreadPool.submit(() -> {
            System.out.println("Task executed by fixed thread pool");
        });
    }
}

// newScheduledThreadPool(int corePoolSize)
try (ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2)) {
    scheduledThreadPool.scheduleAtFixedRate(() -> {
        System.out.println("Task scheduled by scheduled thread pool");
    }, 0, 1, TimeUnit.SECONDS);

    // Keep the main thread alive for 3 seconds
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

In these examples, we create different types of executor services using the respective factory methods provided by the Executors class.

The newSingleThreadExecutor() method creates an ExecutorService with a single worker thread, ensuring that tasks are executed sequentially. The newSingleThreadScheduledExecutor() method creates a single-threaded ScheduledExecutorService for scheduling tasks with delays or periodic execution.

The newCachedThreadPool() method creates a thread pool that creates new threads as needed and reuses idle threads. The newFixedThreadPool(int nThreads) method creates a thread pool with a fixed number of threads specified by the nThreads parameter.

The newScheduledThreadPool(int corePoolSize) method creates a ScheduledExecutorService with a fixed number of threads specified by the corePoolSize parameter. It allows scheduling tasks with delays or periodic execution.

These factory methods provide convenient ways to create different types of executor services based on specific requirements. They encapsulate the complexities of thread creation, management, and termination, allowing developers to focus on defining and submitting tasks.

It’s important to choose the appropriate factory method based on your application’s needs. Consider factors such as the number of tasks, concurrency requirements, scheduling needs, and resource constraints when selecting a suitable executor service.

In any case, remember to properly shut down the executor services using the shutdown() method when they are no longer needed to ensure graceful termination and resource cleanup.

Virtual Thread-Aware Executor

With the introduction of virtual threads, Java introduced a new factory method in the Executors class to create ExecutorService instances that work seamlessly with virtual threads: newVirtualThreadPerTaskExecutor().

This method is equivalent to invoking newThreadPerTaskExecutor(ThreadFactory) with a thread factory that creates virtual threads. newThreadPerTaskExecutor(ThreadFactory) creates an Executor that starts a new Thread for each task. The number of threads created by the Executor is unbounded.

The key to understanding this is by recognizing that while they behave like platform threads, virtual threads represent a different concept. Platform threads, being scarce resources, need careful management, often through thread pools. The question “How many threads should we have in the pool?” is a common consideration with platform threads.

On the other hand, there can be millions of virtual threads. Instead of representing a shared, pooled resource, each virtual thread should represent a task in your application. The number of virtual threads you should use should be equal to the number of concurrent tasks in your application.

So, instead of using a shared thread pool executor like this:

try (var sharedThreadPoolExecutor = Executors.newFixedThreadPool(4)) {
    Future<TaskA> f1 = sharedThreadPoolExecutor.submit(task1);
    Future<TaskB> f2 = sharedThreadPoolExecutor.submit(task2);
    // ... use futures
}

You can use a virtual thread executor:

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<TaskA> f1 = executor.submit(task1);
    Future<TaskB> f2 = executor.submit(task2);
    // ... use futures
}

The Executors.newVirtualThreadPerTaskExecutor() method returns an ExecutorService that doesn’t use a thread pool. Instead, it creates a new virtual thread for each submitted task. This executor is lightweight, allowing you to create a new one as easily as you would any simple object.

Since the ExecutorService extends the AutoCloseable interface, at the end of the try-with-resources block, the close() method waits for all tasks submitted to the ExecutorService (all virtual threads spawned by the ExecutorService) to terminate.

This pattern is particularly useful when you need to concurrently perform multiple outgoing calls to different services. Here’s an example:

void handle(Request request, Response response) {
    var url1 = ...
    var url2 = ...
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        var future1 = executor.submit(() -> fetchURL(url1));
        var future2 = executor.submit(() -> fetchURL(url2));
        response.send(future1.get() + future2.get());
    } catch (ExecutionException | InterruptedException e) {
        response.fail(e);
    }
}

String fetchURL(URL url) throws IOException {
    try (var in = url.openStream()) {
        return new String(in.readAllBytes(), StandardCharsets.UTF_8);
    }
}

This example creates a new virtual thread for each URL fetch operation, even though these are small, short-lived tasks. This is precisely the kind of scenario for virtual threads.

It’s important to understand that converting n platform threads to n virtual threads won’t yield significant benefits. The real power comes from converting tasks to virtual threads. As a rule of thumb, if your application never reaches 10,000 virtual threads or more, it’s unlikely to fully benefit from virtual threads. This could mean either your application’s load is too light to need improved throughput, or you haven’t represented enough tasks as virtual threads.

Concurrent Collections

When working with Java collections like ArrayList, HashMap, etc., in a multi-threaded environment, you may have encountered a ConcurrentModificationException. This exception is thrown when one thread is iterating over a collection while another thread tries to modify it structurally, for example, by adding or removing elements.

The solution is to use thread-safe, concurrent collections instead. Java provides several concurrent collection classes that allow multiple threads to access and modify them safely, without the risk of ConcurrentModificationException.

Some key concurrent collection classes include:

In this example, the producer thread tries to put 20 items into the queue, but the queue has a maximum capacity of 10. When the limit is reached, the producer will block until the consumer has taken some items out. The consumer thread continually takes items from the queue and processes them. If the queue becomes empty, the consumer will block until the producer puts more items in.

When running the example above, the exact output may vary due to the concurrent execution of threads.

In addition to these purpose-built concurrent classes, in the java.util.Collections class, Java also provides methods to obtain synchronized versions of regular collections. These synchronization wrappers add a layer of thread-safety around an existing non-concurrent collection.

Some examples of these methods are:

For example, to create a synchronized version of an ArrayList:

List<String> list = new ArrayList<>();
List<String> syncList = Collections.synchronizedList(list);

Now syncList is a thread-safe collection that can be safely accessed and modified by multiple threads. However, the synchronization is done at the method level, meaning each method of the collection is synchronized. This can limit concurrency compared to the purpose-built concurrent collections that often use more sophisticated techniques like CAS operations and non-blocking algorithms.

In general, it’s preferable to use the concurrent collection classes directly, as they are designed from the ground up for high concurrency. The synchronization wrappers are useful when you need to add thread-safety to an existing collection or when using a less common collection type that doesn’t have a direct concurrent equivalent.

Parallel Streams

In the world of Java streams, there’s a feature that can greatly enhance performance when working with large datasets: parallel streams.

A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread in parallel. This can significantly speed up operations on large datasets by leveraging the power of multi-core processors.

However, there’s an important concern to keep in mind when using parallel streams: the order of elements. Unlike regular sequential streams, the order of elements in a parallel stream is not guaranteed unless specifically enforced. This means that operations like forEach, which rely on encounter order, may produce unexpected results.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream().forEach(System.out::println);

When running the above example, the output will show an unpredictable order. For example:

7
6
8
9
10
1
3
5
4
2

Another key consideration when using parallel streams is to avoid stateful lambda expressions. A stateful lambda is one that modifies shared state across invocations. In a parallel stream, multiple threads may be executing the same lambda concurrently, which can lead to race conditions and unpredictable behavior if the lambda is stateful:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] state = {0}; // Shared state

numbers.parallelStream().forEach(n -> {
    // Simulate some processing time
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    state[0] += n; // Stateful lambda, unsafe
});

System.out.println(state[0]); // Unpredictable result due to race conditions

In this example, we use an array to hold the shared state, which allows us to modify it inside the lambda expression. The Thread.sleep(100) call introduces a small delay, increasing the likelihood of race conditions. Sometimes, the output will be 15. Other times, it will be 13 or something else.

To avoid these issues, it’s important to use stateless lambda expressions when working with parallel streams.

Creating Parallel Streams

There are a few ways to create a parallel stream in Java:

  1. Using the parallelStream() method on a collection:

     List<String> list = Arrays.asList("a", "b", "c");
     Stream<String> parallelStream = list.parallelStream();
    
  2. Using the parallel() method on an existing stream:

     List<String> list = Arrays.asList("a", "b", "c");
     Stream<String> stream = list.stream();
     Stream<String> parallelStream = stream.parallel();
    
  3. Using StreamSupport.stream() with a specified parallelism flag:

     List<String> list = Arrays.asList("a", "b", "c");
     boolean isParallel = true;
     Stream<String> parallelStream = StreamSupport.stream(list.spliterator(), isParallel);
    

Parallel Decomposition

Parallel decomposition is the process of breaking a task into smaller, independent subtasks that can be processed concurrently, and then combining the results to produce the final output. This is a fundamental concept in parallel computing, and it’s key to understanding how parallel streams work under the hood.

When you invoke a terminal operation on a parallel stream, the Java runtime performs a parallel decomposition of the stream behind the scenes. This involves several steps:

  1. Splitting the Stream into Substreams: The original stream is divided into multiple smaller substreams. The division is typically recursive, and does not necessarily match the number of processor cores directly. Each substream represents a portion of the original stream that can be processed independently, allowing for optimal utilization of computing resources.

  2. Processing Each Substream Independently: Each substream is processed by a separate thread from ForkJoinPool, Java’s built-in thread pool for parallel execution. ForkJoinPool uses a work-stealing algorithm to balance the load, and dynamically allocates tasks among threads. This allows multiple substreams to be processed concurrently, leveraging the power of multi-core processors. Each thread applies the stream operations to its assigned substream independently of the others.

  3. Combining the Results: Once all the substreams have been processed, their individual results need to be combined to produce the final result. The combining process also leverages ForkJoinPool’s capabilities to parallelize this step, especially for associative operations. The specific way in which the results are combined depends on the terminal operation. For example, with a reduce operation, the results of each substream’s reduction are combined using the provided accumulator function. For a collect operation, the results are combined using the provided combiner function.

Consider the following example:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum); // Output: 55

In this example, the reduce operation is performed in parallel. The stream is split into substreams, each substream is summed independently, and then the results are combined to produce the final sum.

Here’s a visual representation of this process:

          [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
                         |
                     Split into
                     substreams
          ______________|________________
         |     |         |     |     |   |
       [1, 2] [3, 4] [5, 6] [7, 8] [9, 10]
         |     |     |     |     |     |
    Process each substream independently
         |     |     |     |     |     |
       [3]   [7]   [11]  [15]  [19]
         \     |     |     |     /
          \    |     |     |    /
         Combine the subresults
                    |
                  [55]

The power of parallel decomposition lies in its ability to break down a large task into smaller, more manageable pieces that can be processed concurrently. This can lead to significant performance improvements, especially for computationally intensive tasks operating on large datasets.

However, it’s important to note that not all operations can be parallelized effectively. For parallel decomposition to work, the subtasks must be independent - that is, the processing of one subtask should not depend on the results of another. This is why stateful lambda expressions can cause problems in parallel streams, as they introduce dependencies between subtasks.

Additionally, the cost of splitting the stream and combining the results should be taken into account. For small streams or simple operations, the overhead of parallel decomposition may outweigh the benefits of concurrent processing. The Java runtime attempts to make intelligent decisions about when to parallelize a stream based on factors like the stream size and the complexity of the operations, but it’s still important to understand the implications of using parallel streams in your particular use case.

Methods of Stream that Perform Order-Based Tasks

There are certain operations that rely on the encounter order of elements. These operations are known as order-based tasks, and they can behave differently when used with parallel streams compared to sequential streams. Let’s take a closer look at some of these methods and their implications.

forEach and forEachOrdered

It’s important to understand the difference between the forEach and forEachOrdered terminal operations.

The forEach operation, as we’ve seen earlier, is used to perform an action on each element of a stream. When used with a parallel stream, forEach does not guarantee the order in which the elements will be processed. Each substream is processed independently by a different thread, and the order in which the threads are scheduled is non-deterministic.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().forEach(System.out::println);
// Possible output: 3, 1, 4, 2, 5

In this example, the numbers may be printed in any order, depending on how the parallel stream is split and how the threads are scheduled.

This non-deterministic ordering can be beneficial in certain scenarios. For example, if you’re performing an operation where the order doesn’t matter, such as adding elements to a thread-safe collection or updating counters in a thread-safe manner, forEach can significantly boost performance by allowing operations to be performed in parallel without the overhead of maintaining order.

On the other hand, forEachOrdered guarantees that the action will be performed on the elements in the encounter order, even when used with a parallel stream. This means that the elements will be processed in the same order as they would be in a sequential stream, even though the processing is happening in parallel.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().forEachOrdered(System.out::println);
// Output: 1, 2, 3, 4, 5

In this case, the numbers are always printed in their original order, regardless of how the parallel stream is split and processed.

However, this ordering guarantee comes at a cost. To maintain the encounter order, forEachOrdered introduces a degree of synchronization and communication between the threads processing the substreams. This can reduce the performance benefits of parallelism, especially for large streams or complex operations.

So, when should you use forEach, and when should you use forEachOrdered? The answer depends on your specific use case.

Use forEach when:

Use forEachOrdered when:

It’s worth noting that in many cases, if you need deterministic ordering, it may be more efficient to use a sequential stream instead of a parallel stream with forEachOrdered. The sequential stream will maintain the encounter order naturally, without the overhead of parallel decomposition and synchronization.

findFirst()

The findFirst() method returns an Optional describing the first element of the stream, or an empty Optional if the stream is empty. In a sequential stream, this is straightforward, it simply returns the first element encountered in the stream.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> first = numbers.stream().findFirst();
System.out.println(first.get()); // Output: 1

However, when used with a parallel stream, findFirst() returns the first element from the first substream that produces a result. Since the order in which substreams are processed is non-deterministic, the element returned by findFirst() on a parallel stream may not always be the same.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> first = numbers.parallelStream().findFirst();
System.out.println(first.get()); // Output: non-deterministic (could be 1, 2, 3, 4, or 5)

limit()

The limit() method returns a stream consisting of the first n elements of the original stream. In a sequential stream, this is again straightforward - it simply returns the first n elements in the encounter order.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream().limit(3).forEach(System.out::println);
// Output: 1, 2, 3

When used with a parallel stream, limit() returns the first n elements from the stream, but the order in which they are returned may not match the encounter order. This is because each substream is processed independently, and the first n elements from the combined results of the substreams are returned.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().limit(3).forEach(System.out::println);
// Possible output: 1, 3, 2

skip()

The skip() method is the complement of limit(). It returns a stream consisting of the remaining elements of the original stream after discarding the first n elements. In a sequential stream, this skips the first n elements in the encounter order.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream().skip(3).forEach(System.out::println);
// Output: 4, 5

In a parallel stream, skip() discards the first n elements from the combined results of the substreams. However, since the substreams are processed independently, the elements that are skipped may not be the first n elements in the encounter order.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().skip(3).forEach(System.out::println);
// Possible output: 5, 4 (but could also be 1, 5 or 2, 4 or other combinations)

The non-deterministic behavior of these order-based methods when used with parallel streams can lead to surprising and potentially incorrect results if not handled correctly. If your operation relies on the encounter order of elements, it’s generally safer to use a sequential stream.

However, there are situations where the non-deterministic ordering may be acceptable, or even desirable. For example, if you’re using findFirst() to find any element matching a certain predicate, and you don’t care which matching element is returned, using a parallel stream can provide a performance boost.

As with all aspects of parallel programming, the key is to understand the behavior and implications of the methods you’re using, and to carefully consider whether the potential performance benefits outweigh the risks of non-deterministic results.

Reducing Parallel Streams

Reduction operations, such as reduce(), collect(), and sum(), are powerful tools for combining the elements of a stream into a single result. When used with parallel streams, these operations can provide significant performance benefits by allowing the reduction to be performed concurrently on multiple substreams. However, there are certain pitfalls to be aware of, particularly when it comes to the choice of accumulator function.

The accumulator function combines elements during a reduction operation. For example, in the reduce() method, the accumulator function takes two parameters: the partial result of the reduction so far, and the next element to be incorporated.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum); // Output: 15

In this example, the accumulator function is Integer::sum, which simply adds two integers together.

For a reduction in a parallel stream to produce correct results, the accumulator function must be associative and stateless. An associative function is one in which the order of application doesn’t matter. That is, (a op b) op c is equal to a op (b op c), where op is the accumulator function.

However, certain accumulator functions can lead to issues in parallel streams. For example, using a mutable accumulator:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
ArrayList<Integer> list = numbers.parallelStream().reduce(
        new ArrayList<>(),
        (l, i) -> { l.add(i); return l; },
        (l1, l2) -> { l1.addAll(l2); return l1; });
System.out.println(list); // Output: non-deterministic (could be [1, 2, 3, 4, 5], [1, 3, 5, 2, 4], etc.)

The output is non-deterministic because we’re using a mutable ArrayList as the accumulator. The lambda expressions are modifying the same ArrayList concurrently from multiple threads, leading to race conditions and non-deterministic results.

To avoid these issues and ensure deterministic, correct results from parallel reductions, follow these best practices:

  1. Use associative and stateless accumulator functions. If your accumulator function is not associative, consider using a sequential stream instead.

  2. Avoid using mutable accumulators. If you need to collect results into a mutable container, use the collect() method with a concurrent collector, such as toConcurrentMap(), instead of reduce(). Concurrent collectors are designed to handle parallel modifications safely.

  3. Be cautious with floating-point arithmetic. Due to the limitations of floating-point representation, floating-point addition and multiplication are not strictly associative. If absolute precision is required, consider using a sequential stream or a different numerical representation.

  4. Test your reductions thoroughly. Do it with different stream sizes and different levels of parallelism to ensure that they produce consistent, correct results.

Combining Results in Parallel Streams

The collect() method is a terminal operation that allows you to accumulate elements of a stream into a collection or other data structure. When used with parallel streams, collect() can provide significant performance benefits by allowing the accumulation to be performed concurrently on multiple substreams. However, to ensure correct and efficient operation, there are certain considerations to keep in mind.

Remember, the collect() method takes a Collector, which specifies how the elements of the stream should be accumulated. A Collector is defined by four components:

  1. A supplier function that creates a new result container.

  2. An accumulator function that adds an element to the result container.

  3. A combiner function that merges two result containers into one.

  4. A finisher function that performs an optional final transformation on the result container.

The Java Collectors class provides a wide variety of predefined collectors, such as toList(), toSet(), toMap(), groupingBy(), and more.

When using collect() with a parallel stream, there are several key considerations to ensure correct and efficient operation:

  1. The collector should be concurrent. This means that the accumulator and combiner functions must be thread-safe and should not depend on the order in which elements are processed. The Collectors class provides several concurrent collectors, such as toConcurrentMap(), groupingByConcurrent(), etc:

     List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
     ConcurrentMap<String, Integer> map = strings.parallelStream()
             .collect(Collectors.toConcurrentMap(s -> s, s -> 1, Integer::sum));
     System.out.println(map); // Output: {a=1, b=1, c=1, d=1, e=1}
    
  2. If the collector is not concurrent, consider using a concurrent result container. For example, you can collect into a ConcurrentHashMap or a CopyOnWriteArrayList:

     List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
     ConcurrentHashMap<String, Integer> map = strings.parallelStream()
             .collect(ConcurrentHashMap::new, 
                      (m, s) -> m.put(s, 1), 
                      ConcurrentHashMap::putAll);
     System.out.println(map); // Output: {a=1, b=1, c=1, d=1, e=1}
    
  3. Be careful with order-dependent collectors. Collectors like Collectors.toList() and Collectors.toCollection(ArrayList::new) preserve the encounter order of elements in a sequential stream, but not necessarily in a parallel stream. If the order of elements in the result is important, consider using Collectors.toCollection(LinkedHashSet::new) or collecting to a concurrent container and then copying to an ordered container:

     List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
     List<String> list = strings.parallelStream()
             .collect(Collectors.toCollection(CopyOnWriteArrayList::new))
             .stream()
             .sorted()
             .collect(Collectors.toList());
     System.out.println(list); // Output: [a, b, c, d, e]
    
  4. Consider the characteristics of the collector. The Collector interface defines three characteristics (java.util.stream.Collector.Characteristics):

    • CONCURRENT: Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.
    • UNORDERED: Indicates that the collection operation does not commit to preserving the encounter order of input elements.
    • IDENTITY_FINISH: Indicates that the finisher function is the identity function and can be left out.

These characteristics provide hints to the stream framework about how the collector can be optimized. For example, if a collector is UNORDERED, the stream framework can freely rearrange the elements, which can enable certain optimizations:

List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
Set<String> set = strings.parallelStream()
        .collect(Collectors.toUnmodifiableSet()); // UNORDERED collector
System.out.println(set); // Output: [a, b, c, d, e] (possibly in a different order)

By understanding these considerations and choosing the appropriate collector for your use case, you can effectively harness the power of collect() with parallel streams to achieve significant performance improvements in your stream-based operations.

In addition to the predefined collectors provided by the Collectors class, you can also create your own custom collectors using the Collector.of() method. This allows you to define your own supplier, accumulator, combiner, and finisher functions to collect elements into a custom data structure or perform a custom accumulation operation.

Here’s an example using a sequential stream for string concatenation, which ensures deterministic output and better performance:

List<String> strings = Arrays.asList("a", "b", "c", "d", "e");
String concatenated

String concatenated = strings.stream() // Using a sequential stream
    .collect(Collector.of(
        StringBuilder::new,                // Supplier
        StringBuilder::append,             // Accumulator
        (sb1, sb2) -> {
            sb1.append(sb2);
            return sb1;
        },                                 // Combiner
        StringBuilder::toString            // Finisher
    ));

System.out.println(concatenated); // Output: abcde

In this example, we use a sequential stream to concatenate a list of strings into a single string. A custom collector is created using Collector.of(), with StringBuilder as the container for accumulating the strings. The StringBuilder::append method is used as the accumulator, ensuring that strings are appended in the correct order. The combiner is defined to merge StringBuilder instances during parallel processing, but since we are using a sequential stream, it ensures the concatenation is performed efficiently and deterministically. Finally, the StringBuilder::toString method is used as the finisher to produce the final concatenated string. This approach guarantees the correct order of elements and optimal performance for string concatenation.

However, string concatenation is inherently sequential, and using a parallel stream here is likely to be less efficient than using a sequential stream. In fact, the output of this operation is non-deterministic for a parallel stream, because the order in which the substreams are combined is not guaranteed.

A better example for parallel streams using a custom collector might involve a task that can benefit from parallel processing and has a well-defined order of elements. Consider this example that adds up integers, where parallel processing can provide performance benefits:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Integer sum = numbers.parallelStream() // Using a parallel stream
    .collect(Collector.of(
        () -> new int[1],               // Supplier
        (a, t) -> a[0] += t,            // Accumulator
        (a1, a2) -> {
            a1[0] += a2[0];
            return a1;
        },                              // Combiner
        a -> a[0]                       // Finisher
    ));

System.out.println(sum); // Output: 15

In this example, a custom collector is defined using Collector.of(), with an integer array as the container to hold the sum. The accumulator function adds each integer to the array’s single element, and the combiner function merges two arrays by summing their elements. The finisher function extracts the summed value from the array.

However, creating custom collectors that are both correct and efficient for parallel streams can be challenging. It requires a deep understanding of concurrency, thread safety, and the characteristics of the stream and collector. If possible, it’s generally recommended to use the predefined collectors or compose them to achieve your desired operation.

Key Points

Practice Questions

1. Which of the following lines of code correctly creates and starts a new virtual thread?

public class Main {
    public static void main(String[] args) {
        Runnable task = () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Task is running");
            }
        };
        // Insert code here to create and start a new virtual thread
    }
}

A) Thread thread = Thread.ofVirtual(); thread.start(task);
B) Thread thread = Thread.ofVirtual().unstarted(task).run();
C) Thread thread = Thread.ofVirtual().start(task);
D) Thread thread = Thread.ofVirtual(); task.run();
E) Thread thread = Thread.start(task);

2. Which of the options correctly uses a synchronized block to ensure that only one thread at a time can execute a critical section that increments a shared counter?

public class Main {
    private static int counter = 0;
    public static void main(String[] args) {
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                // Insert synchronized block here
            }
        };
        Thread thread1 = Thread.ofPlatform().start(task);
        Thread thread2 = Thread.ofPlatform().start(task);
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Final counter value: " + counter);
    }
}

A) synchronized (this) { counter++; }
B) synchronized (Main.class) { counter++; }
C) synchronized (task) { counter++; }
D) synchronized (counter) { counter++; }
E) synchronized (System.out) { counter++; }

3. Which of the following statements about atomic classes is correct? (Choose all that apply)

A) AtomicInteger is part of the java.util.concurrent.atomic package, but it does not provide atomic operations for increment and decrement.
B) AtomicReference can only be used with reference types, not primitive types.
C) AtomicLong supports atomic operations on long values, including getAndIncrement() and compareAndSet() methods.
D) AtomicBoolean can be used to perform atomic arithmetic operations on boolean values.

4. Which of the following code snippets correctly uses the Lock interface to ensure thread-safe access to a shared resource?

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        // Insert code here
    }

    public int getCount() {
        return count;
    }
}

A)

lock.lock();
try {
    count++;
} finally {
    lock.unlock();
}

B)

lock.lock();
count++;
lock.unlock();

C)

try {
    lock.lock(() -> {
        count++;
    });
} finally {
    lock.unlock();
}

D)

synchronized(lock) {
    count++;
}

5. Which of the following code snippets correctly demonstrates the usage of an ExecutorService with a try-with-resources block?

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorExample {
    public static void main(String[] args) {
        // Insert code here to create and use an ExecutorService with try-with-resources
    }
}

A)

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> System.out.println("Task executed"));
}

B)

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> System.out.println("Task executed"));
} finally {
    executor.shutdown();
}

C)

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
    executor.submit(() -> System.out.println("Task executed"));
} finally {
    executor.close();
}

D)

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> System.out.println("Task executed"));
    executor.awaitTermination(1, TimeUnit.SECONDS);
}

6. Which of the following code snippets correctly demonstrates how to get a result from a Callable task using an ExecutorService with try-with-resources?

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class CallableExample {
    public static void main(String[] args) {
        Callable<Integer> task = () -> {
            return 123;
        };
        // Insert code here
    }
}

A)

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<Integer> future = executor.submit(task);
    System.out.println(future.get());
}

B)

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<Integer> future = executor.submit(task);
    Integer result = future.get(1, TimeUnit.SECONDS);
    System.out.println(result);
}

C)

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<Integer> future = executor.submit(task);
    executor.shutdown();
    System.out.println(future.get());
}

D)

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<Integer> future = executor.submit(task);
    try {
        Integer result = future.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        // Handle exceptions
    }
}

7. Which of the following statements about Java’s concurrent collections is correct?

A) ConcurrentHashMap allows concurrent read and write operations, and retrieval operations do not block even when updates are being made.
B) CopyOnWriteArrayList is optimized for scenarios with a high number of write operations compared to read operations.
C) ConcurrentSkipListSet does not kept elements sorted.
D) BlockingQueue implementations like LinkedBlockingQueue allow elements to be added and removed concurrently without any internal locking mechanisms.

8. Which of the following statements about parallel streams is correct?

A) Parallel streams always improve the performance of a program by utilizing multiple threads.
B) Parallel streams can lead to incorrect results if the operations performed are not thread-safe.
C) The order of elements in a parallel stream is always preserved compared to the original stream.
D) Using parallel streams guarantees that the operations on elements will execute in a fixed order.

9. Which of the following code snippets correctly demonstrates how to reduce a parallel stream to compute the sum of its elements?

import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
        // Insert code here
    }
}

A)

int sum = numbers.parallelStream().reduce(1, Integer::sum);
System.out.println(sum);

B)

int sum = numbers.parallelStream().reduce(0, Integer::sum);
System.out.println(sum);

C)

int sum = numbers.stream().reduce(0, Integer::sum);
System.out.println(sum);

D)

int sum = numbers.parallelStream().collect(reduce(0, Integer::sum));
System.out.println(sum);

Do you like what you read? Would you consider?


Do you have a problem or something to say?

Report an issue with the book

Contact me