Periodic averages with overflowing buckets
I recently had the need of keeping track of time-windowed averages of values emitted by a periodic system (f = 2 Hz)
, on a constrained system (1GHz single-core SOC). The general idea is that a pulse emits a floating point value every 500ms
, and I must keep track of the average value over the last 1s
, the last 5s
, up to a two week time span.
Two weeks of values means 2419200 values.
Circular buffer implementation.
An implementation is made after getting advice about using a circular buffer. This data structure offers the ease of automatically expiring out-of-scope values when the buffer is full and a new value is pushed.
defmodule TimeValues do
use Agent
@bucket_sizes [2, 5, 3, 4, 5, 3, 4, 2, 2, 3, 2, 7]
@labels ["1 second", "5 seconds", "15 seconds", "1 minute", "5 minutes", "15 minutes", "1 hour", "2 hours", "4 hours", "12 hours", "1 day", "7 days"]
@sizes @bucket_sizes
|> Enum.with_index()
|> Enum.map(fn \{_, i\} ->
Enum.reduce(@bucket_sizes |> Enum.slice(Range.new(0, i)), 1, fn (a, m) -> m * a end)
end)
@sizes_labels Enum.zip(@sizes, @labels)
I start by pre-computing the time window widths in the @sizes_labels
module attribute :
[
{2, "1 second"},
{10, "5 seconds"},
{30, "15 seconds"},
{120, "1 minute"},
{600, "5 minutes"},
{1800, "15 minutes"},
{7200, "1 hour"},
{14400, "2 hours"},
{28800, "4 hours"},
{86400, "12 hours"},
{172800, "1 day"},
{1209600, "7 days"}
]
Then, the TimeValues
module is made an Agent
, keeping track of a CircularBuffer
(https://hex.pm/packages/circular_buffer) backed by erlang’s :queue
module. Every pulse inserts a new value in the circular buffer by calling push_value
.
use Agent
def init_value(), do: CircularBuffer.new(2 * 3600 * 24 * 7)
def start_link(_), do: Agent.start(fn () -> init_value() end, name: __MODULE__)
def push_value(value) do
Agent.update(__MODULE__, fn (state) ->
CircularBuffer.insert(state, value)
end)
end
def averages() do
Agent.get(__MODULE__, fn (state) ->
list = CircularBuffer.to_list(state) |> Enum.reverse()
@sizes_labels |> Enum.reduce([], fn ({s, l}, out) ->
[{l, (Enum.slice(list, Range.new(0, s)) |> Enum.sum()) / s} | out]
end) |> Enum.reverse()
end)
end
end
Getting the averages is then getting the appropriate slices of the circular buffer by using the precomputed window widths.
iex(5)> TimeValues.averages
[
{"1 second", 1.736297781463606},
{"5 seconds", 1.2222835758760193},
{"15 seconds", 1.1009895991799596},
{"1 minute", 1.1712411620641587},
{"5 minutes", 1.1525250356686392},
{"15 minutes", 1.160297458693731},
{"1 hour", 1.1552837191731788},
{"2 hours", 1.1556979150632054},
{"4 hours", 1.1540732856346982},
{"12 hours", 1.154558061279052},
{"1 day", 1.154920291828406},
{"7 days", 1.154455477404886}
]
Those averages have the advantage of always being up-to-date, rolling windows over the set of values. The ease of using a circular buffer is nice. But they’re quite costly to compute since you have to traverse a lot of elements. Doing this computation at every value is a bit inefficient, especially since a single value will have very little impact on long period.
I stopped for a while and thought about the pros and cons of another, “instinctive” implementation.
Overflowing buckets of values
Picture a bucket of capacity n
: you can push values inside of it, but when it’s full, it emits the average of the values it holds, and empties its contents.
defmodule Bucket do
defstruct capacity: 0, items: 0, values: [], average: 0, name: ""
def make(size, options \\ []) do
%__MODULE__{
capacity: size,
items: 0,
values: [],
average: 0,
name: Keyword.get(options, :name, ""),
}
end
end
We keep track of the number of items placed in the bucket, to avoid expensive length
calls. The stamp
function adds metadata to every value, but that’s not really relevant here. We pattern match on capacity == number of items
by matching both keys with s
to detect an overflow condition. We also update the bucket’s current average value.
def push(%__MODULE__{capacity: s, items: s, values: vs} = b, value) do
avg = (vs |> Enum.map(&elem(&1, 0)) |> Enum.sum()) / s
{:overflowed,
%__MODULE__{
capacity: s,
items: 1,
values: [stamp(value)],
average: avg,
name: b.name
},
avg
}
end
def push(%__MODULE__{capacity: s, items: n, values: vs} = b, value) when n < s do
avg = ([stamp(value) | vs] |> Enum.map(&elem(&1, 0)) |> Enum.sum()) / (n + 1)
{:ok,
%__MODULE__{
capacity: s,
items: n + 1,
values: [stamp(value) | vs],
average: avg,
name: b.name
}
}
end
Usage is then quite easy :
test "bucket pushing" do
b = Bucket.make(3) # make a bucket of capacity 3
{:ok, b} = Bucket.push(b, 1.5) # push a value
q = [1.5]
%Bucket{capacity: 3, items: 1, values: q} = b
{:ok, b} = Bucket.push(b, 1.6) # push two other values
{:ok, b} = Bucket.push(b, 1.4)
{:overflowed, b, average} = Bucket.push(b, 1.5) # push a fourth value, overflowing the bucket
1.5 = average
end
Chaining buckets
After having defined a single Bucket
, we define a BucketChain
structure that will be responsible of holding multiple buckets.
defmodule BucketChain do
alias Bucket
defstruct num_buckets: 0, buckets: []
def make() do
%__MODULE__{
num_buckets: 0,
buckets: []
}
end
def make(%Bucket{} = b) do
%__MODULE__{
num_buckets: 1,
buckets: [b]
}
end
def make([h|t]) do
Enum.reduce(t, make(h), fn (b, out) ->
push_bucket(out, b)
end)
end
def push_bucket(%__MODULE__{} = bs, %Bucket{} = b) do
%__MODULE__{
num_buckets: bs.num_buckets + 1,
buckets: bs.buckets ++ [b]
}
end
end
We offer a few convenience functions to create a bucket chain, with either no buckets, a single, or many buckets.
Pushing a value to the bucket chain is then reducing over our list of buckets, and if a bucket overflows, carrying its average as input to the next bucket. If a bucket does not overflow, we stop carrying values and the remaining buckets are just taken as-is.
def push_value(%__MODULE__{num_buckets: 0}, _value) do
{:error, :no_buckets}
end
def push_value(%__MODULE__{} = bs, value) do
{_, updated_buckets} = Enum.reduce(bs.buckets, {value, []}, fn (b, {v, out}) ->
case v do
nil -> {nil, out ++ [b]}
a -> case Bucket.push(b, a) do
{:ok, updated_b} -> {nil, out ++ [updated_b]}
{:overflowed, updated_b, average} -> {average, out ++ [updated_b]}
end
end
end)
{:ok, %__MODULE__{
num_buckets: bs.num_buckets,
buckets: updated_buckets
}}
end
end
Using this module, our alternate TimeValues
implementation looks like this :
defmodule TimeBuckets do
alias Bucket
alias BucketChain
use Agent
def init_value() do
BucketChain.make([
Bucket.make(2, name: "1 second"),
Bucket.make(5, name: "5 seconds"),
Bucket.make(3, name: "15 seconds"),
Bucket.make(4, name: "1 minute"),
Bucket.make(5, name: "5 minutes"),
Bucket.make(3, name: "15 minutes"),
Bucket.make(4, name: "1 hour"),
Bucket.make(2, name: "2 hours"),
Bucket.make(2, name: "4 hours"),
Bucket.make(3, name: "12 hours"),
Bucket.make(2, name: "1 day"),
Bucket.make(7, name: "7 days"),
])
end
def start_link(_) do
Agent.start_link(fn () -> init_value() end, name: __MODULE__)
end
def push_value(value) do
Agent.update(__MODULE__, fn (buckets) ->
{:ok, buckets} = Buckets.push_value(buckets, value)
buckets
end)
end
def averages() do
Agent.get(__MODULE__, fn (buckets) ->
Enum.map(buckets.buckets, fn b -> {b.name, b.average} end)
end)
end
end
By playing with this average-of-averages system, we only add the capacities of the buckets, and hold 42 values for a week instead of 1209600. The downsides are some loss of precision on the floating point values at each stage, since we have a lot of divisions going around instead of one, and the fact that the upper buckets do not possess any value if we did not run the system long enough to fill them. Also, the lower buckets do not show a rolling window average, but a revolved period average.
These downsides aren’t problematic for my use, but you would have to seriously consider them if implementing a similar solution. I ran tests confirming that the average-of-averages approach doesn’t deviate too much from the average-of-all-values approach over the 7 days or two week periods.
test "buckets implementation" do
n_values = 2 * 5 * 3 * 4 * 5 * 3 * 4 * 2 * 2 * 3 * 2 * 7
vals = for _ <- 0..n_values do
v = :rand.uniform() + :rand.uniform() + 0.1542
TimeBuckets.push_value(v)
v
end
mean = Enum.sum(vals) / n_values
assert Float.round((TimeBuckets.averages() |> List.last() |> elem(1)), 3) == Float.round(mean, 3)
end
Benchmarking
In my case, an artistic installation with embedded Elixir, a short computation time at each tick was more important than absolute correctness. Think of the lower period buckets driving high frequency effects, and upper period buckets driving low frequency effects.
Getting the averages (runs at each tick) :
Name ips average deviation median 99th %
time_buckets 627.24 K 0.00159 ms ±1255.28% 0.00142 ms 0.00200 ms
time_values 0.0218 K 45.89 ms ±13.32% 46.22 ms 60.72 ms
Comparison:
time_buckets 627.24 K
time_values 0.0218 K - 28782.20x slower +45.89 ms
Pushing a value (runs at each tick) :
time_values 813.54 K 1.23 μs ±4532.33% 1.04 μs 1.83 μs
time_buckets 437.52 K 2.29 μs ±890.89% 2.04 μs 4.83 μs
Comparison:
time_values 813.54 K
time_buckets 437.52 K - 1.86x slower +1.06 μs
TimeBuckets
is slower when pushing a value, since it has to reduce over the list of buckets, but not slower enough that I would de-consider it for my purpose.
If you know this technique and know it has a name, please drop me a line at contact@this domain
, because I have been unable to find previous examples so far.