Lucas Sifoni

Periodic averages with overflowing buckets

elixirprogramming


I recently had the need of keeping track of time-windowed averages of values emitted by a periodic system (f = 2 Hz), on a constrained system (1GHz single-core SOC). The general idea is that a pulse emits a floating point value every 500ms, and I must keep track of the average value over the last 1s, the last 5s, up to a two week time span.

Two weeks of values means 2419200 values.

Circular buffer implementation.

An implementation is made after getting advice about using a circular buffer. This data structure offers the ease of automatically expiring out-of-scope values when the buffer is full and a new value is pushed.

defmodule TimeValues do
  use Agent

  @bucket_sizes [2, 5, 3, 4, 5, 3, 4, 2, 2, 3, 2, 7]
  @labels ["1 second", "5 seconds", "15 seconds", "1 minute", "5 minutes", "15 minutes", "1 hour", "2 hours", "4 hours", "12 hours", "1 day", "7 days"]
  @sizes @bucket_sizes
    |> Enum.with_index()
    |> Enum.map(fn \{_, i\} ->
        Enum.reduce(@bucket_sizes |> Enum.slice(Range.new(0, i)), 1, fn (a, m) -> m * a end)
    end)
  @sizes_labels Enum.zip(@sizes, @labels)

I start by pre-computing the time window widths in the @sizes_labels module attribute :

[
  {2, "1 second"},
  {10, "5 seconds"},
  {30, "15 seconds"},
  {120, "1 minute"},
  {600, "5 minutes"},
  {1800, "15 minutes"},
  {7200, "1 hour"},
  {14400, "2 hours"},
  {28800, "4 hours"},
  {86400, "12 hours"},
  {172800, "1 day"},
  {1209600, "7 days"}
]

Then, the TimeValues module is made an Agent, keeping track of a CircularBuffer (https://hex.pm/packages/circular_buffer) backed by erlang’s :queue module. Every pulse inserts a new value in the circular buffer by calling push_value.

  use Agent

  def init_value(), do: CircularBuffer.new(2 * 3600 * 24 * 7)

  def start_link(_), do: Agent.start(fn () -> init_value() end, name: __MODULE__)

  def push_value(value) do
    Agent.update(__MODULE__, fn (state) ->
      CircularBuffer.insert(state, value)
    end)
  end

  def averages() do
    Agent.get(__MODULE__, fn (state) ->
      list = CircularBuffer.to_list(state) |> Enum.reverse()
      @sizes_labels |> Enum.reduce([], fn ({s, l}, out) ->
        [{l, (Enum.slice(list, Range.new(0, s)) |> Enum.sum()) / s} | out]
      end) |> Enum.reverse()
    end)
  end
end

Getting the averages is then getting the appropriate slices of the circular buffer by using the precomputed window widths.

iex(5)> TimeValues.averages
[
  {"1 second", 1.736297781463606},
  {"5 seconds", 1.2222835758760193},
  {"15 seconds", 1.1009895991799596},
  {"1 minute", 1.1712411620641587},
  {"5 minutes", 1.1525250356686392},
  {"15 minutes", 1.160297458693731},
  {"1 hour", 1.1552837191731788},
  {"2 hours", 1.1556979150632054},
  {"4 hours", 1.1540732856346982},
  {"12 hours", 1.154558061279052},
  {"1 day", 1.154920291828406},
  {"7 days", 1.154455477404886}
]

Those averages have the advantage of always being up-to-date, rolling windows over the set of values. The ease of using a circular buffer is nice. But they’re quite costly to compute since you have to traverse a lot of elements. Doing this computation at every value is a bit inefficient, especially since a single value will have very little impact on long period.

I stopped for a while and thought about the pros and cons of another, “instinctive” implementation.

Overflowing buckets of values

Picture a bucket of capacity n : you can push values inside of it, but when it’s full, it emits the average of the values it holds, and empties its contents.

defmodule Bucket do
  defstruct capacity: 0, items: 0, values: [], average: 0, name: ""

  def make(size, options \\ []) do
    %__MODULE__{
      capacity: size,
      items: 0,
      values: [],
      average: 0,
      name: Keyword.get(options, :name, ""),
    }
  end
end

We keep track of the number of items placed in the bucket, to avoid expensive length calls. The stamp function adds metadata to every value, but that’s not really relevant here. We pattern match on capacity == number of items by matching both keys with s to detect an overflow condition. We also update the bucket’s current average value.

  def push(%__MODULE__{capacity: s, items: s, values: vs} = b, value) do
    avg = (vs |> Enum.map(&elem(&1, 0)) |> Enum.sum()) / s
    {:overflowed,
      %__MODULE__{
        capacity: s,
        items: 1,
        values: [stamp(value)],
        average: avg,
        name: b.name
      },
      avg
     }
  end

  def push(%__MODULE__{capacity: s, items: n, values: vs} = b, value) when n < s do
    avg = ([stamp(value) | vs] |> Enum.map(&elem(&1, 0)) |> Enum.sum()) / (n + 1)
    {:ok,
      %__MODULE__{
        capacity: s,
        items: n + 1,
        values: [stamp(value) | vs],
        average: avg,
        name: b.name
      }
    }
  end

Usage is then quite easy :

    test "bucket pushing" do
      b = Bucket.make(3) # make a bucket of capacity 3
      {:ok, b} = Bucket.push(b, 1.5) # push a value
      q = [1.5]
      %Bucket{capacity: 3, items: 1, values: q} = b
      {:ok, b} = Bucket.push(b, 1.6) # push two other values
      {:ok, b} = Bucket.push(b, 1.4)
      {:overflowed, b, average} = Bucket.push(b, 1.5) # push a fourth value, overflowing the bucket
      1.5 = average
    end

Chaining buckets

After having defined a single Bucket, we define a BucketChain structure that will be responsible of holding multiple buckets.

defmodule BucketChain do
  alias Bucket
  defstruct num_buckets: 0, buckets: []

  def make() do
    %__MODULE__{
      num_buckets: 0,
      buckets: []
    }
  end

  def make(%Bucket{} = b) do
    %__MODULE__{
      num_buckets: 1,
      buckets: [b]
    }
  end

  def make([h|t]) do
    Enum.reduce(t, make(h), fn (b, out) ->
      push_bucket(out, b)
    end)
  end

  def push_bucket(%__MODULE__{} = bs, %Bucket{} = b) do
    %__MODULE__{
      num_buckets: bs.num_buckets + 1,
      buckets: bs.buckets ++ [b]
    }
  end
end

We offer a few convenience functions to create a bucket chain, with either no buckets, a single, or many buckets.

Pushing a value to the bucket chain is then reducing over our list of buckets, and if a bucket overflows, carrying its average as input to the next bucket. If a bucket does not overflow, we stop carrying values and the remaining buckets are just taken as-is.


  def push_value(%__MODULE__{num_buckets: 0}, _value) do
    {:error, :no_buckets}
  end
  def push_value(%__MODULE__{} = bs, value) do
    {_, updated_buckets} = Enum.reduce(bs.buckets, {value, []}, fn (b, {v, out}) ->
      case v do
        nil -> {nil, out ++ [b]}
        a -> case Bucket.push(b, a) do
          {:ok, updated_b} -> {nil, out ++ [updated_b]}
          {:overflowed, updated_b, average} -> {average, out ++ [updated_b]}
        end
      end
    end)
    {:ok, %__MODULE__{
      num_buckets: bs.num_buckets,
      buckets: updated_buckets
    }}
  end
end

Using this module, our alternate TimeValues implementation looks like this :

defmodule TimeBuckets do
  alias Bucket
  alias BucketChain
  use Agent

  def init_value() do
    BucketChain.make([
      Bucket.make(2, name: "1 second"),
      Bucket.make(5, name: "5 seconds"),
      Bucket.make(3, name: "15 seconds"),
      Bucket.make(4, name: "1 minute"),
      Bucket.make(5, name: "5 minutes"),
      Bucket.make(3, name: "15 minutes"),
      Bucket.make(4, name: "1 hour"),
      Bucket.make(2, name: "2 hours"),
      Bucket.make(2, name: "4 hours"),
      Bucket.make(3, name: "12 hours"),
      Bucket.make(2, name: "1 day"),
      Bucket.make(7, name: "7 days"),
    ])
  end

  def start_link(_) do
    Agent.start_link(fn () -> init_value() end, name: __MODULE__)
  end

  def push_value(value) do
    Agent.update(__MODULE__, fn (buckets) ->
      {:ok, buckets} = Buckets.push_value(buckets, value)
      buckets
    end)
  end

  def averages() do
    Agent.get(__MODULE__, fn (buckets) ->
      Enum.map(buckets.buckets, fn b -> {b.name, b.average} end)
    end)
  end
end

By playing with this average-of-averages system, we only add the capacities of the buckets, and hold 42 values for a week instead of 1209600. The downsides are some loss of precision on the floating point values at each stage, since we have a lot of divisions going around instead of one, and the fact that the upper buckets do not possess any value if we did not run the system long enough to fill them. Also, the lower buckets do not show a rolling window average, but a revolved period average.

These downsides aren’t problematic for my use, but you would have to seriously consider them if implementing a similar solution. I ran tests confirming that the average-of-averages approach doesn’t deviate too much from the average-of-all-values approach over the 7 days or two week periods.

test "buckets implementation" do
    n_values = 2 * 5 * 3 * 4 * 5 * 3 * 4 * 2 * 2 * 3 * 2 * 7
    vals = for _ <- 0..n_values do
    v = :rand.uniform() + :rand.uniform() + 0.1542
    TimeBuckets.push_value(v)
    v
    end
    mean = Enum.sum(vals) / n_values
    assert Float.round((TimeBuckets.averages() |> List.last() |> elem(1)), 3) == Float.round(mean, 3)
end

Benchmarking

In my case, an artistic installation with embedded Elixir, a short computation time at each tick was more important than absolute correctness. Think of the lower period buckets driving high frequency effects, and upper period buckets driving low frequency effects.

Getting the averages (runs at each tick) :

Name                   ips        average  deviation         median         99th %
time_buckets      627.24 K     0.00159 ms  ±1255.28%     0.00142 ms     0.00200 ms
time_values       0.0218 K       45.89 ms    ±13.32%       46.22 ms       60.72 ms

Comparison:
time_buckets      627.24 K
time_values       0.0218 K - 28782.20x slower +45.89 ms

Pushing a value (runs at each tick) :

time_values       813.54 K        1.23 μs  ±4532.33%        1.04 μs        1.83 μs
time_buckets      437.52 K        2.29 μs   ±890.89%        2.04 μs        4.83 μs

Comparison:
time_values       813.54 K
time_buckets      437.52 K - 1.86x slower +1.06 μs

TimeBuckets is slower when pushing a value, since it has to reduce over the list of buckets, but not slower enough that I would de-consider it for my purpose.

If you know this technique and know it has a name, please drop me a line at contact@this domain, because I have been unable to find previous examples so far.


Previous post : I would never allow users to upload PDFs in place of pictures
Next post : [Talk] Code BEAM EU 23 - Prototyping a remote-controlled telescope with Elixir