diff --git a/assets/js/app.js b/assets/js/app.js index f65de2e..118e3b5 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -277,3 +277,14 @@ return ` ` } + +// Check if local CRDT consistent with server CRDT +var check_btn = document.getElementById("check_btn"); +check_btn.onclick = function() { + channel.push("check", { + value: crdt.toString() + }).receive("ok", resp => { + console.log(`Response: ${resp["flag"]}`) + }) +} + diff --git a/lib/codeshare/application.ex b/lib/codeshare/application.ex index 96f842a..f305107 100644 --- a/lib/codeshare/application.ex +++ b/lib/codeshare/application.ex @@ -14,7 +14,10 @@ defmodule Codeshare.Application do CodeshareWeb.Endpoint, # Starts a worker by calling: Codeshare.Worker.start_link(arg) # {Codeshare.Worker, arg}, - CodeshareWeb.Presence + # Start Presence + CodeshareWeb.Presence, + # Start CRDT Supervisor + Codeshare.CRDT.Supervisor ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/codeshare/crdt/character.ex b/lib/codeshare/crdt/character.ex new file mode 100644 index 0000000..e744e6b --- /dev/null +++ b/lib/codeshare/crdt/character.ex @@ -0,0 +1,170 @@ +defmodule Codeshare.Identifier do + @moduledoc """ + List of identifiers define the 'position' + of a character in CRDT + """ + alias __MODULE__ + + defstruct [ + position: 0, + siteID: -1 + ] + + @doc """ + Helper to convert payload from channel + to struct + """ + def to_struct(identifier_map) do + map = identifier_map + %Identifier{ + position: map["position"], + siteID: map["siteID"] + } + end + + @doc """ + Convert to string representation + """ + def to_string(identifier) do + # Client and server have diff infinity values! + if identifier.siteID == 16777216 do + "[#{identifier.position}, Infinity]" + else + "[#{identifier.position}, #{identifier.siteID}]" + end + end + + @doc """ + Check if two identifiers are equal + """ + def is_equal(id1, id2) do + id1.position == id2.position && id1.siteID == id2.siteID + end + + @doc """ + is `id1` > `id2` ? [Identifiers] + """ + def is_greater(id1, id2) do + if id1.position == id2.position do + id1.siteID > id2.siteID + else + id1.position > id2.position + end + end + + # TODO: Add struct validation? +end + +defmodule Codeshare.Character do + @moduledoc """ + Represensts each 'character' in + the CRDT text + """ + alias __MODULE__ + alias Codeshare.Identifier + + defstruct [ + ch: "", + identifiers: [%Identifier{}] + ] + + @doc """ + Helper to convert payload from channel + to struct + """ + def to_struct(character_map) do + map = character_map + %Character{ + ch: map["ch"], + identifiers: to_identifiers_struct_list(map["identifiers"]) + } + end + + @doc """ + Convert to string representation + """ + def to_string(character) do + "{#{character.ch}: [#{to_string_id_list(character.identifiers)}]}" + end + + @doc """ + Check if two characters are equal + """ + def is_equal(character1, character2) do + if character1.ch != character2.ch || + length(character1.identifiers) != length(character2.identifiers) do + false + else + is_identifier_list_equal(character1.identifiers, + character2.identifiers) + end + end + + @doc """ + is `character1` > `character2` ? + """ + def is_greater(character1, character2) do + is_identifier_list_greater(character1.identifiers, + character2.identifiers) + end + + # Helper functions + + defp to_identifiers_struct_list(identifiers_map_list) do + if identifiers_map_list == [] do + [] + else + [h | t] = identifiers_map_list + [Identifier.to_struct(h) | to_identifiers_struct_list(t)] + end + end + + defp is_identifier_list_equal(id_list1, id_list2) do + if id_list1 == [] && id_list2 == [] do + true + else + [id1 | id_list1] = id_list1 + [id2 | id_list2] = id_list2 + + if Identifier.is_equal(id1, id2) do + is_identifier_list_equal(id_list1, id_list2) + else + false + end + end + end + + defp is_identifier_list_greater(id_list1, id_list2) do + + cond do + id_list1 == [] -> + false + id_list2 == [] -> + true + true -> + [id1 | id_list1] = id_list1 + [id2 | id_list2] = id_list2 + + cond do + Identifier.is_greater(id1, id2) -> + true + Identifier.is_greater(id2, id1) -> + false + Identifier.is_equal(id1, id2) -> + is_identifier_list_greater(id_list1, id_list2) + end + end + + end + + defp to_string_id_list(id_list) do + [first_id | id_list] = id_list + if id_list == [] do + "#{Identifier.to_string(first_id)}" + else + "#{Identifier.to_string(first_id)}, #{to_string_id_list(id_list)}" + end + end + + # TODO: Add struct validation? +end \ No newline at end of file diff --git a/lib/codeshare/crdt/crdt.ex b/lib/codeshare/crdt/crdt.ex new file mode 100644 index 0000000..c2b85fa --- /dev/null +++ b/lib/codeshare/crdt/crdt.ex @@ -0,0 +1,196 @@ +defmodule Codeshare.CRDT do + @moduledoc """ + Manages the server-side CRDT + """ + use Agent + alias Codeshare.{Character, Identifier} + + # [] arg required by supervisor + def start_link(_opts) do + fn -> [[ + %Character{ + ch: "", + identifiers: [%Identifier{ + position: 0, + siteID: -1 + }] + }, + %Character{ + ch: "", + identifiers: [%Identifier{ + position: 1, + siteID: 16777216 #TODO: Infinity for now; think of something + }] + } + ]] + end |> Agent.start_link() + # TODO: Registered with module name, which doesn't allow server crdt per session + # (It allows for only one server crdt process) + # Need another process to map session id with corresponding server crdt pid + end + + @doc """ + Put payload data recived from channel + into CRDT + """ + def put(crdt, payload) do + character = Character.to_struct(payload["character"]) + case Map.get(payload, "type") do + "input" -> + remote_insert(crdt, character) + "delete" -> + remote_delete(crdt, character) + "inputnewline" -> + remote_insert_newline(crdt, character) + "deletenewline" -> + remote_delete_newline(crdt, character) + end + end + + @doc """ + Get CRDT data + """ + def get(crdt) do + Agent.get(crdt, & &1) + end + + @doc """ + Get CRDT string representation + """ + def get_string(crdt) do + Agent.get(crdt, & convert_to_string(&1)) + end + + def stop(crdt) do + Agent.stop(crdt) + end + + # Helper functions + + defp remote_insert(crdt, character) do + Agent.update(crdt, fn crdt -> insert_character(crdt, character) end) + end + + defp remote_delete(crdt, character) do + Agent.update(crdt, fn crdt -> delete_character(crdt, character) end) + end + + defp remote_insert_newline(crdt, character) do + Agent.update(crdt, fn crdt -> insert_newline(crdt, character) end) + end + + defp remote_delete_newline(crdt, character) do + Agent.update(crdt, fn crdt -> delete_newline(crdt, character) end) + end + + defp insert_character(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + [insert_character_on_line(first_line, character) | crdt] + else + [first_line | insert_character(crdt, character)] + end + end + + defp insert_character_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_greater(first_ch, character) do + [character | [ first_ch | line] ] + else + [first_ch | insert_character_on_line(line, character)] + end + end + + defp delete_character(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + [delete_character_on_line(first_line, character) | crdt] + else + [first_line | delete_character(crdt, character)] + end + end + + defp delete_character_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_equal(first_ch, character) do + line + else + [first_ch | delete_character_on_line(line, character)] + end + end + + defp insert_newline(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_greater(last_ch, character) do + insert_newline_on_line(first_line, character) ++ crdt + else + [first_line | insert_newline(crdt, character)] + end + end + + defp insert_newline_on_line(line, character) do + + [first_ch | line] = line + + if Character.is_greater(first_ch, character) do + delimeter_ch = %Character{ + ch: "", + identifiers: character.identifiers + } + line1 = [delimeter_ch] + line2 = [delimeter_ch | [first_ch | line] ] + [line1, line2] + else + lines = insert_newline_on_line(line, character) + [line1 | [line2]] = lines + line1 = [first_ch | line1] + [line1 | [line2]] + end + end + + defp delete_newline(crdt, character) do + + [first_line | crdt] = crdt + last_ch = List.last(first_line) # NOTE: Takes linear time :( + + if Character.is_equal(last_ch, character) do + line1 = first_line |> Enum.reverse |> tl |> Enum.reverse + [[_ | line2] | crdt] = crdt + [line1++line2 | crdt] + else + [first_line | delete_newline(crdt, character)] + end + end + + defp convert_to_string(crdt) do + if crdt == [] do + "" + else + [first_line | crdt] = crdt + "#{convert_line_to_string(first_line)}\n#{convert_to_string(crdt)}" + end + end + + defp convert_line_to_string(line) do + [first_ch | line] = line + if line == [] do + "#{Character.to_string(first_ch)}" + else + "#{Character.to_string(first_ch)}, #{convert_line_to_string(line)}" + end + end + +end \ No newline at end of file diff --git a/lib/codeshare/crdt/registry.ex b/lib/codeshare/crdt/registry.ex new file mode 100644 index 0000000..03ec107 --- /dev/null +++ b/lib/codeshare/crdt/registry.ex @@ -0,0 +1,97 @@ +defmodule Codeshare.CRDT.Registry do + @moduledoc """ + Registry mapping session id -> crdt process + """ + alias __MODULE__ + use GenServer + alias Codeshare.CRDT + + ## API to accessing CRDT + + @doc """ + Start the registry + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, :ok, opts) + end + + def create(server, session_id) do + GenServer.cast(server, {:create, session_id}) + end + + def put(server, session_id, payload) do + GenServer.cast(server, {:put, session_id, payload}) + end + + def stop(server, session_id) do + GenServer.cast(server, {:stop, session_id}) + end + + def get_string(server, session_id) do + GenServer.call(server, {:get_string, session_id}) + end + + # Debugging function + def get_sessions(server) do + GenServer.call(server, {:get_sessions}) + end + + ## Defining GenSever Callback + + @impl true + def init(:ok) do + sessions = %{} + refs = %{} + {:ok, {sessions, refs}} + end + + @impl true + def handle_cast({:create, session_id}, {sessions, refs}) do + if Map.has_key?(sessions, session_id) do + {:noreply, {sessions, refs}} + else + {:ok, crdt} = CRDT.start_link([]) + ref = Process.monitor(crdt) + refs = Map.put(refs, ref, session_id) + sessions = Map.put(sessions, session_id, crdt) + {:noreply, {sessions, refs}} + end + end + + @impl true + def handle_cast({:put, session_id, payload}, {sessions, refs}) do + {:ok, crdt} = Map.fetch(sessions, session_id) + CRDT.put(crdt, payload) + {:noreply, {sessions, refs}} + end + + @impl true + def handle_cast({:stop, session_id}, {sessions, refs}) do + {:ok, crdt} = Map.fetch(sessions, session_id) + CRDT.stop(crdt) + {:noreply, {sessions, refs}} + end + + @impl true + def handle_call({:get_string, session_id}, _from, {sessions, refs}) do + {:ok, crdt} = Map.fetch(sessions, session_id) + {:reply, CRDT.get_string(crdt), {sessions, refs}} + end + + @impl true + def handle_call({:get_sessions}, _from, {sessions, refs}) do + {:reply, sessions, {sessions, refs}} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, {sessions, refs}) do + {session_id, refs} = Map.pop(refs, ref) + sessions = Map.delete(sessions, session_id) + {:noreply, {sessions, refs}} + end + + @impl true + def handle_info(_msg, state) do + {:noreply, state} + end +end \ No newline at end of file diff --git a/lib/codeshare/crdt/supervisor.ex b/lib/codeshare/crdt/supervisor.ex new file mode 100644 index 0000000..9ef89d9 --- /dev/null +++ b/lib/codeshare/crdt/supervisor.ex @@ -0,0 +1,22 @@ +defmodule Codeshare.CRDT.Supervisor do + @moduledoc """ + Supervise CRDT Registry + and CRDT processes + """ + use Supervisor + alias Codeshare.CRDT + + def start_link(opts) do + Supervisor.start_link(__MODULE__, :ok, opts) + end + + @impl true + def init(:ok) do + children = [ + {CRDT.Registry, name: CRDT.Registry} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + +end \ No newline at end of file diff --git a/lib/codeshare_web/channels/room_channel.ex b/lib/codeshare_web/channels/room_channel.ex index 961cb2c..7c22366 100644 --- a/lib/codeshare_web/channels/room_channel.ex +++ b/lib/codeshare_web/channels/room_channel.ex @@ -2,9 +2,18 @@ defmodule CodeshareWeb.RoomChannel do use CodeshareWeb, :channel import Ecto.Query, only: [from: 2] alias CodeshareWeb.Presence + alias Codeshare.CRDT def join("room:" <> room_id, payload, socket) do if authorized?(payload) do + # Add CRDT process + CRDT.Registry.create(CRDT.Registry, room_id) + + # Print active sessions + sessions = CRDT.Registry.get_sessions(CRDT.Registry) + IO.puts "Active sessions" + IO.inspect sessions + query = from entry in "editor_state", select: entry.data, where: entry.room_id == ^(room_id) @@ -44,6 +53,11 @@ defmodule CodeshareWeb.RoomChannel do # It is also common to receive messages from the client and # broadcast to everyone in the current topic (room:lobby). def handle_in("shout", payload, socket) do + + # Update server-side CRDT + CRDT.Registry.put(CRDT.Registry, socket.assigns.room_id, payload) + # IO.inspect CRDT.get() + editor = %Codeshare.Editor{} changeset = Codeshare.Editor.changeset(editor, %{data: payload, room_id: socket.assigns.room_id}) Codeshare.Repo.insert(changeset) @@ -57,6 +71,19 @@ defmodule CodeshareWeb.RoomChannel do {:noreply, socket} end + def handle_in("check", payload, socket) do + IO.puts "CRDT: #{socket.assigns.user_id}:" + IO.puts payload["value"] + IO.puts "CRDT: Server:" + IO.puts CRDT.Registry.get_string(CRDT.Registry, socket.assigns.room_id) + + if payload["value"] == CRDT.Registry.get_string(CRDT.Registry, socket.assigns.room_id) do + {:reply, {:ok, %{"flag" => true}}, socket} + else + {:reply, {:ok, %{"flag" => false}}, socket} + end + end + def terminate(_ , socket) do # When last active channel is terminating, # empty the database @@ -64,6 +91,10 @@ defmodule CodeshareWeb.RoomChannel do query = from editor in "editor_state", select: editor.data Codeshare.Repo.delete_all(query) + + # Stop CRDT process + CRDT.Registry.stop(CRDT.Registry, socket.assigns.room_id) + IO.puts "Session #{String.slice(socket.assigns.room_id, 0..6)} terminated" end :ok end diff --git a/lib/codeshare_web/templates/page/index.html.eex b/lib/codeshare_web/templates/page/index.html.eex index 7fa74ff..07ea8cc 100644 --- a/lib/codeshare_web/templates/page/index.html.eex +++ b/lib/codeshare_web/templates/page/index.html.eex @@ -45,7 +45,7 @@ function updateMode() { - + Current user: