Skip to content

fix: implement Exception Handling in Kafka Publisher #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

sancheet230
Copy link

@sancheet230 sancheet230 commented Mar 6, 2025

For issue #431

Description
This pull request enhances the PublisherServiceImpl class by incorporating robust exception handling mechanisms during Kafka message publishing. These improvements aim to ensure reliable message delivery and facilitate effective error management within our Kafka producer implementation.

Prior to this update, the PublisherServiceImpl class lacked comprehensive exception handling during message publication. This omission posed risks of unhandled exceptions leading to application instability and potential message loss. Implementing proper exception handling is crucial for maintaining the reliability and resilience of the Kafka producer.

By integrating these exception handling strategies, the Kafka producer's resilience and reliability are significantly enhanced. These modifications ensure that message delivery failures are appropriately managed, thereby reducing the risk of data loss and improving overall system stability.

…eption handling mechanisms during Kafka message publishing.
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welcome to AsyncAPI. Thanks a lot for creating your first pull request. Please check out our contributors guide useful for opening a pull request.
Keep in mind there are also other channels you can use to interact with AsyncAPI community. For more details check out this issue.

@sancheet230 sancheet230 changed the title Implement Exception Handling in Kafka Publisher fix: Implement Exception Handling in Kafka Publisher Mar 6, 2025
@sancheet230 sancheet230 changed the title fix: Implement Exception Handling in Kafka Publisher fix: implement Exception Handling in Kafka Publisher Mar 6, 2025
@sancheet230
Copy link
Author

sancheet230 commented Mar 6, 2025

@Tenischev @derberg @asyncapi-bot-eve Can you please review these changes

@sancheet230
Copy link
Author

sancheet230 commented Mar 20, 2025

@asyncapi-bot @asyncapi-bot-eve Can you check this PR out

@Tenischev
Copy link
Member

Hi @sancheet230
I agree that implementation example could be improved, but please keep in mind that this repository provides the template that should generate code based on provided AsynAPI documentation.
Most of all my review comments related to the changes of template names and parameters to the hardcoded.
Please run tests and adjust for your changes.

@sancheet230
Copy link
Author

@Tenischev Replaced deprecated ListenableFutureCallback with CompletableFuture, improved async handling, added proper exception handling for sync sends, and ensured template-driven dynamic method generation

@sancheet230 sancheet230 requested a review from Tenischev March 20, 2025 13:26
@sancheet230
Copy link
Author

sancheet230 commented Mar 20, 2025

@Tenischev I have done the changes PTAL

Copy link
Member

@Tenischev Tenischev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you finish with update of the PublisherService, please run and update tests

{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
public void {{methodName}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) {
public void {{methodName}}Async(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update method name in the PublisherService interface accordingly

}

private String get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) {
public void {{methodName}}Sync(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this method to the PublisherService interface

Comment on lines -9 to -15
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is required for payload object of the message

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants