Thread Communication simulation
Let's say we have:
- A buffer of limited size.
- A couple of producers.
- A couple of consumers.
What happens when producers try to fill the buffer while consumers are simultaneously trying to read the buffer. We want to understand deadlocks
, livelocks
and what should be done for a normal operation.
/** main.scala **/
import scala.collection.mutable
import scala.util.Random
object Main extends App {
class Consumer(id: Int, buffer: mutable.Queue[Int]) extends Thread {
/**
* Since Thread already has access to Runnable
* We can override run.
*/
override def run(): Unit = {
val random = new Random()
while(true) {
buffer.synchronized {
while (buffer.isEmpty) {
println(s"[consumer:$id] buffer empty | waiting.")
buffer.wait()
}
val x = buffer.dequeue()
println(s"[consumer:$id] picked $x from the buffer")
buffer.notify()
}
Thread.sleep(random.nextInt(250))
}
}
}
class Producer(id: Int, buffer: mutable.Queue[Int], capacity: Int) extends Thread {
override def run(): Unit = {
val random = new Random()
var i = 0
while(true) {
buffer.synchronized {
while (buffer.size == capacity) {
println(s"[producer:$id]: buffer is full | waiting.")
buffer.wait()
}
println(s"[producer:$id] producing $i")
buffer.enqueue(i)
buffer.notify()
i += 1
}
Thread.sleep(random.nextInt(500))
}
}
}
def multiProdCons(nConsumers:Int, nProducers: Int, capacity: Int): Unit = {
val buffer: mutable.Queue[Int] = new mutable.Queue[Int]
(1 to nConsumers).foreach(i => new Consumer(i, buffer).start())
(1 to nProducers).foreach(i => new Producer(i, buffer, capacity).start())
}
multiProdCons(3, 3, 4)
}
The detail is in the wait over while(buffer.isEmpty)
in the implementation of Consumer
so if the current thread
is active, and buffer
is not empty, only then is a consumer
allowed to read off the `buffer.
Deadlock Example
case class Friend(name: String) {
def bow(other: Friend) = {
this.synchronized {
println(s"$this: bowing down to $other")
other.rise(this)
println(s"$this: Rise since $other has risen.")
}
}
def rise(other: Friend) = {
this.synchronized {
println(s"$this: rising before $other.")
}
}
var side = "right"
def switchSide(): Unit = {
if (side == "right") side = "left"
else side = "right"
}
def pass(other: Friend): Unit = {
while (this.side == other.side) {
println(s"$this: feel free to pass")
switchSide()
Thread.sleep(1000)
}
}
}
val bob = Friend("Bob")
val alice = Friend("Alice")
new Thread(() => bob.bow(alice)).start()
new Thread(() => alice.bow(bob)).start()
When we run the above, we get the output as:
$
Friend(Bob): bowing to Friend(Alice)
Friend(Alice): bowing to Friend(Bob)
But nothing else happens, because both call the other.rise(...)
method which calls their synchronized expression, causing both to wait for the other to println(...)
first. The first instance of friend that runs locks the other and vice-versa. This is a Deadlock, where (two) threads cannot proceed because they are waiting on each other to release their locks.
Livelock example
/** main.scala **/
val jerry = Friend("Jerry")
val tom = Friend("Tom")
new Thread(() => jerry.pass(tom)).start()
new Thread(() => tom.pass(jerry)).start()
The output of this looks like an infinite loop condition, this is a live-lock. A situation where none of the threads are block but are not able to proceed because they yield to the other threads.
$
Friend(Jerry): feel free to pass
Friend(Tom): feel free to pass
Friend(Tom): feel free to pass
Friend(Jerry): feel free to pass
Friend(Tom): feel free to pass
...
fin.