Synchronisation Primitives in Python

Todd Turner
March 24th, 2020 · 2 min read

Intent

I am going to time-box writing everything I know about Python multi-threading Synchronisation. Essentially when multi-threading, programs need a way to prevent race-conditions, especially when shared resources are being referenced between the threads. All sorts of weird stuff can happen if we don’t synchronise our threads up.

A Synchro-What?

A synchronisation primitive is a super fancy description for:

“A thing you can use to prevent race conditions when multi-threading.”

Ahhh… I see…

I see...

Python probably has a bunch of these, however I will write about the ones I know about:

  1. Locks (and RLocks)
  2. Barriers
  3. Semaphores
  4. Events
  5. Conditions

Here we go!


Problem Definition

Let’s say we have a random object that has three methods that print “first”, “second” and “third” respectively:

1public class Foo {
2 public void first() { print("first"); }
3 public void second() { print("second"); }
4 public void third() { print("third"); }
5}

If the same object was passed to three threads, and each thread executed each thread in a random order; how could we ensure that first() always executed first, followed by second() and third() respectively?

Please reference the leetcode problem titled #1114 - Print In Order. to get an idea of what I am talking about.

Use Synchronisation Primitives (Duh?!)

We can tackle this problem using every primitive I mentioned in the above section.

1. Use a Lock

Locks work by essentially that… they lock things.

To use a lock:

  1. Create a lock object.
  2. Lock the lock object with its acquire() method.
  3. Release the lock with release().

If any competing threads tries to acquire this lock object; it automatically becomes blocked, until such time that the lock is released. Let’s see this in action:

1from threading import Lock
2
3class Foo:
4 def __init__(self):
5 #create two locks
6 self.locks = [Lock(), Lock()]
7 #set both locks to the "locked" state
8 for lock in self.locks:
9 lock.acquire()
10
11 def first(self, printFirst: 'Callable[[], None]') -> None:
12 #no need to lock this bad-boy... we want this to be first to run
13 printFirst()
14 #release the first lock so second() can now be unblocked
15 self.locks[0].release()
16
17 def second(self, printSecond: 'Callable[[], None]') -> None:
18 with self.locks[0]:
19 printSecond()
20 #release the second lock so third() can now be unblocked
21 self.locks[1].release()
22
23 def third(self, printThird: 'Callable[[], None]') -> None:
24 with self.locks[1]:
25 printThird()
26 self.locks[1].release() #not required for problem, but clean if this were real

Essentially, the methods aren’t released until the required preceding method releases the lock.

One draw back to Locks is they are so dumb, it doesn’t care which thread “owns” the lock; it will block whoever tries to acquire() a locked-lock. EVEN ITSELF!

Spiderman pointing meme

To prevent this, use a RLock() lock instead as this can be called aquired multiple times by the same thread. I won’t go into this, just trust me (or better yet the docs.)

2. Use a Barrier

Barriers are essentially counters… they block a thread until a certain number of wait() methods have been called on that object.

Tackling our original problem again, we require that each barrier has 2 waits called on it, before unblocking the thread:

1from threading import Barrier
2
3class Foo:
4 def __init__(self):
5 self.barrier1 = Barrier(2)
6 self.barrier2 = Barrier(2)
7
8 def first(self, printFirst: 'Callable[[], None]') -> None:
9
10 printFirst()
11 self.barrier1.wait()
12
13 def second(self, printSecond: 'Callable[[], None]') -> None:
14
15 self.barrier1.wait()
16 printSecond()
17 self.barrier2.wait()
18
19 def third(self, printThird: 'Callable[[], None]') -> None:
20
21 self.barrier2.wait()
22 printThird()

3. Use a Semaphore

Semaphore is just a another form of counter; similar to barrier, except a bit smarter. The semaphore object is created with a counter, which represents how many acquires can be called on it before it blocks. When the counter hits “0”, it blocks. The semaphore keeps track of this number, and even counts the running total upwards when releases are called against it.

For example, this semaphore will block after acquire is called 3 times against it. my_semaphore = threading.Semaphore(3)

These are great for rate setting or connection limiting applications.

In our problem, we can be tricky and set it to block at 0! This treats it essentially like a lock.

1from threading import Semaphore
2
3class Foo:
4 def __init__(self):
5 #create two locks
6 self.semaphores = [Semaphore(0), Semaphore(0]
7
8 def first(self, printFirst: 'Callable[[], None]') -> None:
9
10 printFirst()
11 self.semaphores[0].release()
12
13 def second(self, printSecond: 'Callable[[], None]') -> None:
14
15 with semaphores[0]:
16 printSecond()
17 self.locks[1].release()
18
19 def third(self, printThird: 'Callable[[], None]') -> None:
20
21 with self.semaphores[1]:
22 printThird()

4. Use an Event

Yep, an event. This means that when an event occurs, any thread waiting for that event may now proceed! An event has said to have “occured” once a set() method has been called on it.

1from threading import Event
2
3class Foo:
4 def __init__(self):
5 #create two locks
6 self.events = [Event(), Event()]
7
8 def first(self, printFirst: 'Callable[[], None]') -> None:
9
10 printFirst()
11 self.event[0].set()
12
13 def second(self, printSecond: 'Callable[[], None]') -> None:
14 #wait for the first event to finish
15 self.events[0].wait()
16
17 printSecond()
18 self.event[1].set()
19
20 def third(self, printThird: 'Callable[[], None]') -> None:
21
22 self.events[1].wait()
23 printThird()

5. Use a Condition

Love locks? Love events? Which you could marry those two things together? Threadings got you fam! Welcome to conditions! Combines both the goodness of locks with the power of events.

Create a Condition object which will can be aquired by all threads. When created, the Condition object has an underlying RLock attached.

Create a couple of conditions that we require to be True (in our example, has the Print been called yet?) and have the threads wait for their corresponding condition:

1from threading import Condition
2
3class Foo:
4 def __init__(self):
5 #create a Condition object
6 self.the_condition = threading.condition()
7 #create an int to track where the print is at
8 self.order = 0
9 # create two variables that return True once the print order changes
10 self.first_done = lambda: self.order == 1
11 self.second_done = lambda: self.order == 2
12
13 def first(self, printFirst: 'Callable[[], None]') -> None:
14 with self.the_condition:
15 printFirst()
16 self.order = 1
17 self.the_condition.notify(2) #notify the two waiting threads to check their condition
18
19 def second(self, printSecond: 'Callable[[], None]') -> None:
20 with self.the_condition:
21 self.the_condition.wait_for(self.first_done)
22 printSecond()
23 self.order = 2
24 self.the_condition.notify() #notify the one other waiting thread to check the waiting status
25
26 def third(self, printThird: 'Callable[[], None]') -> None:
27 with self.the_condition:
28 self.the_condition.wait_for(self.second_done)
29 printThird()

And There We Have It!

Synchronisation Primitives; scary name, not so scary concept when you step it out. Please reach out to me if you have any questions!

Twisting Threads

More articles from toddtee

Hello COVID-19 World

Not really; I don't think we need to be too dramatic. In-fact, I really resisted even mentioning the virus, however it has led me (and everyone) to an interesting set of circumstances...

March 18th, 2020 · 1 min read

Defanging Malicious IP Addresses

Defang those nasty IP addresses and URLS with this one weird trick! Hackers HATE him!

March 26th, 2020 · 1 min read
© 2020 toddtee
Link to $https://twitter.com/ToddTeeTurnerLink to $https://www.linkedin.com/in/ttturner/