How to do MultiThreading in Java?

What is process and thread?

A process usually refers to a program in execution whereas a thread is part of process. A thread is the smallest part of process (light weight process)that can run concurrently with other threads.

How to create a thread?

1. Extend Thread class

NOTE: You will not be able to extend to another class after extending to Thread class.

class Runner extends Thread {
 public void run() {
  for (int i = 0; i < 10; i++) {
   System.out.println("Hello " + i);
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
}

public class App {
public static void main(String[] args) {
  Runner runner1 = new Runner();
  Runner runner2 = new Runner();
  runner1.start();
  runner2.start();
 }
}

2. Implement Runnable class

class Runner implements Runnable {
 public void run() {
  for (int i = 0; i < 10; i++) {
   System.out.println("Hello " + i);
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }
} 
public class App {
 public static void main(String[] args) {
  Thread t1 = new Thread(new Runner());
  Thread t2 = new Thread(new Runner());

  t1.start();
  t2.start();
 }
}

2. Create Thread instance

public class App {
public static void main(String[] args) {
  Thread t1 = new Thread(new Runnable() {

   public void run() {
    for (int i = 0; i < 10; i++) {
     System.out.println("Hello " + i);
     try {
      Thread.sleep(100);
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }
  });

  Thread t2 = new Thread(new Runnable() {
   public void run() {
    for (int i = 0; i < 10; i++) {
     System.out.println("Hello " + i);
     try {
      Thread.sleep(100);
     } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }
  });

  t1.start();
  t2.start();
 }
}

NOTE: Do not use run method as the thread will run in the main thread . Use start instead for it to run on its own thread.

What is Volatile?

Volatile tells a program that the specific variable will not be cached and instead always read and write to main memory. This solves the issue that the variable in CPU cache might not have the correct value as the one in main memory when there is multiple thread accessing to that variable.

Synchronized method vs code blocks(with intrinsic locks)?

  • Methods marked with synchronized ensures no more than 1 invocation of synchronized method of same object at the same time. When one thread is executing a synchronized method for an object, all other threads that invoke the synchronized methods of the same object are blocked until the first thread is done with the object. Object in this case is the class.
package demo;
public class App {
private int count;

 public static void main(String[] args) throws InterruptedException {
  App app1 = new App();
  app1.doSth();
 }

 public synchronized void increment() {
  count++;
 }

 public void doSth() throws InterruptedException {

  Thread t1 = new Thread(new Runnable() {
public void run() {
    for(int i=0; i<1000; i++) {
     increment();
    }
   }

  });

  Thread t2 = new Thread(new Runnable() {

public void run() {
    for(int i=0; i<1000; i++) {
     increment();
    }
   }

  });

  t1.start();
  t2.start();

  t1.join();
  t2.join();

  System.out.println("Count= " + count);
 }
}
  • We can have synchronized blocks on lock objects within a class to have different threads executing different parts of the code at the same. Hence when thread 1 is busy doing work A, thread 2 can proceed to complete work B at the same time in the same class.
public class App {
private static ArrayList<Integer> list1 = new ArrayList<Integer>();
 private static ArrayList<Integer> list2 = new ArrayList<Integer>();

 private Object lock1 = new Object();
 private Object lock2 = new Object();

 private Random random = new Random();

 private void task1() {
  synchronized(lock1) {
   try {
    Thread.sleep(1);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   list1.add(random.nextInt(100));
  }
 }

 private synchronized void task2() {
  synchronized(lock2) {
   try {
    Thread.sleep(1);
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   list2.add(random.nextInt(100));
  }
 }

 private void process() {
  for(int i=0; i<1000; i++) {
   task1();
   task2();
  }
 }

 public static void main(String[] args) {
  App app1 = new App();
  long start = System.currentTimeMillis();

  Thread t1 = new Thread(new Runnable() {
public void run() {
    app1.process();
   }
  });

  Thread t2 = new Thread(new Runnable() {
public void run() {
    app1.process();
   }
  });

  t1.start();
  t2.start();

  try {
   t1.join();
   t2.join();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

  long end = System.currentTimeMillis();

  System.out.println("Time taken=" + (end - start));
  System.out.println("Size of List1=" + list1.size() + " and Size of List2=" + list2.size());
 }
}

ThreadPool

Threadpool consists of worker threads. Using worker threads reduces the overhead of creating thread. One common type of thread pool is the fixed thread pool. This type of pool always has a specified number of threads running. The fixed number of working threads ensures the application would not be overloaded and stop working. The application would simply have to wait for threads to complete their task before taking up new ones.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Task implements Runnable {
private int id;
public Task(int id) {
        this.id = id;
    }
public void run() {
        System.out.println("Task " + id + " has started.");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ignored) {
        }
        System.out.println("Task " + id + " has completed.");
    }
}
public class App {

 public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
  for(int i=0;i<10;i++) {
   executor.submit(new Task(i));
  }
  executor.shutdown();
  System.out.println("All tasks submitted");

  long start = System.currentTimeMillis()
;
  try {
   executor.awaitTermination(1, TimeUnit.DAYS);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

  long end = System.currentTimeMillis();
  System.out.println("All tasks completed");
  System.out.println("Time taken to complete is " + (end-start));
 }
}

newFixedThreadPool vs newCachedThreadPool

  • newFixedThreadPool reuses a fixed number of threads and at any point, at most n Threads will be actively processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
  • newCachedThreadPool creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.

Countdown latch

Countdown latch is a threadsafe variable, hence thread synchronization is taken care of. For every invocation of CountDownLatch.countdown the latch count is decremented by one. The CountDownLatch.await temporarily blocks the tread until the count reaches zero.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Task implements Runnable {
private CountDownLatch latch;<
private int id;

    public Task(CountDownLatch latch, int id) {
        this.latch = latch;
        this.id = id;
    }
public void run() {
        try {
         System.out.println("Task number " + id + " has started");
            Thread.sleep(3000);
        } catch (InterruptedException ignored) {
        }
        latch.countDown();
    }
}
public class App {

 public static void main(String[] args) {
  CountDownLatch latch = new CountDownLatch(5);

  ExecutorService executor = Executors.newFixedThreadPool(2);
  for(int i=0;i<10;i++) {
   executor.submit(new Task(latch, i));
  }
  executor.shutdown();
  System.out.println("All tasks submitted");

  try {
   latch.await();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }

  System.out.println("All tasks completed");
 }
}

Wait and Notify

Assuming you have 2 threads running. Thread 1 needs completion of thread 2 before resuming. In this case, Thread 1 can use the wait inside synchronized block. The control will be handle over to thread 2 which after completion can call notify inside synchronized block. Handle over from thread 2 to 1 does not happen immediately until the last line in thread 2 synchronized block is run.

public class Task {
 public void produce() throws InterruptedException {
        synchronized (this) {
            System.out.println("Producer thread running ....");
            wait();//this.wait() is fine.
            System.out.println("Resumed.");
        }
    }<
public void consume() throws InterruptedException {
        Scanner scanner = new Scanner(System.in);
        Thread.sleep(2000);
        synchronized (this) {
            System.out.println("Waiting for return key.");
            scanner.nextLine();
            System.out.println("Return key pressed.");
            notify();
            Thread.sleep(5000);
            System.out.println("Consumption done.");
        }
    }
}

Reentrant lock

Similar to synchronized method or blocks, when thread 1 acquires a lock, no other thread is able to proceed without thread 1 releasing a lock. The method unlock should always be in finally block incase there is an error and unlock will not be call causing other threads to waiting indefinitely for lock to be released.

import java.util.Scanner;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task {
private int count = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        for(int i=0;i<10000;i++) {
            count++;
        }
    }

    public void firstThread() throws InterruptedException {
        lock.lock();
        try {
            increment();
        }
        finally {
            lock.unlock();
        }
    }

    public void secondThread() throws InterruptedException {
        Thread.sleep(2000);
        lock.lock();
        try {
            increment();
        }

        finally {
            lock.unlock();
        }
    }
public void finishThread() {
        System.out.println("The value of count is " + count);
    }
}

Wait and Signal

This technique is similar to wait and notify . The difference is after signal is call, the lock is immediately handled over to the thread which call wait . In the wait and notify technique previously described, after notify is called, the control is handled over not until the last line of code in the synchronized block is has executed.

import java.util.Scanner;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task {
private int count = 0;
    private Lock lock = new ReentrantLock();
private Condition cond = lock.newCondition();

    public void increment() {
        for(int i=0;i<10000;i++) {
            count++;
        }
    }

    public void firstThread() throws InterruptedException {
        lock.lock();
        System.out.println("Waiting...");
cond.await();
        System.out.println("Awaken!");        
        try {
            increment();
        }
        finally {
            lock.unlock();
        }
    }

    public void secondThread() throws InterruptedException {
        Thread.sleep(2000);        
        lock.lock();
        System.out.println("Waiting for return key..");
        new Scanner(System.in).nextLine();
        System.out.println("Has return key");
cond.signal();        
        try {
            increment();
        }        
        finally {
            lock.unlock();
        }
    }
public void finishThread() {
        System.out.println("The value of count is " + count);
    }
}

Deadlock

Imagine we have an account class, which has getBalance, deposit, withdraw and transfer method. Now we have 2 accounts, and to simulate actual use case, we are going to randomly transfer funds between 2 accounts. This will be done with 2 threads, where first thread randomly transfer from account 1 to 2 and second thread doing the same in opposite way. In the first thread, we will lock account 1 and 2, complete the transfer before releasing both locks. In the second lock, we will do the same except we lock account 2 before 1. This will cause deadlock. At the same time, thread 1 locks account 1 and thread 2 locks account 2. Then thread 1 tries to lock account 2 and thread 2 tries to lock account 1. This will cause both threads waiting for each locks to be release.

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task {
private Account acc1 = new Account();
    private Account acc2 = new Account();
private Lock lock1 = new ReentrantLock();
    private Lock lock2 = new ReentrantLock();
public void firstThread() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            lock1.lock();
            lock2.lock();
            try {
                Account.transfer(acc1, acc2, random.nextInt(100));
            } finally {
                lock1.unlock();
                lock2.unlock();
            }
        }
}
public void secondThread() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            lock2.lock();
            lock1.lock();
            try {
                Account.transfer(acc2, acc1, random.nextInt(100));
            } finally {
                lock2.unlock();
                lock1.unlock();
            }
        }
    }
public void finish() {
        System.out.println("Total balance is " + acc1.getBalance() + acc2.getBalance());
    }
}

There are 2 ways to solving deadlock. First is always lock both accounts in same order. Second way is using tryLock to lock 2 accounts. This method only acquires a lock if its available and will not block if no lock is available within a time period. Then we will check if both locks have been acquired, else we will release those locks for other threads to use and try again sometime again.

NOTE: During creation of ReentrantLock, you can specify fairness boolean. If it is set to true, lock will be given to longest waiting thread.

package demo;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task {
private Account acc1 = new Account();
    private Account acc2 = new Account();
private Lock lock1 = new ReentrantLock();
    private Lock lock2 = new ReentrantLock();

    private void acquireLocks(Lock lock1, Lock lock2) throws InterruptedException {
        while(true) {
            boolean gotFirstLock = false;
            boolean gotSecondLock = false;
            try {
                gotFirstLock = lock1.tryLock();
                gotSecondLock = lock2.tryLock();
            } finally {
                if (gotFirstLock && gotSecondLock) {
                    return;
                }
                if (gotFirstLock) {
                    lock1.unlock();
                }
                if (gotSecondLock) {
                    lock2.unlock();
                }
                Thread.sleep(1);
            }
        }
    }
public void firstThread() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            acquireLocks(lock1,lock2);
            try {
                Account.transfer(acc1, acc2, random.nextInt(100));
            } finally {
                lock1.unlock();
                lock2.unlock();
            }
        }
}
public void secondThread() throws InterruptedException {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            acquireLocks(lock2,lock1);
            try {
                Account.transfer(acc2, acc1, random.nextInt(100));
            } finally {
                lock1.unlock();
                lock2.unlock();
            }
        }
    }
public void finish() {
        System.out.println("First account balance is " + acc1.getBalance());
        System.out.println("Second account balance is " + acc2.getBalance());
        System.out.println("Total balance is " + (acc1.getBalance() + acc2.getBalance()));
    }
}

ReentrantLock vs Synchronized

Some differences between reentrant lock and synchronized blocks are as follows:

  • ReentrantLock has fairness attribute which gives the lock to longest waiting thread.
  • tryLock method of ReentrantLock only locks its is available within a period of time and reduce thread blocking.
  • lockInterruptibly method of ReentrantLock allows to interrupt a thread that is waiting for a lock.
  • ReentrantLock allows listing of threads waiting for a lock.

Semaphore

Semaphore is usually use to restrict the number of threads accessing to a resource a the same time. When we initialize a semaphore, we need to enter the number of permits available. These permits are analogous to number of keys available to access the resource. When a thread calls Semaphore.acquire method, it will try to acquire a permit and if successfully, reduces the number of permits available. When there are no more permits left, other threads would have to wait for permits to be released via Semaphore.release .

Future and Callable

So far we have talked about Runnable which executes a task without returning any value. To return a value from thread we use Callable interface and Future interface. To extract the value returned use Future.get() .

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class App {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(new Callable<Integer>() {
   public Integer call() throws Exception {
    Random random = new Random();
    int duration = random.nextInt(1000);
    Thread.sleep(duration);
    return duration;
   }
  });
executor.shutdown();
  try {
   System.out.println("Duration is " + future.get());
  } catch (ExecutionException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
}
}

NOTE: Both Future and Callable data types must be of same type.

Interrupt Thread

From the main, we can interrupt a thread using Thread.interrupt() and inside a thread, we can catch the interrupt in if(Thread.currentThread().isInterrupted() block.

import java.util.Random;
public class App {
public static void main(String[] args) throws InterruptedException {
System.out.println("Started");
Thread t1 = new Thread(new Runnable() {
@Override
   public void run() {
Random ran = new Random();
    for (int i = 0; i < 1E8; i++) {
     Math.sin(ran.nextDouble());
    }
if (Thread.currentThread().isInterrupted()) {
     System.out.println("Interrupted");
    }
}
});
t1.start();
Thread.sleep(100);
t1.interrupt();
t1.join();
System.out.println("Finished");
}
}

About the author

Founder of tattweicheah.com. Loves music, sport and most importantly software development.

Leave a Reply

Your email address will not be published. Required fields are marked *