Skip to content

Commit 6589acd

Browse files
andseljsvd
andauthored
Update Azure event Hub client library to version 3.3.0 (#96)
Updates azure-eventhubs to 3.3.0 and all its dependencies. - adapt the instantiation of EventProcessorHost using the EventProcessorHostBuilder instead of the now private constructor - updates the mocking in unit tests to respect the switch from constructor to builder. --------- Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
1 parent 045973e commit 6589acd

File tree

6 files changed

+160
-66
lines changed

6 files changed

+160
-66
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.5.0
2+
- Updated Azure Event Hub client library to version 3.3.0 [#96](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/96)
3+
14
## 1.4.9
25
- Fixed issue with `getHostContext` method accessibility, causing plugin not to be able to run [#93](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/93)
36

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.4.9
1+
1.5.0

build.gradle

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,43 @@ repositories {
1919
dependencies {
2020
testImplementation 'junit:junit:4.12'
2121

22-
implementation 'com.microsoft.azure:azure-eventhubs:2.3.2'
22+
implementation 'com.microsoft.azure:azure-eventhubs:3.3.0'
23+
2324
implementation 'com.microsoft.azure:qpid-proton-j-extensions:1.2.4'
24-
implementation 'com.microsoft.azure:azure-eventhubs-eph:2.5.2'
25+
implementation 'com.microsoft.azure:azure-eventhubs-eph:3.3.0'
2526
implementation 'com.microsoft.azure:azure-storage:8.6.6'
26-
implementation 'com.google.code.gson:gson:2.10.1'
27-
implementation 'org.apache.qpid:proton-j:0.33.9'
27+
implementation 'com.google.code.gson:gson:2.8.9'
28+
implementation 'org.apache.qpid:proton-j:0.33.8'
29+
30+
implementation 'com.microsoft.azure:azure-keyvault-core:1.2.4'
31+
implementation 'com.microsoft.azure:adal4j:1.6.4'
32+
implementation 'com.microsoft.azure:azure-annotations:1.10.0'
33+
implementation 'com.microsoft.azure:azure-client-authentication:1.7.3'
34+
implementation 'com.microsoft.azure:azure-client-runtime:1.7.3'
35+
implementation 'com.microsoft.rest:client-runtime:1.7.3'
36+
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.12.7'
37+
implementation 'com.fasterxml.jackson.core:jackson-core:2.12.7'
38+
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.7.1'
39+
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.12.7'
40+
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
41+
implementation 'com.google.guava:guava:32.0.1-jre'
42+
implementation 'com.nimbusds:lang-tag:1.7'
43+
implementation 'com.nimbusds:nimbus-jose-jwt:9.37.2'
44+
implementation 'com.nimbusds:oauth2-oidc-sdk:6.5'
45+
implementation 'com.squareup.okhttp3:logging-interceptor:3.12.2'
46+
implementation 'com.squareup.okhttp3:okhttp-urlconnection:3.12.2'
47+
implementation 'com.squareup.okhttp3:okhttp:3.14.7'
48+
implementation 'com.squareup.okio:okio:1.17.6'
49+
implementation 'com.squareup.retrofit2:adapter-rxjava:2.7.2'
50+
implementation 'com.squareup.retrofit2:converter-jackson:2.7.2'
51+
implementation 'com.squareup.retrofit2:retrofit:2.7.2'
52+
implementation 'com.sun.mail:javax.mail:1.6.1'
53+
implementation 'commons-codec:commons-codec:1.11'
54+
implementation 'io.reactivex:rxjava:1.3.8'
55+
implementation 'javax.activation:activation:1.1'
56+
implementation 'net.minidev:json-smart:2.4.9'
57+
implementation 'org.checkerframework:checker-compat-qual:2.0.0'
58+
implementation 'org.codehaus.mojo:animal-sniffer-annotations:1.14'
2859
compileOnly 'org.apache.logging.log4j:log4j-api:2.17.0' // provided by Logstash
2960
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.0' // provided by Logstash
3061
}
@@ -47,11 +78,16 @@ task vendor {
4778
doLast {
4879
String vendorPathPrefix = "vendor/jar-dependencies"
4980
configurations.runtimeClasspath.allDependencies.each { dep ->
50-
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}") }.singleFile
51-
String groupPath = dep.group.replaceAll('\\.', '/')
52-
File newJarFile = file("${vendorPathPrefix}" + File.separator + "${groupPath}" + File.separator + "${dep.name}"+ File.separator + "${dep.version}" + File.separator + "${dep.name}-${dep.version}.jar")
53-
newJarFile.mkdirs()
54-
Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
81+
FileCollection fileCollection = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}") }
82+
if (fileCollection.isEmpty()) {
83+
println "runtimeClasspath is empty for dependency ${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}"
84+
} else {
85+
File f = fileCollection.singleFile
86+
String groupPath = dep.group.replaceAll('\\.', '/')
87+
File newJarFile = file("${vendorPathPrefix}" + File.separator + "${groupPath}" + File.separator + "${dep.name}" + File.separator + "${dep.version}" + File.separator + "${dep.name}-${dep.version}.jar")
88+
newJarFile.mkdirs()
89+
Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
90+
}
5591
}
5692
}
5793
}
Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,38 @@
11
# AUTOGENERATED BY THE GRADLE SCRIPT. DO NOT EDIT.
22

33
require 'jar_dependencies'
4-
require_jar('com.microsoft.azure', 'azure-eventhubs', '2.3.2')
4+
require_jar('com.microsoft.azure', 'azure-eventhubs', '3.3.0')
55
require_jar('com.microsoft.azure', 'qpid-proton-j-extensions', '1.2.4')
6-
require_jar('com.microsoft.azure', 'azure-eventhubs-eph', '2.5.2')
6+
require_jar('com.microsoft.azure', 'azure-eventhubs-eph', '3.3.0')
77
require_jar('com.microsoft.azure', 'azure-storage', '8.6.6')
8-
require_jar('com.google.code.gson', 'gson', '2.10.1')
9-
require_jar('org.apache.qpid', 'proton-j', '0.33.9')
8+
require_jar('com.google.code.gson', 'gson', '2.8.9')
9+
require_jar('org.apache.qpid', 'proton-j', '0.33.8')
10+
require_jar('com.microsoft.azure', 'azure-keyvault-core', '1.2.4')
11+
require_jar('com.microsoft.azure', 'adal4j', '1.6.4')
12+
require_jar('com.microsoft.azure', 'azure-annotations', '1.10.0')
13+
require_jar('com.microsoft.azure', 'azure-client-authentication', '1.7.3')
14+
require_jar('com.microsoft.azure', 'azure-client-runtime', '1.7.3')
15+
require_jar('com.microsoft.rest', 'client-runtime', '1.7.3')
16+
require_jar('com.fasterxml.jackson.core', 'jackson-annotations', '2.12.7')
17+
require_jar('com.fasterxml.jackson.core', 'jackson-core', '2.12.7')
18+
require_jar('com.fasterxml.jackson.core', 'jackson-databind', '2.12.7.1')
19+
require_jar('com.fasterxml.jackson.datatype', 'jackson-datatype-joda', '2.12.7')
20+
require_jar('com.github.stephenc.jcip', 'jcip-annotations', '1.0-1')
21+
require_jar('com.google.guava', 'guava', '32.0.1-jre')
22+
require_jar('com.nimbusds', 'lang-tag', '1.7')
23+
require_jar('com.nimbusds', 'nimbus-jose-jwt', '9.37.2')
24+
require_jar('com.nimbusds', 'oauth2-oidc-sdk', '6.5')
25+
require_jar('com.squareup.okhttp3', 'logging-interceptor', '3.12.2')
26+
require_jar('com.squareup.okhttp3', 'okhttp-urlconnection', '3.12.2')
27+
require_jar('com.squareup.okhttp3', 'okhttp', '3.14.7')
28+
require_jar('com.squareup.okio', 'okio', '1.17.6')
29+
require_jar('com.squareup.retrofit2', 'adapter-rxjava', '2.7.2')
30+
require_jar('com.squareup.retrofit2', 'converter-jackson', '2.7.2')
31+
require_jar('com.squareup.retrofit2', 'retrofit', '2.7.2')
32+
require_jar('com.sun.mail', 'javax.mail', '1.6.1')
33+
require_jar('commons-codec', 'commons-codec', '1.11')
34+
require_jar('io.reactivex', 'rxjava', '1.3.8')
35+
require_jar('javax.activation', 'activation', '1.1')
36+
require_jar('net.minidev', 'json-smart', '2.4.9')
37+
require_jar('org.checkerframework', 'checker-compat-qual', '2.0.0')
38+
require_jar('org.codehaus.mojo', 'animal-sniffer-annotations', '1.14')

lib/logstash/inputs/azure_event_hubs.rb

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -401,14 +401,17 @@ def run(queue)
401401
@logger.info("Event Hub #{event_hub_name} is initializing... ")
402402
begin
403403
if event_hub['storage_connection']
404-
event_processor_host = EventProcessorHost.new(
405-
EventProcessorHost.createHostName('logstash'),
406-
event_hub_name,
407-
event_hub['consumer_group'],
408-
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
409-
event_hub['storage_connection'].value,
410-
event_hub.fetch('storage_container', event_hub_name),
411-
scheduled_executor_service)
404+
event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
405+
.useAzureStorageCheckpointLeaseManager(
406+
event_hub['storage_connection'].value,
407+
event_hub.fetch('storage_container', event_hub_name),
408+
nil
409+
)
410+
.useEventHubConnectionString(
411+
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
412+
)
413+
.setExecutor(scheduled_executor_service)
414+
.build
412415
else
413416
@logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
414417
event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
@@ -487,15 +490,11 @@ def run(queue)
487490
def create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
488491
checkpoint_manager = InMemoryCheckpointManager.new
489492
lease_manager = InMemoryLeaseManager.new
490-
event_processor_host = EventProcessorHost.new(
491-
EventProcessorHost.createHostName('logstash'),
492-
event_hub_name,
493-
event_hub['consumer_group'],
494-
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
495-
checkpoint_manager,
496-
lease_manager,
497-
scheduled_executor_service,
498-
nil)
493+
event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
494+
.useUserCheckpointAndLeaseManagers(checkpoint_manager, lease_manager)
495+
.useEventHubConnectionString(event_hub['event_hub_connections'].first.value) #there will only be one in this array by the time it gets here
496+
.setExecutor(scheduled_executor_service)
497+
.build
499498
host_context = get_host_context(event_processor_host)
500499
#using java_send to avoid naming conflicts with 'initialize' method
501500
lease_manager.java_send :initialize, [HostContext], host_context

spec/inputs/azure_event_hub_spec.rb

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -84,25 +84,39 @@
8484
unregister_counter.incrementAndGet
8585
completable_future
8686
}
87-
expect(EventProcessorHost).to receive(:new).at_most(2).times {|host_name, event_hub_name, consumer_group, event_hub_connection, storage_connection, container, executor|
88-
case event_hub_name
89-
when 'event_hub_name0'
9087

91-
assertion_count.incrementAndGet
88+
build_step_mock = double("final build step")
89+
expect(build_step_mock).to receive(:build).at_most(2).times.and_return(mock_host)
90+
91+
executor_step = double("executor step")
92+
expect(executor_step).to receive(:setExecutor).at_most(2).times.and_return(build_step_mock)
93+
94+
mock_connection_string = double("connection string")
95+
expect(mock_connection_string).to receive(:useEventHubConnectionString).at_most(2).times {|event_hub_connection|
96+
case event_hub_connection
97+
when /.*event_hub_name0$/
9298
expect(event_hub_connection).to eql(config['event_hub_connections'][0])
93-
expect(container).to eql('event_hub_name0') # default
99+
when /.*event_hub_name1$/
100+
expect(event_hub_connection).to eql(config['event_hub_connections'][1])
101+
end
102+
executor_step
103+
}
94104

105+
mock_builder = double("storage and lease managers")
106+
expect(mock_builder).to receive(:useAzureStorageCheckpointLeaseManager).at_most(2).times {|storage_connection_str, storage_container, storage_blob_prefix|
107+
case storage_container
108+
when 'event_hub_name0'
109+
assertion_count.incrementAndGet
95110
when 'event_hub_name1'
96111
assertion_count.incrementAndGet
97-
expect(host_name).to start_with('logstash')
98-
expect(event_hub_connection).to eql(config['event_hub_connections'][1])
99-
expect(container).to eql('event_hub_name1') # default
100112
end
101-
expect(host_name).to start_with('logstash')
102-
expect(storage_connection).to eql(config['storage_connection'])
113+
mock_connection_string
114+
}
103115

116+
expect(EventProcessorHost::EventProcessorHostBuilder).to receive(:newBuilder).at_most(2).times {|host_name, consumer_group|
117+
expect(host_name).to start_with('logstash')
104118
host_counter.incrementAndGet
105-
mock_host
119+
mock_builder
106120
}
107121
# signal the stop first since the run method blocks until stop is called.
108122
input.do_stop
@@ -126,8 +140,6 @@
126140
expect(exploded_config[0]['event_hub_connections'][0].value).to eql('Endpoint=sb://logstash/;SharedAccessKeyName=activity-log-readonly;SharedAccessKey=something;EntityPath=event_hub1')
127141
end
128142
end
129-
130-
131143
end
132144

133145
describe "Advanced Config" do
@@ -221,32 +233,45 @@
221233
unregister_counter.incrementAndGet
222234
completable_future
223235
}
224-
expect(EventProcessorHost).to receive(:new).at_most(3).times {|host_name, event_hub_name, consumer_group, event_hub_connection, storage_connection, container, executor|
225-
case event_hub_name
236+
237+
build_step_mock = double("final build step")
238+
expect(build_step_mock).to receive(:build).at_most(3).times.and_return(mock_host)
239+
240+
executor_step = double("executor step")
241+
expect(executor_step).to receive(:setExecutor).at_most(3).times.and_return(build_step_mock)
242+
243+
mock_connection_string = double("connection string")
244+
expect(mock_connection_string).to receive(:useEventHubConnectionString).at_most(3).times {|event_hub_connection|
245+
case event_hub_connection
246+
when /.*event_hub_name0$/
247+
expect(event_hub_connection).to eql(config['event_hubs'][0]['event_hub_name0']['event_hub_connections'][0].value)
248+
when /.*event_hub_name1$/
249+
expect(event_hub_connection).to eql(config['event_hubs'][1]['event_hub_name1']['event_hub_connections'][0].value)
250+
end
251+
executor_step
252+
}
253+
254+
managers_mock = double("checkpoint and lease managers")
255+
expect(managers_mock).to receive(:useUserCheckpointAndLeaseManagers).at_most(3).times {|checkpoint_mngr, lease_mngr|
256+
assertion_count.incrementAndGet
257+
mock_connection_string
258+
}
259+
expect(managers_mock).to receive(:useAzureStorageCheckpointLeaseManager).at_most(3).times {|storage_connection_str, storage_container, storage_blob_prefix|
260+
case storage_container
226261
when 'event_hub_name0'
227-
if consumer_group.eql?('cg')
228-
assertion_count.incrementAndGet
229-
expect(host_name).to start_with('logstash')
230-
expect(event_hub_connection).to eql(config['event_hubs'][0]['event_hub_name0']['event_hub_connections'][0].value)
231-
expect(storage_connection).to eql(config['event_hubs'][0]['event_hub_name0']['storage_connection'].value)
232-
expect(container).to eql('event_hub_name0') # default
233-
elsif consumer_group.eql?('ls')
234-
assertion_count.incrementAndGet
235-
expect(event_hub_connection).to eql(config['event_hubs'][2]['event_hub_name0']['event_hub_connections'][0].value)
236-
# in this mode, storage connection and container are replaced with in memory offset management
237-
expect(storage_connection).to be_kind_of(InMemoryCheckpointManager)
238-
expect(container).to be_kind_of(InMemoryLeaseManager)
239-
end
240-
when 'event_hub_name1'
241262
assertion_count.incrementAndGet
242-
expect(host_name).to start_with('logstash')
243-
expect(event_hub_connection).to eql(config['event_hubs'][1]['event_hub_name1']['event_hub_connections'][0].value)
244-
expect(storage_connection).to eql(config['event_hubs'][1]['event_hub_name1']['storage_connection'].value)
245-
expect(container).to eql(config['event_hubs'][1]['event_hub_name1']['storage_container'])
263+
when 'alt_container'
264+
assertion_count.incrementAndGet
246265
end
266+
mock_connection_string
267+
}
268+
269+
expect(EventProcessorHost::EventProcessorHostBuilder).to receive(:newBuilder).at_most(3).times {|host_name, consumer_group|
270+
expect(host_name).to start_with('logstash')
247271
host_counter.incrementAndGet
248-
mock_host
272+
managers_mock
249273
}
274+
250275
# signal the stop first since the run method blocks until stop is called.
251276
input.do_stop
252277
input.run(mock_queue)
@@ -259,12 +284,14 @@
259284
it "can create an in memory EPH" do
260285
#event_hub, event_hub_name, scheduled_executor_service
261286
exploded_config = input.event_hubs_exploded
287+
# During build step Azure libraries does a syntax check of EventHub connection string, so needs to be a pseudo real
288+
exploded_config[0]['event_hub_connections'] = [::LogStash::Util::Password.new("Endpoint=sb://logstash.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=blabla;EntityPath=ops-logs")]
262289
input.create_in_memory_event_processor_host(exploded_config[0], exploded_config[0]['event_hubs'].first, nil)
263290
end
264291
end
265292

266293
describe "Bad Basic Config" do
267-
describe "Offset overwritting" do
294+
describe "Offset overwriting" do
268295
let(:config) do
269296
{
270297
'event_hub_connections' => ['Endpoint=sb://...;EntityPath=event_hub_name0', 'Endpoint=sb://...;EntityPath=event_hub_name0'],

0 commit comments

Comments
 (0)