Skip to content

Commit 14ec15e

Browse files
committed
use flow for replication alg and min followers
1 parent 2d08dd0 commit 14ec15e

File tree

4 files changed

+90
-73
lines changed

4 files changed

+90
-73
lines changed

spec/replication_spec.cr

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -67,47 +67,47 @@ describe LavinMQ::Replication::Client do
6767
end
6868

6969
describe LavinMQ::Replication::Server do
70-
data_dir = "/tmp/lavinmq-follower"
70+
# data_dir = "/tmp/lavinmq-follower"
7171

72-
before_each do
73-
FileUtils.rm_rf data_dir
74-
Dir.mkdir_p data_dir
75-
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
76-
Server.vhosts["/"].declare_queue("repli", true, false)
77-
LavinMQ::Config.instance.min_followers = 1
78-
end
72+
# before_each do
73+
# FileUtils.rm_rf data_dir
74+
# Dir.mkdir_p data_dir
75+
# File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
76+
# Server.vhosts["/"].declare_queue("repli", true, false)
77+
# LavinMQ::Config.instance.min_followers = 1
78+
# end
7979

80-
after_each do
81-
FileUtils.rm_rf data_dir
82-
LavinMQ::Config.instance.min_followers = 0
83-
end
80+
# after_each do
81+
# FileUtils.rm_rf data_dir
82+
# LavinMQ::Config.instance.min_followers = 0
83+
# end
8484

85-
it "should publish when min_followers is fulfilled" do
86-
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
87-
repli = LavinMQ::Replication::Client.new(data_dir)
88-
spawn do
89-
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
90-
end
91-
with_channel do |ch|
92-
ch.basic_publish "hello world", "", "repli"
93-
end
94-
q.basic_get(true) { }.should be_true
95-
repli.close
96-
end
85+
# it "should publish when min_followers is fulfilled" do
86+
# q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
87+
# repli = LavinMQ::Replication::Client.new(data_dir)
88+
# spawn do
89+
# repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
90+
# end
91+
# with_channel do |ch|
92+
# ch.basic_publish "hello world", "", "repli"
93+
# end
94+
# q.basic_get(true) { }.should be_true
95+
# repli.close
96+
# end
9797

98-
it "should not publish when min_followers is not fulfilled" do
99-
server = LavinMQ::Replication::Server.new(data_dir)
100-
obj = "test".to_slice
101-
done = Channel(Nil).new
102-
spawn do
103-
server.append("/afs/", obj)
104-
done.send nil
105-
end
106-
select
107-
when done.receive
108-
fail "Should not receive message"
109-
when timeout(0.1.seconds)
110-
server.close
111-
end
112-
end
98+
# it "should not publish when min_followers is not fulfilled" do
99+
# server = LavinMQ::Replication::Server.new(data_dir)
100+
# obj = "test".to_slice
101+
# done = Channel(Nil).new
102+
# spawn do
103+
# server.append("/afs/", obj)
104+
# done.send nil
105+
# end
106+
# select
107+
# when done.receive
108+
# fail "Should not receive message"
109+
# when timeout(0.1.seconds)
110+
# server.close
111+
# end
112+
# end
113113
end

src/lavinmq/replication/follower.cr

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module LavinMQ
66
module Replication
77
class Follower
88
Log = ::Log.for(self)
9-
@ack = Channel(Int64).new
9+
# @ack = Channel(Int64).new
1010
@acked_bytes = 0_i64
1111
@sent_bytes = 0_i64
1212
@actions = Channel(Action).new(4096)
@@ -48,22 +48,29 @@ module LavinMQ
4848
loop do
4949
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
5050
@acked_bytes += len
51-
if max_lag = Config.instance.max_lag
52-
if lag < max_lag
53-
@ack.try_send lag
54-
end
55-
end
51+
# if max_lag = Config.instance.max_lag
52+
# if lag < max_lag
53+
# @ack.try_send lag
54+
# end
55+
# end
5656
end
5757
rescue IO::Error
5858
end
5959

60-
def wait_for_max_lag
60+
# def wait_for_max_lag
61+
# if max_lag = Config.instance.max_lag
62+
# current_lag = lag
63+
# until current_lag < max_lag
64+
# break unless current_lag = @ack.receive?
65+
# end
66+
# end
67+
# end
68+
69+
def has_max_lag?
6170
if max_lag = Config.instance.max_lag
62-
current_lag = lag
63-
until current_lag < max_lag
64-
break unless current_lag = @ack.receive?
65-
end
71+
return true if lag > max_lag
6672
end
73+
return false
6774
end
6875

6976
private def action_loop(socket = @lz4)
@@ -162,7 +169,7 @@ module LavinMQ
162169
Log.info { "Disconnected" }
163170
wait_for_sync if synced_close
164171
@actions.close
165-
@ack.close
172+
# @ack.close
166173
@lz4.close
167174
@socket.close
168175
rescue IO::Error

src/lavinmq/replication/server.cr

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ module LavinMQ
2424
include FileIndex
2525
include Replicator
2626
Log = ::Log.for("replication")
27-
getter followers_changed = Channel(Nil).new
27+
# getter followers_changed = Channel(Nil).new
2828
getter? closing
2929
@lock = Mutex.new(:unchecked)
3030
@followers = Array(Follower).new
@@ -61,13 +61,13 @@ module LavinMQ
6161

6262
def append(path : String, obj)
6363
Log.debug { "appending #{obj} to #{path}" }
64-
wait_for_max_lag unless closing?
64+
# wait_for_max_lag unless closing?
6565
each_follower &.append(path, obj)
6666
end
6767

6868
def delete_file(path : String)
6969
@files.delete(path)
70-
wait_for_max_lag unless closing?
70+
# wait_for_max_lag unless closing?
7171
each_follower &.delete(path)
7272
end
7373

@@ -110,22 +110,33 @@ module LavinMQ
110110
end
111111
end
112112

113-
def wait_for_max_lag
114-
# was_closing = @closing
115-
until @closing || @followers.size >= Config.instance.min_followers
116-
@followers_changed.receive
117-
end
118-
# unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers
119-
# raise Exception.new("Not enough followers")
120-
# end
121-
# use waitgroup instead l8er
122-
@followers.each_with_index do |f, i|
123-
break if i > Config.instance.min_followers
124-
f.wait_for_max_lag
113+
def has_max_lag?
114+
@followers.any?(&.has_max_lag?)
115+
end
116+
117+
def has_enough_followers?
118+
@followers.size >= Config.instance.min_followers
119+
end
120+
121+
def set_flow_control?
122+
if has_enough_followers?
123+
return has_max_lag?
124+
else
125+
return true
125126
end
126-
rescue Channel::ClosedError
127127
end
128128

129+
# def wait_for_max_lag
130+
# until @closing || @followers.size >= Config.instance.min_followers
131+
# @followers_changed.receive
132+
# end
133+
# @followers.each_with_index do |f, i|
134+
# break if i > Config.instance.min_followers
135+
# f.wait_for_max_lag
136+
# end
137+
# rescue Channel::ClosedError
138+
# end
139+
129140
private def password : String
130141
path = File.join(Config.instance.data_dir, ".replication_secret")
131142
begin
@@ -163,14 +174,14 @@ module LavinMQ
163174
follower.full_sync
164175
@followers << follower
165176
end
166-
followers_changed.try_send nil
177+
# followers_changed.try_send nil
167178
begin
168179
follower.read_acks
169180
ensure
170181
@lock.synchronize do
171182
@followers.delete(follower)
172183
end
173-
followers_changed.try_send nil
184+
# followers_changed.try_send nil
174185
end
175186
rescue ex : AuthenticationError
176187
Log.warn { "Follower negotiation error" }
@@ -198,14 +209,14 @@ module LavinMQ
198209
end
199210
@followers.clear
200211
end
201-
@followers_changed.close
212+
# @followers_changed.close
202213
Log.debug { "closed" }
203214
Fiber.yield # required for follower/listener fibers to actually finish
204215
end
205216

206217
def closing
207218
@closing = true
208-
@followers_changed.try_send? nil
219+
# @followers_changed.try_send? nil
209220
end
210221

211222
private def each_follower(& : Follower -> Nil) : Nil

src/lavinmq/server.cr

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ module LavinMQ
320320
system_metrics(statm)
321321
end
322322
end
323-
324323
control_flow!
325324
sleep Config.instance.stats_interval.milliseconds
326325
end
@@ -390,7 +389,7 @@ module LavinMQ
390389
getter stats_system_collection_duration_seconds = Time::Span.new
391390

392391
private def control_flow!
393-
if disk_full?
392+
if disk_full? || @replicator.set_flow_control?
394393
if flow?
395394
@log.info { "Low disk space: #{@disk_free.humanize}B, stopping flow" }
396395
flow(false)

0 commit comments

Comments
 (0)