From 532bb45b4b37a76c7c983d40b629fd2afd32bc91 Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Sun, 12 Jan 2020 22:03:02 +0530 Subject: [PATCH 1/9] lib/codeshare: Add Server CRDT skeleton Add Identifier and Character struct. Add basic server crdt Agent logic. --- lib/codeshare/crdt/character.ex | 47 ++++++++++++++++++++++++ lib/codeshare/crdt/crdt.ex | 63 +++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 lib/codeshare/crdt/character.ex create mode 100644 lib/codeshare/crdt/crdt.ex diff --git a/lib/codeshare/crdt/character.ex b/lib/codeshare/crdt/character.ex new file mode 100644 index 0000000..88bc6f0 --- /dev/null +++ b/lib/codeshare/crdt/character.ex @@ -0,0 +1,47 @@ +defmodule Codeshare.Identifier do + alias __MODULE__ + + defstruct [ + position: 0, + siteId: -1 + ] + + def to_struct(identifier_map) do + map = identifier_map + %Identifier{ + position: map["position"], + siteId: map["siteId"] + } + end + + # TODO: Add struct validation? +end + +defmodule Codeshare.Character do + alias __MODULE__ + alias Codeshare.Identifier + + defstruct [ + ch: "", + identifiers: [%Identifier{}] + ] + + def to_struct(character_map) do + map = character_map + %Character{ + ch: map["ch"], + identifiers: to_identifiers_struct_list(map["identifiers"]) + } + end + + defp to_identifiers_struct_list(identifiers_map_list) do + if identifiers_map_list == [] do + [] + else + [h | t] = identifiers_map_list + [Identifier.to_struct(h) | to_identifiers_struct_list(t)] + end + end + + # TODO: Add struct validation? +end \ No newline at end of file diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex new file mode 100644 index 0000000..5952001 --- /dev/null +++ b/lib/codeshare/crdt/crdt.ex @@ -0,0 +1,63 @@ +defmodule Codeshare.CRDT do + use Agent + alias Codeshare.{Character, Identifier} + + def start_link() do + fn -> [[ + %Character{ + ch: "", + identifiers: [%Identifier{ + position: 0, + siteId: -1 + }] + }, + %Character{ + ch: "", + identifiers: [%Identifier{ + position: 1, + siteId: 16777216 #TODO: Infinity for now; think of something + }] + } + ]] + end |> Agent.start_link(name: __MODULE__) + # TODO: Registered with module name, which doesn't allow server crdt per session + # (It allows for only one server crdt process) + # Need another process to map session id with corresponding server crdt pid + end + + def put(payload) do + character = Character.to_struct(payload["character"]) + case Map.get(payload, "type") do + "input" -> + remote_insert(character) + "delete" -> + remote_delete(character) + "inputnewline" -> + remote_insert_newline(character) + "deletenewline" -> + remote_delete_newline(character) + end + # Agent.update(__MODULE__, fn list -> [payload | list] end) + end + + def show() do + IO.inspect Agent.get(__MODULE__, & &1) + end + + defp remote_insert(character) do + "insert" + end + + defp remote_delete(character) do + "delete" + end + + defp remote_insert_newline(character) do + "inputnewline" + end + + defp remote_delete_newline(character) do + "deletenewline" + end + +end \ No newline at end of file From 1b1ddaa0c23523ba662212486c93ea38732152ed Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Sun, 8 Mar 2020 18:15:35 +0530 Subject: [PATCH 2/9] server_crdt: Add primitive identifier and character functions --- lib/codeshare/crdt/character.ex | 69 +++++++++++++++++++++- lib/codeshare/crdt/crdt.ex | 4 +- lib/codeshare_web/channels/room_channel.ex | 1 + 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/lib/codeshare/crdt/character.ex b/lib/codeshare/crdt/character.ex index 88bc6f0..955f84e 100644 --- a/lib/codeshare/crdt/character.ex +++ b/lib/codeshare/crdt/character.ex @@ -3,17 +3,29 @@ defmodule Codeshare.Identifier do defstruct [ position: 0, - siteId: -1 + siteID: -1 ] def to_struct(identifier_map) do map = identifier_map %Identifier{ position: map["position"], - siteId: map["siteId"] + siteID: map["siteID"] } end + def is_equal(id1, id2) do + id1.position == id2.position && id1.siteID == id2.siteID + end + + def is_greater(id1, id2) do + if id1.position == id2.position do + id1.siteID > id2.siteID + else + id1.position > id2.position + end + end + # TODO: Add struct validation? end @@ -41,6 +53,59 @@ defmodule Codeshare.Character do [h | t] = identifiers_map_list [Identifier.to_struct(h) | to_identifiers_struct_list(t)] end + end + + def is_equal(character1, character2) do + if character1.ch != character2.ch || + length(character1.identifiers) != length(character2.identifiers) do + false + else + is_identifier_list_equal(character1.identifiers, + character2.identifiers) + end + end + + defp is_identifier_list_equal(id_list1, id_list2) do + if id_list1 == [] && id_list2 == [] do + true + else + [id1 | id_list1] = id_list1 + [id2 | id_list2] = id_list2 + + if Identifier.is_equal(id1, id2) do + is_identifier_list_equal(id_list1, id_list2) + else + false + end + end + end + + def is_greater(character1, character2) do + is_identifier_list_greater(character1.identifiers, + character2.identifiers) + end + + defp is_identifier_list_greater(id_list1, id_list2) do + + cond do + id_list1 == [] -> + false + id_list2 == [] -> + true + true -> + [id1 | id_list1] = id_list1 + [id2 | id_list2] = id_list2 + + cond do + Identifier.is_greater(id1, id2) -> + true + Identifier.is_greater(id2, id1) -> + false + Identifier.is_equal(id1, id2) -> + is_identifier_list_greater(id_list1, id_list2) + end + end + end # TODO: Add struct validation? diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index 5952001..b3db4f3 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -8,14 +8,14 @@ defmodule Codeshare.CRDT do ch: "", identifiers: [%Identifier{ position: 0, - siteId: -1 + siteID: -1 }] }, %Character{ ch: "", identifiers: [%Identifier{ position: 1, - siteId: 16777216 #TODO: Infinity for now; think of something + siteID: 16777216 #TODO: Infinity for now; think of something }] } ]] diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index ae8bf1e..5de6e00 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -2,6 +2,7 @@ defmodule CodeshareWeb.RoomChannel do use CodeshareWeb, :channel import Ecto.Query, only: [from: 2] alias CodeshareWeb.Presence + alias Codeshare.{Identifier, Character} def join("room:" <> room_id, payload, socket) do if authorized?(payload) do From 1735603f83459445dfd999f4bb865f79cb5e9de5 Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Mon, 16 Mar 2020 12:33:51 +0530 Subject: [PATCH 3/9] server_crdt: Implement remote_insert --- lib/codeshare/application.ex | 3 +- lib/codeshare/crdt/crdt.ex | 35 ++++++++++++++++++---- lib/codeshare_web/channels/room_channel.ex | 7 ++++- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/lib/codeshare/application.ex b/lib/codeshare/application.ex index 96f842a..d62a5ca 100644 --- a/lib/codeshare/application.ex +++ b/lib/codeshare/application.ex @@ -14,7 +14,8 @@ defmodule Codeshare.Application do CodeshareWeb.Endpoint, # Starts a worker by calling: Codeshare.Worker.start_link(arg) # {Codeshare.Worker, arg}, - CodeshareWeb.Presence + CodeshareWeb.Presence, + Codeshare.CRDT ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index b3db4f3..bd96d14 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -1,8 +1,10 @@ defmodule Codeshare.CRDT do + alias __MODULE__ use Agent alias Codeshare.{Character, Identifier} - def start_link() do + # [] arg required by supervisor + def start_link([]) do fn -> [[ %Character{ ch: "", @@ -18,7 +20,7 @@ defmodule Codeshare.CRDT do siteID: 16777216 #TODO: Infinity for now; think of something }] } - ]] + ]] end |> Agent.start_link(name: __MODULE__) # TODO: Registered with module name, which doesn't allow server crdt per session # (It allows for only one server crdt process) @@ -40,12 +42,12 @@ defmodule Codeshare.CRDT do # Agent.update(__MODULE__, fn list -> [payload | list] end) end - def show() do - IO.inspect Agent.get(__MODULE__, & &1) + def get() do + Agent.get(__MODULE__, & &1) end defp remote_insert(character) do - "insert" + Agent.update(__MODULE__, fn crdt -> insert_character(crdt, character) end) end defp remote_delete(character) do @@ -60,4 +62,27 @@ defmodule Codeshare.CRDT do "deletenewline" end + defp insert_character(crdt, character) do + + [ first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + [insert_character_on_line(first_line, character) | crdt] + else + [first_line | insert_character(crdt, character)] + end + end + + defp insert_character_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_greater(first_ch, character) do + [character | [ first_ch | line] ] + else + [first_ch | insert_character_on_line(line, character)] + end + end + end \ No newline at end of file diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index 5de6e00..d9c6d6f 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -2,7 +2,7 @@ defmodule CodeshareWeb.RoomChannel do use CodeshareWeb, :channel import Ecto.Query, only: [from: 2] alias CodeshareWeb.Presence - alias Codeshare.{Identifier, Character} + alias Codeshare.CRDT def join("room:" <> room_id, payload, socket) do if authorized?(payload) do @@ -31,6 +31,11 @@ defmodule CodeshareWeb.RoomChannel do # It is also common to receive messages from the client and # broadcast to everyone in the current topic (room:lobby). def handle_in("shout", payload, socket) do + + # Update server-side CRDT + CRDT.put(payload) + IO.inspect CRDT.get() + editor = %Codeshare.Editor{} changeset = Codeshare.Editor.changeset(editor, %{data: payload}) Codeshare.Repo.insert(changeset) From 98ff260679c2b3e09feaf37ed8a6dfebec19e70b Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Mon, 16 Mar 2020 20:33:43 +0530 Subject: [PATCH 4/9] server_crdt: Implement all remote functions Testing of consistency of client and server CRDTs not done yet. --- lib/codeshare/crdt/crdt.ex | 77 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index bd96d14..9c4d0d1 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -51,20 +51,20 @@ defmodule Codeshare.CRDT do end defp remote_delete(character) do - "delete" + Agent.update(__MODULE__, fn crdt -> delete_character(crdt, character) end) end defp remote_insert_newline(character) do - "inputnewline" + Agent.update(__MODULE__, fn crdt -> insert_newline(crdt, character) end) end defp remote_delete_newline(character) do - "deletenewline" + Agent.update(__MODULE__, fn crdt -> delete_newline(crdt, character) end) end defp insert_character(crdt, character) do - [ first_line | crdt] = crdt + [first_line | crdt] = crdt last_ch = List.last(first_line) # NOTE: Takes linear time :( if Character.is_greater(last_ch, character) do @@ -85,4 +85,73 @@ defmodule Codeshare.CRDT do end end + defp delete_character(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + [delete_character_on_line(first_line, character) | crdt] + else + [first_line | delete_character(crdt, character)] + end + end + + defp delete_character_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_equal(first_ch, character) do + line + else + [first_ch | delete_character_on_line(line, character)] + end + end + + defp insert_newline(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + insert_newline_on_line(first_line, character) ++ crdt + else + [first_line | insert_character(crdt, character)] + end + end + + defp insert_newline_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_greater(first_ch, character) do + delimeter_ch = %Character{ + ch: "", + identifiers: character.identifiers + } + line1 = [delimeter_ch] + line2 = [delimeter_ch | [first_ch | line] ] + [line1, line2] + else + lines = insert_newline_on_line(line, character) + [line1 | [line2]] = lines + line1 = [first_ch | line1] + [line1 | [line2]] + end + end + + defp delete_newline(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_equal(last_ch, character) do + line1 = first_line |> Enum.reverse |> tl |> Enum.reverse + [[_ | line2] | crdt] = crdt + [line1++line2 | crdt] + else + [first_line | insert_character(crdt, character)] + end + end + end \ No newline at end of file From 1b55090065b083b5f398bc06ad199fc7c79685d0 Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Tue, 17 Mar 2020 11:33:32 +0530 Subject: [PATCH 5/9] server_crdt: Implement 'check' btn Check button implemented to check the consistency between client and server crdt Fix bug in insert_newline and delete_newline --- assets/js/app.js | 11 ++++++++ lib/codeshare/crdt/character.ex | 22 ++++++++++++++++ lib/codeshare/crdt/crdt.ex | 26 +++++++++++++++++-- lib/codeshare_web/channels/room_channel.ex | 15 ++++++++++- .../templates/page/index.html.eex | 2 ++ 5 files changed, 73 insertions(+), 3 deletions(-) diff --git a/assets/js/app.js b/assets/js/app.js index 41856f9..ff28326 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -245,3 +245,14 @@ return ` ` } + +// Check if local CRDT consistent with server CRDT +var check_btn = document.getElementById("check_btn"); +check_btn.onclick = function() { + channel.push("check", { + value: crdt.toString() + }).receive("ok", resp => { + console.log(`Response: ${resp["flag"]}`) + }) +} + diff --git a/lib/codeshare/crdt/character.ex b/lib/codeshare/crdt/character.ex index 955f84e..4340864 100644 --- a/lib/codeshare/crdt/character.ex +++ b/lib/codeshare/crdt/character.ex @@ -14,6 +14,15 @@ defmodule Codeshare.Identifier do } end + def to_string(identifier) do + # Client and server have diff infinity values! + if identifier.siteID == 16777216 do + "[#{identifier.position}, Infinity]" + else + "[#{identifier.position}, #{identifier.siteID}]" + end + end + def is_equal(id1, id2) do id1.position == id2.position && id1.siteID == id2.siteID end @@ -46,6 +55,10 @@ defmodule Codeshare.Character do } end + def to_string(character) do + "{#{character.ch}: [#{to_string_id_list(character.identifiers)}]}" + end + defp to_identifiers_struct_list(identifiers_map_list) do if identifiers_map_list == [] do [] @@ -108,5 +121,14 @@ defmodule Codeshare.Character do end + defp to_string_id_list(id_list) do + [first_id | id_list] = id_list + if id_list == [] do + "#{Identifier.to_string(first_id)}" + else + "#{Identifier.to_string(first_id)}, #{to_string_id_list(id_list)}" + end + end + # TODO: Add struct validation? end \ No newline at end of file diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index 9c4d0d1..9dff959 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -46,6 +46,10 @@ defmodule Codeshare.CRDT do Agent.get(__MODULE__, & &1) end + def get_string() do + Agent.get(__MODULE__, & convert_to_string(&1)) + end + defp remote_insert(character) do Agent.update(__MODULE__, fn crdt -> insert_character(crdt, character) end) end @@ -116,7 +120,7 @@ defmodule Codeshare.CRDT do if Character.is_greater(last_ch, character) do insert_newline_on_line(first_line, character) ++ crdt else - [first_line | insert_character(crdt, character)] + [first_line | insert_newline(crdt, character)] end end @@ -150,7 +154,25 @@ defmodule Codeshare.CRDT do [[_ | line2] | crdt] = crdt [line1++line2 | crdt] else - [first_line | insert_character(crdt, character)] + [first_line | delete_newline(crdt, character)] + end + end + + defp convert_to_string(crdt) do + if crdt == [] do + "" + else + [first_line | crdt] = crdt + "#{convert_line_to_string(first_line)}\n#{convert_to_string(crdt)}" + end + end + + defp convert_line_to_string(line) do + [first_ch | line] = line + if line == [] do + "#{Character.to_string(first_ch)}" + else + "#{Character.to_string(first_ch)}, #{convert_line_to_string(line)}" end end diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index d9c6d6f..8990559 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -34,7 +34,7 @@ defmodule CodeshareWeb.RoomChannel do # Update server-side CRDT CRDT.put(payload) - IO.inspect CRDT.get() + # IO.inspect CRDT.get() editor = %Codeshare.Editor{} changeset = Codeshare.Editor.changeset(editor, %{data: payload}) @@ -49,6 +49,19 @@ defmodule CodeshareWeb.RoomChannel do {:noreply, socket} end + def handle_in("check", payload, socket) do + IO.puts "CRDT: #{socket.assigns.user_id}:" + IO.puts payload["value"] + IO.puts "CRDT: Server:" + IO.puts CRDT.get_string() + + if payload["value"] == CRDT.get_string() do + {:reply, {:ok, %{"flag" => true}}, socket} + else + {:reply, {:ok, %{"flag" => false}}, socket} + end + end + def terminate(_ , socket) do # When last active channel is terminating, # empty the database diff --git a/lib/codeshare_web/templates/page/index.html.eex b/lib/codeshare_web/templates/page/index.html.eex index 0fa96e3..454b926 100644 --- a/lib/codeshare_web/templates/page/index.html.eex +++ b/lib/codeshare_web/templates/page/index.html.eex @@ -45,6 +45,8 @@ function updateMode() { } + + Current user:
From 73eac068cfba43eae1b13e6958fd1ac19cd4cdf1 Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Sun, 22 Mar 2020 21:35:41 +0530 Subject: [PATCH 6/9] server_crdt: Reorganise code and add documentation --- lib/codeshare/crdt/character.ex | 64 +++++++++++++++++++++++++-------- lib/codeshare/crdt/crdt.ex | 15 ++++++++ 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/lib/codeshare/crdt/character.ex b/lib/codeshare/crdt/character.ex index 4340864..e744e6b 100644 --- a/lib/codeshare/crdt/character.ex +++ b/lib/codeshare/crdt/character.ex @@ -1,4 +1,8 @@ defmodule Codeshare.Identifier do + @moduledoc """ + List of identifiers define the 'position' + of a character in CRDT + """ alias __MODULE__ defstruct [ @@ -6,6 +10,10 @@ defmodule Codeshare.Identifier do siteID: -1 ] + @doc """ + Helper to convert payload from channel + to struct + """ def to_struct(identifier_map) do map = identifier_map %Identifier{ @@ -14,6 +22,9 @@ defmodule Codeshare.Identifier do } end + @doc """ + Convert to string representation + """ def to_string(identifier) do # Client and server have diff infinity values! if identifier.siteID == 16777216 do @@ -23,10 +34,16 @@ defmodule Codeshare.Identifier do end end + @doc """ + Check if two identifiers are equal + """ def is_equal(id1, id2) do id1.position == id2.position && id1.siteID == id2.siteID end + @doc """ + is `id1` > `id2` ? [Identifiers] + """ def is_greater(id1, id2) do if id1.position == id2.position do id1.siteID > id2.siteID @@ -39,6 +56,10 @@ defmodule Codeshare.Identifier do end defmodule Codeshare.Character do + @moduledoc """ + Represensts each 'character' in + the CRDT text + """ alias __MODULE__ alias Codeshare.Identifier @@ -47,6 +68,10 @@ defmodule Codeshare.Character do identifiers: [%Identifier{}] ] + @doc """ + Helper to convert payload from channel + to struct + """ def to_struct(character_map) do map = character_map %Character{ @@ -55,19 +80,16 @@ defmodule Codeshare.Character do } end + @doc """ + Convert to string representation + """ def to_string(character) do "{#{character.ch}: [#{to_string_id_list(character.identifiers)}]}" end - defp to_identifiers_struct_list(identifiers_map_list) do - if identifiers_map_list == [] do - [] - else - [h | t] = identifiers_map_list - [Identifier.to_struct(h) | to_identifiers_struct_list(t)] - end - end - + @doc """ + Check if two characters are equal + """ def is_equal(character1, character2) do if character1.ch != character2.ch || length(character1.identifiers) != length(character2.identifiers) do @@ -78,6 +100,25 @@ defmodule Codeshare.Character do end end + @doc """ + is `character1` > `character2` ? + """ + def is_greater(character1, character2) do + is_identifier_list_greater(character1.identifiers, + character2.identifiers) + end + + # Helper functions + + defp to_identifiers_struct_list(identifiers_map_list) do + if identifiers_map_list == [] do + [] + else + [h | t] = identifiers_map_list + [Identifier.to_struct(h) | to_identifiers_struct_list(t)] + end + end + defp is_identifier_list_equal(id_list1, id_list2) do if id_list1 == [] && id_list2 == [] do true @@ -93,11 +134,6 @@ defmodule Codeshare.Character do end end - def is_greater(character1, character2) do - is_identifier_list_greater(character1.identifiers, - character2.identifiers) - end - defp is_identifier_list_greater(id_list1, id_list2) do cond do diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index 9dff959..d34dd53 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -1,4 +1,7 @@ defmodule Codeshare.CRDT do + @moduledoc """ + Manages the server-side CRDT + """ alias __MODULE__ use Agent alias Codeshare.{Character, Identifier} @@ -27,6 +30,10 @@ defmodule Codeshare.CRDT do # Need another process to map session id with corresponding server crdt pid end + @doc """ + Put payload data recived from channel + into CRDT + """ def put(payload) do character = Character.to_struct(payload["character"]) case Map.get(payload, "type") do @@ -42,14 +49,22 @@ defmodule Codeshare.CRDT do # Agent.update(__MODULE__, fn list -> [payload | list] end) end + @doc """ + Get CRDT data + """ def get() do Agent.get(__MODULE__, & &1) end + @doc """ + Get CRDT string representation + """ def get_string() do Agent.get(__MODULE__, & convert_to_string(&1)) end + # Helper functions + defp remote_insert(character) do Agent.update(__MODULE__, fn crdt -> insert_character(crdt, character) end) end From ba0d4cac0d3e969ea130cdbfcf0dad32d19936e9 Mon Sep 17 00:00:00 2001 From: Shanthanu Date: Mon, 23 Mar 2020 14:46:26 +0530 Subject: [PATCH 7/9] server_crdt: Add CRDT Registry and Supervisor Each session now has it's own CRDT process. The mapping session_id -> crdt pid is handled by CRDT.Registry. CRDT.Supervisor currently just supervises the Registry (should also dynmically supervise CRDT pids) --- lib/codeshare/application.ex | 4 +- lib/codeshare/crdt/crdt.ex | 40 +++++++-------- lib/codeshare/crdt/registry.ex | 60 ++++++++++++++++++++++ lib/codeshare/crdt/supervisor.ex | 22 ++++++++ lib/codeshare_web/channels/room_channel.ex | 9 ++-- 5 files changed, 110 insertions(+), 25 deletions(-) create mode 100644 lib/codeshare/crdt/registry.ex create mode 100644 lib/codeshare/crdt/supervisor.ex diff --git a/lib/codeshare/application.ex b/lib/codeshare/application.ex index d62a5ca..f305107 100644 --- a/lib/codeshare/application.ex +++ b/lib/codeshare/application.ex @@ -14,8 +14,10 @@ defmodule Codeshare.Application do CodeshareWeb.Endpoint, # Starts a worker by calling: Codeshare.Worker.start_link(arg) # {Codeshare.Worker, arg}, + # Start Presence CodeshareWeb.Presence, - Codeshare.CRDT + # Start CRDT Supervisor + Codeshare.CRDT.Supervisor ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index d34dd53..cb5c44e 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -2,12 +2,11 @@ defmodule Codeshare.CRDT do @moduledoc """ Manages the server-side CRDT """ - alias __MODULE__ use Agent alias Codeshare.{Character, Identifier} # [] arg required by supervisor - def start_link([]) do + def start_link(_opts) do fn -> [[ %Character{ ch: "", @@ -24,7 +23,7 @@ defmodule Codeshare.CRDT do }] } ]] - end |> Agent.start_link(name: __MODULE__) + end |> Agent.start_link() # TODO: Registered with module name, which doesn't allow server crdt per session # (It allows for only one server crdt process) # Need another process to map session id with corresponding server crdt pid @@ -34,51 +33,50 @@ defmodule Codeshare.CRDT do Put payload data recived from channel into CRDT """ - def put(payload) do + def put(crdt, payload) do character = Character.to_struct(payload["character"]) case Map.get(payload, "type") do "input" -> - remote_insert(character) + remote_insert(crdt, character) "delete" -> - remote_delete(character) + remote_delete(crdt, character) "inputnewline" -> - remote_insert_newline(character) + remote_insert_newline(crdt, character) "deletenewline" -> - remote_delete_newline(character) + remote_delete_newline(crdt, character) end - # Agent.update(__MODULE__, fn list -> [payload | list] end) end @doc """ Get CRDT data """ - def get() do - Agent.get(__MODULE__, & &1) + def get(crdt) do + Agent.get(crdt, & &1) end @doc """ Get CRDT string representation """ - def get_string() do - Agent.get(__MODULE__, & convert_to_string(&1)) + def get_string(crdt) do + Agent.get(crdt, & convert_to_string(&1)) end # Helper functions - defp remote_insert(character) do - Agent.update(__MODULE__, fn crdt -> insert_character(crdt, character) end) + defp remote_insert(crdt, character) do + Agent.update(crdt, fn crdt -> insert_character(crdt, character) end) end - defp remote_delete(character) do - Agent.update(__MODULE__, fn crdt -> delete_character(crdt, character) end) + defp remote_delete(crdt, character) do + Agent.update(crdt, fn crdt -> delete_character(crdt, character) end) end - defp remote_insert_newline(character) do - Agent.update(__MODULE__, fn crdt -> insert_newline(crdt, character) end) + defp remote_insert_newline(crdt, character) do + Agent.update(crdt, fn crdt -> insert_newline(crdt, character) end) end - defp remote_delete_newline(character) do - Agent.update(__MODULE__, fn crdt -> delete_newline(crdt, character) end) + defp remote_delete_newline(crdt, character) do + Agent.update(crdt, fn crdt -> delete_newline(crdt, character) end) end defp insert_character(crdt, character) do diff --git a/lib/codeshare/crdt/registry.ex b/lib/codeshare/crdt/registry.ex new file mode 100644 index 0000000..d90c032 --- /dev/null +++ b/lib/codeshare/crdt/registry.ex @@ -0,0 +1,60 @@ +defmodule Codeshare.CRDT.Registry do + @moduledoc """ + Registry mapping session id -> crdt process + """ + alias __MODULE__ + use GenServer + alias Codeshare.CRDT + + ## API to accessing CRDT + + @doc """ + Start the registry + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, :ok, opts) + end + + def create(server, session_id) do + GenServer.cast(server, {:create, session_id}) + end + + def put(server, session_id, payload) do + GenServer.cast(server, {:put, session_id, payload}) + end + + def get_string(server, session_id) do + GenServer.call(server, {:get_string, session_id}) + end + + ## Defining GenSever Callback + + @impl true + def init(:ok) do + {:ok, %{}} + end + + @impl true + def handle_cast({:create, session_id}, sessions) do + if Map.has_key?(sessions, session_id) do + {:noreply, sessions} + else + {:ok, crdt} = CRDT.start_link([]) + {:noreply, Map.put(sessions, session_id, crdt)} + end + end + + @impl true + def handle_cast({:put, session_id, payload}, sessions) do + {:ok, crdt} = Map.fetch(sessions, session_id) + CRDT.put(crdt, payload) + {:noreply, sessions} + end + + @impl true + def handle_call({:get_string, session_id}, _from, sessions) do + {:ok, crdt} = Map.fetch(sessions, session_id) + {:reply, CRDT.get_string(crdt), sessions} + end + +end \ No newline at end of file diff --git a/lib/codeshare/crdt/supervisor.ex b/lib/codeshare/crdt/supervisor.ex new file mode 100644 index 0000000..9ef89d9 --- /dev/null +++ b/lib/codeshare/crdt/supervisor.ex @@ -0,0 +1,22 @@ +defmodule Codeshare.CRDT.Supervisor do + @moduledoc """ + Supervise CRDT Registry + and CRDT processes + """ + use Supervisor + alias Codeshare.CRDT + + def start_link(opts) do + Supervisor.start_link(__MODULE__, :ok, opts) + end + + @impl true + def init(:ok) do + children = [ + {CRDT.Registry, name: CRDT.Registry} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + +end \ No newline at end of file diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index 983863c..245f40f 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -6,6 +6,9 @@ defmodule CodeshareWeb.RoomChannel do def join("room:" <> room_id, payload, socket) do if authorized?(payload) do + # Add CRDT process + CRDT.Registry.create(CRDT.Registry, room_id) + query = from entry in "editor_state", select: entry.data, where: entry.room_id == ^(room_id) @@ -47,7 +50,7 @@ defmodule CodeshareWeb.RoomChannel do def handle_in("shout", payload, socket) do # Update server-side CRDT - CRDT.put(payload) + CRDT.Registry.put(CRDT.Registry, socket.assigns.room_id, payload) # IO.inspect CRDT.get() editor = %Codeshare.Editor{} @@ -67,9 +70,9 @@ defmodule CodeshareWeb.RoomChannel do IO.puts "CRDT: #{socket.assigns.user_id}:" IO.puts payload["value"] IO.puts "CRDT: Server:" - IO.puts CRDT.get_string() + IO.puts CRDT.Registry.get_string(CRDT.Registry, socket.assigns.room_id) - if payload["value"] == CRDT.get_string() do + if payload["value"] == CRDT.Registry.get_string(CRDT.Registry, socket.assigns.room_id) do {:reply, {:ok, %{"flag" => true}}, socket} else {:reply, {:ok, %{"flag" => false}}, socket} From d0f5e30d2f0cb3bf29c44ce0392c2e2b2fdccc6f Mon Sep 17 00:00:00 2001 From: Shanthanu Rai Date: Wed, 1 Apr 2020 19:44:24 +0530 Subject: [PATCH 8/9] server_crdt: Registry monitors CRDT processes Registry now knows when a CRDT process is stopped. Hence, this CRDT process can be removed from the Registry's map. --- lib/codeshare/crdt/crdt.ex | 4 +++ lib/codeshare/crdt/registry.ex | 53 +++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex index cb5c44e..c2b85fa 100644 --- a/lib/codeshare/crdt/crdt.ex +++ b/lib/codeshare/crdt/crdt.ex @@ -61,6 +61,10 @@ defmodule Codeshare.CRDT do Agent.get(crdt, & convert_to_string(&1)) end + def stop(crdt) do + Agent.stop(crdt) + end + # Helper functions defp remote_insert(crdt, character) do diff --git a/lib/codeshare/crdt/registry.ex b/lib/codeshare/crdt/registry.ex index d90c032..03ec107 100644 --- a/lib/codeshare/crdt/registry.ex +++ b/lib/codeshare/crdt/registry.ex @@ -23,38 +23,75 @@ defmodule Codeshare.CRDT.Registry do GenServer.cast(server, {:put, session_id, payload}) end + def stop(server, session_id) do + GenServer.cast(server, {:stop, session_id}) + end + def get_string(server, session_id) do GenServer.call(server, {:get_string, session_id}) end + # Debugging function + def get_sessions(server) do + GenServer.call(server, {:get_sessions}) + end + ## Defining GenSever Callback @impl true def init(:ok) do - {:ok, %{}} + sessions = %{} + refs = %{} + {:ok, {sessions, refs}} end @impl true - def handle_cast({:create, session_id}, sessions) do + def handle_cast({:create, session_id}, {sessions, refs}) do if Map.has_key?(sessions, session_id) do - {:noreply, sessions} + {:noreply, {sessions, refs}} else {:ok, crdt} = CRDT.start_link([]) - {:noreply, Map.put(sessions, session_id, crdt)} + ref = Process.monitor(crdt) + refs = Map.put(refs, ref, session_id) + sessions = Map.put(sessions, session_id, crdt) + {:noreply, {sessions, refs}} end end @impl true - def handle_cast({:put, session_id, payload}, sessions) do + def handle_cast({:put, session_id, payload}, {sessions, refs}) do {:ok, crdt} = Map.fetch(sessions, session_id) CRDT.put(crdt, payload) - {:noreply, sessions} + {:noreply, {sessions, refs}} + end + + @impl true + def handle_cast({:stop, session_id}, {sessions, refs}) do + {:ok, crdt} = Map.fetch(sessions, session_id) + CRDT.stop(crdt) + {:noreply, {sessions, refs}} end @impl true - def handle_call({:get_string, session_id}, _from, sessions) do + def handle_call({:get_string, session_id}, _from, {sessions, refs}) do {:ok, crdt} = Map.fetch(sessions, session_id) - {:reply, CRDT.get_string(crdt), sessions} + {:reply, CRDT.get_string(crdt), {sessions, refs}} end + @impl true + def handle_call({:get_sessions}, _from, {sessions, refs}) do + {:reply, sessions, {sessions, refs}} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, {sessions, refs}) do + {session_id, refs} = Map.pop(refs, ref) + sessions = Map.delete(sessions, session_id) + {:noreply, {sessions, refs}} + end + + @impl true + def handle_info(_msg, state) do + {:noreply, state} + end end \ No newline at end of file From 943489d54e97e56a63cc1039bf24c792fb00012f Mon Sep 17 00:00:00 2001 From: Shanthanu Rai Date: Wed, 1 Apr 2020 19:55:02 +0530 Subject: [PATCH 9/9] room_channel.ex: Stop CRDT process on session termination --- lib/codeshare_web/channels/room_channel.ex | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index 245f40f..7c22366 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -8,6 +8,11 @@ defmodule CodeshareWeb.RoomChannel do if authorized?(payload) do # Add CRDT process CRDT.Registry.create(CRDT.Registry, room_id) + + # Print active sessions + sessions = CRDT.Registry.get_sessions(CRDT.Registry) + IO.puts "Active sessions" + IO.inspect sessions query = from entry in "editor_state", select: entry.data, @@ -86,6 +91,10 @@ defmodule CodeshareWeb.RoomChannel do query = from editor in "editor_state", select: editor.data Codeshare.Repo.delete_all(query) + + # Stop CRDT process + CRDT.Registry.stop(CRDT.Registry, socket.assigns.room_id) + IO.puts "Session #{String.slice(socket.assigns.room_id, 0..6)} terminated" end :ok end