-
Notifications
You must be signed in to change notification settings - Fork 6
Kasper Programming
- Messages, Handlers and Responses
- What is a Message ?
- Handlers
- Responses
- Context is anywhere!
- Programming in command area
- The command area
- Domain Driven Design principle in Kasper
- What is a Domain Driven Design ?
- Aggregates
- Modeling a Concept
- Modeling a Linked Concept
- Modeling a Relation
- Modeling a Value object
- Defining a command
- Defining a command handler
- Defining a repository
- Programming in query area
- The query area
- Defining a query
- Defining a query result
- Defining a query handler
- Caching a query response
- Filtering a query
- Talking with Kasper
- Using gateways
- Through exposure
- Emit an Command through HTTP exposure
- Emit an Query through HTTP exposure
- Emit an Event through HTTP exposure
- Using Kasper client
Most of the requests received by a Kasper application are encapsulated by a Message.
A message is basically a carrying an input with his context. This message will be treated by the related handler that will produce a response.
An handler is nothing more than a class extending com.viadeo.kasper.core.component.Handler that provide several methods allowing to process the message with or without its contains.
package com.viadeo.kasper.core.component;
import com.viadeo.kasper.api.response.KasperResponse;
public interface Handler<MESSAGE extends KasperMessage<INPUT>, RESPONSE extends KasperResponse, INPUT> {
RESPONSE handle(MESSAGE message);
Class<INPUT> getInputClass();
Class<?> getHandlerClass();
}An handler can be implemented by both COMMAND and QUERY architectural area. The message are dispatched to the related handler by the gateways or the event bus.
All handlers return a response indicating the Status of the requested message.
public static enum Status {
OK,
REFUSED,
ERROR,
ACCEPTED,
FAILURE,
SUCCESS
}The responses according to its role can carry only the status of the requested message or carrying a QueryResult.
Inside a Kasper application, the Context is used anywhere. It allows to carry technical information, describing the caller situation, etc...
A context its fully immutable and should be a companion of only one input (Command, Query or Event) inside a message.
The context provide additional information in more of the input itself that must be sufficient.
To ease its instantiation you can use Contexts.
private Contexts() {}
public static Context.Builder newFrom(final Context context) {
return new Context.Builder(checkNotNull(context));
}
public static Context.Builder builder() {
return new Context.Builder();
}
public static Context.Builder builder(UUID kasperCorrelationId) {
return new Context.Builder(kasperCorrelationId, 1);
}
public static Context empty() {
return builder().build();
}
}The child of the initial context must be pass at each rebounds done by the handler. In order to increment properly the sequence without lose any information from the initial context. :warning: A context is not a response don't use it like this!
The command side is focused on processing of commands, maintenance of model coherency and data persistence with ACID properties.
Commands are created by the client application and then sent to the domain layer. Commands are messages that instruct a specific entity to perform a certain action. Commands are named like DoSomethingCommand (for example, ChangeMyNameCommand, DeleteAnOrderCommand ...). They instruct the target entity to do something that might result in different outcomes or fail. Commands are handled by command handlers.
All commands will be sent to the command bus which will delegate each command to the matching command handler. This demonstrates that there is only one entry point into the domain. The responsibility of the command handlers is to execute the appropriate domain behavior. Command handlers should have a connection to the repository to provide the ability to load the needed aggregate (entity) on which behavior will be executed.

The command handler performs the following tasks:
- It receives the Command instance from the messaging infrastructure (Command Bus)
- It validates that the Command is a valid Command
- It locates the aggregate instance that is the target of the Command.
- It invokes the appropriate method on the aggregate instance passing in any parameter from the command.
- It persists the new state of the aggregate to storage (repository).
Using Kasper framework you'll have to define one and only one command handler per defined command.
The command area is accessible and must be accessed by the related gateway.
DDD is an analysis and design approach that encourages you to use models and a ubiquitous language to bridge the gap between the business and the development team by fostering a common understanding of the domain. Of necessity, the DDD approach is oriented towards analyzing behavior rather than just data in the business domain, and this leads to a focus on modeling and implementing behavior in the software. A natural way to implement the domain model in code is to use commands and events.
The model is a set of concepts built up in the heads of people on the project, with terms andr elationships that reflect domain insight. These terms and interrelationships provide the semantics of a language that is tailored to the domain while being precise enough for technical development.
-- Eric Evans, "Domain-Driven Design: Tackling Complexity in the Heart of Software," p23.
This list helps to capture the idea of a model, but is no substitute for reading the book to gain a deeper understanding of the concept:
- Models should be bound to the implementation.
- You should cultivate a language based on the model.
- Models should be knowledge rich.
- You should brainstorm and experiment to develop the model.
In more DDD allows to divide large, complex systems into more manageable units known as bounded contexts. A bounded context defines the context for a model:
Explicitly define the context within which a model applies. Explicitly set boundaries in terms of team organization, usage within specific parts of the application, and physical manifestations such as code bases and database schemas. Keep the model strictly consistent within these bounds, but don't be distracted or confused by issues outside.
--Eric Evans, "Domain-Driven Design: Tackling Complexity in the Heart of Software," p335.
The reasons for identifying context boundaries for your domain models are not necessarily the same reasons for choosing the portions of the system that should use the CQRS pattern. In DDD, a bounded context defines the context for a model and the scope of a ubiquitous language. You should implement the CQRS pattern to gain certain benefits for your application such as scalability, simplicity, and maintainability. Because of these differences, it may make sense to think about applying the CQRS pattern to business components rather than bounded contexts.
In summary, you should not apply the CQRS pattern to the top level of your system. You should clearly identify the different portions of your system that you can design and implement largely independently of each other, and then only apply the CQRS pattern to those portions where there are clear business benefits in doing so.
Kasper aggregates must be written as event-sourced aggregates in order to ensure that every mutation will generate an associated event :
- do not directly mutate the aggregate using a public method (even through constructor)
- use events and event handlers within the aggregate
- apply() will immediately apply the event to the aggregate (eg. calling the declared event handlers within the aggregate)
- once applied the event will be generalized (recorded to the unit of work for further publication)
exemple:
public void mutate(Object value) {
apply(new MutationEvent(value, DateTime.now()));
}
@EventHandler
private void onMutated(MutationEvent event) {
this.value = event.getValue();
this.lastUpdate = event.getLastModificationDate();
}Entity-store repositories (which directly extends Repository instead of EventSourcedRepository) will need to construct aggregates, they can have different strategies for that :
Generate event(s) from the entity store, build an empty aggregate and call their event handlers but the handlers have then to be public (deprecated).
Call a direct constructor of the aggregate which is reserved for this usage
Provide an internal static builder within the aggregate (preferred way).
In case you have to choose the second or third strategy, annotate your constructor with the @XKasperEntityStoreCreator marker.
private Viewer_hasFollowed_Company() { }
@XKasperEntityStoreCreator
public static Member_hasFollowed_Member build(ID followerMemberID, ID followedMemberID, int privacy, boolean followed, DateTime lastModificationDate) {
Member_hasFollowed_Member aggregate = new Member_hasFollowed_Member();
aggregate.setId(new RelationID(followerMemberID, followedMemberID);
aggregate.privacy = privacy;
aggregate.followed = followed;
aggregate.lastModificationDate = lastModificationDate;
return aggregate;
}A concept aggregate root is the base entity of a model. It is a persisted business entity as you can understand it in many other object models.
ex: a Car, a Member, a BlogPost, a Forum, a Job, ...
As being the root of an aggregate, this kind of entity can hold references to component concepts and relations, but must not contain any direct reference to other aggregate roots without a intermediate relation entity.
An aggregate root in Kasper is not necessarily a composition of several entities, it can just be a standalone object, persistable.
@XKasperConcept(domain = Twitter.class, label = "Member", description = "A member")
public class Member extends Concept {...}Never enclose a concept instance within another one.
Use instead a LinkedConcept which is a typed KasperID.
This allows a better identification of links between concepts.
A relation aggregate root is used to connect two concept aggregate roots with some optional metadata.
This implies that the two connected concept aggregate roots can exists within the system independently, without this relation.
A relation is by default unidirectional, a concept root A is connected to a concept root B. Adding the annotation @XBidirectional to a relation makes it understandable as a bidirectional relation.
Kasper encourage to use a specific class names nomenclature for relations :
ex of relation verbs: connectedTo, friendWith, likedBy/likes, shares/sharedBy
@XKasperRelation(
domain = Twitter.class,
verb = "followedBy",
description = "A member has followed another member")
public class Member_hasFollowed_Member extends Relation<Member, Member> {...}Value Object: An object that contains attributes but has no conceptual identity. They should be treated as immutable.
--Wikipedia - DDD value object
Example: When people exchange dollar bills, they generally do not distinguish between each unique bill; they only are concerned about the face value of the dollar bill. In this context, dollar bills are value objects. However, the Federal Reserve may be concerned about each unique bill; in this context each bill would be an entity.
A value object is well.. just a value object..
But Kasper framework propose you two interfaces in order to better identify them and reinforce some good practices and constraints.
A value object is immutable.
If you want to create a value object you can the interface Value.
The Value interface will force you to implement the Serializable interface and propose you to not miss the implementation of the methods toString(), hashCode() and equals().
Sometimes you just want to create a value object around one unique other type (primitive or not) and add management methods to this enclosing value. Kasper framework propose you the EnclosingValue abstract class.
A command is an immutable anemic object (DTO, Data Transfer Object), implementing the interface Command whose class name ends by 'Command'.
A command can optionally declares some metadata using the @XKasperCommand annotation.
A Command is part of a domain API.
The abstract class CreateCommand can be used to define an entity creation command, which contains an id to be used as identifier for the entity to be created.
Two other abstract classes can be used for entity updates and deletion : UpdateCommand and DeleteCommand
@XKasperCommand(description = "Send a response to an existing hello message")
public class NoticeTheWorldCommand implements Command {
@NotNull( message = NOT_PROVIDED_HELLO_MSG )
@Length( min = MIN_HELLO_LENGTH, max = MAX_HELLO_LENGTH, message = BAD_LENGTH_HELLO_MSG )
@Pattern( regexp = REGEX_HELLO, message = INVALID_HELLO_MSG )
private final String notice;
public NoticeTheWorldCommand(final String notice) {
this.notice = notice;
}
public String getNotice() {
return this.notice;
}
}A command handler is an object implementing CommandHandler<Command>, whose class name ends with 'CommandHandler'.
It can also extend BaseCommandHandler<Command> which is the base implementation of an CommandHandler, or extend AutowiredCommandHandler<Command> which contains already the EventBus, the CommandGateway and the RepositoryManager.
A command handler have to declares its owning domain into the annotation @XKasperCommandHandler.
A command handler is part of the COMMAND area.
The interface class EntityCommandHandler<Command, Entity>, implemented by abstracts classes BaseEntityCommandHandler<Command, Entity> and AutowiredEntityCommandHandler<Command, Entity>, can be used to stick a command handler to a specific entity, it defines a method getRepository() used to retrieve easily the repository corresponding to this entity. This interface class must generally be used when defining a command mainly dedicated to create, modify and delete a domain entity.
@XKasperCommandHandler(
domain = HelloDomain.class,
description = "Send a response to an existing Hello message"
)
public class NoticeTheWorldCommandHandler extends AutowiredCommandHandler<NoticeTheWorldCommand> {
private final KasperEventBus eventBus;
@Inject
public NoticeTheWorldCommandHandler(KasperEventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public CommandResponse handle(final NoticeTheWorldCommand command) {
this.eventBus.publish(
new NoticeSentToTheWorldEvent(
command.getNotice()
)
);
return CommandResponse.ok();
}
}If you need to retrieve a different repository, use the method getRepositoryOf :
If you need to send non-domain events from the handler (available with AutowiredCommandHandler and AutowiredEntityCommandHandler), use this.publish(event), do not try to inject the event bus unless your event will not be sent during unit of work commit process.
Repository: methods for retrieving domain objects should delegate to a specialized Repository object such that alternative storage implementations may be easily interchanged.
--Wikipedia - DDD repository
A DDD repository is used to manage with entities persistence, and as the only persistable entities in Kasper framework are the aggregate roots then Kasper repositories are bound to a specific aggregate root.
In order to create a Kasper repository you have to extend Repository<AggregateRoot> annotating it with the @XKasperRepository annotation.
@XKasperRepository(description = "Store Hello entities")
public class HelloRepository extends Repository<Hello> {
final KeyValueStore store = HelloMessagesIndexStore.db;
final KeyValueStore businessStore = HelloMessagesByBuddyBusinessIndexStore.db;
private String businessKey(final Hello aggregate) {
return String.format(
"%d-%d",
aggregate.getForBuddy().hashCode(),
aggregate.getMessage().hashCode()
);
}
@Override
protected Optional<Hello> doLoad(final KasperID aggregateIdentifier, final Long expectedVersion) {
final Optional<Hello> hello = store.get(aggregateIdentifier);
if (hello.isPresent()) {
return Optional.of(hello.get());
} else{
return Optional.absent();
}
}
@Override
protected void doSave(final Hello aggregate) {
store.set(aggregate.getIdentifier(), aggregate);
try {
businessStore.set(businessKey(aggregate), true);
} catch (final Exception e) {
store.del(aggregate.getIdentifier());
throw e;
}
}
@Override
protected void doDelete(final Hello aggregate) {
this.doSave(aggregate);
}
@Override
protected boolean doHas(final KasperID id) {
return store.has(id);
}
public boolean hasTheSameMessage(final Hello aggregate) {
return businessStore.has(businessKey(aggregate));
}
}You can also add new public methods to this repository in order to access to your business indexes (logically hosted in your COMMAND area). These methods can later be accessed from command handlers using (ClientRepository).business()
Repositories are then accessed using the methods load(), get(), has() or add(), generally in command handlers only.
The load() method loads an entity from the repository and marks it so it will be automatically saved (doSave() will be called on your aggregate) on unit of work commit, while the get() method only load the aggregate without marking it to be automatically saved.
The has() method is not implemented by default, you'll have to override the doHas() method on your repository if you want this feature.
There is no delete() method on the repository. To delete an aggregate you have to create a specific method/handler on your loaded aggregate which calls the markDeleted() protected method internally. The aggregate is then marked as deleted, the doDelete() repository method will then be called once the unit of work is commited. You are heavily encouraged to never delete data in your domains by just marking them as deleted. So in major cases doDelete() can just call doSave(), the loading of entities in Kasper repositories will take care of not loading deleted aggregates.
The doSave() method is use for entity creation AND update. If your backend needs to make the difference between a creation or an update, you can :
- test aggregate.getVersion() for nullity in the doSave() method (newly created entities does not have a version)
- or implement the doUpdate() method, so entity creation will be automatically made calling doSave() and updates through doUpdate()
The Repository abstract class mut be considered as an entity store : the current state of entities is stored, then events will be sent by the unit of work once entity is persisted. If you want to apply a real event sourcing strategy, use instead the EventSourcedRepository supplying it an Axon EventStore.
The queries will only contain the methods for getting data. From an architectural point of view these would be all methods that return DTOs (Data Transfer Objects), or QueryResult, that the clients consumes.

The query handler performs the following tasks:
- It receives the Query instance
- It validates that the Query is a valid Query, the default abstract Kasper query handler will validates the Query against JSR-303 annotations
- It invokes the appropriate view to collect data.
- It returns the state at the request moment as result.
The query area is accessible and must be accessed by the related gateway.
A Kasper query is an immutable, anemic object used to request the platform for data, its class name ends with the suffix 'Query' (recommended).
A 'Query' must be a part of a domain API.
It is used by the client in order to send requests and by the query handler to understand the request and filter the indexed data.
A Kasper query has to implement the interface Query and can optionally defines some metadata using the @XKasperQuery annotation.
@XKasperQuery(description = "Retrieve all submitted Hello messages for a specific buddy")
public class GetAllHelloMessagesSentToBuddyQuery implements Query {
@NotNull( message = NOT_PROVIDED_BUDDY_MSG )
@Length( min = MIN_BUDDY_LENGTH, max = MAX_BUDDY_LENGTH, message = BAD_LENGTH_BUDDY_MSG )
@Pattern( regexp = REGEX_BUDDY, message = INVALID_BUDDY_MSG)
private final String forBuddy;
public GetAllHelloMessagesSentToBuddyQuery(final String forBuddy) {
this.forBuddy = checkNotNull(forBuddy);
}
public String getForBuddy() {
return this.forBuddy;
}
}Some sub interfaces are available in standard by Kasper framework :
com.viadeo.kasper.cqrs.query.OrderedQuery : can be implemented when the response can be ordered. com.viadeo.kasper.cqrs.query.PaginatedQuery : can be implemented when the response can be paginated.
A Kasper query result is an immutable, anemic object used by a query handler to send back data to the requesting client, it ends with the suffix ‘QueryResult‘ (recommended).
A 'QueryResult' must be a part of a domain API.
A Kasper query result has to implement the interface QueryResult and can optionally defines some metadata using the @XKasperQueryResult annotation.
@XKasperQueryResult(description = "Result.")
public class HelloMessageResult extends IndexedEntity implements QueryResult {
public static final String ENTITY_NAME = "Hello";
private final String message;
public HelloMessageResult(final KasperID id,
final Long version, final DateTime lastModificationDate,
final String message) {
super(id, ENTITY_NAME, version, lastModificationDate);
this.message = checkNotNull(message);
}
public String getMessage() {
return this.message;
}
}Some sub classes are available in standard by Kasper framework :
PaginatedQueryResult : can be implemented when the response is paginated. EntityQueryResult : contains basis entity fields like the type, the id and the version CollectionQueryResult : a collection of unit query results MapQueryResult : a map of string-indexed unit query results
A Kasper query handler is I/O component using a Query as input and responsible to return a QueryResult.
A Query service is part of the QUERY architectural area.
It has to implements QueryHandler<Query, QueryResult>, or extends the base implementation BaseQueryHandler<Query, QueryResult> or the more complete AutowiredQueryHandler<Query, QueryResult> (containing the EventBus and the QueryGateway) and specify its owning domain with the @XKasperQueryHandler annotation and ends with the 'QueryHandler' suffix (recommended).
usage
By extending AutowiredQueryHandler, you only have to implement at least one handle() method :
@XKasperQueryHandler(domain = HelloDomain.class)
public class GetAllHelloMessagesSentToBuddyQueryHandler extends AutowiredQueryHandler<GetAllHelloMessagesSentToBuddyQuery, HelloMessagesResult> {
private KeyValueStore store = HelloMessagesIndexStore.db;
@Override
@SuppressWarnings("unchecked")
public QueryResponse<HelloMessagesResult> handle(final GetAllHelloMessagesSentToBuddyQuery query) throws KasperQueryException {
final String forBuddy = query.getForBuddy();
Collection<HelloMessageResult> ret = Lists.newArrayList();
if (store.has(forBuddy)) {
/** Index directly contains the structure to be returned, no manipulation needed here */
ret = ((Map<KasperID, HelloMessageResult>) store.get(forBuddy).get()).values();
}
return QueryResponse.of(new HelloMessagesResult(Lists.newArrayList(ret)));
}
}Kasper framework provides a way to cache query responses based on the submitted query, the cache is enabled per QueryHandler and is disabled by default.
It is based on JSR 107 - JCache for selecting a cache implementation. By default no cache implementation is provided by the framework you can use any implementation of JCache (for example using ehcache-jcache).
To enable the cache for a query handler with default configuration, just put @XKasperQueryCache annotation:
The default behaviour will be to use the QueryAttributesKeyGenerator for computing the key of the query and use a ttl of one hour.
QueryAttributesKeyGenerator is using the hashcode of your query if no key is defined, otherwise it will combine the hashcode of the keys.
Use only someField and anotherField in the generated key and have a ttl of 1 minute:
@XKasperQueryCache(keys = {"someField", "anotherField"}, ttl=60)You can also have custom KeyGenerators, to do so just implement QueryCacheKeyGenerator and enable it:
@XKasperQueryCache( keyGenerator = MyKeyGenerator.class )Kasper framework provides a way to add an interceptor to a specific query.
To add interceptor for a query handler, just put @XKasperQueryFilter annotation:
@XKasperQueryFilter({NormalizeBuddyQueryInterceptor.class})From a platform instance of your Kasper application, you can retrieve gateways in order to dispatch your requests to the wanted area or directly publish an event on the bus.
exemple:
Platform platform = Platforms.newSpringPlatformBuilder().build();
platform.getCommandGateway().sendCommand(new NoticeTheWorldCommand("Hello"), Contexts.empty());
QueryResponse<HelloMessagesResult> queryResponse = platform.getQueryGateway().retrieve(new GetAllHelloMessagesSentToBuddyQuery("Chuck"), Contexts.empty());
platform.getEventBus.publish(Contexts.empty(), new NoticeSentToTheWorldEvent("Hello"))From an AutowiredCommandHandler, AutowiredQueryHandler, or AutoriwedEventListener, you can retrieve the related gateway corresponding to area and event bus.
A Kasper application using an exposition allows automatically to expose commands, queries and events. We can request any Command, Query, and Event through these points.
Actually Kasper provide an HTTP exposition allowing to exchange JSON messages with custom headers corresponding to our inputs.
Commands are submitted using POST or PUT requests, there are no query parameters, everything is in the body. Actually only json content is supported as input and output.
$ curl -XPOST -H "X-KASPER-SECURITY-TOKEN: 0031" -H "Content-Type: application/json" -d "{ \"response\": \"Hello\" }" http://localhost:8080/kasper/command/NoticeTheWorld
{"status":"OK","reason":false,"reasons":[]}Queries are requested using POST or PUT requests, there are no query parameters, everything is in the body. Actually only json content is supported as input and output. :warning: the GET request is deprecated
$ curl -XPOST -H "X-KASPER-SECURITY-TOKEN: 0031" -H "Content-Type: application/json" -d "{ \"forBuddy\": \"Chuck\" }" http://localhost:8080/kasper/query/GetAllHelloMessagesSentToBuddy
{"list":[],"empty":true,"count":0}Events can be submitted using POST or PUT requests, there are no query parameters, everything is in the body. Actually only json content is supported as input and output.
$ curl -XPOST -H "Content-Type: application/json" -d "{ \"response\": \"Chuck\" }" http://localhost:8080/kasper/event/NoticeSentToTheWorld
The idea behind the Kasper client library is to provide a very easy and powerful API to hit exposed Kasper implementations. This requires kasper implementations to share their Query and Commands code with JVM based consumers. This allows the library to only require one liner code taking as arguments populated queries and commands (basic POJOs), the library takes care of all the ser/deser stuff, error handling, asynchronous calls and more.
They share some common code in order to ensure that the way in which the communication is done is symmetric (ex: what can be serialized can also be deserialized). For example all the databinding part for query is shared between kasper-client and kasper-exposition.
The main entry point of the library is KasperClient class, it provides all the required methods to communicate with exposed Kasper implementations.
KasperClient is thread safe and should be reused for optimal performances.
Sending a command :
KasperClient client = new KasperClientBuilder().create();
CommandResponse responde = client.send(Context.empty(), new NoticeTheWorldCommand("Hello"));Requesting a query :
KasperClient client = new KasperClientBuilder().create();
QueryResponse<HelloMessagesResult> response = client.emit(Context.empty(), new GetAllHelloMessagesSentToBuddyQuery("Chuck"), TypeToken.of(HelloMessagesResult.class));Emitting an event :
KasperClient client = new KasperClientBuilder().create();
client.emit(Context.empty(), new NoticeSentToTheWorldEvent("Hello"));If an error occurred during query processing on client side a KasperQueryException will be raised, if something goes wrong on server side then a QueryResponse with an error is returned.
Note
By default KasperClient is configured to hit queries at http://localhost:8080/query and commands at http://localhost:8080/command.
This can be configured using KasperClientBuilder.