Skip to content Skip to sidebar Skip to footer

Loading Several Npz Files In A Multithreadedly

I have several .npz files. All .npz file the same structures: each of them just contain two variables, always with the same variable names. As of now, I simply loop over all .npz f

Solution 1:

I was surprised by the comments of @Brent Washburne and decided to try it out myself. I think the general problem is two-fold:

Firstly, reading data is often IO bound, so writing multi-threaded code often does not yield high performance gains. Secondly, doing shared memory parallelization in python is inherently difficult due to the design of the language itself. There's much more overhead compared to native c.

But let's see what we can do.

# some importsimport numpy as np
import glob
from multiprocessing import Pool
import os

# creating some temporary data
tmp_dir = os.path.join('tmp', 'nptest')
ifnot os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)
    for i inrange(100):
        x = np.random.rand(10000, 50)
        file_path = os.path.join(tmp_dir, '%05d.npz' % i)
        np.savez_compressed(file_path, x=x)

defread_x(path):
    with np.load(path) as data:
        return data["x"]

defserial_read(files):
    x_list = list(map(read_x, files))
    return x_list

defparallel_read(files):
    with Pool() as pool:
        x_list = pool.map(read_x, files)
    return x_list

Okay, enough stuff prepared. Let's get the timings.

files = glob.glob(os.path.join(tmp_dir, '*.npz'))

%timeit x_serial = serial_read(files)
# 1 loops, best of 3: 7.04 s per loop

%timeit x_parallel = parallel_read(files)
# 1 loops, best of 3: 3.56 s per loop

np.allclose(x_serial, x_parallel)
# True

It actually looks like a decent speedup. I am using two real and two hyper-threading cores.


To run and time everything at once, you can execute this script:

from __future__ import print_function
from __future__ import division

# some importsimport numpy as np
import glob
import sys
import multiprocessing
import os
import timeit

# creating some temporary data
tmp_dir = os.path.join('tmp', 'nptest')
ifnot os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)
    for i inrange(100):
        x = np.random.rand(10000, 50)
        file_path = os.path.join(tmp_dir, '%05d.npz' % i)
        np.savez_compressed(file_path, x=x)

defread_x(path):
    data = dict(np.load(path))
    return data['x']

defserial_read(files):
    x_list = list(map(read_x, files))
    return x_list

defparallel_read(files):
    pool = multiprocessing.Pool(processes=4)
    x_list = pool.map(read_x, files)
    return x_list


files = glob.glob(os.path.join(tmp_dir, '*.npz'))
#files = files[0:5] # to test on a subset of the npz files# Timing:
timeit_runs = 5

timer = timeit.Timer(lambda: serial_read(files))
print('serial_read: {0:.4f} seconds averaged over {1} runs'
      .format(timer.timeit(number=timeit_runs) / timeit_runs,
      timeit_runs))
# 1 loops, best of 3: 7.04 s per loop

timer = timeit.Timer(lambda: parallel_read(files))
print('parallel_read: {0:.4f} seconds averaged over {1} runs'
      .format(timer.timeit(number=timeit_runs) / timeit_runs,
      timeit_runs))
# 1 loops, best of 3: 3.56 s per loop# Examples of use:
x = serial_read(files)
print('len(x): {0}'.format(len(x))) # len(x): 100print('len(x[0]): {0}'.format(len(x[0]))) # len(x[0]): 10000print('len(x[0][0]): {0}'.format(len(x[0][0]))) # len(x[0]): 10000print('x[0][0]: {0}'.format(x[0][0])) # len(x[0]): 10000print('x[0].nbytes: {0} MB'.format(x[0].nbytes / 1e6)) # 4.0 MB

Post a Comment for "Loading Several Npz Files In A Multithreadedly"