Skip to content Skip to sidebar Skip to footer

Improving Runtime Of Python Numpy Code

I have a code which reassigns bins to a large numpy array. Basically, the elements of the large array has been sampled at different frequency and the final goal is to rebin the ent

Solution 1:

Keep the code simple and than optimize

If you have an idea what algorithm you want to code write a simple reference implementation. From this you can go two ways using Python. You can try to vectorize the code or you can compile the code to get good performance.

Even if np.einsum or np.add.at were implementet in Numba, it would be very hard for any compiler to make efficient binary code from your example.

The only thing I have rewritten is a more efficient approach of digitize for scalar values.

Edit

In the Numba source code there is a more efficient implimentation of digitize for scalar values.

Code

#From Numba source#Copyright (c) 2012, Anaconda, Inc.#All rights reserved.@nb.njit(fastmath=True)defdigitize(x, bins, right=False):
    # bins are monotonically-increasing
    n = len(bins)
    lo = 0
    hi = n

    if right:
        if np.isnan(x):
            # Find the first nan (i.e. the last from the end of bins,# since there shouldn't be many of them in practice)for i inrange(n, 0, -1):
                ifnot np.isnan(bins[i - 1]):
                    return i
            return0while hi > lo:
            mid = (lo + hi) >> 1if bins[mid] < x:
                # mid is too low => narrow to upper bins
                lo = mid + 1else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid
    else:
        if np.isnan(x):
            # NaNs end up in the last binreturn n
        while hi > lo:
            mid = (lo + hi) >> 1if bins[mid] <= x:
                # mid is too low => narrow to upper bins
                lo = mid + 1else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid

    return lo

@nb.njit(fastmath=True)defdigitize(value, bins):
  if value<bins[0]:
    return0if value>=bins[bins.shape[0]-1]:
    return bins.shape[0]

  for l inrange(1,bins.shape[0]):
    if value>=bins[l-1] and value<bins[l]:
      return l

@nb.njit(fastmath=True,parallel=True)definner_loop(boost_factor,freq_bins,es):
  res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
  for i in nb.prange(boost_factor.shape[0]):
    for j inrange(boost_factor.shape[1]):
      for k inrange(freq_bins.shape[0]):
        ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
        res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
  return res

@nb.njit(fastmath=True)defcalc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
  final_emit = np.empty((division, division, freq_division),np.float64)
  for i inrange(division):
    final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
  return final_emit

Performance

(Quadcore i7)
original_code: 118.5scalc_nb: 4.14s#with digitize implementation from Numba sourcecalc_nb: 2.66s

Solution 2:

This seems to be trivially parallelizable:

  • You've got an outer loop that you run 90 times.
  • Each time, you're not mutating any shared arrays except final_emit
    • … and that, only to store into a unique row.
  • It looks like most of the work inside the loop is numpy array-wide operations, which will release the GIL.

So (using the futures backport of concurrent.futures, since you seem to be on 2.7):

import numpy as np
import time
import futures

division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))

defdostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    for i, row inenumerate(x.map(dostuff, xrange(division))):
        final_emit[i] = row

If this works, there are two tweaks to try, either of which might be more efficient. We don't really care which order the results come back in, but map queues them up in order. This can waste a bit of space and time. I don't think it will make much difference (presumably, the vast majority of your time is presumably spent doing the calculations, not writing out the results), but without profiling your code, it's hard to be sure. So, there are two easy ways around this problem.


Using as_completed lets us use the results in whatever order they finish, rather than in the order we queued them. Something like this:

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return i, np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    fs = [x.submit(dostuff, i) for i in xrange(division))
    for i, row in futures.as_completed(fs): 
        final_emit[i] = row

Alternatively, we can make the function insert the rows directly, instead of returning them. This means we're now mutating a shared object from multiple threads. So I think we need a lock here, although I'm not positive (numpy's rules are a bit complicated, and I haven't read you code that thoroughly…). But that probably won't hurt performance significantly, and it's easy. So:

import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()

defdostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    with final_emit_lock:
        final_emit[i] = np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    x.map(dostuff, xrange(division))

That max_workers=8 in all of my examples should be tuned for your machine. Too many threads is bad, because they start fighting each other instead of parallelizing; too few threads is even worse, because some of your cores just sit there idle.

If you want this to run on a variety of machines, rather than tuning it for each one, the best guess (for 2.7) is usually:

import multiprocessing
# ...with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:

But if you want to squeeze the max performance out of a specific machine, you should test different values. In particular, for a typical quad-core laptop with hyperthreading, the ideal value can be anywhere from 4 to 8, depending on the exact work you're doing, and it's easier to just try all the values than to try to predict.

Solution 3:

I think you get a small boost in the performance by replacing einsum with actual multiplication.

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
    sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

Your code is rather slow at np.add.at, which I believe can be much faster with np.bincount, although I couldn't get it quite work for multidimensional arrays you have. May be someone here can add to that.

Post a Comment for "Improving Runtime Of Python Numpy Code"