vector icon indicating copy to clipboard operation
vector copied to clipboard

Orphaned buffer lock

Open nabokihms opened this issue 2 years ago • 10 comments

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

When I try to reload my vector configuration, it says in the log:

2023-10-12T15:56:16.714921Z ERROR vector::topology: Configuration error. error=Sink "destination/cluster/loki-ebapp": error occurred when building buffer: failed to build individual stage 0: failed to load/create ledger: failed to lock buffer.lock; is another Vector process running and using this buffer?
2023-10-12T15:56:16.714954Z  WARN vector::topology::running: Failed to completely load new configuration. Restoring old configuration.
2023-10-12T15:56:16.714997Z  INFO vector::topology::running: Running healthchecks.
2023-10-12T15:56:16.716437Z  INFO vector::topology::running: Old configuration restored successfully.

It happens before there is a lock file in the directory created three days ago.

root@log-shipper-agent-8r87q:/# ls -lah /vector-data/buffer/v2/destination/cluster/loki-ebapp/
total 197M
drwxr-xr-x 2 root root 4.0K Oct  9 21:01 .
drwxr-xr-x 3 root root 4.0K Oct  9 05:54 ..
-rw-r--r-- 1 root root  70M Oct  9 21:00 buffer-data-58.dat
-rw-r--r-- 1 root root 128M Oct  9 21:02 buffer-data-59.dat
-rw-r--r-- 1 root root   24 Oct 10 19:51 buffer.db
-rw-r--r-- 1 root root    0 Oct  9 21:01 buffer.lock

No other vector processes are running.

Configuration

root@log-shipper-agent-8r87q:/# cat /etc/vector/**/*
{
  "data_dir": "/vector-data",
  "expire_metrics_secs": 60,
  "api" : {
    "address" : "127.0.0.1:8686",
    "enabled" : true,
    "playground" : false
  },
  "log_schema": {
    "host_key": "host",
    "message_key": "message",
    "source_type_key": "source_type",
    "timestamp_key": "timestamp"
  },
  "sources": {
    "internal_metrics": {
      "type": "internal_metrics"
    }
  },
  "sinks": {
    "prometheus_sink": {
      "type": "prometheus_exporter",
      "inputs": [
        "internal_metrics"
      ],
      "address": "127.0.0.1:9090",
      "suppress_timestamp": true
    }
  }
}
{
  "sources": {
    "cluster_logging_config/ebapp:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    },
    "cluster_logging_config/ebapp_app-one:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    },
    "cluster_logging_config/ebapp_app-two:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    }
  },
  "transforms": {
    "transform/destination/loki-ebapp/00_ratelimit": {
      "exclude": "null",
      "inputs": [
        "transform/source/ebapp/02_local_timezone"
      ],
      "threshold": 1000,
      "type": "throttle",
      "window_secs": 60
    },
    "transform/source/ebapp/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp_app-one:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-one/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-one/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/03_label_filter": {
      "condition": "if is_boolean(.pod_labels.app) || is_float(.pod_labels.app) {\n    data, err = to_string(.pod_labels.app);\n    if err != null {\n        false;\n    } else {\n        includes([\"ebapp-first\"], data);\n    };\n} else if .pod_labels.app == null {\n    \"null\";\n} else {\n    includes([\"ebapp-first\"], .pod_labels.app);\n}",
      "inputs": [
        "transform/source/ebapp_app-one/02_local_timezone"
      ],
      "type": "filter"
    },
    "transform/source/ebapp_app-two/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp_app-two:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-two/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-two/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/03_label_filter": {
      "condition": "if is_boolean(.pod_labels.app) || is_float(.pod_labels.app) {\n    data, err = to_string(.pod_labels.app);\n    if err != null {\n        false;\n    } else {\n        includes([\"ebapp-second\"], data);\n    };\n} else if .pod_labels.app == null {\n    \"null\";\n} else {\n    includes([\"ebapp-second\"], .pod_labels.app);\n}",
      "inputs": [
        "transform/source/ebapp_app-two/02_local_timezone"
      ],
      "type": "filter"
    }
  },
  "sinks": {
    "destination/cluster/loki-ebapp": {
      "type": "loki",
      "inputs": [
        "transform/destination/loki-ebapp/00_ratelimit"
      ],
      "healthcheck": {
        "enabled": false
      },
      "buffer": {
        "max_size": 268435488,
        "type": "disk",
        "when_full": "drop_newest"
      },
      "encoding": {
        "only_fields": [
          "message"
        ],
        "codec": "text",
        "timestamp_format": "rfc3339"
      },
      "endpoint": "http://loki.d8-monitoring:3100",
      "tls": {
        "verify_hostname": true,
        "verify_certificate": true
      },
      "labels": {
        "container": "{{ container }}",
        "host": "{{ host }}",
        "image": "{{ image }}",
        "namespace": "{{ namespace }}",
        "node": "{{ node }}",
        "pod": "{{ pod }}",
        "pod_ip": "{{ pod_ip }}",
        "pod_labels_*": "{{ pod_labels }}",
        "pod_owner": "{{ pod_owner }}",
        "stream": "{{ stream }}"
      },
      "remove_label_fields": true,
      "out_of_order_action": "rewrite_timestamp"
    }
  }
}

Version

v0.32.0

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

nabokihms avatar Oct 12 '23 16:10 nabokihms

Probably, this is due to the vector not graceful process termination. Maybe it is better to switch to the time-based locking for vector: set the lock for, for example, 5 minutes and try to renew it every second minute. If the lock is not renewed, the vector process is allowed to take it.

nabokihms avatar Oct 12 '23 16:10 nabokihms

Yep, this is definitely a limitation and something that isn't handled terribly well with non-graceful shutdown.

We would likely want to make it work somewhat more like a PID file, where any Vector process that wants to take the lock will confirm that, if the lock file already exists, that the PID referenced within it is still running... otherwise, it would delete the lock file and take it over.

(Just to comment on it: I would want to avoid a time-based solution because I don't want another process trying to take over the buffer if there was a simple bug with the background task responsible for updating lock liveness. If there's a stalled Vector process, something might still be happening to the disk buffer, so we can only be sure that nothing is touching it if the PID itself is gone: process not running, can't possibly be modifying the disk buffer.

tobz avatar Oct 16 '23 14:10 tobz

Fantastic idea, thanks for clarifying, @tobz.

How will it work in a container? Will vector need access to the host pid namespace?

nabokihms avatar Oct 18 '23 15:10 nabokihms

I assume the lock is not required for the vector running in the Kubernetes cluster because the DaemonSet controller can handle that two pods are not running on the same node. As a workaround, in our vector installation, we will remove the lock in the entrypoint every time the container starts.

And, of course, it would be nice to finalize the design for the feature to implement advanced locking capabilities in vector.

nabokihms avatar Oct 25 '23 18:10 nabokihms

I was able to verify that this occurs when removing and then re-adding a sink with a disk buffer and was able to reproduce the behavior through the following sequence:

  1. Start Vector with the following config. VECTOR_SELF_NODE_NAME=abc VECTOR_SELF_POD_NAME=def vector --config config.json
  2. Rename destination/cluster/loki-ebapp to destination/cluster/loki-ebapp-1 in the config.
  3. Reload the config kill -HUP $(pgrep vector).
  4. Rename destination/cluster/loki-ebapp-1 to destination/cluster/loki-ebapp in the config.
  5. Reload the config kill -HUP $(pgrep vector).

This results in

2023-12-12T16:56:04.063075Z ERROR vector::topology::builder: Configuration error. error=Sink "destination/cluster/loki-ebapp-1": error occurred when building buffer: failed to build individual stage 0: failed to load/create ledger: failed to lock buffer.lock; is another Vector process running and using this buffer?

Notably, this behavior occurs when the sink is loki (and also clickhouse) but not when the sink is blackhole.

{
  "data_dir": "./vector-data",
  "sources": {
    "cluster_logging_config/ebapp:ebapp": {
      "type": "demo_logs",
      "format": "json"
    }
  },
  "sinks": {
    "destination/cluster/loki-ebapp": {
      "type": "loki",
      "inputs": [
        "cluster_logging_config/ebapp:ebapp"
      ],
      "healthcheck": {
        "enabled": false
      },
      "buffer": {
        "max_size": 268435488,
        "type": "disk",
        "when_full": "drop_newest"
      },
      "encoding": {
        "only_fields": [
          "message"
        ],
        "codec": "text",
        "timestamp_format": "rfc3339"
      },
      "endpoint": "http://loki.d8-monitoring:3100",
      "tls": {
        "verify_hostname": true,
        "verify_certificate": true
      },
      "labels": {
        "container": "{{ container }}",
        "host": "{{ host }}",
        "image": "{{ image }}",
        "namespace": "{{ namespace }}",
        "node": "{{ node }}",
        "pod": "{{ pod }}",
        "pod_ip": "{{ pod_ip }}",
        "pod_labels_*": "{{ pod_labels }}",
        "pod_owner": "{{ pod_owner }}",
        "stream": "{{ stream }}"
      },
      "remove_label_fields": true,
      "out_of_order_action": "rewrite_timestamp"
    }
  }
}

This is a bug that we should resolve.

dsmith3197 avatar Dec 12 '23 17:12 dsmith3197

We would likely want to make it work somewhat more like a PID file, where any Vector process that wants to take the lock will confirm that, if the lock file already exists, that the PID referenced within it is still running... otherwise, it would delete the lock file and take it over.

Regarding this approach, the fslock crate leverages flock (ref) which automatically releases the lock when all file descriptors are dropped. As such, our implementation already handles non-graceful shutdowns.

dsmith3197 avatar Dec 12 '23 17:12 dsmith3197

@dsmith3197 thanks for the info. Do you have any ideas why this problem occurs when the sink is Loki?

nabokihms avatar Jan 26 '24 17:01 nabokihms

@dsmith3197 thanks for the info. Do you have any ideas why this problem occurs when the sink is Loki?

What we were able to reproduce is that Vector doesn't correctly release disk buffer locks during reload when components are removed so that if they are re-added it'll fail to lock. Does that match how you ran into this issue? Dropping a component, reloading, readding a component with the same name, reloading?

jszwedko avatar Jan 26 '24 18:01 jszwedko

In my case a sidekick application composes a config file for vector and reloads it. A sink with the same id can removed and added multiple times, so yes. I think this is my case. We rely much on reloading.

nabokihms avatar Jan 27 '24 15:01 nabokihms

In my case a sidekick application composes a config file for vector and reloads it. A sink with the same id can removed and added multiple times, so yes. I think this is my case. We rely much on reloading.

👍 thanks for confirming! I don't think it is specific to the loki sink then. You'd see the same error for any sinks using disk buffers if they are removed and re-added via vector reload.

jszwedko avatar Jan 29 '24 14:01 jszwedko