Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. This parallelization leads to significant speedup in tasks that involve a lot of computation. This article will cover multiprocessing in Python; it’ll start by illustrating multiprocessing in Python with some basic sleep methods and then finish up with a real-world image processing example. In one of our recent articles, we discussed using multithreading in Python to speed up programs; I recommend reading that before continuing.
Multiprocessing in Python
Like the threading module, the multiprocessing module comes with the Python standard library. You can create processes by creating a Process object using a callable object or function or by inheriting the Process
class and overriding the run()
method. Let’s create the dummy function we will use to illustrate the basics of multiprocessing in Python.
import time def useless_function(sec = 1): print(f'Sleeping for {sec} second(s)') time.sleep(sec) print(f'Done sleeping') start = time.perf_counter() useless_function() useless_function() end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 1 second(s) Done sleeping Sleeping for 1 second(s) Done sleeping Finished in 2.02 second(s)
Using the Thread() Constructor
Running the function twice sequentially took roughly two seconds as expected. Let’s create two processes, run them in parallel and see how that pans out.
import multiprocessing start = time.perf_counter() process1 = multiprocessing.Process(target=useless_function) process2 = multiprocessing.Process(target=useless_function) process1.start() process2.start() end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Finished in 0.02 second(s) Sleeping for 1 second(s) Sleeping for 1 second(s)
Something seems wrong with the output, granted that we forgot to wait for the processes to finish but according to the output, the processes were started after the program finished execution. The output appears in this order because it takes a while to create the processes and get them running. This isn’t the case for threads that start instantly. Like threads, the join()
method is used to wait for the processes to finish execution.
start = time.perf_counter() process1.start() process2.start() process1.join() process2.join() end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 1 second(s) Sleeping for 1 second(s) Done sleeping Done sleeping Finished in 1.04 second(s)
Right now we are not getting that big of a speedup, but that’s mainly because our function doesn’t take too much time to execute, and we are only running it twice. What if we want to run it ten times? If we were to run it sequentially, it would take a little over ten seconds because one would have to finish before the other. However, if we run these parallelly in multiple processes, it should be significantly faster. Instead of manually creating the ten processes, let’s create and start these in a loop.
Unlike threads, when passing arguments to processes, the arguments must be serializable using pickle
. Simply put, serialization means converting python objects into a format (binary format) that can be deconstructed and reconstructed in another python script.
start = time.perf_counter() processes = [] for _ in range(10): p = multiprocessing.Process(target=useless_function, args = [2]) p.start() processes.append(p)
Now we can’t run join()
within the same loop because it would wait for the process to finish before looping and creating the next one. So it would be the same as running them sequentially.
for p in processes: p.join() end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Sleeping for 2 second(s) Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Done sleeping Finished in 2.15 second(s)
Even when running the functions ten times, it finishes in about two seconds. Now, this does seem a bit strange, seeing that my processor only has 4 cores. However, the computer has its own abstracted ways of switching cores when one of them isn’t busy (hint: multithreading).
Creating a Custom Process Class
To create your own custom process class, you can inherit the Process
class and override its run()
method.
from multiprocessing import Process def countdown(name, delay, count): while count: time.sleep(delay) print (f'{name, time.ctime(time.time()), count}') count -= 1 class newProcess(Process): def __init__(self, name, count): multiprocessing.Process.__init__(self) self.name = name self.count = count def run(self): print("Starting: " + self.name + "\n") countdown(self.name, 1,self.count) print("Exiting: " + self.name + "\n") t = newProcess("newProcess 1", 5) t.start() t.join() print("Done")
-----------------------------Output----------------------------- Starting: newProcess 1 ('newProcess 1', 'Fri Apr 30 07:24:56 2021', 5) ('newProcess 1', 'Fri Apr 30 07:24:57 2021', 4) ('newProcess 1', 'Fri Apr 30 07:24:58 2021', 3) ('newProcess 1', 'Fri Apr 30 07:24:59 2021', 2) ('newProcess 1', 'Fri Apr 30 07:25:00 2021', 1) Exiting: newProcess 1 Done
Using ProcessPoolExecutor
In addition to using the multiprocessing library, there’s another way of running processes. In Python 3.2, they introduced ProcessPoolExecuter
. It is a more efficient way of running multiple processes. It also allows us to switch over to using multiple threads instead of processes with minimal changes. If we want to execute the function one at a time, we can use the submit()
method. It schedules the target function for execution and returns a futures
object. This futures
object encapsulates the function’s execution and allows us to check that it’s running or if it’s done and fetch the return value using result()
.
Let’s redefine the dummy function, so it has a return value and illustrates the use of ProcessPoolExecuter.
import concurrent.futures start = time.perf_counter() def useless_function(sec = 1): print(f'Sleeping for {sec} second(s)') time.sleep(sec) print(f'Done sleeping') return sec with concurrent.futures.ProcessPoolExecutor() as executor: process1 = executor.submit(useless_function, 1) process2 = executor.submit(useless_function, 1) print(f'Return Value: {process1.result()}') print(f'Return Value: {process2.result()}') end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 1 second(s) Sleeping for 1 second(s) Done sleeping Done sleeping Return Value: 1 Return Value: 1 Finished in 1.06 second(s)
If we want to run this ten times, we will have to create two loops, one for creating the processes and another for fetching their results. A better way of doing this would be the as_completed()
method. The as_completed()
method returns an iterator that we can loop over to get the results of the processes as they’re completed, i.e, in the order of their completion.
with concurrent.futures.ProcessPoolExecutor() as executor: secs = [5, 4, 3, 2, 1] pool = [executor.submit(useless_function, i) for i in secs] for i in concurrent.futures.as_completed(pool): print(f'Return Value: {i.result()}') end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 5 second(s) Sleeping for 4 second(s) Done sleeping Sleeping for 3 second(s) Return Value: 4 Done sleeping Sleeping for 2 second(s) Return Value: 5 Done sleeping Done sleeping Sleeping for 1 second(s) Return Value: 2 Return Value: 3 Done sleeping Return Value: 1 Finished in 6.07 second(s)
To avoid using loops altogether, we can use the map()
method. This map()
method is similar to the built-in map()
method; it runs the function for every item of the iterable we pass in. It just uses processes rather than doing it sequentially. And instead of returning a futures
object, it returns an iterable containing the results. These results are in the order the processes were started, not in the order they are completed. Another thing to note is that if our function raises an exception, it won’t raise it while running the process; the exception will be raised when its value is retrieved from the results iterator.
start = time.perf_counter() with concurrent.futures.ProcessPoolExecutor() as executor: secs = [5, 4, 3, 2, 1] pool = executor.map(useless_function, sec) for res in pool: print(f'Return Value: {res}') end = time.perf_counter() print(f'Finished in {round(end-start, 2)} second(s)')
-----------------------------Output----------------------------- Sleeping for 5 second(s) Sleeping for 4 second(s) Done sleeping Sleeping for 3 second(s) Done sleeping Sleeping for 2 second(s) Return Value: 5 Return Value: 4 Done sleeping Done sleeping Sleeping for 1 second(s) Return Value: 3 Return Value: 2 Done sleeping Return Value: 1 Finished in 6.06 second(s)
Parallelized Image Augmentation
To demonstrate the use of multiprocessing in a somewhat realistic setting we will continue with the images example used in the multithreading article, and perform some image augmentation on the images we downloaded from Pexels. Although image augmentation is a computation-intensive task, it is by no means the perfect use case for multiprocessing because it does involve a fair bit of I/O operations.
Running the Image Augmentation Function Sequentially
from PIL import Image, ImageFilter file_names = ['305821.jpg', '509922.jpg', '325812.jpg', '1252814.jpg', '1420709.jpg', '963486.jpg', '1557183.jpg', '3023211.jpg', '1031641.jpg', '439227.jpg', '696644.jpg', '911254.jpg', '1001990.jpg', '3518623.jpg', '916044.jpg'] start = time.perf_counter() size = (1200, 1200) def augment_image(img_name): img = Image.open(img_name) img = img.filter(ImageFilter.GaussianBlur(15)) img.thumbnail(size) img.save(f'augmented-{img_name}') print(f'{img_name} was augmented...') for f in file_names: augment_image(f) end = time.perf_counter() print(f'Finished in {round(end-start, 2)} seconds')
-----------------------------Output----------------------------- 305821.jpg was augmented... 509922.jpg was augmented... 325812.jpg was augmented... 1252814.jpg was augmented... 1420709.jpg was augmented... 963486.jpg was augmented... 1557183.jpg was augmented... 3023211.jpg was augmented... 1031641.jpg was augmented... 439227.jpg was augmented... 696644.jpg was augmented... 911254.jpg was augmented... 1001990.jpg was augmented... 3518623.jpg was augmented... 916044.jpg was augmented... Finished in 20.66153374500027 seconds
Running the Function in Parallel using Multiprocessing
start = time.perf_counter() with concurrent.futures.ProcessPoolExecutor() as executor: executor.map(augment_image, file_names) end = time.perf_counter() print(f'Finished in {round(end-start, 2)} seconds')
-----------------------------Output----------------------------- 509922.jpg was augmented... 305821.jpg was augmented... 325812.jpg was augmented... 1420709.jpg was augmented... 1252814.jpg was augmented... 963486.jpg was augmented... 1557183.jpg was augmented... 3023211.jpg was augmented... 1031641.jpg was augmented... 696644.jpg was augmented... 911254.jpg was augmented... 1001990.jpg was augmented... 439227.jpg was augmented... 3518623.jpg was augmented... 916044.jpg was augmented... Finished in 8.63 seconds
Using multiprocessing enables the program to finish execution in almost one-third the time of sequential execution.
To learn more about the Python multiprocessing module, refer to the official documentation and thw source code.
Want to learn more about the ins and outs of Python? Check out these articles: