Intent
I am going to time-box writing everything I know about Python multi-threading Synchronisation. Essentially when multi-threading, programs need a way to prevent race-conditions, especially when shared resources are being referenced between the threads. All sorts of weird stuff can happen if we don’t synchronise our threads up.
A Synchro-What?
A synchronisation primitive is a super fancy description for:
“A thing you can use to prevent race conditions when multi-threading.”
Ahhh… I see…
Python probably has a bunch of these, however I will write about the ones I know about:
- Locks (and RLocks)
- Barriers
- Semaphores
- Events
- Conditions
Here we go!
Problem Definition
Let’s say we have a random object that has three methods that print “first”, “second” and “third” respectively:
1public class Foo {2 public void first() { print("first"); }3 public void second() { print("second"); }4 public void third() { print("third"); }5}
If the same object was passed to three threads, and each thread executed each thread in a random
order; how could we ensure that first()
always executed first, followed by second()
and
third()
respectively?
Please reference the leetcode problem titled #1114 - Print In Order. to get an idea of what I am talking about.
Use Synchronisation Primitives (Duh?!)
We can tackle this problem using every primitive I mentioned in the above section.
1. Use a Lock
Locks work by essentially that… they lock things.
To use a lock:
- Create a lock object.
- Lock the lock object with its
acquire()
method. - Release the lock with
release()
.
If any competing threads tries to acquire this lock object; it automatically becomes blocked, until such time that the lock is released. Let’s see this in action:
1from threading import Lock23class Foo:4 def __init__(self):5 #create two locks6 self.locks = [Lock(), Lock()]7 #set both locks to the "locked" state8 for lock in self.locks:9 lock.acquire()1011 def first(self, printFirst: 'Callable[[], None]') -> None:12 #no need to lock this bad-boy... we want this to be first to run13 printFirst()14 #release the first lock so second() can now be unblocked15 self.locks[0].release()1617 def second(self, printSecond: 'Callable[[], None]') -> None:18 with self.locks[0]:19 printSecond()20 #release the second lock so third() can now be unblocked21 self.locks[1].release()2223 def third(self, printThird: 'Callable[[], None]') -> None:24 with self.locks[1]:25 printThird()26 self.locks[1].release() #not required for problem, but clean if this were real
Essentially, the methods aren’t released until the required preceding method releases the lock.
One draw back to Locks is they are so dumb, it doesn’t care which thread “owns” the lock; it will
block whoever tries to acquire()
a locked-lock. EVEN ITSELF!
To prevent this, use a RLock()
lock instead as this can be called aquired multiple times by the
same thread. I won’t go into this, just trust me (or better yet the docs.)
2. Use a Barrier
Barriers are essentially counters… they block a thread until a certain number of wait()
methods
have been called on that object.
Tackling our original problem again, we require that each barrier has 2
waits called on it,
before unblocking the thread:
1from threading import Barrier23class Foo:4 def __init__(self):5 self.barrier1 = Barrier(2)6 self.barrier2 = Barrier(2)78 def first(self, printFirst: 'Callable[[], None]') -> None:910 printFirst()11 self.barrier1.wait()1213 def second(self, printSecond: 'Callable[[], None]') -> None:1415 self.barrier1.wait()16 printSecond()17 self.barrier2.wait()1819 def third(self, printThird: 'Callable[[], None]') -> None:2021 self.barrier2.wait()22 printThird()
3. Use a Semaphore
Semaphore is just a another form of counter; similar to barrier, except a bit smarter. The semaphore object is created with a counter, which represents how many acquires can be called on it before it blocks. When the counter hits “0”, it blocks. The semaphore keeps track of this number, and even counts the running total upwards when releases are called against it.
For example, this semaphore will block after acquire is called 3 times against it.
my_semaphore = threading.Semaphore(3)
These are great for rate setting or connection limiting applications.
In our problem, we can be tricky and set it to block at 0! This treats it essentially like a lock.
1from threading import Semaphore23class Foo:4 def __init__(self):5 #create two locks6 self.semaphores = [Semaphore(0), Semaphore(0]78 def first(self, printFirst: 'Callable[[], None]') -> None:910 printFirst()11 self.semaphores[0].release()1213 def second(self, printSecond: 'Callable[[], None]') -> None:1415 with semaphores[0]:16 printSecond()17 self.locks[1].release()1819 def third(self, printThird: 'Callable[[], None]') -> None:2021 with self.semaphores[1]:22 printThird()
4. Use an Event
Yep, an event. This means that when an event occurs, any thread waiting for that event may now
proceed! An event has said to have “occured” once a set()
method has been called on it.
1from threading import Event23class Foo:4 def __init__(self):5 #create two locks6 self.events = [Event(), Event()]78 def first(self, printFirst: 'Callable[[], None]') -> None:910 printFirst()11 self.event[0].set()1213 def second(self, printSecond: 'Callable[[], None]') -> None:14 #wait for the first event to finish15 self.events[0].wait()1617 printSecond()18 self.event[1].set()1920 def third(self, printThird: 'Callable[[], None]') -> None:2122 self.events[1].wait()23 printThird()
5. Use a Condition
Love locks? Love events? Which you could marry those two things together? Threadings got you fam! Welcome to conditions! Combines both the goodness of locks with the power of events.
Create a Condition object which will can be aquired by all threads. When created, the Condition object has an underlying RLock attached.
Create a couple of conditions that we require to be True
(in our example, has the Print been
called yet?) and have the threads wait for their corresponding condition:
1from threading import Condition23class Foo:4 def __init__(self):5 #create a Condition object6 self.the_condition = threading.condition()7 #create an int to track where the print is at8 self.order = 09 # create two variables that return True once the print order changes10 self.first_done = lambda: self.order == 111 self.second_done = lambda: self.order == 21213 def first(self, printFirst: 'Callable[[], None]') -> None:14 with self.the_condition:15 printFirst()16 self.order = 117 self.the_condition.notify(2) #notify the two waiting threads to check their condition1819 def second(self, printSecond: 'Callable[[], None]') -> None:20 with self.the_condition:21 self.the_condition.wait_for(self.first_done)22 printSecond()23 self.order = 224 self.the_condition.notify() #notify the one other waiting thread to check the waiting status2526 def third(self, printThird: 'Callable[[], None]') -> None:27 with self.the_condition:28 self.the_condition.wait_for(self.second_done)29 printThird()
And There We Have It!
Synchronisation Primitives; scary name, not so scary concept when you step it out. Please reach out to me if you have any questions!