# GreptimeDB Pipeline – OTel journald receiver # # Input: NDJSON log records produced by the OpenTelemetry Collector's # journald receiver. The OTel OTLP exporter wraps the journald # JSON entry as a string under the top-level "body" key, so the # pipeline first parses that string into an object before doing # anything else. # # Timestamp: __MONOTONIC_TIMESTAMP (microseconds since boot) is used as the # time-index column. If you prefer wall-clock time, swap this for # __REALTIME_TIMESTAMP with the same resolution. # # Apply this pipeline by setting the HTTP export header in the OTel config: # x-greptime-pipeline-name: journald # # Upload via the GreptimeDB API: # curl -X POST 'http://:4000/v1/events/pipelines/journald' \ # -H 'Content-Type: application/x-yaml' \ # --data-binary @journald.yaml version: 2 processors: # ------------------------------------------------------------------ # 1. The OTel OTLP exporter encodes the journald entry as a JSON string # in the "body" field. Parse it in-place so subsequent steps can # address individual keys as .body.. # ------------------------------------------------------------------ - json_parse: fields: - Body, body ignore_missing: false # ------------------------------------------------------------------ # 2. Flatten every journald / systemd field from .body.* to the top # level with clean snake_case names, cast numeric fields to integers, # strip the trailing newline journald appends to _SELINUX_CONTEXT, # lift __MONOTONIC_TIMESTAMP as a plain string for the epoch processor # in step 3, and finally drop the now-empty .body object. # # del(.body.) returns the value AND removes the key in one step. # ------------------------------------------------------------------ - vrl: source: | .transport = del(.body._TRANSPORT) .hostname = del(.body._HOSTNAME) .exe = del(.body._EXE) .cmdline = del(.body._CMDLINE) .runtime_scope = del(.body._RUNTIME_SCOPE) .systemd_cgroup = del(.body._SYSTEMD_CGROUP) .comm = del(.body._COMM) .message = del(.body.MESSAGE) .systemd_invocation_id = del(.body._SYSTEMD_INVOCATION_ID) .gid = to_int!(del(.body._GID)) .uid = to_int!(del(.body._UID)) .priority = to_int!(del(.body.PRIORITY)) .boot_id = del(.body._BOOT_ID) .pid = to_int!(del(.body._PID)) .seqnum_id = del(.body.__SEQNUM_ID) .seqnum = to_int!(del(.body.__SEQNUM)) .syslog_identifier = del(.body.SYSLOG_IDENTIFIER) .stream_id = del(.body._STREAM_ID) .selinux_context = strip_whitespace(string!(del(.body._SELINUX_CONTEXT))) .systemd_slice = del(.body._SYSTEMD_SLICE) .syslog_facility = to_int!(del(.body.SYSLOG_FACILITY)) .cursor = del(.body.__CURSOR) .systemd_unit = del(.body._SYSTEMD_UNIT) .cap_effective = del(.body._CAP_EFFECTIVE) .machine_id = del(.body._MACHINE_ID) # Lift the raw timestamp string so the epoch processor (step 3) # can consume it from the top level. .monotonic_timestamp = to_int!(del(.body.__MONOTONIC_TIMESTAMP)) del(.body) . # ------------------------------------------------------------------ # 3. Parse the monotonic timestamp (µs since boot) into a typed value # and rename it to `timestamp` so it becomes the time-index column. # ------------------------------------------------------------------ # - epoch: # fields: # - __MONOTONIC_TIMESTAMP, timestamp # resolution: microsecond # ignore_missing: false # ------------------------------------------------------------------ # Transform # # In version 2, only fields that require a specific type, index, or # tag annotation need to be listed here. All remaining fields from the # pipeline context are auto-detected and persisted by the engine. # # Resulting schema (auto-detected fields shown as comments): # timestamp TimestampMicrosecond PRIMARY KEY (time index) # message String fulltext index # systemd_unit String inverted index # hostname String inverted index # comm String inverted index # syslog_identifier String inverted index # transport String inverted index # systemd_slice String inverted index # priority Int64 (auto) # syslog_facility Int64 (auto) # uid Int64 (auto) # gid Int64 (auto) # pid Int64 (auto) # seqnum Int64 (auto) # exe String (auto) # cmdline String (auto) # runtime_scope String (auto) # systemd_cgroup String (auto) # systemd_invocation_id String (auto) # boot_id String (auto) # seqnum_id String (auto) # stream_id String (auto) # selinux_context String (auto) # cursor String (auto) # cap_effective String (auto) # machine_id String (auto) # ------------------------------------------------------------------ transform: # Time index — microsecond precision monotonic clock - fields: - Timestamp type: epoch, us index: timestamp # Full-text search on the human-readable log body - fields: - message type: string index: fulltext # Inverted indexes on the fields most commonly used in WHERE / GROUP BY - fields: - systemd_unit - hostname - comm - syslog_identifier - transport - systemd_slice type: string index: inverted