feat: only store bucket values for histogram

The previous implementation stored every single recorded value, causing
massive memory usage over time.
mem-usage
Jef Roosens 2023-01-28 17:21:59 +01:00
parent 116b85e039
commit bf1385ee6d
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
6 changed files with 94 additions and 60 deletions

View File

@ -15,27 +15,26 @@ pub mut:
struct Histogram { struct Histogram {
metric Metric metric Metric
pub mut: pub mut:
buckets []f64 total_count u64
} sum f64
buckets []f64
struct FloatSeries { bucket_counts []u64
metric Metric
pub mut:
data []f64
} }
[heap] [heap]
struct DefaultCollector { struct DefaultCollector {
mut: mut:
buckets map[string][]f64
counters shared map[string]&Counter counters shared map[string]&Counter
histograms shared map[string]&FloatSeries histograms shared map[string]&Histogram
gauges shared map[string]&Gauge gauges shared map[string]&Gauge
} }
pub fn new_default_collector() &DefaultCollector { pub fn new_default_collector() &DefaultCollector {
return &DefaultCollector{ return &DefaultCollector{
buckets: map[string][]f64{}
counters: map[string]&Counter{} counters: map[string]&Counter{}
histograms: map[string]&FloatSeries{} histograms: map[string]&Histogram{}
gauges: map[string]&Gauge{} gauges: map[string]&Gauge{}
} }
} }
@ -92,29 +91,52 @@ pub fn (c &DefaultCollector) counters() []Metric {
return metrics return metrics
} }
pub fn (c &DefaultCollector) histogram_record(value f64, metric Metric) { pub fn (mut c DefaultCollector) histogram_buckets_set(name string, buckets []f64) {
lock c.histograms {
c.buckets[name] = buckets
}
}
pub fn (mut c DefaultCollector) histogram_record(value f64, metric Metric) {
lock c.histograms { lock c.histograms {
mut entry := c.histograms[metric.str()] or { mut entry := c.histograms[metric.str()] or {
hist := &FloatSeries{ buckets := c.buckets[metric.name] or { [] }
hist := &Histogram{
metric: metric metric: metric
data: []f64{} buckets: buckets
bucket_counts: []u64{len: buckets.len, init: 0}
} }
c.histograms[metric.str()] = hist c.histograms[metric.str()] = hist
hist hist
} }
entry.data << value entry.sum += value
entry.total_count += 1
mut i := entry.buckets.len - 1
for i >= 0 && value <= entry.buckets[i] {
entry.bucket_counts[i]++
i -= 1
}
} }
} }
pub fn (c &DefaultCollector) histogram_get(metric Metric) ?[]f64 { pub fn (c &DefaultCollector) histogram_get(metric Metric) ?Histogram {
return rlock c.histograms { return rlock c.histograms {
entry := c.histograms[metric.str()] or { return none } entry := c.histograms[metric.str()] or { return none }
// Return a clone of the data to prevent user from altering // Return a clone of the data to prevent user from altering
// internal structure // internal structure
entry.data.clone() Histogram{
metric: metric
total_count: entry.total_count
sum: entry.sum
buckets: entry.buckets.clone()
bucket_counts: entry.bucket_counts.clone()
}
} }
} }

View File

@ -32,10 +32,28 @@ fn test_histogram() {
mut m := new_default_collector() mut m := new_default_collector()
m.histogram_record(5.0, name: 'test') m.histogram_record(5.0, name: 'test')
assert m.histogram_get(name: 'test')? == [5.0]
assert m.histogram_get(name: 'test')? == Histogram{
metric: Metric{
name: 'test'
}
total_count: 1
sum: 5.0
buckets: []
bucket_counts: []
}
m.histogram_record(7.0, name: 'test') m.histogram_record(7.0, name: 'test')
assert m.histogram_get(name: 'test')? == [5.0, 7.0]
assert m.histogram_get(name: 'test')? == Histogram{
metric: Metric{
name: 'test'
}
total_count: 2
sum: 12.0
buckets: []
bucket_counts: []
}
// Test with labels // Test with labels
metric := Metric{ metric := Metric{
@ -43,11 +61,26 @@ fn test_histogram() {
labels: [['hi', 'label']!, ['hi2', 'label2']!] labels: [['hi', 'label']!, ['hi2', 'label2']!]
} }
m.histogram_buckets_set('test2', [10.0])
m.histogram_record(5.0, metric) m.histogram_record(5.0, metric)
assert m.histogram_get(metric)? == [5.0]
assert m.histogram_get(metric)? == Histogram{
metric: metric
total_count: 1
sum: 5.0
buckets: [10.0]
bucket_counts: [u64(1)]
}
m.histogram_record(7.0, metric) m.histogram_record(7.0, metric)
assert m.histogram_get(metric)? == [5.0, 7.0]
assert m.histogram_get(metric)? == Histogram{
metric: metric
total_count: 2
sum: 12.0
buckets: [10.0]
bucket_counts: [u64(2)]
}
} }
fn test_gauge_add() { fn test_gauge_add() {

View File

@ -16,7 +16,7 @@ pub fn (m &Metric) str() string {
pub interface MetricsCollector { pub interface MetricsCollector {
counter_get(metric Metric) ?u64 counter_get(metric Metric) ?u64
counters() []Metric counters() []Metric
histogram_get(metric Metric) ?[]f64 histogram_get(metric Metric) ?Histogram
histograms() []Metric histograms() []Metric
gauge_get(metric Metric) ?f64 gauge_get(metric Metric) ?f64
gauges() []Metric gauges() []Metric

2
null.v
View File

@ -21,7 +21,7 @@ pub fn (c &NullCollector) counters() []Metric {
pub fn (c &NullCollector) histogram_record(value f64, metric Metric) {} pub fn (c &NullCollector) histogram_record(value f64, metric Metric) {}
pub fn (c &NullCollector) histogram_get(metric Metric) ?[]f64 { pub fn (c &NullCollector) histogram_get(metric Metric) ?Histogram {
return none return none
} }

View File

@ -2,19 +2,15 @@ module metrics
import strings import strings
import io import io
import arrays
pub struct PrometheusExporter { pub struct PrometheusExporter {
buckets []f64
mut: mut:
prefix string prefix string
collector &MetricsCollector = unsafe { nil } collector &MetricsCollector = unsafe { nil }
} }
pub fn new_prometheus_exporter(buckets []f64) PrometheusExporter { pub fn new_prometheus_exporter() PrometheusExporter {
return PrometheusExporter{ return PrometheusExporter{}
buckets: buckets
}
} }
pub fn (mut e PrometheusExporter) load(prefix string, collector &MetricsCollector) { pub fn (mut e PrometheusExporter) load(prefix string, collector &MetricsCollector) {
@ -61,45 +57,25 @@ pub fn (mut e PrometheusExporter) export_to_writer(mut writer io.Writer) ! {
} }
for hist in e.collector.histograms() { for hist in e.collector.histograms() {
data := e.collector.histogram_get(hist) or { return error("This can't happen.") } hist_data := e.collector.histogram_get(hist) or { return error("This can't happen.") }
sum := arrays.sum(data) or { 0.0 }
total_count := data.len
mut bucket_counts := []u64{len: e.buckets.len}
mut i := bucket_counts.len - 1
// For each data point, increment all buckets that the value is
// contained in. Because the buckets are sorted, we can stop once we
// encounter one that it doesn't fit in
for val in data {
for i >= 0 && val <= e.buckets[i] {
bucket_counts[i]++
i -= 1
}
i = bucket_counts.len - 1
}
mut m := Metric{ mut m := Metric{
...hist ...hist
name: '${hist.name}_count' name: '${hist.name}_count'
} }
writer.write('${e.serialize_metric(m)} $total_count\n'.bytes())! writer.write('${e.serialize_metric(m)} $hist_data.total_count\n'.bytes())!
m = Metric{ m = Metric{
...hist ...hist
name: '${hist.name}_sum' name: '${hist.name}_sum'
} }
writer.write('${e.serialize_metric(m)} $sum\n'.bytes())! writer.write('${e.serialize_metric(m)} $hist_data.sum\n'.bytes())!
mut le_labels := [][2]string{} mut le_labels := [][2]string{}
le_labels.prepend(hist.labels) le_labels.prepend(hist.labels)
le_labels << ['le', '']! le_labels << ['le', '']!
for j, bucket in e.buckets { for j, bucket in hist_data.buckets {
le_labels[le_labels.len - 1][1] = bucket.str() le_labels[le_labels.len - 1][1] = bucket.str()
m = Metric{ m = Metric{
@ -107,17 +83,19 @@ pub fn (mut e PrometheusExporter) export_to_writer(mut writer io.Writer) ! {
labels: le_labels labels: le_labels
} }
writer.write('${e.serialize_metric(m)} ${bucket_counts[j]}\n'.bytes())! writer.write('${e.serialize_metric(m)} ${hist_data.bucket_counts[j]}\n'.bytes())!
} }
// Always output the +Inf bucket // Always output the +Inf bucket
le_labels[le_labels.len - 1][1] = '+Inf' le_labels[le_labels.len - 1][1] = '+Inf'
m = Metric{ if hist_data.buckets.len > 0 {
name: '${hist.name}_bucket' m = Metric{
labels: le_labels name: '${hist.name}_bucket'
} labels: le_labels
}
writer.write('${e.serialize_metric(m)} $total_count\n'.bytes())! writer.write('${e.serialize_metric(m)} $hist_data.total_count\n'.bytes())!
}
} }
} }

View File

@ -4,7 +4,7 @@ fn test_only_counters() {
mut m := new_default_collector() mut m := new_default_collector()
m.counter_increment(name: 'test') m.counter_increment(name: 'test')
mut e := new_prometheus_exporter([]) mut e := new_prometheus_exporter()
e.load('hi_', m) e.load('hi_', m)
assert e.export_to_string()! == 'hi_test 1\n' assert e.export_to_string()! == 'hi_test 1\n'
@ -23,7 +23,7 @@ fn test_only_gauges() {
mut m := new_default_collector() mut m := new_default_collector()
m.gauge_set(3.25, name: 'test') m.gauge_set(3.25, name: 'test')
mut e := new_prometheus_exporter([]) mut e := new_prometheus_exporter()
e.load('hi_', m) e.load('hi_', m)
assert e.export_to_string()! == 'hi_test 3.25\n' assert e.export_to_string()! == 'hi_test 3.25\n'
@ -40,10 +40,11 @@ fn test_only_gauges() {
fn test_single_histogram() { fn test_single_histogram() {
mut m := new_default_collector() mut m := new_default_collector()
m.histogram_buckets_set('test', [0.5, 5.0])
m.histogram_record(5.0, name: 'test') m.histogram_record(5.0, name: 'test')
m.histogram_record(7.0, name: 'test') m.histogram_record(7.0, name: 'test')
mut e := new_prometheus_exporter([0.5, 5.0]) mut e := new_prometheus_exporter()
e.load('hi_', m) e.load('hi_', m)
assert e.export_to_string()! == 'hi_test_count 2\nhi_test_sum 12.0\nhi_test_bucket{le="0.5"} 0\nhi_test_bucket{le="5.0"} 1\nhi_test_bucket{le="+Inf"} 2\n' assert e.export_to_string()! == 'hi_test_count 2\nhi_test_sum 12.0\nhi_test_bucket{le="0.5"} 0\nhi_test_bucket{le="5.0"} 1\nhi_test_bucket{le="+Inf"} 2\n'