diff --git a/src/main/java/io/growing/sdk/java/GrowingAPI.java b/src/main/java/io/growing/sdk/java/GrowingAPI.java index c0386ee..f6ce68d 100644 --- a/src/main/java/io/growing/sdk/java/GrowingAPI.java +++ b/src/main/java/io/growing/sdk/java/GrowingAPI.java @@ -6,6 +6,8 @@ import io.growing.sdk.java.exception.GIOSendBeRejectedException; import io.growing.sdk.java.logger.GioLogger; import io.growing.sdk.java.sender.FixThreadPoolSender; +import io.growing.sdk.java.sender.SendResult; +import io.growing.sdk.java.sender.SyncSender; import io.growing.sdk.java.store.StoreStrategy; import io.growing.sdk.java.store.StoreStrategyClient; import io.growing.sdk.java.utils.ConfigUtils; @@ -59,6 +61,19 @@ public void send(GIOMessage msg) { } } + private static final SyncSender SYNC_SENDER = new SyncSender(); + + public SendResult sendSync(GIOMessage msg) { + try { + if (validDefaultConfig && businessVerification(msg)) { + return SYNC_SENDER.sendMsg(msg); + } + } catch (Exception e) { + GioLogger.error("failed to send msg, " + e.toString()); + } + return new SendResult(SendResult.State.UNKNOWN_ERROR, "unknown error"); + } + /** * 添加埋点事件,当队列满时,可以抛出拒绝异常. * diff --git a/src/main/java/io/growing/sdk/java/dto/GIOMessage.java b/src/main/java/io/growing/sdk/java/dto/GIOMessage.java index 3f382b7..fabc906 100644 --- a/src/main/java/io/growing/sdk/java/dto/GIOMessage.java +++ b/src/main/java/io/growing/sdk/java/dto/GIOMessage.java @@ -22,4 +22,6 @@ public String getProjectKey() { public boolean isIllegal() { return false; } + + public abstract Class getMessageClass(); } \ No newline at end of file diff --git a/src/main/java/io/growing/sdk/java/dto/GioCdpEventMessage.java b/src/main/java/io/growing/sdk/java/dto/GioCdpEventMessage.java index e938b26..be6a954 100644 --- a/src/main/java/io/growing/sdk/java/dto/GioCdpEventMessage.java +++ b/src/main/java/io/growing/sdk/java/dto/GioCdpEventMessage.java @@ -44,6 +44,11 @@ public EventV3Dto getMessage() { return event.toBuilder().setProjectKey(projectKey).setDataSourceId(dataSourceId).build(); } + @Override + public Class getMessageClass() { + return GioCdpEventMessage.class; + } + @Override public boolean isIllegal() { if (StringUtils.isBlank(event.getEventName())) { diff --git a/src/main/java/io/growing/sdk/java/dto/GioCdpItemMessage.java b/src/main/java/io/growing/sdk/java/dto/GioCdpItemMessage.java index e4d8247..71c4093 100644 --- a/src/main/java/io/growing/sdk/java/dto/GioCdpItemMessage.java +++ b/src/main/java/io/growing/sdk/java/dto/GioCdpItemMessage.java @@ -38,6 +38,11 @@ public ItemDto getMessage() { return event.toBuilder().setProjectKey(projectKey).setDataSourceId(dataSourceId).build(); } + @Override + public Class getMessageClass() { + return GioCdpItemMessage.class; + } + @Override public boolean isIllegal() { if (event.getId().isEmpty() || event.getKey().isEmpty()) { diff --git a/src/main/java/io/growing/sdk/java/dto/GioCdpUserMappingMessage.java b/src/main/java/io/growing/sdk/java/dto/GioCdpUserMappingMessage.java index 6906d65..78cd7ab 100644 --- a/src/main/java/io/growing/sdk/java/dto/GioCdpUserMappingMessage.java +++ b/src/main/java/io/growing/sdk/java/dto/GioCdpUserMappingMessage.java @@ -36,6 +36,11 @@ public UserMappingDto getMessage() { return event.toBuilder().setProjectKey(projectKey).setDataSourceId(dataSourceId).build(); } + @Override + public Class getMessageClass() { + return GioCdpUserMappingMessage.class; + } + @Override public boolean isIllegal() { if (event.getIdentifiesMap().isEmpty()) { diff --git a/src/main/java/io/growing/sdk/java/dto/GioCdpUserMessage.java b/src/main/java/io/growing/sdk/java/dto/GioCdpUserMessage.java index 37b5b37..0391a83 100644 --- a/src/main/java/io/growing/sdk/java/dto/GioCdpUserMessage.java +++ b/src/main/java/io/growing/sdk/java/dto/GioCdpUserMessage.java @@ -41,6 +41,11 @@ public EventV3Dto getMessage() { return user.toBuilder().setProjectKey(projectKey).setDataSourceId(dataSourceId).build(); } + @Override + public Class getMessageClass() { + return GioCdpUserMessage.class; + } + @Override public boolean isIllegal() { if (user.getAttributesMap().isEmpty()) { diff --git a/src/main/java/io/growing/sdk/java/logger/FileWriter.java b/src/main/java/io/growing/sdk/java/logger/FileWriter.java new file mode 100644 index 0000000..3f40972 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/logger/FileWriter.java @@ -0,0 +1,138 @@ +package io.growing.sdk.java.logger; + +import io.growing.sdk.java.utils.ConfigUtils; + +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class FileWriter { + private static final String loggerFilePath; + private static final int loggerFileMaxSize; + private static final int loggerFileMaxDays; + + static { + loggerFilePath = ConfigUtils.getStringValue("logger.file.path", "/logs").trim(); + loggerFileMaxSize = ConfigUtils.getIntValue("logger.file.max_size", 10); + loggerFileMaxDays = ConfigUtils.getIntValue("logger.file.max_days", -1); + } + + private FileWriter() { + } + + private static class SingleInstance { + private static final FileWriter INSTANCE = new FileWriter(); + } + + public static FileWriter getInstance() { + return FileWriter.SingleInstance.INSTANCE; + } + + private FileChannel fileChannel; + private Date lastDate; + private static final SimpleDateFormat fileDateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private static final SimpleDateFormat logDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public synchronized void log(String message) { + try { + if (lastDate == null || intervalDays(lastDate.getTime(), new Date().getTime()) > 0) { + createFileIfNotExist(); + } + + if (fileChannel != null && fileChannel.isOpen()) { + // 超过单个文件大小限制后直接关闭channel,不再写入日志 + if (fileChannel.size() > ((long) loggerFileMaxSize * 1024L * 1024L)) { + if (fileChannel != null) { + fileChannel.close(); + fileChannel = null; + } + return; + } + + ByteBuffer buffer = ByteBuffer.wrap((logDateFormat.format(new Date()) + ": " + message + "\n").getBytes()); + fileChannel.write(buffer); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void createFileIfNotExist() { + try { + String currentDate = fileDateFormat.format(new Date()); + lastDate = fileDateFormat.parse(currentDate); + String filePath = loggerFilePath + File.separator + currentDate + ".log"; + File file = new File(filePath); + if (!createMissingParentDirectories(file)) { + return; + } + if (file.createNewFile()) { + // 每次创建新的文件时,关闭上一个fileChannel,并且删除过期的日志文件 + if (fileChannel != null) { + fileChannel.close(); + fileChannel = null; + } + deleteExpiredFiles(); + } + + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rwd"); + randomAccessFile.seek(randomAccessFile.length()); + fileChannel = randomAccessFile.getChannel(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void deleteExpiredFiles() { + try { + if (loggerFileMaxDays > 0) { + File logsDir = new File(loggerFilePath); + File[] logFiles = logsDir.listFiles(); + if (logFiles == null || logFiles.length == 0) { + return; + } + for (File logFile : logFiles) { + String fileName = logFile.getName(); + if (logFile.isFile() && fileName.endsWith(".log") && isExpiredFile(fileName)) { + logFile.delete(); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private Boolean isExpiredFile(String fileName) { + try { + String fileTime = fileName.substring(0, fileName.lastIndexOf(".")); + Date fileDate = fileDateFormat.parse(fileTime); + Date currentDate = new Date(); + long days = intervalDays(fileDate.getTime(), currentDate.getTime()); + if (days > loggerFileMaxDays) { + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + private long intervalDays(long lastTime, long currentTime) { + return (currentTime - lastTime) / (1000 * 60 * 60 * 24); + } + + private boolean createMissingParentDirectories(File file) { + File parent = file.getParentFile(); + if (parent == null) { + return true; + } + + parent.mkdirs(); + return parent.exists(); + } +} diff --git a/src/main/java/io/growing/sdk/java/logger/GioLogger.java b/src/main/java/io/growing/sdk/java/logger/GioLogger.java index 0220d68..119cfc1 100644 --- a/src/main/java/io/growing/sdk/java/logger/GioLogger.java +++ b/src/main/java/io/growing/sdk/java/logger/GioLogger.java @@ -10,10 +10,12 @@ public class GioLogger { private static GioLoggerInterface logger; private static final String loggerLevel; + private static final Boolean loggerFileEnabled; static { String loggerImplName = ConfigUtils.getStringValue("logger.implementation", "io.growing.sdk.java.logger.GioLoggerImpl"); + loggerFileEnabled = ConfigUtils.getBooleanValue("logger.file.enabled", false); loggerLevel = ConfigUtils.getStringValue("logger.level", "error"); try { @@ -44,4 +46,10 @@ public static void debug(String msg) { public static void error(String msg) { logger.error(msg); } + + public static void file(String msg) { + if (loggerFileEnabled) { + FileWriter.getInstance().log(msg); + } + } } \ No newline at end of file diff --git a/src/main/java/io/growing/sdk/java/process/EventProcessorClient.java b/src/main/java/io/growing/sdk/java/process/EventProcessorClient.java index f24ad8c..3126523 100644 --- a/src/main/java/io/growing/sdk/java/process/EventProcessorClient.java +++ b/src/main/java/io/growing/sdk/java/process/EventProcessorClient.java @@ -27,6 +27,10 @@ public static Collection getProcessors(){ return processors.values(); } + public static MessageProcessor getProcessor(Class clazz){ + return processors.get(clazz); + } + public static MessageProcessor getApiInstance(GIOMessage msg) { return processors.get(msg.getClass()); } diff --git a/src/main/java/io/growing/sdk/java/sender/FixThreadPoolSender.java b/src/main/java/io/growing/sdk/java/sender/FixThreadPoolSender.java index db1928a..6cf7601 100644 --- a/src/main/java/io/growing/sdk/java/sender/FixThreadPoolSender.java +++ b/src/main/java/io/growing/sdk/java/sender/FixThreadPoolSender.java @@ -21,15 +21,13 @@ public class FixThreadPoolSender implements MessageSender { private static final ExecutorService sendThread = Executors.newFixedThreadPool(ConfigUtils.getIntValue("send.msg.thread", 3), new GioThreadNamedFactory("gio-sender")); - private static final NetProviderAbstract netProvider = new HttpUrlProvider(); - @Override public void sendMsg(final String projectKey, final List msg) { doSend(projectKey, msg); } public static NetProviderAbstract getNetProvider() { - return netProvider; + return HttpUrlProvider.getInstance(); } public void doSend(final String projectKey, final List msgList) { diff --git a/src/main/java/io/growing/sdk/java/sender/SendResult.java b/src/main/java/io/growing/sdk/java/sender/SendResult.java new file mode 100644 index 0000000..974c6d5 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/SendResult.java @@ -0,0 +1,33 @@ +package io.growing.sdk.java.sender; + +public class SendResult { + public enum State { + SUCCESS, + NETWORK_FAILURE, + EXCEPTION_FAILURE, + UNKNOWN_ERROR + } + private State state; + private String msg; + + public SendResult(State state, String msg) { + this.state = state; + this.msg = msg; + } + + public State getState() { + return state; + } + + public String getMsg() { + return msg; + } + + @Override + public String toString() { + return "SendResult{" + + "state=" + state + + ", msg='" + msg + '\'' + + '}'; + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/SyncSender.java b/src/main/java/io/growing/sdk/java/sender/SyncSender.java new file mode 100644 index 0000000..dee58e1 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/SyncSender.java @@ -0,0 +1,42 @@ +package io.growing.sdk.java.sender; + +import io.growing.sdk.java.com.googlecode.protobuf.format.JsonFormat; +import io.growing.sdk.java.dto.GIOMessage; +import io.growing.sdk.java.dto.GioCDPMessage; +import io.growing.sdk.java.logger.GioLogger; +import io.growing.sdk.java.process.EventProcessorClient; +import io.growing.sdk.java.process.MessageProcessor; +import io.growing.sdk.java.sender.net.HttpUrlProvider; + +import java.util.Arrays; + +public class SyncSender { + + private static final JsonFormat JSON_FORMAT = new JsonFormat(); + + public SendResult sendMsg(final GIOMessage msg) { + String projectKey = msg.getProjectKey(); + + MessageProcessor processor = EventProcessorClient.getProcessor(msg.getMessageClass()); + byte[] processed = processor.process(Arrays.asList(msg)); + + if (processed != null && processed.length > 0) { + RequestDto requestDto = new RequestDto.Builder() + .setUrl(processor.apiHost(projectKey)) + .setContentType(processor.contentType()) + .setBytes(processed) + .setHeaders(processor.headers()) + .build(); + + SendResult result = HttpUrlProvider.getInstance().toSendSync(requestDto); + if (result.getState() != SendResult.State.SUCCESS) { + if (msg instanceof GioCDPMessage) { + GioLogger.file("send failed, result: " + result + ", data: " + JSON_FORMAT.printToString(((GioCDPMessage) msg).getMessage())); + } + } + return result; + } + + return new SendResult(SendResult.State.UNKNOWN_ERROR, "unknown error"); + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/net/HttpUrlProvider.java b/src/main/java/io/growing/sdk/java/sender/net/HttpUrlProvider.java index dd442e5..5dee099 100644 --- a/src/main/java/io/growing/sdk/java/sender/net/HttpUrlProvider.java +++ b/src/main/java/io/growing/sdk/java/sender/net/HttpUrlProvider.java @@ -2,6 +2,7 @@ import io.growing.sdk.java.logger.GioLogger; import io.growing.sdk.java.sender.RequestDto; +import io.growing.sdk.java.sender.SendResult; import io.growing.sdk.java.utils.ConfigUtils; import java.io.DataOutputStream; @@ -20,6 +21,17 @@ */ public class HttpUrlProvider extends NetProviderAbstract { + private HttpUrlProvider() { + } + + private static class SingleInstance { + private static final HttpUrlProvider INSTANCE = new HttpUrlProvider(); + } + + public static HttpUrlProvider getInstance() { + return SingleInstance.INSTANCE; + } + @Override protected int sendPost(RequestDto requestDto) { try { @@ -34,6 +46,20 @@ protected int sendPost(RequestDto requestDto) { } } + @Override + protected SendResult sendPostSync(RequestDto requestDto) { + try { + int responseCode = doSend(requestDto, true); + if (responseCode >= 200 && responseCode < 300) { + return new SendResult(SendResult.State.SUCCESS, "response code: " + responseCode); + } + return new SendResult(SendResult.State.NETWORK_FAILURE, "response code: " + responseCode); + } catch (Exception e) { + GioLogger.debug("failed to send request, cause " + e.getLocalizedMessage()); + return new SendResult(SendResult.State.EXCEPTION_FAILURE, e.getLocalizedMessage()); + } + } + @Override public boolean connectedToGrowingAPIHost() { InputStream inputStream = null; @@ -62,13 +88,22 @@ public boolean connectedToGrowingAPIHost() { } private int doSend(RequestDto requestDto) throws Exception { + return doSend(requestDto, false); + } + + private int doSend(RequestDto requestDto, boolean isSync) throws Exception { HttpURLConnection httpConn = getConnection(requestDto.getUrl()); setHttpConnHeaders(httpConn, requestDto.getHeaders()); httpConn.setRequestProperty("Content-Type", requestDto.getContentType().toString()); httpConn.setUseCaches(false); httpConn.setRequestMethod("POST"); - httpConn.setConnectTimeout(getConnectionTimeout()); - httpConn.setReadTimeout(getReadTimeout()); + if (isSync) { + httpConn.setConnectTimeout(getSyncConnectionTimeout()); + httpConn.setReadTimeout(getSyncReadTimeout()); + } else { + httpConn.setConnectTimeout(getConnectionTimeout()); + httpConn.setReadTimeout(getReadTimeout()); + } httpConn.setRequestProperty("Content-Length", String.valueOf(requestDto.getBytes().length)); httpConn.setDoOutput(true); diff --git a/src/main/java/io/growing/sdk/java/sender/net/NetProviderAbstract.java b/src/main/java/io/growing/sdk/java/sender/net/NetProviderAbstract.java index a596888..da9bdea 100644 --- a/src/main/java/io/growing/sdk/java/sender/net/NetProviderAbstract.java +++ b/src/main/java/io/growing/sdk/java/sender/net/NetProviderAbstract.java @@ -3,6 +3,7 @@ import io.growing.sdk.java.constants.RunMode; import io.growing.sdk.java.logger.GioLogger; import io.growing.sdk.java.sender.RequestDto; +import io.growing.sdk.java.sender.SendResult; import io.growing.sdk.java.utils.ConfigUtils; import java.net.Authenticator; @@ -30,15 +31,33 @@ protected static int getReadTimeout() { return ConfigUtils.getIntValue("read.timeout", 2000); } + protected static int getSyncConnectionTimeout() { + return ConfigUtils.getIntValue("sync.connection.timeout", 1000); + } + + protected static int getSyncReadTimeout() { + return ConfigUtils.getIntValue("sync.read.timeout", 1000); + } + public void toSend(RequestDto requestDto) { + GioLogger.debug(System.currentTimeMillis() + " message sent. " + requestDto.toString()); if (RunMode.isProductionMode()) { sendPost(requestDto); } + } + + public SendResult toSendSync(RequestDto requestDto) { GioLogger.debug(System.currentTimeMillis() + " message sent. " + requestDto.toString()); + if (RunMode.isProductionMode()) { + return sendPostSync(requestDto); + } + return new SendResult(SendResult.State.SUCCESS, "running int mode: Test"); } protected abstract int sendPost(RequestDto requestDto); + protected abstract SendResult sendPostSync(RequestDto requestDto); + protected static class ProxyInfo { private static final Proxy proxy; diff --git a/src/main/java/io/growing/sdk/java/sender/retry/Attempt.java b/src/main/java/io/growing/sdk/java/sender/retry/Attempt.java new file mode 100644 index 0000000..9d220c1 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/Attempt.java @@ -0,0 +1,20 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.concurrent.ExecutionException; + +public interface Attempt { + + int getAttemptNumber(); + + long getDelaySinceFirstAttempt(); + + boolean hasException(); + + Throwable getExceptionCause() throws IllegalStateException; + + boolean hasResult(); + + V getResult() throws IllegalStateException; + + V get() throws ExecutionException; +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiter.java b/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiter.java new file mode 100644 index 0000000..5c709f6 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiter.java @@ -0,0 +1,8 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.concurrent.Callable; + +public interface AttemptTimeLimiter { + + V call(Callable callable) throws Exception; +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiters.java b/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiters.java new file mode 100644 index 0000000..6ca8534 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/AttemptTimeLimiters.java @@ -0,0 +1,70 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class AttemptTimeLimiters { + + private AttemptTimeLimiters() { + } + + public static AttemptTimeLimiter noTimeLimit() { + return new NoAttemptTimeLimit(); + } + + public static AttemptTimeLimiter fixedTimeLimit(long duration, TimeUnit timeUnit) { + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + return new FixedAttemptTimeLimit(duration, timeUnit); + } + + public static AttemptTimeLimiter fixedTimeLimit(long duration, TimeUnit timeUnit, ExecutorService executorService) { + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + return new FixedAttemptTimeLimit(duration, timeUnit, executorService); + } + + // 没有时间限制的单次调用策略 + private static final class NoAttemptTimeLimit implements AttemptTimeLimiter { + @Override + public V call(Callable callable) throws Exception { + return callable.call(); + } + } + + // 固定时间限制的单次调用策略 + private static final class FixedAttemptTimeLimit implements AttemptTimeLimiter { + + private final SimpleTimeLimiter timeLimiter; + private final long duration; + private final TimeUnit timeUnit; + + public FixedAttemptTimeLimit(long duration, TimeUnit timeUnit) { + this(new SimpleTimeLimiter(), duration, timeUnit); + } + + public FixedAttemptTimeLimit(long duration, TimeUnit timeUnit, ExecutorService executorService) { + this(new SimpleTimeLimiter(executorService), duration, timeUnit); + } + + private FixedAttemptTimeLimit(SimpleTimeLimiter timeLimiter, long duration, TimeUnit timeUnit) { + if (timeLimiter == null) { + throw new IllegalArgumentException("timeLimiter must not be null"); + } + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + this.timeLimiter = timeLimiter; + this.duration = duration; + this.timeUnit = timeUnit; + } + + @Override + public V call(Callable callable) throws Exception { + return timeLimiter.callWithTimeout(callable, duration, timeUnit, true); + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategies.java b/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategies.java new file mode 100644 index 0000000..9b22502 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategies.java @@ -0,0 +1,20 @@ +package io.growing.sdk.java.sender.retry; + +public class BlockStrategies { + private static final BlockStrategy THREAD_SLEEP_STRATEGY = new ThreadSleepStrategy(); + + private BlockStrategies() { + } + + public static BlockStrategy threadSleepStrategy() { + return THREAD_SLEEP_STRATEGY; + } + + private static class ThreadSleepStrategy implements BlockStrategy { + + @Override + public void block(long sleepTime) throws InterruptedException { + Thread.sleep(sleepTime); + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategy.java b/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategy.java new file mode 100644 index 0000000..1b01559 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/BlockStrategy.java @@ -0,0 +1,5 @@ +package io.growing.sdk.java.sender.retry; + +public interface BlockStrategy { + void block(long sleepTime) throws InterruptedException; +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/Predicate.java b/src/main/java/io/growing/sdk/java/sender/retry/Predicate.java new file mode 100644 index 0000000..7d21752 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/Predicate.java @@ -0,0 +1,6 @@ +package io.growing.sdk.java.sender.retry; + +public interface Predicate{ + + boolean apply(T input); +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/Predicates.java b/src/main/java/io/growing/sdk/java/sender/retry/Predicates.java new file mode 100644 index 0000000..a4e3e19 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/Predicates.java @@ -0,0 +1,45 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class Predicates { + + public static Predicate or(Predicate... components) { + return new OrPredicate(defensiveCopy(components)); + } + + private static List defensiveCopy(T... array) { + return defensiveCopy(Arrays.asList(array)); + } + + static List defensiveCopy(Iterable iterable) { + ArrayList list = new ArrayList(); + for (T element : iterable) { + if (element != null) { + list.add(element); + } + } + return list; + } + + private static class OrPredicate implements Predicate { + private final List> components; + + private OrPredicate(List> components) { + this.components = components; + } + + @Override + public boolean apply(T t) { + // Avoid using the Iterator to avoid generating garbage (issue 820). + for (int i = 0; i < components.size(); i++) { + if (components.get(i).apply(t)) { + return true; + } + } + return false; + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/README.md b/src/main/java/io/growing/sdk/java/sender/retry/README.md new file mode 100644 index 0000000..fb30966 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/README.md @@ -0,0 +1,8 @@ +## 重试策略代码来源致谢 + +相关代码主要来源如下 +https://github.com/google/guava +https://github.com/rholder/guava-retrying +https://github.com/google/guava/issues/490 + +考虑到本SDK需要支持到Java 6,尽量减少三方依赖,不使用guava中的Predicate,Predicates以及Java8相关特性 \ No newline at end of file diff --git a/src/main/java/io/growing/sdk/java/sender/retry/RetryException.java b/src/main/java/io/growing/sdk/java/sender/retry/RetryException.java new file mode 100644 index 0000000..b5844f0 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/RetryException.java @@ -0,0 +1,24 @@ +package io.growing.sdk.java.sender.retry; + +public final class RetryException extends Exception { + + private final int numberOfFailedAttempts; + private final Attempt lastFailedAttempt; + + public RetryException(int numberOfFailedAttempts, Attempt lastFailedAttempt) { + this("Retrying failed to complete successfully after " + numberOfFailedAttempts + " attempts.", numberOfFailedAttempts, lastFailedAttempt); + } + + public RetryException(String message, int numberOfFailedAttempts, Attempt lastFailedAttempt) { + super(message, lastFailedAttempt != null && lastFailedAttempt.hasException() ? lastFailedAttempt.getExceptionCause() : null); + this.numberOfFailedAttempts = numberOfFailedAttempts; + this.lastFailedAttempt = lastFailedAttempt; + } + public int getNumberOfFailedAttempts() { + return numberOfFailedAttempts; + } + + public Attempt getLastFailedAttempt() { + return lastFailedAttempt; + } +} \ No newline at end of file diff --git a/src/main/java/io/growing/sdk/java/sender/retry/Retryer.java b/src/main/java/io/growing/sdk/java/sender/retry/Retryer.java new file mode 100644 index 0000000..3a12a42 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/Retryer.java @@ -0,0 +1,147 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class Retryer { + private final StopStrategy stopStrategy; + private final WaitStrategy waitStrategy; + private final BlockStrategy blockStrategy; + private final AttemptTimeLimiter attemptTimeLimiter; + private final Predicate> rejectionPredicate; + + public Retryer(AttemptTimeLimiter attemptTimeLimiter, + StopStrategy stopStrategy, + WaitStrategy waitStrategy, + BlockStrategy blockStrategy, + Predicate> rejectionPredicate) { + this.attemptTimeLimiter = attemptTimeLimiter; + this.stopStrategy = stopStrategy; + this.waitStrategy = waitStrategy; + this.blockStrategy = blockStrategy; + this.rejectionPredicate = rejectionPredicate; + } + + public V call(Callable callable) throws ExecutionException, RetryException { + long startTime = System.nanoTime(); + for (int attemptNumber = 1; ; attemptNumber++) { + Attempt attempt; + try { + V result = attemptTimeLimiter.call(callable); + attempt = new ResultAttempt(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); + } catch (Throwable t) { + attempt = new ExceptionAttempt(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); + } + + if (!rejectionPredicate.apply(attempt)) { + return attempt.get(); + } + if (stopStrategy.shouldStop(attempt)) { + throw new RetryException(attemptNumber, attempt); + } else { + long sleepTime = waitStrategy.computeSleepTime(attempt); + try { + blockStrategy.block(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RetryException(attemptNumber, attempt); + } + } + } + } + + static final class ResultAttempt implements Attempt { + private final R result; + private final int attemptNumber; + private final long delaySinceFirstAttempt; + + public ResultAttempt(R result, int attemptNumber, long delaySinceFirstAttempt) { + this.result = result; + this.attemptNumber = attemptNumber; + this.delaySinceFirstAttempt = delaySinceFirstAttempt; + } + + @Override + public R get() throws ExecutionException { + return result; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean hasException() { + return false; + } + + @Override + public R getResult() throws IllegalStateException { + return result; + } + + @Override + public Throwable getExceptionCause() throws IllegalStateException { + throw new IllegalStateException("The attempt resulted in a result, not in an exception"); + } + + @Override + public int getAttemptNumber() { + return attemptNumber; + } + + @Override + public long getDelaySinceFirstAttempt() { + return delaySinceFirstAttempt; + } + } + + static final class ExceptionAttempt implements Attempt { + private final ExecutionException e; + private final int attemptNumber; + private final long delaySinceFirstAttempt; + + public ExceptionAttempt(Throwable cause, int attemptNumber, long delaySinceFirstAttempt) { + this.e = new ExecutionException(cause); + this.attemptNumber = attemptNumber; + this.delaySinceFirstAttempt = delaySinceFirstAttempt; + } + + @Override + public R get() throws ExecutionException { + throw e; + } + + @Override + public boolean hasResult() { + return false; + } + + @Override + public boolean hasException() { + return true; + } + + @Override + public R getResult() throws IllegalStateException { + throw new IllegalStateException("The attempt resulted in an exception, not in a result"); + } + + @Override + public Throwable getExceptionCause() throws IllegalStateException { + return e.getCause(); + } + + @Override + public int getAttemptNumber() { + return attemptNumber; + } + + @Override + public long getDelaySinceFirstAttempt() { + return delaySinceFirstAttempt; + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/RetryerBuilder.java b/src/main/java/io/growing/sdk/java/sender/retry/RetryerBuilder.java new file mode 100644 index 0000000..e830c3f --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/RetryerBuilder.java @@ -0,0 +1,158 @@ +package io.growing.sdk.java.sender.retry; + +public class RetryerBuilder { + + private AttemptTimeLimiter attemptTimeLimiter; + private StopStrategy stopStrategy; + private WaitStrategy waitStrategy; + private BlockStrategy blockStrategy; + private Predicate> rejectionPredicate = new Predicate>() { + @Override + public boolean apply(Attempt input) { + return false; + } + }; + + private RetryerBuilder() { + } + + public static RetryerBuilder newBuilder() { + return new RetryerBuilder(); + } + + public RetryerBuilder withWaitStrategy(WaitStrategy waitStrategy) throws IllegalStateException { + if (waitStrategy == null) { + throw new IllegalArgumentException("waitStrategy must not be null"); + } + if (this.waitStrategy != null) { + throw new IllegalStateException("waitStrategy already set"); + } + this.waitStrategy = waitStrategy; + return this; + } + + public RetryerBuilder withStopStrategy(StopStrategy stopStrategy) throws IllegalStateException { + if (stopStrategy == null) { + throw new IllegalArgumentException("stopStrategy must not be null"); + } + if (this.stopStrategy != null) { + throw new IllegalStateException("stopStrategy already set"); + } + this.stopStrategy = stopStrategy; + return this; + } + + public RetryerBuilder withBlockStrategy(BlockStrategy blockStrategy) throws IllegalStateException { + if (blockStrategy == null) { + throw new IllegalArgumentException("blockStrategy must not be null"); + } + if (this.blockStrategy != null) { + throw new IllegalStateException("blockStrategy already set"); + } + this.blockStrategy = blockStrategy; + return this; + } + + public RetryerBuilder withAttemptTimeLimiter(AttemptTimeLimiter attemptTimeLimiter) { + if (attemptTimeLimiter == null) { + throw new IllegalArgumentException("attemptTimeLimiter must not be null"); + } + this.attemptTimeLimiter = attemptTimeLimiter; + return this; + } + + public RetryerBuilder retryIfException() { + rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionClassPredicate(Exception.class)); + return this; + } + + public RetryerBuilder retryIfRuntimeException() { + rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionClassPredicate(RuntimeException.class)); + return this; + } + + public RetryerBuilder retryIfExceptionOfType(Class exceptionClass) { + if (exceptionClass == null) { + throw new IllegalArgumentException("exceptionClass must not be null"); + } + rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionClassPredicate(exceptionClass)); + return this; + } + + public RetryerBuilder retryIfException(Predicate exceptionPredicate) { + if (exceptionPredicate == null) { + throw new IllegalArgumentException("exceptionPredicate must not be null"); + } + rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionPredicate(exceptionPredicate)); + return this; + } + + public RetryerBuilder retryIfResult(Predicate resultPredicate) { + if (resultPredicate == null) { + throw new IllegalArgumentException("resultPredicate must not be null"); + } + rejectionPredicate = Predicates.or(rejectionPredicate, new ResultPredicate(resultPredicate)); + return this; + } + + public Retryer build() { + AttemptTimeLimiter theAttemptTimeLimiter = attemptTimeLimiter == null ? AttemptTimeLimiters.noTimeLimit() : attemptTimeLimiter; + StopStrategy theStopStrategy = stopStrategy == null ? StopStrategies.neverStop() : stopStrategy; + WaitStrategy theWaitStrategy = waitStrategy == null ? WaitStrategies.noWait() : waitStrategy; + BlockStrategy theBlockStrategy = blockStrategy == null ? BlockStrategies.threadSleepStrategy() : blockStrategy; + + return new Retryer(theAttemptTimeLimiter, theStopStrategy, theWaitStrategy, theBlockStrategy, rejectionPredicate); + } + + private static final class ExceptionClassPredicate implements Predicate> { + + private Class exceptionClass; + + public ExceptionClassPredicate(Class exceptionClass) { + this.exceptionClass = exceptionClass; + } + + @Override + public boolean apply(Attempt attempt) { + if (!attempt.hasException()) { + return false; + } + return exceptionClass.isAssignableFrom(attempt.getExceptionCause().getClass()); + } + } + + private static final class ExceptionPredicate implements Predicate> { + + private Predicate delegate; + + public ExceptionPredicate(Predicate delegate) { + this.delegate = delegate; + } + + @Override + public boolean apply(Attempt attempt) { + if (!attempt.hasException()) { + return false; + } + return delegate.apply(attempt.getExceptionCause()); + } + } + + private static final class ResultPredicate implements Predicate> { + + private Predicate delegate; + + public ResultPredicate(Predicate delegate) { + this.delegate = delegate; + } + + @Override + public boolean apply(Attempt attempt) { + if (!attempt.hasResult()) { + return false; + } + V result = attempt.getResult(); + return delegate.apply(result); + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/SimpleTimeLimiter.java b/src/main/java/io/growing/sdk/java/sender/retry/SimpleTimeLimiter.java new file mode 100644 index 0000000..8d6da9d --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/SimpleTimeLimiter.java @@ -0,0 +1,97 @@ +package io.growing.sdk.java.sender.retry; + +import java.lang.reflect.Array; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class SimpleTimeLimiter { + + private final ExecutorService executor; + + public SimpleTimeLimiter(ExecutorService executor) { + if (executor == null) { + executor = Executors.newSingleThreadExecutor(); + } + this.executor = executor; + } + + public SimpleTimeLimiter() { + this(Executors.newSingleThreadExecutor()); + } + + public T callWithTimeout( + Callable callable, long timeoutDuration, TimeUnit timeoutUnit, boolean amInterruptible) + throws Exception { + Future future = executor.submit(callable); + try { + if (amInterruptible) { + try { + return future.get(timeoutDuration, timeoutUnit); + } catch (InterruptedException e) { + future.cancel(true); + throw e; + } + } else { + return getUninterruptibly(future, timeoutDuration, timeoutUnit); + } + } catch (ExecutionException e) { + throw throwCause(e, true); + } catch (TimeoutException e) { + future.cancel(true); + throw e; + } + } + + private static V getUninterruptibly( + Future future, long timeout, TimeUnit unit) throws ExecutionException, TimeoutException { + boolean interrupted = false; + try { + long remainingNanos = unit.toNanos(timeout); + long end = System.nanoTime() + remainingNanos; + + while (true) { + try { + // Future treats negative timeouts just like zero. + return future.get(remainingNanos, NANOSECONDS); + } catch (InterruptedException e) { + interrupted = true; + remainingNanos = end - System.nanoTime(); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + + private static Exception throwCause(Exception e, boolean combineStackTraces) throws Exception { + Throwable cause = e.getCause(); + if (cause == null) { + throw e; + } + if (combineStackTraces) { + StackTraceElement[] combined = + concat(cause.getStackTrace(), e.getStackTrace(), StackTraceElement.class); + cause.setStackTrace(combined); + } + if (cause instanceof Exception) { + throw (Exception) cause; + } + if (cause instanceof Error) { + throw (Error) cause; + } + // The cause is a weird kind of Throwable, so throw the outer exception. + throw e; + } + + private static T[] concat( + T[] first, T[] second, Class type) { + T[] result = (T[]) Array.newInstance(type, first.length + second.length); + System.arraycopy(first, 0, result, 0, first.length); + System.arraycopy(second, 0, result, first.length, second.length); + return result; + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/StopStrategies.java b/src/main/java/io/growing/sdk/java/sender/retry/StopStrategies.java new file mode 100644 index 0000000..b300ae7 --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/StopStrategies.java @@ -0,0 +1,72 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.concurrent.TimeUnit; + +public final class StopStrategies { + + private static final StopStrategy NEVER_STOP = new NeverStopStrategy(); + + private StopStrategies() { + } + + // 永不停止策略 + public static StopStrategy neverStop() { + return NEVER_STOP; + } + + // 重试次数停止策略 + public static StopStrategy stopAfterAttempt(int attemptNumber) { + return new StopAfterAttemptStrategy(attemptNumber); + } + + // 超时停止策略 + public static StopStrategy stopAfterDelay(long delayInMillis) { + return stopAfterDelay(delayInMillis, TimeUnit.MILLISECONDS); + } + + public static StopStrategy stopAfterDelay(long duration, TimeUnit timeUnit) { + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + return new StopAfterDelayStrategy(timeUnit.toMillis(duration)); + } + + private static final class NeverStopStrategy implements StopStrategy { + @Override + public boolean shouldStop(Attempt failedAttempt) { + return false; + } + } + + private static final class StopAfterAttemptStrategy implements StopStrategy { + private final int maxAttemptNumber; + + public StopAfterAttemptStrategy(int maxAttemptNumber) { + if (maxAttemptNumber < 1) { + throw new IllegalArgumentException("maxAttemptNumber must be greater than 0"); + } + this.maxAttemptNumber = maxAttemptNumber; + } + + @Override + public boolean shouldStop(Attempt failedAttempt) { + return failedAttempt.getAttemptNumber() >= maxAttemptNumber; + } + } + + private static final class StopAfterDelayStrategy implements StopStrategy { + private final long maxDelay; + + public StopAfterDelayStrategy(long maxDelay) { + if (maxDelay < 0L) { + throw new IllegalArgumentException("maxDelay must be greater than 0"); + } + this.maxDelay = maxDelay; + } + + @Override + public boolean shouldStop(Attempt failedAttempt) { + return failedAttempt.getDelaySinceFirstAttempt() >= maxDelay; + } + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/StopStrategy.java b/src/main/java/io/growing/sdk/java/sender/retry/StopStrategy.java new file mode 100644 index 0000000..55d2b8d --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/StopStrategy.java @@ -0,0 +1,5 @@ +package io.growing.sdk.java.sender.retry; + +public interface StopStrategy { + boolean shouldStop(Attempt failedAttempt); +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategies.java b/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategies.java new file mode 100644 index 0000000..ac5b6ac --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategies.java @@ -0,0 +1,313 @@ +package io.growing.sdk.java.sender.retry; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class WaitStrategies { + + private static final WaitStrategy NO_WAIT_STRATEGY = new FixedWaitStrategy(0L); + + private WaitStrategies() { + } + + // 不等待,直接重试策略 + public static WaitStrategy noWait() { + return NO_WAIT_STRATEGY; + } + + public static WaitStrategy fixedWait(long sleepTime, TimeUnit timeUnit) throws IllegalStateException { + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + return new FixedWaitStrategy(timeUnit.toMillis(sleepTime)); + } + + public static WaitStrategy randomWait(long maximumTime, TimeUnit timeUnit) { + if (timeUnit == null) { + timeUnit = TimeUnit.MILLISECONDS; + } + return new RandomWaitStrategy(0L, timeUnit.toMillis(maximumTime)); + } + + public static WaitStrategy randomWait(long minimumTime, + TimeUnit minimumTimeUnit, + long maximumTime, + TimeUnit maximumTimeUnit) { + if (minimumTimeUnit == null) { + minimumTimeUnit = TimeUnit.MILLISECONDS; + } + if (maximumTimeUnit == null) { + maximumTimeUnit = TimeUnit.MILLISECONDS; + } + return new RandomWaitStrategy(minimumTimeUnit.toMillis(minimumTime), + maximumTimeUnit.toMillis(maximumTime)); + } + + public static WaitStrategy incrementingWait(long initialSleepTime, + TimeUnit initialSleepTimeUnit, + long increment, + TimeUnit incrementTimeUnit) { + if (initialSleepTimeUnit == null) { + initialSleepTimeUnit = TimeUnit.MILLISECONDS; + } + if (incrementTimeUnit == null) { + incrementTimeUnit = TimeUnit.MILLISECONDS; + } + return new IncrementingWaitStrategy(initialSleepTimeUnit.toMillis(initialSleepTime), + incrementTimeUnit.toMillis(increment)); + } + + public static WaitStrategy exponentialWait() { + return new ExponentialWaitStrategy(1, Long.MAX_VALUE); + } + + public static WaitStrategy exponentialWait(long multiplier, + long maximumTime, + TimeUnit maximumTimeUnit) { + if (maximumTimeUnit == null) { + maximumTimeUnit = TimeUnit.MILLISECONDS; + } + return new ExponentialWaitStrategy(multiplier, maximumTimeUnit.toMillis(maximumTime)); + } + + public static WaitStrategy fibonacciWait() { + return new FibonacciWaitStrategy(1, Long.MAX_VALUE); + } + + public static WaitStrategy fibonacciWait(long maximumTime, + TimeUnit maximumTimeUnit) { + if (maximumTimeUnit == null) { + maximumTimeUnit = TimeUnit.MILLISECONDS; + } + return new FibonacciWaitStrategy(1, maximumTimeUnit.toMillis(maximumTime)); + } + + public static WaitStrategy fibonacciWait(long multiplier, + long maximumTime, + TimeUnit maximumTimeUnit) { + if (maximumTimeUnit == null) { + maximumTimeUnit = TimeUnit.MILLISECONDS; + } + return new FibonacciWaitStrategy(multiplier, maximumTimeUnit.toMillis(maximumTime)); + } + + public static WaitStrategy exceptionWait(Class exceptionClass, + ExceptionWaitStrategyFunction function) { + if (exceptionClass == null) { + throw new IllegalArgumentException("exceptionClass must not be null"); + } + if (function == null) { + throw new IllegalArgumentException("function must not be null"); + } + return new ExceptionWaitStrategy(exceptionClass, function); + } + + public static WaitStrategy join(WaitStrategy... waitStrategies) { + if (waitStrategies == null || waitStrategies.length == 0) { + throw new IllegalArgumentException("waitStrategies must not be null"); + } + List waitStrategyList = convertToList(waitStrategies); + if (waitStrategyList.contains(null)) { + throw new IllegalArgumentException("waitStrategies must not contain null"); + } + return new CompositeWaitStrategy(waitStrategyList); + } + + private static List convertToList(E... elements) { + List list = new ArrayList(elements.length); + Collections.addAll(list, elements); + return list; + } + + // 固定等待时间策略 + private static final class FixedWaitStrategy implements WaitStrategy { + private final long sleepTime; + + public FixedWaitStrategy(long sleepTime) { + if (sleepTime < 0) { + throw new IllegalArgumentException("sleepTime must be greater than or equal to 0"); + } + this.sleepTime = sleepTime; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + return sleepTime; + } + } + + // 随机区间等待时间策略 + private static final class RandomWaitStrategy implements WaitStrategy { + private static final Random RANDOM = new Random(); + private final long minimum; + private final long maximum; + + public RandomWaitStrategy(long minimum, long maximum) { + if (minimum < 0) { + throw new IllegalArgumentException("minimum must be greater than 0"); + } + if (maximum <= minimum) { + throw new IllegalArgumentException("minimum must be less than maximum"); + } + + this.minimum = minimum; + this.maximum = maximum; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + long t = Math.abs(RANDOM.nextLong()) % (maximum - minimum); + return t + minimum; + } + } + + // 指定步长增长等待时间策略 + private static final class IncrementingWaitStrategy implements WaitStrategy { + private final long initialSleepTime; + private final long increment; + + public IncrementingWaitStrategy(long initialSleepTime, + long increment) { + if (initialSleepTime < 0) { + throw new IllegalArgumentException("initialSleepTime must be greater than or equal to 0"); + } + this.initialSleepTime = initialSleepTime; + this.increment = increment; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + long result = initialSleepTime + (increment * (failedAttempt.getAttemptNumber() - 1)); + return result >= 0L ? result : 0L; + } + } + + // 指数系数增长等待时长策略 + private static final class ExponentialWaitStrategy implements WaitStrategy { + private final long multiplier; + private final long maximumWait; + + public ExponentialWaitStrategy(long multiplier, + long maximumWait) { + if (multiplier <= 0) { + throw new IllegalArgumentException("multiplier must be greater or equal to 0"); + } + if (maximumWait < 0) { + throw new IllegalArgumentException("maximumWait must be greater than 0"); + } + if (multiplier >= maximumWait) { + throw new IllegalArgumentException("multiplier must be less than maximumWait"); + } + + this.multiplier = multiplier; + this.maximumWait = maximumWait; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + double exp = Math.pow(2, failedAttempt.getAttemptNumber()); + long result = Math.round(multiplier * exp); + if (result > maximumWait) { + result = maximumWait; + } + return result >= 0L ? result : 0L; + } + } + + // 斐波那契数列系数增长等待时长策略 + private static final class FibonacciWaitStrategy implements WaitStrategy { + private final long multiplier; + private final long maximumWait; + + public FibonacciWaitStrategy(long multiplier, long maximumWait) { + if (multiplier <= 0) { + throw new IllegalArgumentException("multiplier must be greater than 0"); + } + if (maximumWait < 0) { + throw new IllegalArgumentException("maximumWait must be greater than or equal to 0"); + } + if (multiplier >= maximumWait) { + throw new IllegalArgumentException("multiplier must be less than maximumWait"); + } + + this.multiplier = multiplier; + this.maximumWait = maximumWait; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + long fib = fib(failedAttempt.getAttemptNumber()); + long result = multiplier * fib; + + if (result > maximumWait || result < 0L) { + result = maximumWait; + } + + return result >= 0L ? result : 0L; + } + + private long fib(long n) { + if (n == 0L) return 0L; + if (n == 1L) return 1L; + + long prevPrev = 0L; + long prev = 1L; + long result = 0L; + + for (long i = 2L; i <= n; i++) { + result = prev + prevPrev; + prevPrev = prev; + prev = result; + } + + return result; + } + } + + // 组合 等待时长策略 + private static final class CompositeWaitStrategy implements WaitStrategy { + private final List waitStrategies; + + public CompositeWaitStrategy(List waitStrategies) { + if (waitStrategies == null || waitStrategies.isEmpty()) { + throw new IllegalArgumentException("waitStrategies must not be null or empty"); + } + this.waitStrategies = waitStrategies; + } + + @Override + public long computeSleepTime(Attempt failedAttempt) { + long waitTime = 0L; + for (WaitStrategy waitStrategy : waitStrategies) { + waitTime += waitStrategy.computeSleepTime(failedAttempt); + } + return waitTime; + } + } + + // 根据触发的异常决定等待时长,未命中异常则返回0L作为等待时长 + private static final class ExceptionWaitStrategy implements WaitStrategy { + private final Class exceptionClass; + private final ExceptionWaitStrategyFunction function; + + public ExceptionWaitStrategy(Class exceptionClass, ExceptionWaitStrategyFunction function) { + this.exceptionClass = exceptionClass; + this.function = function; + } + + @Override + public long computeSleepTime(Attempt lastAttempt) { + if (lastAttempt.hasException()) { + Throwable cause = lastAttempt.getExceptionCause(); + if (exceptionClass.isAssignableFrom(cause.getClass())) { + return function.apply((T) cause); + } + } + return 0L; + } + } + + public interface ExceptionWaitStrategyFunction { + R apply(T t); + } +} diff --git a/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategy.java b/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategy.java new file mode 100644 index 0000000..71abbed --- /dev/null +++ b/src/main/java/io/growing/sdk/java/sender/retry/WaitStrategy.java @@ -0,0 +1,6 @@ +package io.growing.sdk.java.sender.retry; + +public interface WaitStrategy { + + long computeSleepTime(Attempt failedAttempt); +} diff --git a/src/main/resources/gio_default.properties b/src/main/resources/gio_default.properties index a353113..02addc9 100644 --- a/src/main/resources/gio_default.properties +++ b/src/main/resources/gio_default.properties @@ -22,3 +22,22 @@ run.mode=production #connection.timeout=2000 #http 连接读取时间 #read.timeout=2000 +# 带拒绝策略的发送策略,默认不采用,此策略在队列快满时打印出debug日志,并且会使用新的线程(个数同send.msg.thread)加速消费队列元素 +# 但可能仍然消费速度不够,导致抛出GIOSendBeRejectedException异常,为了保险起见,使用者应当捕获该异常。 +# 并且此策略新增了shutdownAwait方法关联了队列状态和JVM关闭钩子,此举旨在防止主线程关闭时,内存队列未消费的元素丢失。 +# msg.store.strategy=abortPolicy +# 队列负载率,当为0.5时,表明,队列中元素达到一半时,会出现debug日志,并会使用新线程加速消费队列。队列负载降低到0.5以下后,恢复 +# 此值越大,队列越接近满状态,加速线程执行的时间越提前。"加速"可能对接口接收服务造成压力,谨慎使用! +# msg.store.queue.load_factor=0.5 +# 是否开启记录失败日志到文件中 +# logger.file.enabled=false +# 文件存储目录 +# logger.file.path=/logs +# 单个文件最大的大小,默认单位为MB +# logger.file.max_size=10 +# 文件留存天数,-1表示永久留存 +# logger.file.max_days=-1 +# 单条事件发送 读取超时时间 +# sync.read.timeout=2000 +# 单条时间发送 连接超时时间 +# sync.connection.timeout=2000 \ No newline at end of file diff --git a/src/test/java/io/growing/sdk/java/test/Case5RetryPolicyTest.java b/src/test/java/io/growing/sdk/java/test/Case5RetryPolicyTest.java new file mode 100644 index 0000000..6be177b --- /dev/null +++ b/src/test/java/io/growing/sdk/java/test/Case5RetryPolicyTest.java @@ -0,0 +1,97 @@ +package io.growing.sdk.java.test; + +import io.growing.sdk.java.sender.retry.*; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.*; + +public class Case5RetryPolicyTest { + + @Test + public void AttemptTest() throws InterruptedException { + int attemptCount = 3; + final CountDownLatch countDownLatch = new CountDownLatch(attemptCount); + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfException() + .retryIfResult(new Predicate() { + @Override + public boolean apply(Boolean input) { + return input; + } + }) + // 最多调用3次 + .withStopStrategy(StopStrategies.stopAfterAttempt(attemptCount)) + .build(); + + try { + retryer.call(new Callable() { + @Override + public Boolean call() throws Exception { + countDownLatch.countDown(); + // 触发重试 + return true; + } + }); + } catch (ExecutionException e) { + } catch (RetryException e) { + } + + countDownLatch.await(); + } + + @Test + public void ExceptionTest() throws InterruptedException { + int attemptCount = 3; + final CountDownLatch countDownLatch = new CountDownLatch(attemptCount); + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfExceptionOfType(IOException.class) + // 最多调用3次 + .withStopStrategy(StopStrategies.stopAfterAttempt(attemptCount)) + .build(); + + try { + retryer.call(new Callable() { + @Override + public Boolean call() throws Exception { + countDownLatch.countDown(); + // 触发重试 + throw new IOException(); + } + }); + } catch (ExecutionException e) { + } catch (RetryException e) { + } + + countDownLatch.await(); + } + + @Test + public void TimeLimitTest() throws InterruptedException { + int attemptCount = 3; + final CountDownLatch countDownLatch = new CountDownLatch(1); + Retryer retryer = RetryerBuilder.newBuilder() + .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(1000, TimeUnit.MILLISECONDS)) + .retryIfExceptionOfType(IOException.class) + // 最多调用3次 + .withStopStrategy(StopStrategies.stopAfterAttempt(attemptCount)) + .build(); + + try { + retryer.call(new Callable() { + @Override + public Boolean call() throws Exception { + Thread.sleep(2000); + // 触发重试 + throw new IOException(); + } + }); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause().getClass().isAssignableFrom(TimeoutException.class)); + countDownLatch.countDown(); + } catch (RetryException e) { + } + countDownLatch.await(); + } +} diff --git a/src/test/java/io/growing/sdk/java/test/Case6SendSyncTest.java b/src/test/java/io/growing/sdk/java/test/Case6SendSyncTest.java new file mode 100644 index 0000000..ffe5043 --- /dev/null +++ b/src/test/java/io/growing/sdk/java/test/Case6SendSyncTest.java @@ -0,0 +1,64 @@ +package io.growing.sdk.java.test; + +import io.growing.sdk.java.GrowingAPI; +import io.growing.sdk.java.dto.GioCdpEventMessage; +import io.growing.sdk.java.sender.SendResult; +import io.growing.sdk.java.sender.retry.*; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +public class Case6SendSyncTest { + private static final String PROJECT_KEY = "91eaf9b283361032"; + private static final String DATASOURCE_ID = "a390a68c7b25638c"; + private static GrowingAPI sender = new GrowingAPI.Builder().setDataSourceId(DATASOURCE_ID).setProjectKey(PROJECT_KEY).build(); + + @Test + public void SendSyncTest() throws InterruptedException { + SendResult sendResult = sender.sendSync(new GioCdpEventMessage.Builder() + .eventKey("simple") + .anonymousId("anonymousId") + .addEventVariable("key1", "value1") + .build()); + Assert.assertEquals(sendResult.getState(), SendResult.State.SUCCESS); + } + + @Test + public void CompositeFeatureTest() { + int attemptCount = 3; + final CountDownLatch countDownLatch = new CountDownLatch(1); + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfResult(new Predicate() { + @Override + public boolean apply(Boolean input) { + return input; + } + }) + // 最多调用3次 + .withStopStrategy(StopStrategies.stopAfterAttempt(attemptCount)) + .build(); + + try { + retryer.call(new Callable() { + @Override + public Boolean call() throws Exception { + countDownLatch.countDown(); + SendResult result = sender.sendSync(new GioCdpEventMessage.Builder() + .eventKey("simple") + .anonymousId("anonymousId") + .addEventVariable("key1", "value1") + .build()); + if (result.getState() == SendResult.State.SUCCESS) { + return false; + } + return true; + } + }); + } catch (ExecutionException e) { + } catch (RetryException e) { + } + } +} diff --git a/src/test/resources/gio.properties b/src/test/resources/gio.properties index 68d50bc..077c0f1 100644 --- a/src/test/resources/gio.properties +++ b/src/test/resources/gio.properties @@ -1,6 +1,13 @@ -api.host=http://117.50.105.254:8080 +api.host=https://napi.growingio.com connection.timeout=5000 read.timeout=5000 -run.mode=test +run.mode=production logger.level=debug + +sync.read.timeout=2000 +sync.connection.timeout=2000 +logger.file.enabled=true +logger.file.path=/Users/growingio/logs +logger.file.max_size=10 +logger.file.max_days=30 \ No newline at end of file