[Bug][Pipeline] The pipeline gets stuck if the tool has not space let in the disk
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
When we have too many pipelines running at the same time, the git extractor gets full, and the disk gets full. The pipeline gets stuck and does not even respond to the stop command.
What do you expect to happen
I believe the cancel pipeline command would stop the pipeline and clean the /tmp folder to be able to reschedule the pipelines to different times.
How to reproduce
Run pipelines in parallel that collets a number of git repos that are bigger than the disk space collects.
Anything else
Thanks for reading my post.
Version
v1.0.1-beta9@e061ef2
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Hey there, @msarzijr! I'm here to help you with any bugs, questions, or contribution guidance you might need. Let's tackle this together!
I found a similar closed issue: [Feature][framework] Throttling total number of pipelines running in parallel, which was resolved by a pull request that implemented running pipelines in an independent process [1].
To address the issue of the pipeline getting stuck when the disk space is full and ensuring the cancel pipeline command stops the pipeline and cleans the /tmp folder, you can implement the following steps:
- Monitor Disk Space: Implement a function to monitor disk space and pause the pipeline if the disk space is below a certain threshold.
-
Clean
/tmpFolder on Cancel: Modify theCancelPipelinefunction to clean the/tmpfolder when a pipeline is canceled.
Here is an example implementation:
Monitor Disk Space
Add a function to check disk space and pause the pipeline if necessary:
import (
"syscall"
"log"
)
func checkDiskSpace(path string, threshold uint64) bool {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
log.Printf("Failed to get disk space: %v", err)
return false
}
// Available blocks * size per block = available space in bytes
available := stat.Bavail * uint64(stat.Bsize)
return available > threshold
}
func monitorDiskSpace() {
const threshold = 10 * 1024 * 1024 * 1024 // 10 GB
for {
if !checkDiskSpace("/", threshold) {
log.Println("Disk space is below threshold, pausing pipeline")
// Pause the pipeline
// Implement the logic to pause the pipeline
}
time.Sleep(1 * time.Minute) // Check every minute
}
}
Clean /tmp Folder on Cancel
Modify the CancelPipeline function to clean the /tmp folder:
import (
"os"
"path/filepath"
)
func cleanTmpFolder() error {
tmpDir := "/tmp"
err := os.RemoveAll(tmpDir)
if err != nil {
return err
}
return os.Mkdir(tmpDir, 0755)
}
func CancelPipeline(pipelineId uint64) errors.Error {
// prevent RunPipelineInQueue from consuming pending pipelines
pipeline := &models.Pipeline{}
err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return errors.BadInput.New("pipeline not found")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
pipeline.Status = models.TASK_CANCELLED
err = db.Update(pipeline)
if err != nil {
return errors.Default.Wrap(err, "failed to update pipeline")
}
// now, with RunPipelineInQueue being block and target pipeline got updated
// we should update the related tasks as well
err = db.UpdateColumn(
&models.Task{},
"status", models.TASK_CANCELLED,
dal.Where("pipeline_id = ?", pipelineId),
)
if err != nil {
return errors.Default.Wrap(err, "failed to update pipeline tasks")
}
// Clean the /tmp folder
if err := cleanTmpFolder(); err != nil {
return errors.Default.Wrap(err, "failed to clean /tmp folder")
}
// the target pipeline is pending, no running, no need to perform the actual cancel operation
return nil
}
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
}
if count == 0 {
return nil
}
for _, pendingTask := range pendingTasks {
_ = CancelTask(pendingTask.ID)
}
// Clean the /tmp folder
if err := cleanTmpFolder(); err != nil {
return errors.Default.Wrap(err, "failed to clean /tmp folder")
}
return errors.Convert(err)
}
Run Disk Space Monitor
Start the disk space monitor in the pipelineServiceInit function:
func pipelineServiceInit() {
// initialize plugin
plugin.InitPlugins(basicRes)
// notification
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
}
// standalone mode: reset pipeline status
if cfg.GetBool("RESUME_PIPELINES") {
markInterruptedPipelineAs(models.TASK_RESUME)
} else {
markInterruptedPipelineAs(models.TASK_FAILED)
}
// load cronjobs for blueprints
errors.Must(ReloadBlueprints())
var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
if pipelineMaxParallel < 0 {
panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
}
if pipelineMaxParallel == 0 {
globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
pipelineMaxParallel = 10000
}
// run pipeline with independent goroutine
go RunPipelineInQueue(pipelineMaxParallel)
// Start disk space monitor
go monitorDiskSpace()
}
This implementation ensures that the pipeline is paused when disk space is low and the /tmp folder is cleaned when a pipeline is canceled [2].
To continue talking to Dosu, mention @dosu.
Hi @dosu, Thanks for the points. They are useful, but I believe I should have them embedded in the tool as a user. If I click to stop the pipeline, it needs to stop and clean the disk by itself.
It will help the user to keep going without needing technical intervention, as now it just makes the pipelines stucked.
Yes, the tool can be modified to automatically stop the pipeline and clean the disk when the user clicks to stop the pipeline. The CancelPipeline function already handles stopping the pipeline. You can extend this function to include disk cleanup.
Here is an example of how you might modify the CancelPipeline function to include disk cleanup:
func CancelPipeline(pipelineId uint64) errors.Error {
// prevent RunPipelineInQueue from consuming pending pipelines
pipeline := &models.Pipeline{}
err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return errors.BadInput.New("pipeline not found")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
pipeline.Status = models.TASK_CANCELLED
err = db.Update(pipeline)
if err != nil {
return errors.Default.Wrap(err, "failed to update pipeline")
}
// now, with RunPipelineInQueue being block and target pipeline got updated
// we should update the related tasks as well
err = db.UpdateColumn(
&models.Task{},
"status", models.TASK_CANCELLED,
dal.Where("pipeline_id = ?", pipelineId),
)
if err != nil {
return errors.Default.Wrap(err, "failed to update pipeline tasks")
}
// the target pipeline is pending, no running, no need to perform the actual cancel operation
return nil
}
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
}
if count == 0 {
return nil
}
for _, pendingTask := range pendingTasks {
_ = CancelTask(pendingTask.ID)
}
// Clean up disk
logsPath, err := getPipelineLogsPath(pipeline)
if err == nil {
err = os.RemoveAll(logsPath)
if err != nil {
return errors.Default.Wrap(err, "failed to clean up disk")
}
} else {
return errors.Default.Wrap(err, "failed to get logs path for cleanup")
}
return errors.Convert(err)
}
Additionally, you can modify the UI to handle the cancel operation and trigger the disk cleanup. In the PipelineInfo component, the handleCancel function can be used to call the API that cancels the pipeline and performs the cleanup:
const handleCancel = async () => {
const [success] = await operator(() => API.pipeline.remove(id), {
setOperating,
});
if (success) {
// setVersion((v) => v + 1);
}
};
This modification ensures that when a pipeline is canceled, the associated disk space is also cleaned up [1][2].
To continue talking to Dosu, mention @dosu.
This issue has been automatically marked as stale because it has been inactive for 60 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has been inactive for a long time. You can reopen it if you encounter the similar problem in the future.