Lets suppose we are performing the multiplication:
P = A * B
. We're going to calculate the result matrix row-by-row. The following function returns a row of a result matrix P. The arguments are a row of matrix A and the matrix B.
def calc_row_of_product_matrix(a_row, b):
return map(lambda col: sum(starmap(mul,izip(a_row,col))), izip(*b))
We're going to use the multiprocessing module of Python to create a process pool containing as many processes as the number of CPUs we have and calculate each result row in a different process. Here are the two lines that does this trick:
def __mul__(self, b):
pool = multiprocessing.Pool(multiprocessing.cpu_count())
return pool.map(eval_func_tuple, izip(repeat(calc_row_of_product_matrix), self, repeat(b)))
Some explanation why is this code a bit nasty. Functions passed to the pool has to be picklable and because only the functions defined at the top level of a module are picklable, we needed to get the the row calculation function calc_row_of_product_matrix out of the class. Furthermore, to be able to pass two arguments to calc_row_of_product_matrix we also needed a helper function, which takes a tuple of a function and args, evaluates and returns the result:
def eval_func_tuple(f_args):
return f_args[0](*f_args[1:])
Note that passing around the whole matrix B to all processes calculating result rows, or more precisely passing itertools.repeat(b) to pool.map() visibly increases the memory consumption of the multiprocess version. This was not a real issue for me, as the bottleneck was CPU; anyway, this issue could be addressed by using the shared memory module of multiprocessing. For now we'll leave that as an exercise to the reader.
Here are the running times on my Intel Core2 Duo 3.16GHz box. For sufficiently large matrix (above 500*500) the running time of the multiprocess version is nearly half of the single process version.
100*100 matrix, single process: 0.0835670982343
100*100 matrix, multiprocess: 0.351096555199
200*200 matrix, single process: 0.79961114284
200*200 matrix, multiprocess: 0.700980680681
500*500 matrix, single process: 14.4003259234
500*500 matrix, multiprocess: 7.99582457187
1000*1000 matrix, single process: 118.078526896
1000*1000 matrix, multiprocess: 66.8809939919
Here you can see the single process version running with CPU usage of 50%:
Here you can see the multiprocessing version running with CPU usage of 100%:
Here's the full code:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import multiprocessing | |
from itertools import starmap, izip, repeat, imap | |
from operator import mul | |
def calc_row_of_product_matrix(a_row, b, izip=izip): | |
'''Calculate a row of the product matrix P = A * B | |
Arguments: | |
a_row is af A | |
b is the B matrix | |
returns the corresponding row of P matrix''' | |
return map(lambda col: sum(starmap(mul,izip(a_row,col))), izip(*b)) | |
def eval_func_tuple(f_args): | |
'''Takes a tuple of a function and args, evaluates and returns result''' | |
return f_args[0](*f_args[1:]) | |
class multimatrix(list): | |
def __mul__(self, b, izip=izip, repeat=repeat): | |
'''Concurrent matrix multiplication with multiprocessing.Pool. ''' | |
pool = multiprocessing.Pool(multiprocessing.cpu_count()) | |
return pool.map(eval_func_tuple, izip(repeat(calc_row_of_product_matrix), self, repeat(b))) | |
class itermatrix(list): | |
@staticmethod | |
def sumprod(row, col, sum=sum, starmap=starmap, mul=mul): | |
'''Returns sumproduct of two vectors.''' | |
return sum(starmap(mul,izip(row,col))) | |
def __mul__(self, b, imap=imap, izip=izip): | |
'''Matrix multiplication returning iterable of iterables''' | |
return imap(lambda row: imap(lambda col: itermatrix.sumprod(row, col), izip(*b)), self) | |
def iterate_results(result): | |
'''Iterate over iterable of iterables, | |
and returns elements in list of lists. | |
Usage: if you want to run the whole calculation at once: | |
p = iterate_results(itermatrix([[1, 3], [-5, 6], [2, 4]]) * itermatrix([[1, 4], [8, 7]]))''' | |
return[[col for col in row] for row in result] | |
def random_v(K=1000,min=-1000,max=1000): | |
'''Generates a random vector of dimension N; | |
Returns a list of integers. | |
The values are integers in the range [min,max].''' | |
return [random.randint(min,max) for k in range(K)] | |
def random_m(N=1000, K=1000): | |
'''Generates random matrix. Returns list of list of integers.''' | |
return [random_v(K) for n in range(N)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from matrix import multimatrix, itermatrix, random_m, iterate_results | |
import matrix | |
import timeit | |
if __name__ == '__main__': | |
a = [[1, 3], [-5, 6], [2, 4]] | |
b = [[1, 4], [8, 7]] | |
adotb = [[25, 25], [43, 22], [34, 36]] | |
A = multimatrix(a) | |
B = multimatrix(b) | |
prod = A * B | |
assert(adotb == prod) | |
print prod, "multi test OK" | |
A = itermatrix(a) | |
B = itermatrix(b) | |
iterprod = A * B | |
listprod = iterate_results(iterprod) | |
assert(adotb == listprod) | |
print listprod, "iter test OK" | |
t = timeit.Timer("p = itermatrix(a) * itermatrix(a)", "from matrix import itermatrix, random_m; a = random_m(500, 500)") | |
print "iterable product time:", t.timeit(number=1) | |
t = timeit.Timer("p = itermatrix(a) * itermatrix(a); iterate_results(p)", "from matrix import itermatrix, random_m, iterate_results; a = random_m(500, 500)") | |
print "list product time:", t.timeit(number=1) | |
t = timeit.Timer("p = multimatrix(a) * multimatrix(a)", "from matrix import multimatrix, random_m, iterate_results; a = random_m(500, 500)") | |
print "multi product time:", t.timeit(number=1) |