Using pandas performance is usually not an issue when you use the well optimized internal functions. However, sometimes you have to a perform a lot of calculations column wise on a large dataframe. I recently ran into this issue while calculating time series features. I increased the speed of the calculation 5x by chunking the dataframe and using parallel processing with Pythons multiprocessing
library.
import numpy as np
import pandas as pd
Create a test dataset. The real dataset I am working on is a set of daily satellite measurements (from Copernicus Sentinel-1) ranging from ca. -25 to 0.
ts_df = pd.DataFrame(np.random.random(size=(365, 3000)))
I want to calculate a number of temporal features to be used as input for a regression analysis. These will be calculated for each column. The features themselves are straightforward multi-temporal features such as percentiles, using a lagged time series and some based on Fourier transformation.
def feature_calculation(df):
# create DataFrame and populate with stdDev
result = pd.DataFrame(df.std(axis=0))
result.columns = ["stdDev"]
# mean
result["mean"] = df.mean(axis=0)
# percentiles
for i in [0.1, 0.25, 0.5, 0.75, 0.9]:
result[str(int(i*100)) + "perc"] = df.quantile(q=i)
# percentile differences / amplitudes
result["diff_90perc10perc"] = (result["10perc"] - result["90perc"])
result["diff_75perc25perc"] = (result["75perc"] - result["25perc"])
# percentiles of lagged time-series
for lag in [10, 20, 30, 40, 50]:
for i in [0.1, 0.25, 0.5, 0.75, 0.9]:
result["lag" + str(lag) + "_" + str(int(i*100)) + "perc"] = (df - df.shift(lag)).quantile(q=i)
# fft
df_fft = np.fft.fft(df, axis=0) # fourier transform only along time axis
result["fft_angle_mean"] = np.mean(np.angle(df_fft, deg=True), axis=0)
result["fft_angle_min"] = np.min(np.angle(df_fft, deg=True), axis=0)
result["fft_angle_max"] = np.max(np.angle(df_fft, deg=True), axis=0)
return result
Testing how long the calculation takes for a small test dataset.
%%timeit -n 3 -r 3
ts_features = feature_calculation(ts_df)
11.4 s ± 86.3 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
The calculation takes quite some time and increases linear with the number of columns. My real dataset has more than 700k columns instead of the 3000 we use here. I started looking into optimizing the feature calculation when I found out that my script spent 70% of the time calculating the features.
During the calculation only one core is used. As the calculation is performed for each column we can split the dataframe into a number of subsets and utilize multiple cores to calculate the features - making this an embarassingly parallel problem.
from multiprocessing import Pool
def parallel_feature_calculation(df, partitions=10, processes=4):
# calculate features in parallel by splitting the dataframe into partitions and using parallel processes
pool = Pool(processes)
df_split = np.array_split(df, partitions, axis=1) # split dataframe into partitions column wise
df = pd.concat(pool.map(feature_calculation, df_split))
pool.close()
pool.join()
return df
%%timeit -n 3 -r 3
ts_features_parallel = parallel_feature_calculation(ts_df, partitions=14, processes=7)
2.06 s ± 15.4 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
Compare the two results to make sure we get identical results using both feature calculation functions.
ts_features.equals(ts_features_parallel)
True
Using a simple parallelization routine the time series features are now calculated about 5 times faster - a significant time saving when working with large dataframes.
Update - 2017-12-11:
Jonas (@j08lue) pointed out:
You might save some extra overhead replacing
multiprocessing.Pool
withconcurrent.futures.ProcessPoolExecutor
…
Let’s try it but this time using a 10x larger dataset.
ts_df = pd.DataFrame(np.random.random(size=(365, 30000)))
%%timeit -n 3 -r 3
ts_features_parallel = parallel_feature_calculation(ts_df, partitions=100, processes=7)
19.6 s ± 618 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
Define the new function using ProcessPoolExecutor
.
# using different pool to save overhead
from concurrent.futures import ProcessPoolExecutor
def parallel_feature_calculation_ppe(df, partitions=10, processes=4):
# calculate features in paralell by splitting the dataframe into partitions and using paralell processes
df_split = np.array_split(df, partitions, axis=1) # split dataframe into partitions column wise
with ProcessPoolExecutor(processes) as pool:
df = pd.concat(pool.map(feature_calculation, df_split))
return df
%%timeit -n 3 -r 3
ts_features_parallel_ppe = parallel_feature_calculation_ppe(ts_df, partitions=100, processes=7)
18.4 s ± 376 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
Employing this tip we can indeed save about 6% execution time. Just keep in mind that this requires Python 3.2 or newer.