From daf3a3de3fbfec5febf201afbd6aec8341164670 Mon Sep 17 00:00:00 2001 From: Thomas Aynaud Date: Tue, 30 Aug 2016 18:13:15 +0200 Subject: [PATCH] Stop using Parallel for SparkFeatureUnion See https://issues.apache.org/jira/browse/SPARK-12717 The parameter is still here for the converted to_scikit() object --- splearn/pipeline.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/splearn/pipeline.py b/splearn/pipeline.py index 0d1df9f..1f55157 100644 --- a/splearn/pipeline.py +++ b/splearn/pipeline.py @@ -5,9 +5,8 @@ import numpy as np import scipy.sparse as sp from sklearn.externals import six -from sklearn.externals.joblib import Parallel, delayed from sklearn.pipeline import FeatureUnion, Pipeline, _name_estimators -from splearn.rdd import ArrayRDD, DictRDD +from splearn.rdd import DictRDD class SparkPipeline(Pipeline): @@ -218,6 +217,7 @@ class SparkFeatureUnion(FeatureUnion): List of transformer objects to be applied to the data. The first half of each tuple is the name of the transformer. n_jobs: int, optional + Ignored on spark, useful after conversion to scikit object Number of jobs to run in parallel (default 1). transformer_weights: dict, optional Multiplicative weights for features per transformer. @@ -239,9 +239,11 @@ def fit(self, Z, **fit_params): step, param = pname.split('__', 1) fit_params_steps[step][param] = pval - transformers = Parallel(n_jobs=self.n_jobs, backend="threading")( - delayed(_fit_one_transformer)(trans, Z, **fit_params_steps[name]) - for name, trans in self.transformer_list) + transformers = [_fit_one_transformer(trans, + Z, + **fit_params_steps[name]) + for name, trans in self.transformer_list] + self._update_transformer_list(transformers) return self