diff --git a/apps/aecore/config/dev.exs b/apps/aecore/config/dev.exs index a8612f40..ede8d3bb 100644 --- a/apps/aecore/config/dev.exs +++ b/apps/aecore/config/dev.exs @@ -53,3 +53,7 @@ config :aecore, :pow, version: 1, difficulty_target: 1 } + +config :aecore, :peers, + peers_target_count: 2, + peers_max_count: 4 diff --git a/apps/aecore/config/prod.exs b/apps/aecore/config/prod.exs index a896ff53..11dc49d2 100644 --- a/apps/aecore/config/prod.exs +++ b/apps/aecore/config/prod.exs @@ -53,3 +53,7 @@ config :aecore, :pow, version: 1, difficulty_target: 1 } + +config :aecore, :peers, + peers_target_count: 25, + peers_max_count: 50 diff --git a/apps/aecore/config/test.exs b/apps/aecore/config/test.exs index dce2fdae..79603cc3 100644 --- a/apps/aecore/config/test.exs +++ b/apps/aecore/config/test.exs @@ -51,3 +51,8 @@ config :aecore, :pow, version: 1, difficulty_target: 1 } + +config :aecore, :peers, + peers_target_count: 2, + peers_max_count: 4 + diff --git a/apps/aecore/lib/aecore/peers/scheduler.ex b/apps/aecore/lib/aecore/peers/scheduler.ex deleted file mode 100644 index 357bf7ae..00000000 --- a/apps/aecore/lib/aecore/peers/scheduler.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule Aecore.Peers.Scheduler do - use GenServer - - alias Aecore.Peers.Worker, as: Peers - - @check_time 60_000 - - def start_link(_args) do - GenServer.start_link(__MODULE__, %{}) - end - - def init(state) do - schedule_work() - {:ok, state} - end - - def handle_info(:work, state) do - Peers.check_peers() - schedule_work() - {:noreply, state} - end - - defp schedule_work() do - Process.send_after(self(), :work, @check_time) - end -end diff --git a/apps/aecore/lib/aecore/peers/sync.ex b/apps/aecore/lib/aecore/peers/sync.ex new file mode 100644 index 00000000..76e80dc3 --- /dev/null +++ b/apps/aecore/lib/aecore/peers/sync.ex @@ -0,0 +1,108 @@ +defmodule Aecore.Peers.Sync do + + @check_time 60_000 + @peers_target_count Application.get_env(:aecore, :peers)[:peers_target_count] + + alias Aecore.Peers.Worker, as: Peers + alias Aehttpclient.Client, as: HttpClient + + use GenServer + + require Logger + + def start_link(_args) do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def init(state) do + Process.send_after(self(), :work, 5_000) + {:ok, state} + end + + def handle_info(:work, state) do + check_peers() + introduce_variety() + refill() + schedule_work() + {:noreply, state} + end + + defp schedule_work() do + Process.send_after(self(), :work, @check_time) + end + + defp check_peers do + Peers.check_peers() + end + + #To make sure no peer is more popular in network then others, + #we remove one peer at random if we have at least target_count of peers. + @spec introduce_variety :: :ok + defp introduce_variety do + peers_count = map_size(Peers.all_peers()) + if peers_count >= @peers_target_count do + random_peer = Enum.random(Map.keys(Peers.all_peers())) + Logger.info(fn -> "Removing #{random_peer} to introduce variety" end) + Peers.remove_peer(random_peer) + :ok + else + :ok + end + end + + #If our peer count is lower then @peers_target_count, + #we request peers list from all known peers and choose at random + #min(peers_we_need_to_have_target_count, peers_we_currently_have) + #new peers to add. + @spec refill :: :ok | {:error, term()} + defp refill do + peers_count = map_size(Peers.all_peers()) + cond do + peers_count == 0 -> + Logger.error(fn -> "No peers" end) + {:error, "No peers"} + peers_count < @peers_target_count -> + all_peers = Map.keys(Peers.all_peers()) + new_count = get_newpeers_and_add(all_peers) + if new_count > 0 do + Logger.info(fn -> "Aquired #{new_count} new peers" end) + :ok + else + Logger.error(fn -> "No new peers added when trying to refill peers" end) + {:error, "No new peers added"} + end + true -> + :ok + end + end + + defp get_newpeers_and_add(known) do + known_count = length(known) + known_set = MapSet.new(known) + known + |> Enum.shuffle + |> Enum.take(@peers_target_count - known_count) + |> Enum.reduce([], fn(peer, acc) -> + case (HttpClient.get_peers(peer)) do + {:ok, list} -> Enum.concat(list, acc) + :error -> acc + end + end) + |> Enum.reduce([], fn(peer, acc) -> + if MapSet.member?(known_set, peer) do + acc + else + [peer | acc] + end + end) + |> Enum.shuffle + |> Enum.take(Enum.min([@peers_target_count - known_count, known_count])) + |> Enum.reduce(0, fn(peer, acc) -> + case Peers.add_peer(peer) do + :ok -> acc+1 + _ -> acc + end + end) + end +end + diff --git a/apps/aecore/lib/aecore/peers/worker.ex b/apps/aecore/lib/aecore/peers/worker.ex index b620e4e6..32093561 100644 --- a/apps/aecore/lib/aecore/peers/worker.ex +++ b/apps/aecore/lib/aecore/peers/worker.ex @@ -15,6 +15,9 @@ defmodule Aecore.Peers.Worker do require Logger @mersenne_prime 2147483647 + @peers_max_count Application.get_env(:aecore, :peers)[:peers_max_count] + @probability_of_peer_remove_when_max 0.5 + def start_link(_args) do GenServer.start_link(__MODULE__, %{peers: %{}, nonce: :rand.uniform(@mersenne_prime)}, name: __MODULE__) @@ -72,28 +75,34 @@ defmodule Aecore.Peers.Worker do {:ok, initial_peers} end -def handle_call({:add_peer,uri}, _from, %{peers: peers, nonce: own_nonce} = state) do - case(Client.get_info(uri)) do - {:ok, info} -> - case own_nonce == info.peer_nonce do - false -> - if(info.genesis_block_hash == genesis_block_header_hash()) do - updated_peers = Map.put(peers, uri, info.current_block_hash) - Logger.info(fn -> "Added #{uri} to the peer list" end) - {:reply, :ok, %{state | peers: updated_peers}} - else - Logger.error(fn -> - "Failed to add #{uri}, genesis header hash not valid" end) - {:reply, {:error, "Genesis header hash not valid"}, %{state | peers: peers}} - end - true -> - Logger.debug(fn -> - "Failed to add #{uri}, equal peer nonces" end) - {:reply, {:error, "Equal peer nonces"}, %{state | peers: peers}} - end - :error -> - Logger.error("GET /info request error") - {:reply, :error, %{state | peers: peers}} + def handle_call({:add_peer,uri}, _from, %{peers: peers, nonce: own_nonce} = state) do + if Map.has_key?(peers, uri) do + Logger.debug(fn -> + "Skipped adding #{uri}, already known" end) + {:reply, {:error, "Peer already known"}, state} + else + case check_peer(uri, own_nonce) do + {:ok, info} -> + if should_a_peer_be_added(map_size(peers)) do + peers_update1 = + if map_size(peers) >= @peers_max_count do + random_peer = Enum.random(Map.keys(peers)) + Logger.debug(fn -> "Max peers reached. #{random_peer} removed" end) + Map.delete(peers, random_peer) + else + peers + end + updated_peers = Map.put(peers_update1, uri, info.current_block_hash) + Logger.info(fn -> "Added #{uri} to the peer list" end) + {:reply, :ok, %{state | peers: updated_peers}} + else + Logger.debug(fn -> "Max peers reached. #{uri} not added" end) + {:reply, :ok, state} + end + {:error, reason} -> + Logger.error(fn -> "Failed to add peer. reason=#{reason}" end) + {:reply, {:error, reason}, state} + end end end @@ -161,7 +170,31 @@ def handle_call({:add_peer,uri}, _from, %{peers: peers, nonce: own_nonce} = stat end end + defp check_peer(uri, own_nonce) do + case(Client.get_info(uri)) do + {:ok, info} -> + case own_nonce == info.peer_nonce do + false -> + if(info.genesis_block_hash == genesis_block_header_hash()) do + {:ok, info} + else + {:error, "Genesis header hash not valid"} + end + true -> + {:error, "Equal peer nonces"} + end + :error -> + {:error, "Request error"} + end + end + + defp should_a_peer_be_added peers_count do + peers_count < @peers_max_count + || :rand.uniform() < @probability_of_peer_remove_when_max + end + defp prep_data(:new_tx, %{}=data), do: Serialization.tx(data, :serialize) defp prep_data(:new_block, %{}=data), do: Serialization.block(data, :serialize) end + diff --git a/apps/aecore/lib/aecore/peers/worker/supervisor.ex b/apps/aecore/lib/aecore/peers/worker/supervisor.ex index 5d593f2e..fba0b3b7 100644 --- a/apps/aecore/lib/aecore/peers/worker/supervisor.ex +++ b/apps/aecore/lib/aecore/peers/worker/supervisor.ex @@ -8,7 +8,7 @@ defmodule Aecore.Peers.Worker.Supervisor do def init(:ok) do children = [ Aecore.Peers.Worker, - Aecore.Peers.Scheduler + Aecore.Peers.Sync ] Supervisor.init(children, strategy: :one_for_one) diff --git a/apps/aecore/test/aecore_peers_test.exs b/apps/aecore/test/aecore_peers_test.exs index 41d8a5e7..3353afb5 100644 --- a/apps/aecore/test/aecore_peers_test.exs +++ b/apps/aecore/test/aecore_peers_test.exs @@ -10,7 +10,7 @@ defmodule AecorePeersTest do end test "add peer, get all peers, check peers and remove the peer" do - assert :error = Peers.add_peer("localhost:4000") + assert {:error, _} = Peers.add_peer("localhost:4000") assert Enum.count(Peers.all_peers) == 0 assert :ok = Peers.check_peers assert {:error, "Peer not found"} =