Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using Parallel for SparkFeatureUnion #69

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions splearn/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import numpy as np
import scipy.sparse as sp
from sklearn.externals import six
from sklearn.externals.joblib import Parallel, delayed
from sklearn.pipeline import FeatureUnion, Pipeline, _name_estimators
from splearn.rdd import ArrayRDD, DictRDD
from splearn.rdd import DictRDD


class SparkPipeline(Pipeline):
Expand Down Expand Up @@ -218,6 +217,7 @@ class SparkFeatureUnion(FeatureUnion):
List of transformer objects to be applied to the data. The first
half of each tuple is the name of the transformer.
n_jobs: int, optional
Ignored on spark, useful after conversion to scikit object
Number of jobs to run in parallel (default 1).
transformer_weights: dict, optional
Multiplicative weights for features per transformer.
Expand All @@ -239,9 +239,11 @@ def fit(self, Z, **fit_params):
step, param = pname.split('__', 1)
fit_params_steps[step][param] = pval

transformers = Parallel(n_jobs=self.n_jobs, backend="threading")(
delayed(_fit_one_transformer)(trans, Z, **fit_params_steps[name])
for name, trans in self.transformer_list)
transformers = [_fit_one_transformer(trans,
Z,
**fit_params_steps[name])
for name, trans in self.transformer_list]

self._update_transformer_list(transformers)
return self

Expand Down