Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions app/controllers/blogs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ def destroy

def import
file = params[:attachment]
data = CSV.parse(file.to_io, headers: true, encoding: 'utf8')
# Start code to handle CSV data
ActiveRecord::Base.transaction do
data.each do |row|
current_user.blogs.create!(row.to_h)
end

if file.present?
# Save the file temporarily in `tmp/` so the background job can access it
temp_path = Rails.root.join('tmp', "import_#{Time.now}.csv")
File.open(temp_path, 'wb') { |f| f.write(file.read) }

# Enqueue the background job with the file path and user ID
BulkImportBlogsJob.perform_later(10000, temp_path, current_user.id)

redirect_to blogs_path, notice: "Blog upload started. You will be notified when it's done."
else
redirect_to blogs_path, alert: "Please upload a valid CSV file."
end
# End code to handle CSV data
redirect_to blogs_path
end

private
Expand Down
25 changes: 25 additions & 0 deletions app/jobs/blogs_api_response_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class BlogsApiResponseJob < ApplicationJob
queue_as :default

def perform(blog_ids)
api_responses = []

blog_ids.each do |blog_id|
# Simulate some network latency
sleep(0.1)

# AppendingHash for bulk insert response
api_responses << {
api_response_id: "blog-#{SecureRandom.hex}-#{blog_id}",
api_status: ApiResponse.api_statuses.keys.sample,
blog_id: blog_id,
created_at: Time.now,
updated_at: Time.now
}
end

# Bulk Insert
ApiResponse.insert_all(api_responses) unless api_responses.empty?
end

end
16 changes: 16 additions & 0 deletions app/jobs/bulk_import_blogs_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class BulkImportBlogsJob < ApplicationJob
queue_as :default

def perform(batch_size, file_path, user_id)
# Do something later
if File.exist?(file_path)
# Call the service to process the file
BulkImportBlogsService.new(batch_size, file_path, user_id).process

# Delete the file after processing to save space
File.delete(file_path) if File.exist?(file_path)
else
Rails.logger.error "File not found: #{file_path}"
end
end
end
49 changes: 16 additions & 33 deletions app/jobs/memory_leak_job.rb
Original file line number Diff line number Diff line change
@@ -1,47 +1,30 @@
class MemoryLeakJob < ApplicationJob
queue_as :default

# The purpose of this job to take each blog record and send it to an api and save that api response.

def perform
blogs = Blog.all
def perform(batch_size = 1000)
# Array for storing valid blog ids
valid_blogs_ids = []

blogs.each do |blog|
validate_and_process(blog)
# instead of Blog.all, we use Blog.find_in_batches to load records in batches to avoid memory overflow
Blog.find_in_batches(batch_size: 1000) do |batch|
batch.each do |blog|
if blog_valid?(blog)
valid_blogs_ids << blog.id
else
Rails.logger.info "Invalid blog: #{blog.id}"
end
end
# Enqueue the job with valid blog ids if the array reaches the batch size this job will fetch responses for each blog and save it
BlogsApiResponseJob.perform_later(valid_blogs_ids)
valid_blogs_ids.clear
end

end

private

def validate_and_process(blog)
# Perform some validations
if blog_valid?(blog)
# Make an API request
blog_to_api(blog)
else
Rails.logger.info "Invalid blog: #{blog.id}"
end

# Memory leak: storing blog in an array, which grows indefinitely
@processed_blogs ||= []
@processed_blogs << blog

# This prevents the blog object from being garbage collected
end

def blog_valid?(blog)
blog.title.present? && blog.body.present?
end

def blog_to_api(blog)
# Mock API call - can be replaced with real HTTP call
sleep(0.1) # Simulate some network latency
temp_id = 'blog-id'
# Save API Response
api_response_id = temp_id.gsub("id","#{SecureRandom.hex}-#{blog.id}")
blog.api_responses.create!(
api_response_id: api_response_id,
api_status: ApiResponse.api_statuses.keys.sample
)
end
end
2 changes: 2 additions & 0 deletions app/models/blog.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class Blog < ApplicationRecord
belongs_to :user
has_many :api_responses

validates :title, :body, presence: true
end
68 changes: 68 additions & 0 deletions app/services/bulk_import_blogs_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
require 'csv'

class BulkImportBlogsService

def initialize(batch_size, file_path, user_id)
@batch_size = batch_size
@file_path = file_path
@user_id = user_id
end

def process
blogs_data = []
invalid_blogs = []
# Read CSV file line by line instead of loading the whole file
CSV.foreach(@file_path, headers: true, encoding: 'utf8') do |row|
# Validate the row before adding it to the blogs_data array
if check_blog_valid?(row)
blogs_data << row.to_h.merge(
user_id: @user_id,
created_at: Time.now,
updated_at: Time.now
)
else
# Store invalid records for logging
invalid_blogs << row.to_h
end
# If batch size is reached, insert and log in bulk
handle_bulk_insert_and_logging(blogs_data, invalid_blogs) if blogs_data.size >= @batch_size
end
# Insert and log remaining records after finishing CSV processing
handle_bulk_insert_and_logging(blogs_data, invalid_blogs)
end

private

# Checks if a blog record is valid
def check_blog_valid?(row)
blog = Blog.new(row.to_h.merge(user_id: @user_id))
blog.valid?
end

# Performs bulk insert of valid blog records
def bulk_insert_blogs(blogs_data)
ActiveRecord::Base.transaction do
Blog.insert_all(blogs_data)
end
blogs_data.clear
end

# Logs invalid blog records into a CSV file
def log_invalid_blogs(invalid_blogs)
error_log_file = Rails.root.join('log', "invalid_blogs_#{Time.current.to_i}.csv")

CSV.open(error_log_file, "a") do |csv|
# Write headers only if the file is empty
csv << ["Title", "Body", "User ID", "Errors"] if File.zero?(error_log_file)
invalid_blogs.each { |record| csv << record }
end
invalid_blogs.clear
end

# Handles both bulk inserting valid blogs and logging invalid ones
def handle_bulk_insert_and_logging(blogs_data, invalid_blogs)
bulk_insert_blogs(blogs_data) unless blogs_data.empty?
log_invalid_blogs(invalid_blogs) unless invalid_blogs.empty?
end

end