diff --git a/registration-processor/mosip-stage-executor/src/main/java/io/mosip/registration/processor/stages/executor/MosipStageExecutorApplication.java b/registration-processor/mosip-stage-executor/src/main/java/io/mosip/registration/processor/stages/executor/MosipStageExecutorApplication.java index c762b100fab..b915fe8ba93 100644 --- a/registration-processor/mosip-stage-executor/src/main/java/io/mosip/registration/processor/stages/executor/MosipStageExecutorApplication.java +++ b/registration-processor/mosip-stage-executor/src/main/java/io/mosip/registration/processor/stages/executor/MosipStageExecutorApplication.java @@ -1,84 +1,91 @@ package io.mosip.registration.processor.stages.executor; +import io.mosip.kernel.core.exception.ExceptionUtils; +import io.mosip.kernel.core.logger.spi.Logger; +import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager; +import io.mosip.registration.processor.core.logger.RegProcessorLogger; +import io.mosip.registration.processor.stages.executor.config.StagesConfig; +import io.mosip.registration.processor.stages.executor.util.StageClassesUtil; +import org.slf4j.bridge.SLF4JBridgeHandler; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.core.env.MutablePropertySources; + import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.logging.LogManager; import java.util.stream.Collectors; import java.util.stream.Stream; -import io.mosip.registration.processor.core.logger.RegProcessorLogger; -import io.mosip.kernel.core.logger.spi.Logger; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.core.env.MutablePropertySources; - -import io.mosip.kernel.core.exception.ExceptionUtils; -import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager; - -import io.mosip.registration.processor.stages.executor.config.StagesConfig; -import io.mosip.registration.processor.stages.executor.util.StageClassesUtil; - /** * The Class MosipStageExecutorApplication. */ public class MosipStageExecutorApplication { - - /** The Constant regProcLogger. */ - private static final Logger regProcLogger = RegProcessorLogger.getLogger(MosipStageExecutorApplication.class); - /** - * main method to launch external stage application. - * - * @param args the arguments - */ - public static void main(String[] args) { - regProcLogger.info("Starting mosip-stage-executor..."); - //This context is closed after deploying the stages - try (AnnotationConfigApplicationContext stageInfoApplicationContext = new AnnotationConfigApplicationContext( - new Class[] { StagesConfig.class });) { - StagesConfig stagesConfig = stageInfoApplicationContext.getBean(StagesConfig.class); - MutablePropertySources propertySources = stagesConfig.getCloudPropertySources(); - - List stageBeansBasePackages = StageClassesUtil.getStageBeansBasePackages(stagesConfig, propertySources); - - if(!stageBeansBasePackages.isEmpty()) { - - regProcLogger.info("Base packages for stage beans from configuration: {}", stageBeansBasePackages); - - List> stageClasses = StageClassesUtil.getStageBeanClasses(stageBeansBasePackages); - - regProcLogger.info("Stage classes identified: {}", stageClasses.stream().map(Class::getCanonicalName).collect(Collectors.joining(", "))); - - Class[] entrypointConfigClasses = Stream.concat(Stream.of(StagesConfig.class), stageClasses.stream()) - .toArray(size -> new Class[size]); - - //This context should not be closed and to be kept for consumption by the verticles - AnnotationConfigApplicationContext mainApplicationContext = new PropertySourcesCustomizingApplicationContext( - entrypointConfigClasses) { - @Override - public MutablePropertySources getPropertySources() { - return propertySources; - }; - }; - - if (!stageClasses.isEmpty()) { - ExecutorService executorService = Executors.newFixedThreadPool(stageClasses.size()); - stageClasses.forEach(stageClass -> executorService.execute(() -> { - try { - regProcLogger.info("Executing Stage: {}", stageClass.getCanonicalName()); - MosipVerticleAPIManager stageBean = StageClassesUtil.getStageBean(mainApplicationContext, stageClass); - stageBean.deployVerticle(); - } catch (Exception e) { - regProcLogger.error("Exception occured while loading verticles. Please make sure correct verticle name was passed from deployment script. \n {}", - ExceptionUtils.getStackTrace(e)); - } - })); - executorService.close(); - } else { - regProcLogger.error("No stage class is found. Please make sure correct correct stage class base packages are specified in properties and stages are added to classpath/dependencies."); - } - } else { - regProcLogger.error("No base packages configured for stages."); - } - } - } + /** + * The Constant regProcLogger. + */ + private static final Logger regProcLogger = RegProcessorLogger.getLogger(MosipStageExecutorApplication.class); + + /** + * main method to launch external stage application. + * + * @param args the arguments + */ + public static void main(String[] args) { + LogManager.getLogManager().reset(); + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + regProcLogger.info("Starting mosip-stage-executor..."); + //This context is closed after deploying the stages + try (AnnotationConfigApplicationContext stageInfoApplicationContext = new AnnotationConfigApplicationContext( + new Class[]{StagesConfig.class});) { + StagesConfig stagesConfig = stageInfoApplicationContext.getBean(StagesConfig.class); + MutablePropertySources propertySources = stagesConfig.getCloudPropertySources(); + + List stageBeansBasePackages = StageClassesUtil.getStageBeansBasePackages(stagesConfig, propertySources); + + if (!stageBeansBasePackages.isEmpty()) { + + regProcLogger.info("Base packages for stage beans from configuration: {}", stageBeansBasePackages); + + List> stageClasses = StageClassesUtil.getStageBeanClasses(stageBeansBasePackages); + + regProcLogger.info("Stage classes identified: {}", stageClasses.stream().map(Class::getCanonicalName).collect(Collectors.joining(", "))); + + Class[] entrypointConfigClasses = Stream.concat(Stream.of(StagesConfig.class), stageClasses.stream()) + .toArray(size -> new Class[size]); + + //This context should not be closed and to be kept for consumption by the verticles + AnnotationConfigApplicationContext mainApplicationContext = new PropertySourcesCustomizingApplicationContext( + entrypointConfigClasses) { + @Override + public MutablePropertySources getPropertySources() { + return propertySources; + } + + ; + }; + + if (!stageClasses.isEmpty()) { + ExecutorService executorService = Executors.newFixedThreadPool(stageClasses.size()); + stageClasses.forEach(stageClass -> executorService.execute(() -> { + try { + regProcLogger.info("Executing Stage: {}", stageClass.getCanonicalName()); + MosipVerticleAPIManager stageBean = StageClassesUtil.getStageBean(mainApplicationContext, stageClass); + stageBean.deployVerticle(); + } catch (Exception e) { + regProcLogger.error("Exception occured while loading verticles. Please make sure correct verticle name was passed from deployment script. \n {}", + ExceptionUtils.getStackTrace(e)); + } + })); + executorService.close(); + } else { + regProcLogger.error("No stage class is found. Please make sure correct correct stage class base packages are specified in properties and stages are added to classpath/dependencies."); + } + } else { + regProcLogger.error("No base packages configured for stages."); + } + } + } } diff --git a/registration-processor/mosip-stage-executor/src/main/resources/logback.xml b/registration-processor/mosip-stage-executor/src/main/resources/logback.xml index c9058695db6..86353b17e78 100644 --- a/registration-processor/mosip-stage-executor/src/main/resources/logback.xml +++ b/registration-processor/mosip-stage-executor/src/main/resources/logback.xml @@ -1,10 +1,10 @@ - + + - - + \ No newline at end of file diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/EventTracingHandler.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/EventTracingHandler.java index 7b8bb228786..51643314d19 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/EventTracingHandler.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/EventTracingHandler.java @@ -1,11 +1,5 @@ package io.mosip.registration.processor.core.tracing; -import java.nio.charset.StandardCharsets; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import brave.Span; import brave.Tracer; import brave.Tracing; @@ -18,12 +12,17 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import io.vertx.kafka.client.producer.KafkaHeader; import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; /** * Handler to trace events, * currently methods are written to handle vertx eventbus and kafka * Note: For events, We use single line b3 header - * -- + * -- */ public class EventTracingHandler { @@ -49,6 +48,7 @@ public EventTracingHandler(Tracing tracing, String eventBusType) { /** * creates TraceContext based on the provided eventbus Message, * creates new traceContext if b3 headers are not found else starts the span with existing span + * * @param carrier * @return */ @@ -64,6 +64,7 @@ private Span nextSpan(Message carrier) { /** * creates TraceContext based on the provided Kafka Header, * creates new traceContext if b3 headers are not found else starts the span with existing span + * * @param carrier * @return */ @@ -97,8 +98,8 @@ public String toString() { public String get(List headers, String key) { String value = null; for (KafkaHeader header : headers) { - if(key.equalsIgnoreCase(header.key())) - value= header.value().toString(StandardCharsets.UTF_8); + if (key.equalsIgnoreCase(header.key())) + value = header.value().toString(StandardCharsets.UTF_8); } return value; } @@ -121,8 +122,8 @@ public void readHeaderOnConsume(EventBus eventBus) { public void writeHeaderOnProduce(EventBus eventBus) { eventBus.addOutboundInterceptor(deliveryContext -> { Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER); - Span span = (tracer instanceof TracingHandler) ? ((TracingHandler)tracer).span : (Span)tracer; - if(span == null) { + Span span = (tracer instanceof TracingHandler) ? ((TracingHandler) tracer).span : (Span) tracer; + if (span == null) { span = nextSpan(deliveryContext.message()); JsonObject body = new JsonObject((String) deliveryContext.message().body()); initializeContextWithTracing(span, body == null ? "-" : (body.getString("rid", "-"))); @@ -146,29 +147,29 @@ public Span readHeaderOnKafkaConsume(KafkaConsumerRecord consume public void writeHeaderOnKafkaProduce(KafkaProducerRecord producerRecord) { Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER); - Span span = (tracer instanceof TracingHandler) ? ((TracingHandler)tracer).span : (Span)tracer; - if(span == null) { + Span span = (tracer instanceof TracingHandler) ? ((TracingHandler) tracer).span : (Span) tracer; + if (span == null) { span = nextSpan(producerRecord.headers()); initializeContextWithTracing(span, producerRecord.key()); MDCHelper.addHeadersToMDC(); } producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER, String.format("%s-%s", span.context().traceIdString(), - span.context().spanIdString())); + span.context().spanIdString())); producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key()); } public void writeHeaderOnKafkaProduce(KafkaProducerRecord producerRecord, Span span) { - producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER, + producerRecord.addHeader(TracingConstant.SINGLE_LINE_B3_HEADER, String.format("%s-%s", span.context().traceIdString(), span.context().spanIdString())); - producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key()); + producerRecord.addHeader(TracingConstant.RID_KEY, producerRecord.key()); } public void closeSpan() { Object tracer = ContextualData.getOrDefault(TracingConstant.TRACER); - if(tracer instanceof Span) { - ((Span)tracer).finish(System.currentTimeMillis()); + if (tracer instanceof Span) { + ((Span) tracer).finish(System.currentTimeMillis()); } } @@ -180,11 +181,11 @@ public void closeSpan(Span span) { private void initializeContextWithTracing(Span span, String rid) { ContextualData.put(TracingConstant.TRACER, span); ContextualData.put(TracingConstant.TRACE_ID_KEY, span.context().traceIdString()); - if (rid == null) { - ContextualData.put(TracingConstant.RID_KEY, "-"); - } else { - ContextualData.put(TracingConstant.RID_KEY, rid); - } + if (rid == null) { + ContextualData.put(TracingConstant.RID_KEY, "-"); + } else { + ContextualData.put(TracingConstant.RID_KEY, rid); + } } diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/TracingRoutingContextHandlerLocal.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/TracingRoutingContextHandlerLocal.java index 56fc691fa72..8f3ba1a3502 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/TracingRoutingContextHandlerLocal.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/tracing/TracingRoutingContextHandlerLocal.java @@ -22,7 +22,7 @@ */ final class TracingRoutingContextHandlerLocal implements Handler { - private static Logger regProcLogger = RegProcessorLogger.getLogger(TracingRoutingContextHandlerLocal.class); + private static Logger regProcLogger = RegProcessorLogger.getLogger(TracingRoutingContextHandlerLocal.class); static final Propagation.Getter GETTER = new Propagation.Getter() { public String get(HttpServerRequest carrier, String key) { return carrier.getHeader(key); @@ -46,7 +46,7 @@ public String toString() { } public void handle(RoutingContext context) { - TracingHandler tracingHandler = (TracingHandler)context.get(TracingHandler.class.getName()); + TracingHandler tracingHandler = (TracingHandler) context.get(TracingHandler.class.getName()); if (tracingHandler != null) { if (!context.failed()) context.addHeadersEndHandler(tracingHandler); @@ -65,16 +65,16 @@ public void handle(RoutingContext context) { if (ws != null) ws.close(); } catch (Throwable throwable) { - regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), "", - throwable.getMessage() + ExceptionUtils.getStackTrace(throwable)); + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), "", + throwable.getMessage() + ExceptionUtils.getStackTrace(throwable)); if (ws != null) try { ws.close(); } catch (Throwable throwable1) { - regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), "", - throwable1.getMessage() + ExceptionUtils.getStackTrace(throwable1)); + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), "", + throwable1.getMessage() + ExceptionUtils.getStackTrace(throwable1)); throwable.addSuppressed(throwable1); } throw throwable;