Skip to content

Commit 045973e

Browse files
authored
Fix/get host context (#93)
* Fix `getHostContext` method accessibility This commit works around a recent change to jruby where calls to `java_send` to package private accessible Java methods will fail with `(NoMethodError) undefined method `getHostContext'` errors. This commit works around this by using Java reflection to make the method accessible. I have verified that this is the only package private method that we call on that class. Fixes #91
1 parent c12bb2c commit 045973e

File tree

5 files changed

+46
-17
lines changed

5 files changed

+46
-17
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.4.9
2+
- Fixed issue with `getHostContext` method accessibility, causing plugin not to be able to run [#93](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/93)
3+
14
## 1.4.8
25
- Fixed connection placeholder replacements errors with Logstash `8.15.1` and `8.15.2` [#92](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/92)
36

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.4.8
1+
1.4.9

lib/logstash-input-azure_event_hubs.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
require_jar('com.microsoft.azure', 'qpid-proton-j-extensions', '1.2.4')
66
require_jar('com.microsoft.azure', 'azure-eventhubs-eph', '2.5.2')
77
require_jar('com.microsoft.azure', 'azure-storage', '8.6.6')
8-
require_jar('com.google.code.gson', 'gson', '2.8.5')
8+
require_jar('com.google.code.gson', 'gson', '2.10.1')
99
require_jar('org.apache.qpid', 'proton-j', '0.33.9')

lib/logstash/inputs/azure_event_hubs.rb

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -411,20 +411,7 @@ def run(queue)
411411
scheduled_executor_service)
412412
else
413413
@logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
414-
checkpoint_manager = InMemoryCheckpointManager.new
415-
lease_manager = InMemoryLeaseManager.new
416-
event_processor_host = EventProcessorHost.new(
417-
EventProcessorHost.createHostName('logstash'),
418-
event_hub_name,
419-
event_hub['consumer_group'],
420-
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
421-
checkpoint_manager,
422-
lease_manager,
423-
scheduled_executor_service,
424-
nil)
425-
#using java_send to avoid naming conflicts with 'initialize' method
426-
lease_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
427-
checkpoint_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
414+
event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
428415
end
429416
options = EventProcessorOptions.new
430417
options.setMaxBatchSize(max_batch_size)
@@ -496,4 +483,38 @@ def run(queue)
496483
@logger.debug("interrupted while waiting to close executor service, this can generally be ignored", :exception => e, :backtrace => e.backtrace) if e
497484
end
498485
end
486+
487+
def create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
488+
checkpoint_manager = InMemoryCheckpointManager.new
489+
lease_manager = InMemoryLeaseManager.new
490+
event_processor_host = EventProcessorHost.new(
491+
EventProcessorHost.createHostName('logstash'),
492+
event_hub_name,
493+
event_hub['consumer_group'],
494+
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
495+
checkpoint_manager,
496+
lease_manager,
497+
scheduled_executor_service,
498+
nil)
499+
host_context = get_host_context(event_processor_host)
500+
#using java_send to avoid naming conflicts with 'initialize' method
501+
lease_manager.java_send :initialize, [HostContext], host_context
502+
checkpoint_manager.java_send :initialize, [HostContext], host_context
503+
event_processor_host
504+
end
505+
506+
private
507+
508+
# This method is used to get around the fact that recent versions of jruby do not
509+
# allow access to the package private protected method `getHostContext`
510+
def get_host_context(event_processor_host)
511+
call_private(event_processor_host, 'getHostContext')
512+
end
513+
514+
def call_private(clazz, method)
515+
method = clazz.java_class.declared_method(method)
516+
method.accessible = true
517+
method.invoke(clazz)
518+
end
519+
499520
end

spec/inputs/azure_event_hub_spec.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@
207207
register_counter = java.util.concurrent.atomic.AtomicInteger.new
208208
unregister_counter = java.util.concurrent.atomic.AtomicInteger.new
209209
assertion_count = java.util.concurrent.atomic.AtomicInteger.new
210+
allow(input).to receive(:get_host_context) {mock_host_context}
210211
allow_any_instance_of(InMemoryLeaseManager).to receive(:java_send)
211212
allow_any_instance_of(InMemoryCheckpointManager).to receive(:java_send)
212213

213-
allow(mock_host).to receive(:getHostContext) {mock_host_context}
214214
allow(mock_host_context).to receive(:getEventHubPath) {"foo"}
215215

216216
expect(mock_host).to receive(:registerEventProcessorFactory).at_most(3).times {
@@ -256,6 +256,11 @@
256256
expect(assertion_count.get).to be == 3
257257
end
258258

259+
it "can create an in memory EPH" do
260+
#event_hub, event_hub_name, scheduled_executor_service
261+
exploded_config = input.event_hubs_exploded
262+
input.create_in_memory_event_processor_host(exploded_config[0], exploded_config[0]['event_hubs'].first, nil)
263+
end
259264
end
260265

261266
describe "Bad Basic Config" do

0 commit comments

Comments
 (0)