automl icon indicating copy to clipboard operation
automl copied to clipboard

New update with mlflow experiments causing _pickle.PicklingError

Open hatMatch opened this issue 3 years ago • 0 comments

Since the update to mlflow integration with hyperopt where names are automatically assigned to experiments (such as smiling-worm-674), I began getting the following error consistently when running a previously working mlflow experiment with SparkTrials().

ERROR:hyperopt-spark:trial task 0 failed, exception is
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 405.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 405.0 (TID 1472) (10.143.252.81 executor 0):
org.apache.spark.api.python.PythonException: '_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range'

However, my experiment is not doing any pickling and my code is not referenced in the full traceback, so I am not exactly sure what the issue is. I can confirm that the experiment works when using hyperopt.Trials() rather than hyperopt.SparkTrials(). Apologies for such a lengthy issue, and sorry if the issue is some simple mistake on my end!

Here is the full traceback:

Full Traceback
Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
   return Pickler.dump(self, obj)
 File "/databricks/python/lib/python3.9/site-packages/patsy/origin.py", line 117, in __getstate__
   raise NotImplementedError
NotImplementedError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/serializers.py", line 527, in dumps
   return cloudpickle.dumps(obj, pickle_protocol)
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
   cp.dump(obj)
 File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 604, in dump
   if "recursion" in e.args[0]:
IndexError: tuple index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/databricks/spark/python/pyspark/worker.py", line 876, in main
   process()
 File "/databricks/spark/python/pyspark/worker.py", line 868, in process
   serializer.dump_stream(out_iter, outfile)
 File "/databricks/spark/python/pyspark/serializers.py", line 329, in dump_stream
   bytes = self.serializer.dumps(vs)
 File "/databricks/spark/python/pyspark/serializers.py", line 537, in dumps
   raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range

	  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:692)
	  at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:902)
	  at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:884)
	  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:645)
	  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	  at scala.collection.Iterator.foreach(Iterator.scala:943)
	  at scala.collection.Iterator.foreach$(Iterator.scala:943)
	  at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	  at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	  at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	  at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	  at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	  at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	  at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	  at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	  at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	  at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	  at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	  at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	  at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1029)
	  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	  at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
	  at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
	  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	  at org.apache.spark.scheduler.Task.run(Task.scala:96)
	  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:889)
	  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1692)
	  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:892)
	  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:747)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	  at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3257)
	  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3189)
	  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3180)
	  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3180)
	  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1414)
	  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1414)
	  at scala.Option.foreach(Option.scala:407)
	  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1414)
	  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3466)
	  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3407)
	  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3395)
	  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
	  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1166)
	  at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2702)
	  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1027)
	  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
	  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
	  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	  at org.apache.spark.rdd.RDD.withScope(RDD.scala:411)
	  at org.apache.spark.rdd.RDD.collect(RDD.scala:1025)
	  at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:282)
	  at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	  at sun.reflect.GeneratedMethodAccessor282.invoke(Unknown Source)
	  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	  at java.lang.reflect.Method.invoke(Method.java:498)
	  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	  at py4j.Gateway.invoke(Gateway.java:306)
	  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	  at py4j.commands.CallCommand.execute(CallCommand.java:79)
	  at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	  at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	  at java.lang.Thread.run(Thread.java:748)

The following is the code that is being run in the experiments:

mlflow.start_run()
spark_trials = SparkTrials(parallelism=16)

with mlflow.start_run(run_name='test_experiment'):
  best_result = fmin(
    fn=objective, 
    space=space,
    algo=tpe.suggest,
    max_evals=1024,
    trials=spark_trials)
Hyperopt Optimization Function
def objective(args):
    
    # Initialize model pipeline
    pipe = Pipeline(steps=[
        ('selection', args['selection'])
    ])
    
    pipe.set_params(**args['params']) # Model parameters will be set here
    pipe.fit(X, y)
    penalty = pipe['selection'].penalty_
    try:
        residual = np.sum(pipe['selection']._resid) / len(pipe['selection']._resid)
    except AttributeError:
        residual = -10000
    r2 = r2_score(y, pipe.predict(X))
    score = 1 - r2
    mean_square = mean_squared_error(y, pipe.predict(X))
    mlflow.log_metric('avg_residual', residual)
    mlflow.log_metric('mean_squared_error', mean_square)
    mlflow.log_metric('penalty', penalty)
    mlflow.log_metric('r2', r2)

    print(f"Model Name: {args['selection']}: ", score)
          
    # Since we have to minimize the score, we return 1- score.
    return {'loss': score, 'status': STATUS_OK}

Here are the parameters and parameter space:

Params and Parameter Space
params = {
  'selection__fixed': hp.choice('selection.fixed', fixed_arrs),
  'selection__random': hp.choice('selection.random', random_arrs),
  'selection__intercept': hp.choice('selection.intercept', (0, 1)),
  'selection__cov': hp.choice('selection.cov', (0, 1))
  }

space = hp.choice('regressors', [
    {
    'selection':LMEBaseRegressor(group=['panel'],
                                 dependent=dependent,
                                 media=media_cols),
    'params': params
    }
  ]
)

And finally here is the regressor I am using (including because its a custom class built ontop of sklearn):

LMEBaseRegressor Class
class LMEBaseRegressor(BaseEstimator, RegressorMixin):
    """Implementation of an LME Regression for scikit."""

    def __init__(self, random=None, fixed=None,
                 group=['panel'], dependent=None,
                 intercept=0, cov=0, media=None):
        self.random = random
        self.fixed = fixed
        self.group = group
        self.dependent = dependent
        self.intercept = intercept
        self.cov = cov
        self.media = media

    def fit(self, X, y):
        """Fit the model with LME."""
        str_dep = self.dependent[0]
        str_fixed = ' + '.join(self.fixed)
        str_random = ' + '.join(self.random)
        data = pd.concat([X, y], axis=1)
        self.penalty_ = 0
        print(f"{str_dep} ~ {self.intercept} + {str_fixed}")
        print(f"{self.cov} + {str_random}")
        try:
            mixed = smf.mixedlm(f"{str_dep} ~ {self.intercept} + {str_fixed}",
                                data,
                                re_formula=f"~ {self.cov} + {str_random}",
                                groups=data['panel'],
                                use_sqrt=True)\
                .fit(method=['lbfgs'])
            self._model = mixed
            self._resid = mixed.resid
            self.coef_ = mixed.params[0:len(self.fixed)]                    
        
        except(ValueError):
            print("Cannot predict random effects from singular covariance structure.")
            self.penalty_ = 100

        except(np.linalg.LinAlgError):
            print("Linear Algebra Error: recheck base model fit or try using fewer variables.")
            self.penalty_  = 100
        return self

    def predict(self, X):
        """Take the coefficients provided from fit and multiply them by X."""
        if self.penalty_ != 0:
            return np.ones(len(X)) * -100 * self.penalty_
        return self._model.predict(X)

hatMatch avatar Oct 28 '22 18:10 hatMatch