Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,84 +1,91 @@
package io.mosip.registration.processor.stages.executor;

import io.mosip.kernel.core.exception.ExceptionUtils;
import io.mosip.kernel.core.logger.spi.Logger;
import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager;
import io.mosip.registration.processor.core.logger.RegProcessorLogger;
import io.mosip.registration.processor.stages.executor.config.StagesConfig;
import io.mosip.registration.processor.stages.executor.util.StageClassesUtil;
import org.slf4j.bridge.SLF4JBridgeHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.MutablePropertySources;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.LogManager;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.mosip.registration.processor.core.logger.RegProcessorLogger;
import io.mosip.kernel.core.logger.spi.Logger;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.MutablePropertySources;

import io.mosip.kernel.core.exception.ExceptionUtils;
import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager;

import io.mosip.registration.processor.stages.executor.config.StagesConfig;
import io.mosip.registration.processor.stages.executor.util.StageClassesUtil;

/**
* The Class MosipStageExecutorApplication.
*/
public class MosipStageExecutorApplication {

/** The Constant regProcLogger. */
private static final Logger regProcLogger = RegProcessorLogger.getLogger(MosipStageExecutorApplication.class);

/**
* main method to launch external stage application.
*
* @param args the arguments
*/
public static void main(String[] args) {
regProcLogger.info("Starting mosip-stage-executor...");
//This context is closed after deploying the stages
try (AnnotationConfigApplicationContext stageInfoApplicationContext = new AnnotationConfigApplicationContext(
new Class<?>[] { StagesConfig.class });) {
StagesConfig stagesConfig = stageInfoApplicationContext.getBean(StagesConfig.class);
MutablePropertySources propertySources = stagesConfig.getCloudPropertySources();

List<String> stageBeansBasePackages = StageClassesUtil.getStageBeansBasePackages(stagesConfig, propertySources);

if(!stageBeansBasePackages.isEmpty()) {

regProcLogger.info("Base packages for stage beans from configuration: {}", stageBeansBasePackages);

List<Class<MosipVerticleAPIManager>> stageClasses = StageClassesUtil.getStageBeanClasses(stageBeansBasePackages);

regProcLogger.info("Stage classes identified: {}", stageClasses.stream().map(Class::getCanonicalName).collect(Collectors.joining(", ")));

Class<?>[] entrypointConfigClasses = Stream.concat(Stream.of(StagesConfig.class), stageClasses.stream())
.toArray(size -> new Class<?>[size]);

//This context should not be closed and to be kept for consumption by the verticles
AnnotationConfigApplicationContext mainApplicationContext = new PropertySourcesCustomizingApplicationContext(
entrypointConfigClasses) {
@Override
public MutablePropertySources getPropertySources() {
return propertySources;
};
};

if (!stageClasses.isEmpty()) {
ExecutorService executorService = Executors.newFixedThreadPool(stageClasses.size());
stageClasses.forEach(stageClass -> executorService.execute(() -> {
try {
regProcLogger.info("Executing Stage: {}", stageClass.getCanonicalName());
MosipVerticleAPIManager stageBean = StageClassesUtil.getStageBean(mainApplicationContext, stageClass);
stageBean.deployVerticle();
} catch (Exception e) {
regProcLogger.error("Exception occured while loading verticles. Please make sure correct verticle name was passed from deployment script. \n {}",
ExceptionUtils.getStackTrace(e));
}
}));
executorService.close();
} else {
regProcLogger.error("No stage class is found. Please make sure correct correct stage class base packages are specified in properties and stages are added to classpath/dependencies.");
}
} else {
regProcLogger.error("No base packages configured for stages.");
}
}
}
/**
* The Constant regProcLogger.
*/
Comment on lines +25 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these unnecessary changes are there ?

private static final Logger regProcLogger = RegProcessorLogger.getLogger(MosipStageExecutorApplication.class);

/**
* main method to launch external stage application.
*
* @param args the arguments
*/
public static void main(String[] args) {
LogManager.getLogManager().reset();
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
Comment on lines +36 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why we need to explicitly install the JUL --> SLF4J bride. How it is different as compared to other apps such as packet manager etc. Why we are not able to see these different format there?

regProcLogger.info("Starting mosip-stage-executor...");
//This context is closed after deploying the stages
try (AnnotationConfigApplicationContext stageInfoApplicationContext = new AnnotationConfigApplicationContext(
new Class<?>[]{StagesConfig.class});) {
StagesConfig stagesConfig = stageInfoApplicationContext.getBean(StagesConfig.class);
MutablePropertySources propertySources = stagesConfig.getCloudPropertySources();

List<String> stageBeansBasePackages = StageClassesUtil.getStageBeansBasePackages(stagesConfig, propertySources);

if (!stageBeansBasePackages.isEmpty()) {

regProcLogger.info("Base packages for stage beans from configuration: {}", stageBeansBasePackages);

List<Class<MosipVerticleAPIManager>> stageClasses = StageClassesUtil.getStageBeanClasses(stageBeansBasePackages);

regProcLogger.info("Stage classes identified: {}", stageClasses.stream().map(Class::getCanonicalName).collect(Collectors.joining(", ")));

Class<?>[] entrypointConfigClasses = Stream.concat(Stream.of(StagesConfig.class), stageClasses.stream())
.toArray(size -> new Class<?>[size]);

//This context should not be closed and to be kept for consumption by the verticles
AnnotationConfigApplicationContext mainApplicationContext = new PropertySourcesCustomizingApplicationContext(
entrypointConfigClasses) {
@Override
public MutablePropertySources getPropertySources() {
return propertySources;
}

;
};

if (!stageClasses.isEmpty()) {
ExecutorService executorService = Executors.newFixedThreadPool(stageClasses.size());
stageClasses.forEach(stageClass -> executorService.execute(() -> {
try {
regProcLogger.info("Executing Stage: {}", stageClass.getCanonicalName());
MosipVerticleAPIManager stageBean = StageClassesUtil.getStageBean(mainApplicationContext, stageClass);
stageBean.deployVerticle();
} catch (Exception e) {
regProcLogger.error("Exception occured while loading verticles. Please make sure correct verticle name was passed from deployment script. \n {}",
ExceptionUtils.getStackTrace(e));
}
}));
executorService.close();
} else {
regProcLogger.error("No stage class is found. Please make sure correct correct stage class base packages are specified in properties and stages are added to classpath/dependencies.");
}
} else {
regProcLogger.error("No base packages configured for stages.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<configuration>
<springProperty scope="context" name="appName" source="spring.application.name"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
<logger name="io.mosip.registration.processor" level="DEBUG"/>
<logger name="brave" level="DEBUG"/>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package io.mosip.registration.processor.core.tracing;

import java.nio.charset.StandardCharsets;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
Expand All @@ -18,12 +12,17 @@
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Handler to trace events,
* currently methods are written to handle vertx eventbus and kafka
* Note: For events, We use single line b3 header
* <traceid>-<spanid>-<samplingflag>
* <traceid>-<spanid>-<samplingflag>
*/
public class EventTracingHandler {

Expand All @@ -49,6 +48,7 @@ public EventTracingHandler(Tracing tracing, String eventBusType) {
/**
* creates TraceContext based on the provided eventbus Message,
* creates new traceContext if b3 headers are not found else starts the span with existing span
*
* @param carrier
* @return
*/
Expand All @@ -64,6 +64,7 @@ private Span nextSpan(Message carrier) {
/**
* creates TraceContext based on the provided Kafka Header,
* creates new traceContext if b3 headers are not found else starts the span with existing span
*
* @param carrier
* @return
*/
Expand Down Expand Up @@ -97,8 +98,8 @@ public String toString() {
public String get(List<KafkaHeader> headers, String key) {
String value = null;
for (KafkaHeader header : headers) {
if(key.equalsIgnoreCase(header.key()))
value= header.value().toString(StandardCharsets.UTF_8);
if (key.equalsIgnoreCase(header.key()))
value = header.value().toString(StandardCharsets.UTF_8);
}
return value;
}
Expand All @@ -121,8 +122,8 @@ public void readHeaderOnConsume(EventBus eventBus) {
public void writeHeaderOnProduce(EventBus eventBus) {
eventBus.addOutboundInterceptor(deliveryContext -> {
Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER);
Span span = (tracer instanceof TracingHandler) ? ((TracingHandler)tracer).span : (Span)tracer;
if(span == null) {
Span span = (tracer instanceof TracingHandler) ? ((TracingHandler) tracer).span : (Span) tracer;
if (span == null) {
span = nextSpan(deliveryContext.message());
JsonObject body = new JsonObject((String) deliveryContext.message().body());
initializeContextWithTracing(span, body == null ? "-" : (body.getString("rid", "-")));
Expand All @@ -146,29 +147,29 @@ public Span readHeaderOnKafkaConsume(KafkaConsumerRecord<String, String> consume

public void writeHeaderOnKafkaProduce(KafkaProducerRecord<String, String> producerRecord) {
Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER);
Span span = (tracer instanceof TracingHandler) ? ((TracingHandler)tracer).span : (Span)tracer;
if(span == null) {
Span span = (tracer instanceof TracingHandler) ? ((TracingHandler) tracer).span : (Span) tracer;
if (span == null) {
span = nextSpan(producerRecord.headers());
initializeContextWithTracing(span, producerRecord.key());
MDCHelper.addHeadersToMDC();
}
producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER,
String.format("%s-%s", span.context().traceIdString(),
span.context().spanIdString()));
span.context().spanIdString()));
producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key());
}

public void writeHeaderOnKafkaProduce(KafkaProducerRecord<String, String> producerRecord, Span span) {
producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER,
producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER,
String.format("%s-%s", span.context().traceIdString(),
span.context().spanIdString()));
producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key());
producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key());
}

public void closeSpan() {
Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER);
if(tracer instanceof Span) {
((Span)tracer).finish(System.currentTimeMillis());
if (tracer instanceof Span) {
((Span) tracer).finish(System.currentTimeMillis());
}
}

Expand All @@ -180,11 +181,11 @@ public void closeSpan(Span span) {
private void initializeContextWithTracing(Span span, String rid) {
ContextualData.put(TracingConstant.TRACER, span);
ContextualData.put(TracingConstant.TRACE_ID_KEY, span.context().traceIdString());
if (rid == null) {
ContextualData.put(TracingConstant.RID_KEY, "-");
} else {
ContextualData.put(TracingConstant.RID_KEY, rid);
}
if (rid == null) {
ContextualData.put(TracingConstant.RID_KEY, "-");
} else {
ContextualData.put(TracingConstant.RID_KEY, rid);
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
final class TracingRoutingContextHandlerLocal implements Handler<RoutingContext> {

private static Logger regProcLogger = RegProcessorLogger.getLogger(TracingRoutingContextHandlerLocal.class);
private static Logger regProcLogger = RegProcessorLogger.getLogger(TracingRoutingContextHandlerLocal.class);
static final Propagation.Getter<HttpServerRequest, String> GETTER = new Propagation.Getter<HttpServerRequest, String>() {
public String get(HttpServerRequest carrier, String key) {
return carrier.getHeader(key);
Expand All @@ -46,7 +46,7 @@ public String toString() {
}

public void handle(RoutingContext context) {
TracingHandler tracingHandler = (TracingHandler)context.get(TracingHandler.class.getName());
TracingHandler tracingHandler = (TracingHandler) context.get(TracingHandler.class.getName());
if (tracingHandler != null) {
if (!context.failed())
context.addHeadersEndHandler(tracingHandler);
Expand All @@ -65,16 +65,16 @@ public void handle(RoutingContext context) {
if (ws != null)
ws.close();
} catch (Throwable throwable) {
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), "",
throwable.getMessage() + ExceptionUtils.getStackTrace(throwable));
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), "",
throwable.getMessage() + ExceptionUtils.getStackTrace(throwable));
if (ws != null)
try {
ws.close();
} catch (Throwable throwable1) {
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), "",
throwable1.getMessage() + ExceptionUtils.getStackTrace(throwable1));
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), "",
throwable1.getMessage() + ExceptionUtils.getStackTrace(throwable1));
throwable.addSuppressed(throwable1);
}
throw throwable;
Expand Down