A guide for debugging Vector pipelines

Step by step guide for debugging pipelines

type: guide domain: dev

Debugging Guide

This guide will describe an imaginary use case and how to progressively resolve issues using command line tools and Vector commands.

Imagine we have two sources, and we want to assign group IDs to Vector logs based on the source. Finally, we want to send those modified logs to an HTTP server for further processing and/or storage.

In the following sections we will examine the tools we have at our disposal.

debugging-meme

Vector Tools

Vector top

Use vector top to display topology and metrics in the console. For example, you can inspect how many events are produced by sources vs how many events reach the sinks.

Command:

vector top

Screen:

top-screen

The internal metrics source

The internal_metrics source can be used to inspect component metrics in detail.

For example, you can connect this source to a sink and collect the internal metrics:

data_dir: /var/lib/vector/
sources:
  source0:
    type: internal_metrics
    scrape_interval_secs: 1.0

sinks:
  sink0:
    type: prometheus_exporter
    inputs:
      - source0
    address: 0.0.0.0:9598

Vector Tap

With vector tap you see the input and/or outputs of your components. Here, it is worth mentioning a caveat, sinks don’t have outputs thus we cannot tap them. This means we cannot inspect actual payloads that are sent downstream.

Command:

vector tap --outputs-of source_0,transform_0

Screen:

tap-screen

Console

Note that the console sink can also be very useful here. For example:

my_console_sink:
  type: console
  inputs:
    - my_source_0
    - my_source_1
  encoding:
    codec: json
    json:
      pretty: true

Python HTTP Server

The following code implements a Python server that imitates a downstream system to which Vector publishes logs. We will use this server to demonstrate a few scenarios.

Code

👉 Click to expand the script 👈
import http.server
import socketserver
import json
import threading
import zlib
from http import HTTPStatus  # Import HTTP status codes

# Global variable for the response code
response_code = HTTPStatus.OK  # Default to 200 OK

class DebuggingHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
  """
  HTTP server for debugging.

  - Responds with a dynamically controlled status code (default HTTPStatus.OK).
  - Supports GET, POST, PUT, DELETE, and other HTTP methods.
  - Allows changing the response code via a POST request to /set_status.
  - Logs request headers and JSON payloads (supports zlib-compressed payloads).
  """

  def do_GET(self):
      """Handles GET requests by returning the configured response code."""
      self.send_custom_response()

  def do_POST(self):
      """Handles POST requests. Logs request details before checking the path."""
      json_data = self.log_request_details()
      if self.path == "/set_status":
          self.set_status(json_data)
      elif self.path == "/logs":
          self.send_custom_response()

  def do_PUT(self):
      """Handles PUT requests."""
      self.send_custom_response()

  def do_DELETE(self):
      """Handles DELETE requests."""
      self.send_custom_response()

  def send_custom_response(self):
      """Send an HTTP response with the currently configured status code."""
      global response_code
      self.send_response(response_code)
      self.send_header("Content-type", "application/json")
      self.end_headers()
      response = {"status": response_code, "message": f"Responding with {response_code}"}
      self.wfile.write(json.dumps(response).encode())

  def log_request_details(self):
      """Logs headers and JSON payload for all POST requests before processing."""
      content_length = int(self.headers.get('Content-Length', 0))
      body = self.rfile.read(content_length)

      print("\n📥 Received POST request:")
      print(f"🔗 Path: {self.path}")
      print("📜 Headers:")
      for key, value in self.headers.items():
          print(f"   {key}: {value}")

      if self.headers.get('Content-Encoding') == 'gzip':
          print("❌ Error: Gzip compression is not supported.")
          self.send_response(HTTPStatus.BAD_REQUEST)
          self.send_header("Content-type", "application/json")
          self.end_headers()
          self.wfile.write(json.dumps({"error": "Gzip compression is not supported."}).encode())
          return None

      if self.headers.get('Content-Encoding') == 'deflate':
          try:
              body = zlib.decompress(body)
              print("🗜️ Payload was zlib-compressed. Decompressed successfully.")
          except zlib.error:
              print("❌ Error decompressing zlib payload.")
              self.send_response(HTTPStatus.BAD_REQUEST)
              self.send_header("Content-type", "application/json")
              self.end_headers()
              self.wfile.write(json.dumps({"error": "Invalid zlib-compressed data"}).encode())
              return None

      try:
          json_data = json.loads(body.decode())
          print("📦 JSON Payload:", json.dumps(json_data, indent=2))
          return json_data
      except json.JSONDecodeError:
          print("⚠️  No valid JSON payload received.")
          self.send_response(HTTPStatus.BAD_REQUEST)
          self.send_header("Content-type", "application/json")
          self.end_headers()
          self.wfile.write(json.dumps({"error": "Invalid zlib-compressed data"}).encode())
          return None

  def set_status(self, json_data):
      """Handles POST /set_status to update the response code dynamically."""
      global response_code

      if json_data is None:
          self.send_response(HTTPStatus.BAD_REQUEST)
          response = {"error": "Invalid request format. Send JSON with {'status': <code>}."}
      else:
          try:
              new_status = int(json_data.get("status", HTTPStatus.OK))
              if HTTPStatus.CONTINUE <= new_status <= HTTPStatus.NETWORK_AUTHENTICATION_REQUIRED:
                  response_code = new_status
                  self.send_response(HTTPStatus.OK)
                  response = {"message": f"Response code updated to {response_code}"}
              else:
                  self.send_response(HTTPStatus.BAD_REQUEST)
                  response = {"error": "Invalid status code. Must be between 100 and 599."}
          except ValueError:
              self.send_response(HTTPStatus.BAD_REQUEST)
              response = {"error": "Invalid status code format."}

      self.send_header("Content-type", "application/json")
      self.end_headers()
      self.wfile.write(json.dumps(response).encode())

def run_server(port):
  """Starts the HTTP server on the specified port."""
  handler = DebuggingHTTPRequestHandler
  with socketserver.TCPServer(("", port), handler) as httpd:
      print(f"🚀 Serving on port {port}")
      httpd.serve_forever()

if __name__ == "__main__":
  port = 8000
  server_thread = threading.Thread(target=run_server, args=(port,), daemon=True)
  server_thread.start()

  try:
      while True:
          pass
  except KeyboardInterrupt:
      print("\n🛑 Server shutting down.")
Description

This Python server is a simple HTTP debugging tool that:

  • Responds to all HTTP methods with a configurable status code. For this guide, we focus on POST requests.
  • Supports dynamic status code changes via POST /set_status. This allows us to imitate real world scenarios and demonstrate Vector retries.
  • Logs all POST requests sent to /logs, including headers and JSON payloads.
  • Handles both raw JSON and zlib-compressed JSON payloads. Rejects other payloads.
  • By default, it starts on port 8000 and returns HTTP 200 OK unless modified.

Usage

Run command:

python3 fake_server.py

Change return status:

curl -X POST http://localhost:8000/set_status -H "Content-Type: application/json" -d '{"status": 404}'

Walkthrough

Initial Vector Config

👉 Click to view the whole config 👈
# version 1
api:
  enabled: true

sources:
  source_0:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "a", "property": "foo" }'
      - '{ "key": "b", "property": "bar" }'
    interval: 10

  source_1:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "c", "property": "some" }'
      - '{ "key": "d", "property": "another" }'
    interval: 10

  internal_metrics:
    type: internal_metrics
    scrape_interval_secs: 10

transforms:
  transform_0:
    type: remap
    inputs:
      - source_*
    source: |
      . = parse_json!(.message)
      if .key == "a" {
        .group = 0
      } else {
        .group = 1
      }      

sinks:
  sink_0:
    inputs:
      - transform_0
    type: http
    uri: http://localhost:8000/logs
    encoding:
      codec: json
      json:
        pretty: true

  sink_1:
    type: console
    inputs:
      - internal_metrics
    encoding:
      codec: json
      json:
        pretty: true
Globals
api:
  enabled: true

Required by vector tap.

Sources
sources:
  source_0:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "a", "property": "foo" }'
      - '{ "key": "b", "property": "bar" }'
    interval: 10

  source_1:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "c", "property": "some" }'
      - '{ "key": "d", "property": "another" }'
    interval: 10

The above two sources emulate incoming events.

Transforms
transforms:
  transform_0:
    type: remap
    inputs:
      - source_*
    source: |
      . = parse_json!(.message)
      if .key == "a" {
        .group = 0
      } else {
        .group = 1
      }      

This guide doesn’t focus on transforms like Remap. At this point, I would like to highlight https://playground.vrl.dev/ for quick iterations.

Sinks
  sink_0:
    inputs:
      - transform_0
    type: http
    uri: http://localhost:8000/logs
    encoding:
      codec: json

Vector Config with internal metrics

Full Config Preview
👉 Click to view the whole config 👈
# version 2
api:
  enabled: true

sources:
  source_0:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "a", "property": "foo" }'
      - '{ "key": "b", "property": "bar" }'
    interval: 10

  source_1:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "c", "property": "some" }'
      - '{ "key": "d", "property": "another" }'
    interval: 10

  internal_metrics:
    type: internal_metrics
    scrape_interval_secs: 10

transforms:
  transform_0:
    type: remap
    inputs:
      - source_*
    source: |
      . = parse_json!(.message)
      if .key == "a" {
        .group = 0
      } else {
        .group = 1
      }      

sinks:
  sink_0:
    inputs:
      - transform_0
    type: http
    uri: http://localhost:8000/logs
    encoding:
      codec: json

  sink_1:
    type: console
    inputs:
      - internal_metrics
    encoding:
      codec: json
      json:
        pretty: true
Description

Note that we added a new source:

  internal_metrics:
    type: internal_metrics
    scrape_interval_secs: 10

And a new sink:

  sink_1:
    type: console
    inputs:
      - internal_metrics
    encoding:
      codec: json
      json:
        pretty: true

Now we can observe internal metrics such as how many events our components receive:

{
  "name": "component_received_events_count",
  "namespace": "vector",
  "tags": {
    "component_id": "sink_0",
    "component_kind": "sink",
    "component_type": "http",
    "host": "MY_HOST"
  },
  "timestamp": "2025-02-12T15:58:03.723449Z",
  "kind": "absolute",
  "aggregated_histogram": {
    "buckets": [
      // ...
      {
        "upper_limit": 1.0,
        "count": 2
      },
      // ...
      {
        "upper_limit": "inf",
        "count": 0
      }
    ],
    "count": 2,
    "sum": 2.0
  }
}

Scenarios

In this section we will create some scenarios where the sink produces errors and we will show how to change the sink config to overcome them.

Scenario 1 - Unsupported Compression

Now that we have something that works, we want to add compression.

sinks:
  sink_0:
    inputs:
      - transform_0
    type: http
    uri: http://localhost:8000/logs
    encoding:
      codec: json
    compression: gzip

However, our downstream component can only de-compress zlib payloads.

We now observe the following metrics:

{
  "name": "component_errors_total",
  "namespace": "vector",
  "tags": {
    "component_id": "sink_0",
    "component_kind": "sink",
    "component_type": "http",
    "error_type": "request_failed",
    "host": "MY_HOST",
    "stage": "sending"
  },
  "timestamp": "2025-02-12T16:09:51.122851Z",
  "kind": "absolute",
  "counter": {
    "value": 2.0
  }
}

If we had access to the server downstream, we would see the following error:

📥 Received POST request:
🔗 Path: /logs
📜 Headers:
  content-type: application/json
  content-encoding: gzip
  accept-encoding: zstd,gzip,deflate,br
  user-agent: Vector/0.45.0-custom-bac0c2015 (aarch64-apple-darwin debug=full)
  host: localhost:8000
  content-length: 76
❌ Error: Gzip compression is not supported.
  127.0.0.1 - - [12/Feb/2025 11:10:22] "POST /logs HTTP/1.1" 400 -

So now the fix is obvious, we can change the compression and reload the Vector config:

compression: zlib

Reloading depends on the deployments e.g. systemctl kill -s HUP --kill-who=main vector.service. You can read more in https://vector.dev/docs/administration/management.

Scenario 2 - Temporary Server Disruptions

Step 1

Run the server and the config. Notice that the component_sent_events_total metrics for sinks_0 is increasing during normal operation.

Step 2

Simulate rate limiting:

 curl -X POST http://localhost:8000/set_status -H "Content-Type: application/json" -d '{"status": 429}'

Shortly after, observe this new Vector log:

2025-02-12T19:10:54.562556Z  WARN sink{component_kind="sink" component_id=sink_0 component_type=http}:request{request_id=19}: vector::sinks::util::retries: Retrying after response. reason=too many requests internal_log_rate_limit=true

The component_sent_events_total metrics for sinks_0 is not increasing anymore. And also, we can now see 429 in the response status:

{
  "name": "http_client_responses_total",
  "namespace": "vector",
  "tags": {
    "component_id": "sink_0",
    "component_kind": "sink",
    "component_type": "http",
    "host": "MY_HOST",
    "status": "429"
  },
  "timestamp": "2025-02-12T19:15:16.377709Z",
  "kind": "absolute",
  "counter": {
    "value": 41.0
  }
}

Here it is worth noting Vector will retry these failed requests based on the http sink retry policy. This policy is defined for each sink separately.

Step 3

Command the server to always respond with OK:

 curl -X POST http://localhost:8000/set_status -H "Content-Type: application/json" -d '{"status": 200}'

Notice how the component_sent_events_total metrics for sinks_0 is now increasing again!

{
  "name": "component_sent_events_total",
  "namespace": "vector",
  "tags": {
    "component_id": "sink_1",
    "component_kind": "sink",
    "component_type": "console",
    "host": "MY_HOST"
  },
  "timestamp": "2025-02-12T19:14:56.377942Z",
  "kind": "absolute",
  "counter": {
    "value": 3678.0
  }
}

Scenario 3 - Smaller Batching

For this scenario, let’s assume our downstream server has a strict limit on the maximum payload size. Also, assume Vector sets the maximum batch to 10MB. Note: always refer to the docs for as the source of truth for this value.

📥 Received POST request:
🔗 Path: /logs
📜 Headers:
   content-type: application/json
   content-encoding: deflate
   accept-encoding: zstd,gzip,deflate,br
   user-agent: Vector/0.45.0-custom-bac0c2015 (aarch64-apple-darwin debug=full)
   host: localhost:8000
   content-length: 1e+07
🗜️ Payload was zlib-compressed. Decompressed successfully.
📦 JSON Payload: [
  {
    "group": 0,
    "key": "a",
    "property": "foo"
  },
  {
    "group": 0,
    "key": "a",
    "property": "foo"
  },
  // Assume a lot more bytes...
],
❌ Error: Maximum payload size exceeded. Rejecting payloads over 8192 bytes.

Hmm, does Vector expose any relevant settings?

Yes, this behavior can be changed using batch settings!

We can add to significantly reduce the batch size:

    batch:
      max_events: 4

Final Config

👉 Click to view the whole config 👈
# version 3
api:
  enabled: true

sources:
  source_0:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "a", "property": "foo" }'
      - '{ "key": "b", "property": "bar" }'
    interval: 10

  source_1:
    type: demo_logs
    format: shuffle
    lines:
      - '{ "key": "c", "property": "some" }'
      - '{ "key": "d", "property": "another" }'
    interval: 10

  internal_metrics:
    type: internal_metrics
    scrape_interval_secs: 10

transforms:
  transform_0:
    type: remap
    inputs:
      - source_0
    source: |
      . = parse_json!(.message)
      if .key == "a" {
        .group = 0
      } else {
        .group = 1
      }      

sinks:
  sink_0:
    inputs:
      - transform_0
    type: http
    uri: http://localhost:8000/logs
    encoding:
      codec: json
      json:
        pretty: true
    compression: zlib
    batch:
      max_events: 4

  sink_1:
    type: console
    inputs:
      - internal_metrics
    encoding:
      codec: json
      json:
        pretty: true

Now that we have a final configuration, we can also write Vector configuration unit tests.