A worm in my Erlang cluster, and adventures in microfluidics
Just to be clear, it should maybe be “minifluidics” or “millifluidics” but I’ll ask for forgiveness on this one, because microfluidics reads better. We’ll see what this has to do with my Erlang cluster a bit later.
Sparsely connected Erlang clusters
An erlang cluster, by default, is fully meshed, meaning that every node maintains a connection to all the others. Since this can lead to excessive chatter and an explosion of edges, it is possible to not fully connect an Erlang cluster, and connect some peers to select peers. This means that instead of having a full mesh, you can cut sub-meshes in an Erlang (or Elixir) cluster and connect them together via bridges, aka single (or sparse) connections.
We will use this notation for graphs from now on, which can be interpreted by .dot visualizers :
graph G {
a -- b;
b -- c;
a -- c;
}
The above graph describes a fully-meshed 3-node graph, while the below one describes a 4-node cluster where the “d” node only connects to “c”, meaning the nodes form a triangle with a tail.
graph G {
a -- b;
b -- c;
a -- c;
d -- c;
}
An Erlang node can list the nodes it sees by calling :erlang.nodes() or Node.list() in Elixir. My question is then : how can you map an arbitrary cluster with arbitrary connections, from a single node ?
Walking an Erlang cluster like a graph
My answer is : by asking all the nodes for their neighbours, and comparing the answers with the neighbours I can see.
In the above graph, if I am on node a, I will ask :
b, who answers[:a, :c]c, who answers[:a, :b, :d]
My own neighbors are [:b, :c]. If I take the difference between my own neighbors and the answers, node c gives me new knowledge : there is [:d] that only c can reach.
But what if d itself has neighbors ? I can ask c to ask d to query their own neighbors. d would report to c who would report to me. And if d finds that some of its neighbors have neighbors it cannot see ? This must continue.
We need to flood-fill the graph no matter its topology.
I’ve promised I’d talk about fluids : the graph traversal illustrations here and lower were done by moving ink in channels, and there are a few details about that in this child post : behind the scenes post.
And to do this, I would like to not ship code to more than a single node.
The need for self-propagating code in the cluster
My goal is now clear : I want to build a mapping tool that works with any Erlang cluster and reports its full topology, no matter how sparse or dense are the connections, and I want this tool to be a single file that I can ship or paste to a single node. Because clustered nodes have no obligation to share code, and hot-loading mechanisms only load code on one node. So, if you have a Probe module loaded on your node, you can’t :erpc.call(neighbor, Probe, :run, []) on it if it does not have this module.
Thankfully, Erlang has tools for us : :code.load_binary(module, filename, binary) where filename is only to tag the newly created module in the code server, and does not map to a filesystem operation, and :code.get_object_code(module) that gives us the object code for a loaded module, but is unable to recover the object code for an in-memory-loaded module.
If you paste :
defmodule Probe do
def run() do
...
end
end
into iEx, and you call :code.get_object_code(Probe), despite the module being defined, you get an :error.
So, our first task is to create a module that we can paste in a first node, and that makes this first node have access to the module binary. I settled to use Kernel.ParallelCompiler.compile_to_path/2 which produces a .beam file, and to add the temporary compilation path to the code server via Code.append_path/1.
defmodule ProbeWrapper do
def load() do
payload = """
defmodule ActualProbe do
def run() do
:ok
end
end
"""
tmp = System.tmp_dir!
name = (:crypto.strong_rand_bytes(16) |> Base.encode16) <> ".ex"
path = Path.join(tmp, name)
File.write!(path, payload)
Kernel.ParallelCompiler.compile_to_path([path], tmp)
Code.append_path(tmp)
end
end
When you paste this, you get :
{:module, ProbeWrapper,
<<70, 79, 82, 49, 0, 0, 8, 100, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 1, 113,
0, 0, 0, 34, 19, 69, 108, 105, 120, 105, 114, 46, 80, 114, 111, 98, 101, 87,
114, 97, 112, 112, 101, 114, 8, 95, 95, ...>>, {:load, 0}}
Then you can run ActualProbe, but also access its code :
iex(2)> ProbeWrapper.load
true
iex(3)> ActualProbe.run
:ok
iex(4)> :code.get_object_code(ActualProbe)
{ActualProbe,
<<70, 79, 82, 49, 0, 0, 5, 224, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 0, 176,
0, 0, 0, 18, 18, 69, 108, 105, 120, 105, 114, 46, 65, 99, 116, 117, 97, 108,
80, 114, 111, 98, 101, 8, 95, 95, 105, 110, ...>>,
~c"/var/folders/28/fbhkc24n215b3jkjvkwq5bs80000gn/T/Elixir.ActualProbe.beam"}
And, now that we have an expression that returns the module binary, we can load it again :
iex(5)> {module, binary, _path} = :code.get_object_code(ActualProbe)
...
iex(6)> :code.load_binary(ActualProbe, ~c"some.fakename", binary)
{:module, ActualProbe}
So we have a first step : a pastable module that gives us access to object code that we can ship around and load. We can re-engineer ActualProbe to contain its own binary access.
defmodule ProbeWrapper do
def load() do
payload = """
defmodule ActualProbe do
def run() do
:ok
end
def self_code() do
{_, bin, _} = :code.get_object_code(ActualProbe)
bin
end
end
"""
tmp = System.tmp_dir!
name = (:crypto.strong_rand_bytes(16) |> Base.encode16) <> ".ex"
path = Path.join(tmp, name)
File.write!(path, payload)
Kernel.ParallelCompiler.compile_to_path([path], tmp)
Code.append_path(tmp)
end
end
We then need a graph traversal, an utility to kick it, to keep track of visited nodes, and to ship each node the probe module :
def run_probe(binary) do
IO.puts("Launching from #{node()}")
traverse(node(), MapSet.new(), binary)
end
def run_probe(binary, visited) do
IO.puts("Hello from #{node()}")
traverse(node(), visited, binary)
end
def traverse(my_node, visited_nodes, binary) do
visible_neighbors = :erlang.nodes()
initial = {Map.put(%{}, my_node, visible_neighbors), MapSet.put(visited_nodes, my_node)}
{adjacent, _} = Enum.reduce(visible_neighbors, initial, fn (neighbor, {adj, visited}) ->
if MapSet.member?(visited, neighbor) do
{adj, visited}
else
subgraph = visit(neighbor, visited, binary)
{Map.merge(adj, subgraph), MapSet.union(visited, MapSet.new(Map.keys(subgraph)))}
end
end)
adjacent
end
def visit(node, visited_nodes, binary) do
:erpc.call(node, :code, :load_binary, [__MODULE__, ~c"actual_probe.module", binary])
:erpc.call(node, __MODULE__, :run_probe, [binary, visited_nodes])
end
def self_code() do
{_, bin, _} = :code.get_object_code(ActualProbe)
bin
end
With this in place, we can run it :
iex(3)> ActualProbe.run_probe(ActualProbe.self_code)
%{nonode@nohost: []}
Well. We don’t have a cluster yet. Before we build a cluster, note that the current worm only discovers nodes, but it could also perform work on each node. We could very well add a parameter to pass a function to be ran.
We also are doing a sequential traversal. We could go parallel with :erpc.multicall for example, but either accept duplicate work, or engineer a distributed data structure over a non-fully connected cluster to prevent nodes being visited multiple times. Said otherwise, we are building a cluster discovery mechanism where we only ask each node “who can you see”, but you could add parameters to answer other questions about the cluster (or even load code over the whole cluster) without caring about its topology.
Building a cluster to worm in
To build our cluster, Erlang provides tools like the :peer module, that allows us to spawn new erlang peers. The node spawning other nodes is called the origin, and we will connect through stdio instead of full EPMD. We will also pass connect_all with a false value to mesh the cluster ourselves instead of the default, fully-connected graph.
{:ok, pid, node} =
:peer.start(%{
name: :"some_name",
connection: :standard_io,
args: [~c"-connect_all", ~c"false"]
})
We will create a module that allows us to track the peers, nodes, and edges, and setup all the peers.
defmodule LocalCluster do
defstruct [:peers, :edges, :nodes]
def start(string) when is_binary(string) do
start(parse_graph(string))
end
def start(edges) do
nodes = edges_to_nodes(edges)
peers = for node <- nodes do
{node, start_peer(node)}
end |> Enum.into(%{})
local_cluster = %__MODULE__{
nodes: nodes,
edges: edges,
peers: peers
}
mesh(local_cluster)
local_cluster
end
def start_peer(node) do
{:ok, pid, node2} =
:peer.start(%{
name: :"#{node}",
connection: :standard_io,
args: [~c"-connect_all", ~c"false"]
})
:peer.call(pid, :code, :add_pathsa, [:code.get_path()])
%{name: node, node: node2, pid: pid}
end
def run_on(cluster, name, {m, f, a}) do
:peer.call(cluster.peers[name].pid, m, f, a)
end
def mesh(%__MODULE__{peers: peers, edges: edges} = cluster) do
for {n1, n2} <- unique_edges(edges) do
run_on(cluster, n1, {:net_kernel, :connect_node, [peers[n2].node]})
end
end
def sample() do
"""
graph G {
a -- b;
b -- c;
a -- c;
b -- d;
d -- e;
e -- f;
f -- d;
}
"""
end
def parse_graph(graph) do
graph |> String.split("\n") |> Enum.reduce([], fn (line, out) ->
case Regex.run(~r/([^\s]+)\s*--\s*([^\s;]+);/, line) do
nil -> out
[_, n1, n2] -> [{:"#{n1}", :"#{n2}"} | out]
end
end)
end
end
A few utility functions (graph, parse_graph, unique_edges, edges_to_nodes…) are removed so we can focus on the mechanisms, but the full code is at the end of the post. When we paste it into iex, we can start a sample cluster :
{:module, LocalCluster,
<<70, 79, 82, 49, 0, 0, 29, 4, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 3, 24, 0,
0, 0, 80, 19, 69, 108, 105, 120, 105, 114, 46, 76, 111, 99, 97, 108, 67, 108,
117, 115, 116, 101, 114, 8, 95, 95, ...>>, {:unique_edges, 1}}
iex(2)> LocalCluster.start(LocalCluster.sample())
%LocalCluster{
peers: %{
c: %{name: :c, node: :c@mac, pid: #PID<0.120.0>},
f: %{name: :f, node: :f@mac, pid: #PID<0.115.0>},
a: %{name: :a, node: :a@mac, pid: #PID<0.119.0>},
d: %{name: :d, node: :d@mac, pid: #PID<0.116.0>},
e: %{name: :e, node: :e@mac, pid: #PID<0.117.0>},
b: %{name: :b, node: :b@mac, pid: #PID<0.118.0>}
},
edges: [f: :d, e: :f, d: :e, b: :d, a: :c, b: :c, a: :b],
nodes: [:f, :d, :e, :b, :a, :c]
}
[true, true, true, true, true, true, true]
Given that we own the list of nodes thanks to the accumulation in a struct, we can observe its topology by directly asking each node which neighbors it sees :
iex> cl = LocalCluster.start(LocalCluster.sample())
iex> LocalCluster.graph(cl)
"graph G {
a@mac -- c@mac;
b@mac -- c@mac;
d@mac -- f@mac;
e@mac -- f@mac;
a@mac -- b@mac;
d@mac -- e@mac;
b@mac -- d@mac;
}"
We see that we get the edges we described in the sample, but by virtue of observation.
Worming the graph
The full files as I ran them :
Now, in a fresh iex session, we can, in order :
- paste the LocalCluster module
- start the sample cluster with
cl = LocalCluster.start(LocalCluster.sample()) - paste the ProbeWrapper module
- call
ProbeWrapper.load - call
bin = ActualProbe.self_code() - call
LocalCluster.run_on(cl, :a, {:code, :load_binary, [ActualProbe, ~c"some.name", bin]}) - call
LocalCluster.run_on(cl, :a, {ActualProbe, :run_probe, [bin]})
iex(10)> LocalCluster.run_on(cl, :a, {ActualProbe, :run_probe, [bin]})
%{
f@mac: [:d@mac, :e@mac],
d@mac: [:f@mac, :e@mac, :b@mac],
e@mac: [:f@mac, :d@mac],
b@mac: [:d@mac, :c@mac, :a@mac],
a@mac: [:c@mac, :b@mac],
c@mac: [:a@mac, :b@mac]
}
Now we can flood fill complex graphs :-) .
Flood filling a complex graph to traverse it from a single point of entry : see the behind the scenes post for details.
A few gotcha’s
- I’ve forgotten more than once to call
:peer.call(pid, :code, :add_pathsa, [:code.get_path()])to load the Elixir stdlib on the bare Erlang nodes. Without it,MapSetas used in the probe cannot run. - Everything looked fine until my first execution of the whole sequence : in the bowtie-shaped cluster, the worm reported a connexion from
atof. This is because using:erpc.callcalls the remote function with the group leader of the calling process. This meant that a pid fromarides along the propagation, and nodes have to grab a connection toa. By default, the VM usesdist_auto_connect: :always, which is orthogonal toconnect_allthat only prevents a full meshing. So all nodes could seeaduring the worm propagation despite not being actually connected, as seen in the LocalCluster.graph call. The answer was to reset the group leader to the current node by calling:erlang.group_leader(:erlang.whereis(:init), self()). But this is interfering with the cluster in a way. Can we really perform a measurement without interference ? Your guess ;-) .
This post is inspired by Joe Armstrong’s post “My favorite erlang program” which I’ve read a few years ago. It came again to my mind recently, and after those years in the VM, I feel that I only start to truly grasp why this code was so elegant to him.