143 lines
6.2 KiB
YAML
143 lines
6.2 KiB
YAML
# GreptimeDB Pipeline – OTel journald receiver
|
||
#
|
||
# Input: NDJSON log records produced by the OpenTelemetry Collector's
|
||
# journald receiver. The OTel OTLP exporter wraps the journald
|
||
# JSON entry as a string under the top-level "body" key, so the
|
||
# pipeline first parses that string into an object before doing
|
||
# anything else.
|
||
#
|
||
# Timestamp: __MONOTONIC_TIMESTAMP (microseconds since boot) is used as the
|
||
# time-index column. If you prefer wall-clock time, swap this for
|
||
# __REALTIME_TIMESTAMP with the same resolution.
|
||
#
|
||
# Apply this pipeline by setting the HTTP export header in the OTel config:
|
||
# x-greptime-pipeline-name: journald
|
||
#
|
||
# Upload via the GreptimeDB API:
|
||
# curl -X POST 'http://<host>:4000/v1/events/pipelines/journald' \
|
||
# -H 'Content-Type: application/x-yaml' \
|
||
# --data-binary @journald.yaml
|
||
|
||
version: 2
|
||
|
||
processors:
|
||
# ------------------------------------------------------------------
|
||
# 1. The OTel OTLP exporter encodes the journald entry as a JSON string
|
||
# in the "body" field. Parse it in-place so subsequent steps can
|
||
# address individual keys as .body.<key>.
|
||
# ------------------------------------------------------------------
|
||
- json_parse:
|
||
fields:
|
||
- Body, body
|
||
ignore_missing: false
|
||
|
||
# ------------------------------------------------------------------
|
||
# 2. Flatten every journald / systemd field from .body.* to the top
|
||
# level with clean snake_case names, cast numeric fields to integers,
|
||
# strip the trailing newline journald appends to _SELINUX_CONTEXT,
|
||
# lift __MONOTONIC_TIMESTAMP as a plain string for the epoch processor
|
||
# in step 3, and finally drop the now-empty .body object.
|
||
#
|
||
# del(.body.<key>) returns the value AND removes the key in one step.
|
||
# ------------------------------------------------------------------
|
||
- vrl:
|
||
source: |
|
||
.transport = del(.body._TRANSPORT)
|
||
.hostname = del(.body._HOSTNAME)
|
||
.exe = del(.body._EXE)
|
||
.cmdline = del(.body._CMDLINE)
|
||
.runtime_scope = del(.body._RUNTIME_SCOPE)
|
||
.systemd_cgroup = del(.body._SYSTEMD_CGROUP)
|
||
.comm = del(.body._COMM)
|
||
.message = del(.body.MESSAGE)
|
||
.systemd_invocation_id = del(.body._SYSTEMD_INVOCATION_ID)
|
||
.gid = to_int!(del(.body._GID))
|
||
.uid = to_int!(del(.body._UID))
|
||
.priority = to_int!(del(.body.PRIORITY))
|
||
.boot_id = del(.body._BOOT_ID)
|
||
.pid = to_int!(del(.body._PID))
|
||
.seqnum_id = del(.body.__SEQNUM_ID)
|
||
.seqnum = to_int!(del(.body.__SEQNUM))
|
||
.syslog_identifier = del(.body.SYSLOG_IDENTIFIER)
|
||
.stream_id = del(.body._STREAM_ID)
|
||
.selinux_context = strip_whitespace(string!(del(.body._SELINUX_CONTEXT)))
|
||
.systemd_slice = del(.body._SYSTEMD_SLICE)
|
||
.syslog_facility = to_int!(del(.body.SYSLOG_FACILITY))
|
||
.cursor = del(.body.__CURSOR)
|
||
.systemd_unit = del(.body._SYSTEMD_UNIT)
|
||
.cap_effective = del(.body._CAP_EFFECTIVE)
|
||
.machine_id = del(.body._MACHINE_ID)
|
||
# Lift the raw timestamp string so the epoch processor (step 3)
|
||
# can consume it from the top level.
|
||
.monotonic_timestamp = to_int!(del(.body.__MONOTONIC_TIMESTAMP))
|
||
del(.body)
|
||
.
|
||
|
||
# ------------------------------------------------------------------
|
||
# 3. Parse the monotonic timestamp (µs since boot) into a typed value
|
||
# and rename it to `timestamp` so it becomes the time-index column.
|
||
# ------------------------------------------------------------------
|
||
# - epoch:
|
||
# fields:
|
||
# - __MONOTONIC_TIMESTAMP, timestamp
|
||
# resolution: microsecond
|
||
# ignore_missing: false
|
||
|
||
# ------------------------------------------------------------------
|
||
# Transform
|
||
#
|
||
# In version 2, only fields that require a specific type, index, or
|
||
# tag annotation need to be listed here. All remaining fields from the
|
||
# pipeline context are auto-detected and persisted by the engine.
|
||
#
|
||
# Resulting schema (auto-detected fields shown as comments):
|
||
# timestamp TimestampMicrosecond PRIMARY KEY (time index)
|
||
# message String fulltext index
|
||
# systemd_unit String inverted index
|
||
# hostname String inverted index
|
||
# comm String inverted index
|
||
# syslog_identifier String inverted index
|
||
# transport String inverted index
|
||
# systemd_slice String inverted index
|
||
# priority Int64 (auto)
|
||
# syslog_facility Int64 (auto)
|
||
# uid Int64 (auto)
|
||
# gid Int64 (auto)
|
||
# pid Int64 (auto)
|
||
# seqnum Int64 (auto)
|
||
# exe String (auto)
|
||
# cmdline String (auto)
|
||
# runtime_scope String (auto)
|
||
# systemd_cgroup String (auto)
|
||
# systemd_invocation_id String (auto)
|
||
# boot_id String (auto)
|
||
# seqnum_id String (auto)
|
||
# stream_id String (auto)
|
||
# selinux_context String (auto)
|
||
# cursor String (auto)
|
||
# cap_effective String (auto)
|
||
# machine_id String (auto)
|
||
# ------------------------------------------------------------------
|
||
transform:
|
||
# Time index — microsecond precision monotonic clock
|
||
- fields:
|
||
- Timestamp
|
||
type: epoch, us
|
||
index: timestamp
|
||
|
||
# Full-text search on the human-readable log body
|
||
- fields:
|
||
- message
|
||
type: string
|
||
index: fulltext
|
||
|
||
# Inverted indexes on the fields most commonly used in WHERE / GROUP BY
|
||
- fields:
|
||
- systemd_unit
|
||
- hostname
|
||
- comm
|
||
- syslog_identifier
|
||
- transport
|
||
- systemd_slice
|
||
type: string
|
||
index: inverted
|