Maas client consist of two parts:
- maas-client-core - Tiny, pure java client to MaaS
- rabbit-context-propagation - utility classes for propagation and restoration of B/G version context
- maas-client-quarkus - Core client with beans for Quarkus now located at: https://github.com/Netcracker/qubership-core-quarkus-extensions
Thin layer to MaaS with following requirements in mind:
- as simple as it possible
- minimal external dependencies
- stay framework, cloud and platforms agnostic
Important notice to developer: import classes from api packages and avoid explicit importing of impl packages in your code as much as you can.
Developing MaaS client is rules of thumb in mind were:
apipackages will be backward compatible across minor releases and patchesimplpackages may contain changes that can breaks backward compatibility
All changes in API across library releases will be conformed to semantic versioning rules (See https://semver.org/)
Please ensure, that your pod runtime environment variables contains:
NAMESPACE- namespace name in which microservice is deployedCLOUD_SERVICE_NAME- name of microservice
To add missed variables to your pod runtime environment, you need to edit your deployment chart files.
First of all we need to create instance of MaaSAPIClient. Default implementation for this interface is MaaSAPIClientImpl. MaaSAPIClientImpl requires single parameter to instantiate - M2M auth token supplier. This token will be used to:
- interact with maas-agent microservice
- subscribe to tenant-manager tenant activation/deactivation events (tenant-topics feature)
- subscribe to control-plane service to watch on B/G version deploy/promote/rollback events
It's simple constructor call with token from cloud core libraries m2m-manager:
MaaSClient client = new MaaSAPIClientImpl(() -> M2MManager.getInstance().getToken().getToken());All MaaS operations for Kafka is collected in KafkaMaaSClient. To obtain new instance of MaaS Kafka client just call:
KafkaMaaSClient kafkaClient = client.getKafkaClient();To avoid explicit dependency to Kafka clients, maas client only provide various info about Kafka topic:
- brokers addresses
- auth mathod
- name of topic
- topic options and configs
MaaS Client doesn't provide methods to create KafkaProducer or KafkaConsumer. So developer can freely choose more suitable version of kafka client library for his needs. Get or create topic and create KafkaProducer to it:
// search existing or request for new topic by MaaS, address structure contains all
// required info to create connection to Kafka broker instances
var address = kafkaClient.getOrCreateTopic(new Classifier("invoices"), TopicCreateOptions.DEFAULTS);
// transform address to connection properties needed to KafkaProducer/KafkaConsumer instantiation
var props = address.formatConnectionProperties()
.orElseThrow(() -> new IllegalArgumentException("Unable to construct connection properties to Kafka"));
// create KafkaProducer instance
try(var producer = new KafkaProducer<Integer, String>(props, new IntegerSerializer(), new StringSerializer())) {
...
}Produce record for tenant topics:
TopicAddress topicAddress = kafkaClient.getTopic(new Classifier("orders").tenantId(TenantContext.get()))
.orElseThrow(() -> new RuntimeException("Topic `orders' not found. Configuration or deployment processing error?"));
ProducerRecord<Integer, String> record = new ProducerRecord<>(
topicAddress.getTopicName(),
order.getOrderId(),
mapper.writeValueAsString(wrapped));
// example how to create KafkaProducer look at the previous code example
kafkaProducer.send(record);Consumer for tenant-topics is much complex, because of runtime nature of tenants. Tenant may be created and activated of deactivated in runtime. And consumer in microservice have to dynamically subscribe/unsubscribe to topics created to new tenants.
MaaSKafkaClient provide convenient method to manage topic subscriptions on tenants list change. Callback is called at least once on application startup to simplify initial microservice subscriptions code.
kafkaClient.watchTenantTopics("orders", topics -> {
// perform subscribe/unsubscribe to given topics
});It is crucial to save and restore context during message processing. Moreover, its is manadatory requirements to correctly filtering messages in Blue/Green deployment. To serialize current execution context into message headers just call utility method:
import com.netcracker.cloud.maas.client.context.kafka.KafkaContextPropagation;
var record = new ProducerRecord<...>(
topicName,
partition,
messageKey,
messageValue,
KafkaContextPropagation.propagateContext() // dump context to message headers
);Context propagation methods is in class KafkaContextPropagation located in module:
<dependency>
<groupId>com.netcracker.cloud.maas.client</groupId>
<artifactId>kafka-context-propagation</artifactId>
</dependency>
To restore context from received message into current execution thread use:
ConsumerRecord message = ...
KafkaContextPropagation.restoreContext(message.headers());All MaaS operations for RabbitMQ is collected in RabbitMaaSClient. To obtain new instance of MaaS RabbitMQ client just call:
RabbitMaaSClient rabbitClient = client.getRabbitClient();Despite of Kafka approach where classifier is pointed to topic, classifier used for rabbit is pointed to VHost entity in RabbitMQ. So to locate or create VHost you need:
VHost vhost = rabbitClient.getOrCreateVirtualHost(new Classifier("commands"));VHost entity contains exhaustive information about vhost location and credentials needed for connection to RabbitMQ instance.
If you want to just get vhost (without its creation in case it doesn't exist):
VHost vhost = client.getVirtualHost(new Classifier("commands"));In Blue/Green deployments you need to save and restore context information about original request version. Because version exchange
created for versionedEntities rely on version message header, you need to save http X-Version header value to message headers. Also, you need to
restore version value to microservice execution context on message receiver side.
MaaS Client offers utility class to simplify these tasks. Include dependency to:
<dependency>
<groupId>com.netcracker.cloud.maas.client</groupId>
<artifactId>rabbit-context-propagation</artifactId>
<version>{version}</version>
</dependency>To save version context to message you can use:
AMQP.BasicProperties props = new AMQP.BasicProperties();
// save version value to message headers
props = RabbitContextPropagation.propagateContext(props);
channel.basicPublish("my-exchange", routingKey, props, data);To restore context from message to consumer thread:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
....
// restore b/g version context from message headers
RabbitContextPropagation.restoreContext(delivery);
...
}Details here