Skip to content

Commit e51a356

Browse files
authored
prepare 5.0.1 release (#103)
1 parent 8fa518d commit e51a356

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

lib/sse_client/sse_client.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def on_error(&action)
6161
def close
6262
if @stopped.make_true
6363
@cxn.close if !@cxn.nil?
64+
@cxn = nil
6465
end
6566
end
6667

@@ -77,14 +78,22 @@ def run_stream
7778
@cxn = nil
7879
begin
7980
@cxn = connect
81+
# There's a potential race if close was called in the middle of the previous line, i.e. after we
82+
# connected but before @cxn was set. Checking the variable again is a bit clunky but avoids that.
83+
return if @stopped.value
8084
read_stream(@cxn) if !@cxn.nil?
8185
rescue Errno::EBADF
8286
# don't log this - it probably means we closed our own connection deliberately
8387
rescue StandardError => e
8488
@logger.error { "Unexpected error from event source: #{e.inspect}" }
8589
@logger.debug { "Exception trace: #{e.backtrace}" }
8690
end
87-
@cxn.close if !cxn.nil?
91+
begin
92+
@cxn.close if !@cxn.nil?
93+
rescue StandardError => e
94+
@logger.error { "Unexpected error while closing stream: #{e.inspect}" }
95+
@logger.debug { "Exception trace: #{e.backtrace}" }
96+
end
8897
end
8998
end
9099

lib/sse_client/streaming_http.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require "concurrent/atomics"
12
require "http_tools"
23
require "socketry"
34

@@ -15,11 +16,14 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout)
1516
@reader = HTTPResponseReader.new(@socket, read_timeout)
1617
@status = @reader.status
1718
@headers = @reader.headers
19+
@closed = Concurrent::AtomicBoolean.new(false)
1820
end
1921

2022
def close
21-
@socket.close if @socket
22-
@socket = nil
23+
if @closed.make_true
24+
@socket.close if @socket
25+
@socket = nil
26+
end
2327
end
2428

2529
# Generator that returns one line of the response body at a time (delimited by \r, \n,

spec/sse_client/sse_client_spec.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,42 @@ def with_client(client)
136136
end
137137
end
138138
end
139+
140+
it "reconnects if stream returns EOF" do
141+
events_body_1 = <<-EOT
142+
event: go
143+
data: foo
144+
145+
EOT
146+
events_body_2 = <<-EOT
147+
event: go
148+
data: bar
149+
150+
EOT
151+
with_server do |server|
152+
attempt = 0
153+
server.setup_response("/") do |req,res|
154+
attempt += 1
155+
if attempt == 1
156+
res.body = events_body_1
157+
else
158+
res.body = events_body_2
159+
end
160+
res.content_type = "text/event-stream"
161+
res.status = 200
162+
end
163+
164+
event_sink = Queue.new
165+
client = subject.new(server.base_uri,
166+
reconnect_time: 0.25, read_timeout: 0.25) do |c|
167+
c.on_event { |event| event_sink << event }
168+
end
169+
170+
with_client(client) do |client|
171+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil))
172+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "bar", nil))
173+
expect(attempt).to be >= 2
174+
end
175+
end
176+
end
139177
end

0 commit comments

Comments
 (0)