Skip to content

Commit 4a67e7f

Browse files
author
Malte Rohde
committed
Initial import.
0 parents  commit 4a67e7f

File tree

4 files changed

+197
-0
lines changed

4 files changed

+197
-0
lines changed

COPYING

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2014 FlavourSys Technology GmbH
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in
13+
all copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
THE SOFTWARE.

README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Sidekiq::QueuePause
2+
3+
This gem adds a `.pause` functionality to Sidekiq queues. Sidekiq Pro has this feature, so please consider upgrading if you can.
4+
5+
## Usage
6+
7+
Initializer:
8+
9+
```ruby
10+
Sidekiq.configure_server do |config|
11+
Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch
12+
13+
# Optionally, you may set some unique key identifying the
14+
# Sidekiq process you want to control. This (server) process will
15+
# only be paused/unpaused when the function gets called with
16+
# the corresponding key. See below.
17+
Sidekiq::QueuePause.process_key = 'foo'
18+
19+
# You may also pass in a Proc which is then evaluated when needed.
20+
Sidekiq::QueuePause.process_key { SomeClass.some_method }
21+
22+
# Optionally, you may configure the sleep period (in seconds) after the
23+
# queue lock has been checked. By default, the fetcher will sleep for
24+
# Sidekiq::Fetcher::TIMEOUT, i.e. the same time that the redis fetch
25+
# command may take.
26+
Sidekiq::QueuePause.retry_after = 5
27+
end
28+
```
29+
30+
Pausing a queue:
31+
32+
```ruby
33+
Sidekiq::QueuePause.pause(:example)
34+
```
35+
36+
```ruby
37+
# With process key:
38+
Sidekiq::QueuePause.pause(:example, 'foo')
39+
```
40+
41+
```ruby
42+
# On a queue object:
43+
example = Sidekiq::Queue.new(:example)
44+
example.pause
45+
```
46+
47+
```ruby
48+
# On a queue object with process key:
49+
example.pause('foo')
50+
```
51+
52+
Unpause:
53+
54+
```ruby
55+
Sidekiq::QueuePause.unpause(:example)
56+
```
57+
58+
etc.
59+
60+
Getting pause status:
61+
62+
```ruby
63+
Sidekiq::QueuePause.paused?(:example)
64+
# => true/false
65+
```
66+
67+
etc.
68+
69+
Unpause all queues/processes:
70+
71+
```ruby
72+
Sidekiq::QueuePause.unpause_all
73+
```
74+
75+
## License
76+
77+
The gem is licensed under the [MIT-License](COPYING).

lib/sidekiq-queue-pause.rb

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
require 'celluloid'
2+
require 'sidekiq'
3+
require 'sidekiq/fetch'
4+
5+
module Sidekiq
6+
module QueuePause
7+
PREFIX = 'queue_pause'
8+
9+
class << self
10+
attr_accessor :retry_after
11+
attr_writer :process_key
12+
13+
def process_key(&block)
14+
if block_given?
15+
@process_key = block
16+
else
17+
@process_key.is_a?(Proc) ? @process_key.call : @process_key
18+
end
19+
end
20+
21+
def pause(queue, pkey = nil)
22+
Sidekiq.redis { |it| it.set rkey(queue, pkey), true }
23+
end
24+
25+
def unpause(queue, pkey = nil)
26+
Sidekiq.redis { |it| it.del rkey(queue, pkey) }
27+
end
28+
29+
def paused?(queue, pkey = nil)
30+
Sidekiq.redis { |it| it.exists rkey(queue, pkey) }
31+
end
32+
33+
def unpause_all
34+
Sidekiq.redis { |it| it.keys("#{PREFIX}:*").each { |k| it.del k } }
35+
end
36+
37+
private
38+
39+
def rkey(queue, pkey)
40+
pkey ? "#{PREFIX}:#{queue}:#{pkey}" : "#{PREFIX}:#{queue}"
41+
end
42+
end
43+
44+
class PausingFetch < Sidekiq::BasicFetch
45+
def retrieve_work
46+
qcmd = unpaused_queues_cmd
47+
48+
if qcmd.size > 1
49+
retrieve_work_for_queues qcmd
50+
else
51+
sleep(Sidekiq::QueuePause.retry_after || Sidekiq::Fetcher::TIMEOUT)
52+
nil
53+
end
54+
end
55+
56+
def retrieve_work_for_queues(qcmd)
57+
work = Sidekiq.redis { |conn| conn.brpop(*qcmd) }
58+
UnitOfWork.new(*work) if work
59+
end
60+
61+
def unpaused_queues_cmd
62+
queues = queues_cmd
63+
queues.reject do |q|
64+
q != Sidekiq::Fetcher::TIMEOUT &&
65+
Sidekiq::QueuePause.paused?(q.gsub('queue:', ''), Sidekiq::QueuePause.process_key)
66+
end
67+
end
68+
end
69+
end
70+
71+
class Queue
72+
def pause(pkey = nil)
73+
Sidekiq::QueuePause.pause(name, pkey)
74+
end
75+
76+
def unpause(pkey = nil)
77+
Sidekiq::QueuePause.unpause(name, pkey)
78+
end
79+
80+
def paused?(pkey = nil)
81+
Sidekiq::QueuePause.paused?(name, pkey)
82+
end
83+
end
84+
end

sidekiq-queue-pause.gemspec

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Gem::Specification.new do |s|
2+
s.name = 'sidekiq-queue-pause'
3+
s.version = '0.0.1'
4+
s.summary = 'Pause a Sidekiq queue'
5+
s.description = 'Let\'s you pause/unpause individual sidekiq queues.'
6+
s.license = 'MIT'
7+
s.authors = ['FlavourSys Technology GmbH']
8+
s.email = ['technology@flavoursys.com']
9+
s.homepage = 'http://github.com/FlavourSys/sidekiq-queue-pause'
10+
11+
s.require_paths = ['lib']
12+
s.files = Dir['lib/**/*rb']
13+
14+
s.add_dependency 'sidekiq', '~> 3.1'
15+
end

0 commit comments

Comments
 (0)