Skip to content

Commit 5da5180

Browse files
committed
Process in parallel
1 parent 206d848 commit 5da5180

File tree

3 files changed

+144
-64
lines changed

3 files changed

+144
-64
lines changed

lib/diff/application.ex

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ defmodule Diff.Application do
1212
children = [
1313
goth_spec(),
1414
{Task.Supervisor, name: Diff.Tasks},
15-
# Start the PubSub system
1615
{Phoenix.PubSub, name: Diff.PubSub},
17-
# Start the endpoint when the application starts
18-
DiffWeb.Endpoint,
19-
# Starts a worker by calling: Diff.Worker.start_link(arg)
20-
# {Diff.Worker, arg},
21-
Diff.Package.Supervisor
16+
Diff.Package.Supervisor,
17+
DiffWeb.Endpoint
2218
]
2319

2420
# See https://hexdocs.pm/elixir/Supervisor.html

lib/diff_web/live/diff_live_view.ex

Lines changed: 141 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ defmodule DiffWeb.DiffLiveView do
8282
metadata: metadata,
8383
all_diff_ids: diff_ids,
8484
loaded_diffs: initial_diffs,
85+
loaded_diff_content: %{},
8586
remaining_diffs: remaining,
8687
loading: true,
8788
generating: false,
@@ -103,6 +104,7 @@ defmodule DiffWeb.DiffLiveView do
103104
metadata: %{files_changed: 0, total_additions: 0, total_deletions: 0},
104105
all_diff_ids: [],
105106
loaded_diffs: [],
107+
loaded_diff_content: %{},
106108
remaining_diffs: [],
107109
loading: false,
108110
generating: true,
@@ -162,6 +164,7 @@ defmodule DiffWeb.DiffLiveView do
162164
metadata: metadata,
163165
all_diff_ids: diff_ids,
164166
loaded_diffs: initial_diffs,
167+
loaded_diff_content: %{},
165168
remaining_diffs: remaining,
166169
generating: false,
167170
loading: true,
@@ -186,54 +189,114 @@ defmodule DiffWeb.DiffLiveView do
186189
end
187190

188191
def handle_info({:load_diffs_and_update, diff_ids}, socket) do
189-
# Simply add new diffs to loaded_diffs - no server memory management needed
190-
# diffs are loaded on-demand during rendering from storage
192+
# Load diffs in parallel and add to existing loaded content
193+
new_loaded_content =
194+
load_diffs_in_parallel(
195+
socket.assigns.package,
196+
socket.assigns.from,
197+
socket.assigns.to,
198+
diff_ids
199+
)
200+
201+
existing_content = Map.get(socket.assigns, :loaded_diff_content, %{})
202+
all_loaded_content = Map.merge(existing_content, new_loaded_content)
203+
191204
new_loaded_diffs = socket.assigns.loaded_diffs ++ diff_ids
192205

193206
socket =
194207
socket
195208
|> assign(
196209
loaded_diffs: new_loaded_diffs,
210+
loaded_diff_content: all_loaded_content,
197211
loading: false
198212
)
199213

200214
{:noreply, socket}
201215
end
202216

203-
def handle_info({:load_diffs, _diff_ids}, socket) do
204-
# With on-demand loading, we just need to mark loading as complete
205-
# diff content will be loaded during rendering
206-
socket = assign(socket, loading: false)
217+
def handle_info({:load_diffs, diff_ids}, socket) do
218+
# Load initial diffs in parallel
219+
loaded_content =
220+
load_diffs_in_parallel(
221+
socket.assigns.package,
222+
socket.assigns.from,
223+
socket.assigns.to,
224+
diff_ids
225+
)
226+
227+
socket =
228+
socket
229+
|> assign(
230+
loaded_diff_content: loaded_content,
231+
loading: false
232+
)
233+
207234
{:noreply, socket}
208235
end
209236

210237
defp process_stream_to_diffs(package, from, to, stream) do
211-
diff_index = 0
212-
213-
metadata = %{
238+
initial_metadata = %{
214239
total_diffs: 0,
215240
total_additions: 0,
216241
total_deletions: 0,
217242
files_changed: 0
218243
}
219244

220-
{final_metadata, diff_ids, _} =
221-
Enum.reduce(stream, {metadata, [], diff_index}, fn element,
222-
{acc_metadata, acc_diff_ids, index} ->
223-
case element do
224-
{:ok, {raw_diff, path_from, path_to}} ->
225-
# Store raw git diff output with base paths for relative conversion
226-
diff_id = "diff-#{index}"
245+
# Process stream elements in parallel with indices
246+
results =
247+
stream
248+
|> Stream.with_index()
249+
|> Task.async_stream(
250+
Diff.Tasks,
251+
fn {element, index} ->
252+
process_stream_element(package, from, to, element, index)
253+
end,
254+
max_concurrency: 10,
255+
timeout: 30_000,
256+
ordered: true
257+
)
258+
|> Enum.reduce({initial_metadata, []}, fn
259+
{:ok, {:ok, diff_id, metadata_update}}, {acc_metadata, acc_diff_ids} ->
260+
updated_metadata = merge_metadata(acc_metadata, metadata_update)
261+
{updated_metadata, acc_diff_ids ++ [diff_id]}
262+
263+
{:ok, {:error, _}}, acc ->
264+
acc
265+
266+
{:error, _}, acc ->
267+
acc
268+
end)
269+
270+
case results do
271+
{final_metadata, diff_ids} ->
272+
case Diff.Storage.put_metadata(package, from, to, final_metadata) do
273+
:ok ->
274+
{:ok, final_metadata, diff_ids}
275+
276+
{:error, reason} ->
277+
Logger.error("Failed to store metadata: #{inspect(reason)}")
278+
{:error, reason}
279+
end
280+
end
281+
catch
282+
:throw, {:diff, :invalid_diff} ->
283+
{:error, :invalid_diff}
284+
end
227285

228-
diff_data =
229-
Jason.encode!(%{
230-
"diff" => DiffWeb.LiveView.sanitize_utf8(raw_diff),
231-
"path_from" => path_from,
232-
"path_to" => path_to
233-
})
286+
defp process_stream_element(package, from, to, element, index) do
287+
case element do
288+
{:ok, {raw_diff, path_from, path_to}} ->
289+
diff_id = "diff-#{index}"
234290

235-
Diff.Storage.put_diff(package, from, to, diff_id, diff_data)
291+
diff_data =
292+
Jason.encode!(%{
293+
"diff" => DiffWeb.LiveView.sanitize_utf8(raw_diff),
294+
"path_from" => path_from,
295+
"path_to" => path_to
296+
})
236297

298+
case Diff.Storage.put_diff(package, from, to, diff_id, diff_data) do
299+
:ok ->
237300
# Count additions and deletions from raw diff (exclude +++ and --- headers)
238301
lines = String.split(raw_diff, "\n")
239302

@@ -247,51 +310,56 @@ defmodule DiffWeb.DiffLiveView do
247310
String.starts_with?(line, "-") and not String.starts_with?(line, "---")
248311
end)
249312

250-
updated_metadata = %{
251-
acc_metadata
252-
| total_diffs: acc_metadata.total_diffs + 1,
253-
total_additions: acc_metadata.total_additions + additions,
254-
total_deletions: acc_metadata.total_deletions + deletions,
255-
files_changed: acc_metadata.files_changed + 1
313+
metadata_update = %{
314+
total_diffs: 1,
315+
total_additions: additions,
316+
total_deletions: deletions,
317+
files_changed: 1
256318
}
257319

258-
{updated_metadata, acc_diff_ids ++ [diff_id], index + 1}
320+
{:ok, diff_id, metadata_update}
259321

260-
{:too_large, file_path} ->
261-
# Store raw too_large data directly
262-
too_large_data = Jason.encode!(%{type: "too_large", file: file_path})
263-
diff_id = "diff-#{index}"
264-
265-
Diff.Storage.put_diff(package, from, to, diff_id, too_large_data)
322+
{:error, reason} ->
323+
Logger.error("Failed to store diff #{diff_id}: #{inspect(reason)}")
324+
{:error, reason}
325+
end
266326

267-
updated_metadata = %{
268-
acc_metadata
269-
| total_diffs: acc_metadata.total_diffs + 1,
270-
files_changed: acc_metadata.files_changed + 1
327+
{:too_large, file_path} ->
328+
diff_id = "diff-#{index}"
329+
too_large_data = Jason.encode!(%{type: "too_large", file: file_path})
330+
331+
case Diff.Storage.put_diff(package, from, to, diff_id, too_large_data) do
332+
:ok ->
333+
metadata_update = %{
334+
total_diffs: 1,
335+
total_additions: 0,
336+
total_deletions: 0,
337+
files_changed: 1
271338
}
272339

273-
{updated_metadata, acc_diff_ids ++ [diff_id], index + 1}
340+
{:ok, diff_id, metadata_update}
274341

275-
{:error, error} ->
276-
Logger.error(
277-
"Failed to process diff #{index} for #{package} #{from}..#{to} with: #{inspect(error)}"
278-
)
279-
280-
{acc_metadata, acc_diff_ids, index}
342+
{:error, reason} ->
343+
Logger.error("Failed to store too_large diff #{diff_id}: #{inspect(reason)}")
344+
{:error, reason}
281345
end
282-
end)
283346

284-
case Diff.Storage.put_metadata(package, from, to, final_metadata) do
285-
:ok ->
286-
{:ok, final_metadata, diff_ids}
347+
{:error, error} ->
348+
Logger.error(
349+
"Failed to process diff #{index} for #{package} #{from}..#{to} with: #{inspect(error)}"
350+
)
287351

288-
{:error, reason} ->
289-
Logger.error("Failed to store metadata: #{inspect(reason)}")
290-
{:error, reason}
352+
{:error, error}
291353
end
292-
catch
293-
:throw, {:diff, :invalid_diff} ->
294-
{:error, :invalid_diff}
354+
end
355+
356+
defp merge_metadata(acc_metadata, update) do
357+
%{
358+
total_diffs: acc_metadata.total_diffs + update.total_diffs,
359+
total_additions: acc_metadata.total_additions + update.total_additions,
360+
total_deletions: acc_metadata.total_deletions + update.total_deletions,
361+
files_changed: acc_metadata.files_changed + update.files_changed
362+
}
295363
end
296364

297365
defp parse_versions(input) do
@@ -336,4 +404,20 @@ defmodule DiffWeb.DiffLiveView do
336404
end
337405

338406
defp build_url(app, from, to), do: "/diff/#{app}/#{from}..#{to}"
407+
408+
defp load_diffs_in_parallel(package, from, to, diff_ids) do
409+
diff_ids
410+
|> Task.async_stream(
411+
Diff.Tasks,
412+
fn diff_id ->
413+
{diff_id, DiffWeb.LiveView.load_diff_content(package, from, to, diff_id)}
414+
end,
415+
max_concurrency: 10,
416+
timeout: 30_000
417+
)
418+
|> Enum.reduce(%{}, fn
419+
{:ok, {diff_id, content}}, acc -> Map.put(acc, diff_id, content)
420+
{:error, _}, acc -> acc
421+
end)
422+
end
339423
end

lib/diff_web/templates/live/diff.html.leex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<div class="ghd-container" id="diff-list">
4242
<%= for diff_id <- @loaded_diffs do %>
4343
<div id="diff-<%= diff_id %>">
44-
<%= raw(load_diff_content(@package, @from, @to, diff_id)) %>
44+
<%= raw(Map.get(@loaded_diff_content, diff_id, "<div class='diff-error'>Diff content not loaded</div>")) %>
4545
</div>
4646
<% end %>
4747

0 commit comments

Comments
 (0)