incubator-devlake icon indicating copy to clipboard operation
incubator-devlake copied to clipboard

[Bug][Pipeline] The pipeline gets stuck if the tool has not space let in the disk

Open msarzijr opened this issue 1 year ago • 3 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

When we have too many pipelines running at the same time, the git extractor gets full, and the disk gets full. The pipeline gets stuck and does not even respond to the stop command.

image

What do you expect to happen

I believe the cancel pipeline command would stop the pipeline and clean the /tmp folder to be able to reschedule the pipelines to different times.

How to reproduce

Run pipelines in parallel that collets a number of git repos that are bigger than the disk space collects.

Anything else

Thanks for reading my post.

Version

v1.0.1-beta9@e061ef2

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

msarzijr avatar Sep 24 '24 13:09 msarzijr

Hey there, @msarzijr! I'm here to help you with any bugs, questions, or contribution guidance you might need. Let's tackle this together!

I found a similar closed issue: [Feature][framework] Throttling total number of pipelines running in parallel, which was resolved by a pull request that implemented running pipelines in an independent process [1].

To address the issue of the pipeline getting stuck when the disk space is full and ensuring the cancel pipeline command stops the pipeline and cleans the /tmp folder, you can implement the following steps:

  1. Monitor Disk Space: Implement a function to monitor disk space and pause the pipeline if the disk space is below a certain threshold.
  2. Clean /tmp Folder on Cancel: Modify the CancelPipeline function to clean the /tmp folder when a pipeline is canceled.

Here is an example implementation:

Monitor Disk Space

Add a function to check disk space and pause the pipeline if necessary:

import (
	"syscall"
	"log"
)

func checkDiskSpace(path string, threshold uint64) bool {
	var stat syscall.Statfs_t
	err := syscall.Statfs(path, &stat)
	if err != nil {
		log.Printf("Failed to get disk space: %v", err)
		return false
	}
	// Available blocks * size per block = available space in bytes
	available := stat.Bavail * uint64(stat.Bsize)
	return available > threshold
}

func monitorDiskSpace() {
	const threshold = 10 * 1024 * 1024 * 1024 // 10 GB
	for {
		if !checkDiskSpace("/", threshold) {
			log.Println("Disk space is below threshold, pausing pipeline")
			// Pause the pipeline
			// Implement the logic to pause the pipeline
		}
		time.Sleep(1 * time.Minute) // Check every minute
	}
}

Clean /tmp Folder on Cancel

Modify the CancelPipeline function to clean the /tmp folder:

import (
	"os"
	"path/filepath"
)

func cleanTmpFolder() error {
	tmpDir := "/tmp"
	err := os.RemoveAll(tmpDir)
	if err != nil {
		return err
	}
	return os.Mkdir(tmpDir, 0755)
}

func CancelPipeline(pipelineId uint64) errors.Error {
	// prevent RunPipelineInQueue from consuming pending pipelines
	pipeline := &models.Pipeline{}
	err := db.First(pipeline, dal.Where("id = ?", pipelineId))
	if err != nil {
		return errors.BadInput.New("pipeline not found")
	}
	if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
		pipeline.Status = models.TASK_CANCELLED
		err = db.Update(pipeline)
		if err != nil {
			return errors.Default.Wrap(err, "failed to update pipeline")
		}
		// now, with RunPipelineInQueue being block and target pipeline got updated
		// we should update the related tasks as well
		err = db.UpdateColumn(
			&models.Task{},
			"status", models.TASK_CANCELLED,
			dal.Where("pipeline_id = ?", pipelineId),
		)
		if err != nil {
			return errors.Default.Wrap(err, "failed to update pipeline tasks")
		}
		// Clean the /tmp folder
		if err := cleanTmpFolder(); err != nil {
			return errors.Default.Wrap(err, "failed to clean /tmp folder")
		}
		// the target pipeline is pending, no running, no need to perform the actual cancel operation
		return nil
	}
	pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
	if err != nil {
		return errors.Convert(err)
	}
	if count == 0 {
		return nil
	}
	for _, pendingTask := range pendingTasks {
		_ = CancelTask(pendingTask.ID)
	}
	// Clean the /tmp folder
	if err := cleanTmpFolder(); err != nil {
		return errors.Default.Wrap(err, "failed to clean /tmp folder")
	}
	return errors.Convert(err)
}

Run Disk Space Monitor

Start the disk space monitor in the pipelineServiceInit function:

func pipelineServiceInit() {
	// initialize plugin
	plugin.InitPlugins(basicRes)

	// notification
	var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
	var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
	if strings.TrimSpace(notificationEndpoint) != "" {
		defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
	}

	// standalone mode: reset pipeline status
	if cfg.GetBool("RESUME_PIPELINES") {
		markInterruptedPipelineAs(models.TASK_RESUME)
	} else {
		markInterruptedPipelineAs(models.TASK_FAILED)
	}

	// load cronjobs for blueprints
	errors.Must(ReloadBlueprints())

	var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
	if pipelineMaxParallel < 0 {
		panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
	}
	if pipelineMaxParallel == 0 {
		globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
		pipelineMaxParallel = 10000
	}
	// run pipeline with independent goroutine
	go RunPipelineInQueue(pipelineMaxParallel)

	// Start disk space monitor
	go monitorDiskSpace()
}

This implementation ensures that the pipeline is paused when disk space is low and the /tmp folder is cleaned when a pipeline is canceled [2].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Sep 24 '24 13:09 dosubot[bot]

Hi @dosu, Thanks for the points. They are useful, but I believe I should have them embedded in the tool as a user. If I click to stop the pipeline, it needs to stop and clean the disk by itself.

It will help the user to keep going without needing technical intervention, as now it just makes the pipelines stucked.

msarzijr avatar Sep 24 '24 13:09 msarzijr

Yes, the tool can be modified to automatically stop the pipeline and clean the disk when the user clicks to stop the pipeline. The CancelPipeline function already handles stopping the pipeline. You can extend this function to include disk cleanup.

Here is an example of how you might modify the CancelPipeline function to include disk cleanup:

func CancelPipeline(pipelineId uint64) errors.Error {
	// prevent RunPipelineInQueue from consuming pending pipelines
	pipeline := &models.Pipeline{}
	err := db.First(pipeline, dal.Where("id = ?", pipelineId))
	if err != nil {
		return errors.BadInput.New("pipeline not found")
	}
	if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
		pipeline.Status = models.TASK_CANCELLED
		err = db.Update(pipeline)
		if err != nil {
			return errors.Default.Wrap(err, "failed to update pipeline")
		}
		// now, with RunPipelineInQueue being block and target pipeline got updated
		// we should update the related tasks as well
		err = db.UpdateColumn(
			&models.Task{},
			"status", models.TASK_CANCELLED,
			dal.Where("pipeline_id = ?", pipelineId),
		)
		if err != nil {
			return errors.Default.Wrap(err, "failed to update pipeline tasks")
		}
		// the target pipeline is pending, no running, no need to perform the actual cancel operation
		return nil
	}
	pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
	if err != nil {
		return errors.Convert(err)
	}
	if count == 0 {
		return nil
	}
	for _, pendingTask := range pendingTasks {
		_ = CancelTask(pendingTask.ID)
	}

	// Clean up disk
	logsPath, err := getPipelineLogsPath(pipeline)
	if err == nil {
		err = os.RemoveAll(logsPath)
		if err != nil {
			return errors.Default.Wrap(err, "failed to clean up disk")
		}
	} else {
		return errors.Default.Wrap(err, "failed to get logs path for cleanup")
	}

	return errors.Convert(err)
}

Additionally, you can modify the UI to handle the cancel operation and trigger the disk cleanup. In the PipelineInfo component, the handleCancel function can be used to call the API that cancels the pipeline and performs the cleanup:

const handleCancel = async () => {
    const [success] = await operator(() => API.pipeline.remove(id), {
      setOperating,
    });

    if (success) {
      // setVersion((v) => v + 1);
    }
};

This modification ensures that when a pipeline is canceled, the associated disk space is also cleaned up [1][2].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Sep 24 '24 13:09 dosubot[bot]

This issue has been automatically marked as stale because it has been inactive for 60 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Nov 24 '24 00:11 github-actions[bot]

This issue has been closed because it has been inactive for a long time. You can reopen it if you encounter the similar problem in the future.

github-actions[bot] avatar Dec 01 '24 00:12 github-actions[bot]