SynapseML icon indicating copy to clipboard operation
SynapseML copied to clipboard

[BUG]Inconsistent Training AUC after Saving/Loading Parquet Data

Open stupidoge opened this issue 7 months ago • 0 comments

SynapseML version

synapseml_2.12:1.0.11

System information

  • Language version (e.g. python 3.8, scala 2.12): Scala 2.12
  • Spark Version (e.g. 3.2.3): Spark 3.5.0
  • Spark Platform (e.g. Synapse, Databricks): Databricks

Describe the problem

I'm writing to report a critical reproducibility issue we've encountered with LightGBM training in Synapse ML that significantly impacts model reliability in production environments. During our model development process for a binary classification task with highly imbalanced data (initial positive:negative sample ratio 1:3804 downsampled to 1:1000), we've observed inconsistent training results that directly correlate with whether we use freshly processed data versus reloaded Parquet files.

The core issue manifests as dramatically different training AUC when using logically identical datasets: 1. When training directly on the downsampled DataFrame before any persistence, we obtain expected training AUC values around 0.92 with validation AUC at 0.91 2. When saving this identical dataset as Parquet then reloading it for training, the training AUC inexplicably jumps to 0.99 (near-perfect scores indicating severe overfitting), while validation AUC remains stable at 0.91

This inconsistency persists despite:

  1. Using the same cluster configuration and notebook environment
  2. Verifying identical data content through schema validation and row-count checks
  3. Set seed=777 and deterministic=True during the training process
  4. Multiple attempts to force partition uniformity through repartition()

Could you help us try to figure it out? The inconsistency indicates there might be some error happens inner our API.

Code to reproduce issue



# define downsampling function
def random_undersampling(train_data, val_data, negative_positive_ratio=10, seed=21):
    """
    random sample methods: 
    
    Args:

        negative_positive_ratio: ratio of positive samples, normally 3-5
        n_samples: sampling times
    """
    # get positive samples
    pos_data = train_data.filter(F.col("conversion_flag") == 1)
    neg_data = train_data.filter(F.col("conversion_flag") == 0)
    
    # calculate sampling ratio
    pos_count = pos_data.count()
    target_neg_count = pos_count * negative_positive_ratio
    sampling_fraction = target_neg_count / neg_data.count()
    
    # random sampling
    sampled_neg = neg_data.sample(False, sampling_fraction, seed)
    # merge
    sampled_train = pos_data.unionAll(sampled_neg)
    sampled_train_shuffle = sampled_train.orderBy(rand())
        
    return sampled_train_shuffle, val_data


# run the down sampling function
sampled_train, val_data = random_undersampling(
    train_data=train_data,
    val_data=val_data,
    negative_positive_ratio=1000,
    seed=299
)

sampled_train.write.format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \    
    .save("xxxxx") 

sampled_train_reloaded = spark.read.parquet("xxxxx")



from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
import numpy as np
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
# check the partition distribution difference pre/after reload the data

# Create VectorAssembler
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lgbm = LightGBMClassifier(
    featuresCol="features",
    labelCol="conversion_flag",
    objective='binary', 
    isProvideTrainingMetric=True,
    numIterations=685.0,
    learningRate=0.09801426202661519,
    numLeaves=91.0,
    lambdaL1=0.7071587286417129,
    lambdaL2=0.13903538813177022,
    maxDepth=9.0,
    minSumHessianInLeaf=0.48631862969629164,
    featuresShapCol="shap_values",
    featureFraction=0.5229432743203031,
    baggingFraction=0.9417315628234632,
    baggingFreq=9.0,
    passThroughArgs="scale_pos_weight=4.0",
    deterministic=True,
    seed=777,
    baggingSeed=521,
    useBarrierExecutionMode=True,
    dataTransferMode="streaming"
)

evaluator_auc = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderROC", numBins=0)
evaluator_pr = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderPR", numBins=0)
evaluator_rmse = RegressionEvaluator(labelCol="conversion_flag", predictionCol="prediction", metricName="rmse")

# lgbm.setPassThroughArgs("print_every_n_iterations=5")
pipeline = Pipeline(stages=[lgbm_assembler, lgbm])
model = pipeline.fit(sampled_train)
predictions_train = model.transform(sampled_train)
predictions = model.transform(val_data)

train_auc = evaluator_auc.evaluate(predictions_train)
train_auprc = evaluator_pr.evaluate(predictions_train)  
val_auc = evaluator_auc.evaluate(predictions)
val_auprc = evaluator_pr.evaluate(predictions)  

# Make predictions on val set
print('-----------------------')
print('training AUC:', train_auc)
print('training AUPRC:', train_auprc)
print('validation AUC:', val_auc)
print('validation AUPRC:', val_auprc)


#%%
# then re-run the model use the sampled_train_reloaded
# Create VectorAssembler
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
lgbm = LightGBMClassifier(
    featuresCol="features",
    labelCol="conversion_flag",
    objective='binary', 
    isProvideTrainingMetric=True,
    numIterations=685.0,
    learningRate=0.09801426202661519,
    numLeaves=91.0,
    lambdaL1=0.7071587286417129,
    lambdaL2=0.13903538813177022,
    maxDepth=9.0,
    minSumHessianInLeaf=0.48631862969629164,
    featuresShapCol="shap_values",
    featureFraction=0.5229432743203031,
    baggingFraction=0.9417315628234632,
    baggingFreq=9.0,
    passThroughArgs="scale_pos_weight=4.0",
    deterministic=True,
    seed=777,
    baggingSeed=521,
    useBarrierExecutionMode=True,
    dataTransferMode="streaming"
)

evaluator_auc = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderROC", numBins=0)
evaluator_pr = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderPR", numBins=0)
evaluator_rmse = RegressionEvaluator(labelCol="conversion_flag", predictionCol="prediction", metricName="rmse")

# lgbm.setPassThroughArgs("print_every_n_iterations=5")
pipeline = Pipeline(stages=[lgbm_assembler, lgbm])
model = pipeline.fit(sampled_train_reloaded)
predictions_train = model.transform(sampled_train_reloaded)
predictions = model.transform(val_data)

train_auc = evaluator_auc.evaluate(predictions_train)
train_auprc = evaluator_pr.evaluate(predictions_train)  
val_auc = evaluator_auc.evaluate(predictions)
val_auprc = evaluator_pr.evaluate(predictions)  

# Make predictions on val set
print('-----------------------')
print('training AUC:', train_auc)
print('training AUPRC:', train_auprc)
print('validation AUC:', val_auc)
print('validation AUPRC:', val_auprc)

Other info / logs

this one is before save/reloading:
training AUC: 0.9223300884299345
training AUPRC: 0.8062838605214455
validation AUC: 0.916019539596558
validation AUPRC: 0.7558880508858233

this one is after save/reloading:
training AUC: 0.9999999963765849
training AUPRC: 0.9999963845879573
validation AUC: 0.9082770248549157
validation AUPRC: 0.7598785340365525

What component(s) does this bug affect?

  • [ ] area/cognitive: Cognitive project
  • [ ] area/core: Core project
  • [ ] area/deep-learning: DeepLearning project
  • [ ] area/lightgbm: Lightgbm project
  • [ ] area/opencv: Opencv project
  • [ ] area/vw: VW project
  • [ ] area/website: Website
  • [ ] area/build: Project build system
  • [ ] area/notebooks: Samples under notebooks folder
  • [ ] area/docker: Docker usage
  • [ ] area/models: models related issue

What language(s) does this bug affect?

  • [ ] language/scala: Scala source code
  • [ ] language/python: Pyspark APIs
  • [ ] language/r: R APIs
  • [ ] language/csharp: .NET APIs
  • [ ] language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • [ ] integrations/synapse: Azure Synapse integrations
  • [ ] integrations/azureml: Azure ML integrations
  • [ ] integrations/databricks: Databricks integrations

stupidoge avatar Jun 25 '25 15:06 stupidoge