Skip to content

Conversation

@jamespeerless
Copy link

@jamespeerless jamespeerless commented Jan 8, 2019

When using exec_within_threshold in an environment with multiple threads/processes you can run into an issue where multiple threads read the count for a given subject at the same time. If the read count is right below the current threshold then all threads will be allowed to execute before any of them can perform an add that pushes the count over the threshold.

For example, if you had three threads and a ratelimit with a threshold of 20 in 600 seconds and the current count was at 19. All three threads would read the current count of 19 and the exceeded? check used in exec_within_threshold would evaluate to false so all three threads would execute the block.

The fix requires that we can read and increment the count in an atomic way. There is no way to do this natively with redis out-of-the-box. I had to implement a short Lua Script that can perform the count, check, and increment. Using this new script we are able to implement a threadsafe version of exec_within_threshold that also automatically increments the subject.

I wrote some tests to verify the issue and that this fixes it in a project where we use the ratelimit gem.

The test that exposes the issue where exec_within_threshold allows more than the threshold number of executions in an interval is here:

rlKey = SecureRandom.uuid
rlSubject = SecureRandom.uuid
ratelimit = Ratelimit.new(rlKey,{:redis => $redis})

  statusMap = { stop: 0, sleep: 0, run: 0, finished: 0}

  threads = [
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }}
  ]

  sleep 1

  threads.each do |t| 
    status_symbol = t.status ? t.status.to_sym : :finished
    statusMap[status_symbol] += 1
  end
  
  expect(statusMap[:finished]).to be > 1

The test with the fixed implementation looks like this:

rlKey = SecureRandom.uuid
rlSubject = SecureRandom.uuid
ratelimit = Ratelimit.new(rlKey, {:redis => $redis})

  statusMap = { stop: 0, sleep: 0, run: 0, finished: 0}
  threads = [
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })}
  ]

 sleep 1

  threads.each do |t| 
    status_symbol = t.status ? t.status.to_sym : :finished
    statusMap[status_symbol] += 1
  end

  expect(statusMap[:finished]).to eq(1)

@coveralls
Copy link

Coverage Status

Coverage decreased (-7.6%) to 90.845% when pulling 509fabc on jamespeerless:threadsafe-exec-within-threshold-no-lock into 0e60d34 on ejfinneran:master.

@coveralls
Copy link

coveralls commented Jan 8, 2019

Coverage Status

Coverage decreased (-8.1%) to 90.278% when pulling dc46e5f on jamespeerless:threadsafe-exec-within-threshold-no-lock into 0e60d34 on ejfinneran:master.

evalScript = 'local a=KEYS[1]local b=tonumber(ARGV[1])local c=tonumber(ARGV[b+2])local d=tonumber(ARGV[b+3])local e=tonumber(ARGV[b+4])local f=tonumber(ARGV[b+5])local g=tonumber(ARGV[b+6])local h=0;local i=false;for j,k in ipairs(redis.call("HMGET",a,unpack(ARGV,2,b+1)))do h=h+(tonumber(k)or 0)end;if h<f then redis.call("HINCRBY",a,c,g)redis.call("HDEL",a,(c+1)%d)redis.call("HDEL",a,(c+2)%d)redis.call("EXPIRE",a,e)i=true end;return i'
evalKeys = [get_key_for_subject(subject)]
evalArgs = [keys.length, *keys, bucket, @bucket_count, @bucket_expiry, options[:threshold], options[:increment]]
redis.eval(evalScript, evalKeys, evalArgs)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need to send full script to eval every time, waste of bandwidth. Can take sha1 of the script and use that to check if script exists and then execute using only sha1 of script.

@jamespeerless jamespeerless force-pushed the threadsafe-exec-within-threshold-no-lock branch 5 times, most recently from a970c46 to 630d218 Compare January 9, 2019 21:09
…reshold that counts and increments atomically
@feliperaul
Copy link

@jamespeerless James, have you ever gotten around implementing the idea of not evaluating that lua script every time, and instead checking if it was already evaluated using it's SHA?

@mrhaddad
Copy link

@jamespeerless Do you expect to get this merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants