diff --git a/LICENSE b/LICENSE index acfce36..4822dc7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,9 +1,31 @@ -Copyright 2010 Eugene Kirpichov, Dmitry Astapov. All rights reserved. +Copyright (c) 2009, Eugene Kirpichov -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: +All rights reserved. -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. -3. The name of the author may not be used to endorse or promote products derived from this software without specific prior written permission. +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: -THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * The names of contributors may not be used to endorse or promote + products derived from this software without specific prior + written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/greg-clients/haskell/LICENSE b/greg-clients/haskell/LICENSE index dd896ac..a2c3343 100644 --- a/greg-clients/haskell/LICENSE +++ b/greg-clients/haskell/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2009, Eugene Kirpichov +Copyright (c) 2009, Eugene Kirpichov, Dmitry Astapov All rights reserved. diff --git a/greg-clients/java/pom.xml b/greg-clients/java/pom.xml index 09a5f46..758061e 100644 --- a/greg-clients/java/pom.xml +++ b/greg-clients/java/pom.xml @@ -15,6 +15,15 @@ Greg java client jar + + + junit + junit + 4.8.1 + test + + + diff --git a/greg-clients/java/src/main/java/org/greg/client/Configuration.java b/greg-clients/java/src/main/java/org/greg/client/Configuration.java index 15387ac..e273981 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Configuration.java +++ b/greg-clients/java/src/main/java/org/greg/client/Configuration.java @@ -1,18 +1,179 @@ package org.greg.client; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Enumeration; +import java.util.Properties; + public class Configuration { - public final String server = System.getProperty("greg.server", "localhost"); - public final int port = Integer.parseInt(System.getProperty("greg.port", "5676")); - public final int calibrationPort = Integer.parseInt(System.getProperty("greg.calibrationPort", "5677")); - public final int flushPeriodMs = Integer.parseInt(System.getProperty("greg.flushPeriodMs", "1000")); - public final String clientId = System.getProperty("greg.clientId", "unknown"); - public final int maxBufferedRecords = Integer.parseInt(System.getProperty("greg.maxBufferedRecords", "1000000")); - public final boolean useCompression = Boolean.parseBoolean(System.getProperty("greg.useCompression", "true")); - public final int calibrationPeriodSec = Integer.parseInt(System.getProperty("greg.calibrationPeriodSec", "10")); + public static final String SERVER = "greg.server"; + public static final String HOST_NAME = "greg.hostname"; + public static final String PORT = "greg.port"; + public static final String CALIBRATION_PORT = "greg.port"; + public static final String CALIBRATION_PERIOD_SEC = "greg.calibrationPeriodSec"; + public static final String FLUSH_PERIOD_MS = "greg.flushPeriodMs"; + public static final String CLIENT_ID = "greg.clientId"; + public static final String MAX_BUFFERED_RECORDS = "greg.maxBufferedRecords"; + public static final String USE_COMPRESSION = "greg.useCompression"; /** * This field is not final - you can change it if you wish to use * your own configuration mechanism. */ - public static Configuration INSTANCE = new Configuration(); -} + public static final Configuration INSTANCE = new Configuration(); + + private static final String defaultPropertiesPath = "/greg.properties"; + private Properties properties = new Properties(); + private String server; + private String hostname; + private int port; + private int calibrationPort; + private int calibrationPeriodSec; + private int flushPeriodMs; + private String clientId; + private int maxBufferedRecords; + private boolean useCompression; + + public Configuration() { + this(defaultPropertiesPath); + } + + public Configuration(String path) { + this(Configuration.class.getClass().getResourceAsStream(path)); + } + + public Configuration(InputStream is) { + Properties defaultProperties = new Properties(); + if (is != null) { + try { + load(is, defaultProperties); + } catch (IOException e) { + // ignore + } finally { + close(is); + } + } + } + + public String get(String key) { + return properties.getProperty(key); + } + + public String get(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } + + public int getInt(String key) { + return Integer.parseInt(get(key)); + } + + public int getInt(String key, int defaultValue) { + return Integer.parseInt(get(key, String.valueOf(defaultValue))); + } + + public boolean getBoolean(String key) { + return Boolean.parseBoolean(get(key)); + } + + public boolean getBoolean(String key, boolean defaultValue) { + return Boolean.parseBoolean(get(key, String.valueOf(defaultValue))); + } + + public String getServer() { + return server; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public int getCalibrationPort() { + return calibrationPort; + } + + public int getCalibrationPeriodSec() { + return calibrationPeriodSec; + } + + public int getFlushPeriodMs() { + return flushPeriodMs; + } + + public String getClientId() { + return clientId; + } + + public int getMaxBufferedRecords() { + return maxBufferedRecords; + } + + public boolean isUseCompression() { + return useCompression; + } + + private void load(InputStream is, Properties defaultProperties) throws IOException { + properties.load(is); + loadDefaultProperties(defaultProperties); + merge(defaultProperties, properties); + initialize(); + } + + private void loadDefaultProperties(Properties properties) { + properties.setProperty(SERVER, "localhost"); + properties.setProperty(PORT, "5676"); + properties.setProperty(CALIBRATION_PORT, "5677"); + properties.setProperty(CALIBRATION_PERIOD_SEC, "10"); + properties.setProperty(FLUSH_PERIOD_MS, "1000"); + properties.setProperty(CLIENT_ID, "unknown"); + properties.setProperty(MAX_BUFFERED_RECORDS, "1000000"); + properties.setProperty(USE_COMPRESSION, "true"); + properties.setProperty(HOST_NAME, getHostName()); + } + + private void merge(Properties defaultProperties, Properties targetProperties) { + for(Object key : defaultProperties.keySet()) { + if (!targetProperties.containsKey(key)) { + targetProperties.setProperty((String) key, defaultProperties.getProperty((String) key)); + } + } + } + + private void initialize() { + server = get(SERVER); + hostname = get(HOST_NAME); + port = getInt(PORT); + calibrationPort = getInt(CALIBRATION_PORT); + calibrationPeriodSec = getInt(CALIBRATION_PERIOD_SEC); + flushPeriodMs = getInt(FLUSH_PERIOD_MS); + clientId = get(CLIENT_ID); + maxBufferedRecords = getInt(MAX_BUFFERED_RECORDS); + useCompression = getBoolean(USE_COMPRESSION); + } + + private String getHostName() { + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new AssertionError("Can't get localhost?"); + } + return hostname; + } + + private void close(Closeable is) { + try { + if (is != null) { + is.close(); + } + } catch (IOException ioe) { + // ignore + } + } +} \ No newline at end of file diff --git a/greg-clients/java/src/main/java/org/greg/client/Greg.java b/greg-clients/java/src/main/java/org/greg/client/Greg.java index 175f940..b9ac915 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Greg.java +++ b/greg-clients/java/src/main/java/org/greg/client/Greg.java @@ -8,7 +8,6 @@ import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -19,10 +18,9 @@ public class Greg { private static final AtomicInteger numDropped = new AtomicInteger(0); // Don't use ConcurrentLinkedQueue.size() because it's O(n) private static final AtomicInteger numRecords = new AtomicInteger(0); - private static final Configuration conf = Configuration.INSTANCE; + private static final Configuration config = Configuration.INSTANCE; private static final UUID OUR_UUID = UUID.randomUUID(); - private static final String hostname; static { Thread pushMessages = new Thread("GregPushMessages") { @@ -39,16 +37,10 @@ public void run() { }; initCalibration.setDaemon(true); initCalibration.start(); - - try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new AssertionError("Can't get localhost?"); - } } public static void log(String message) { - if (numRecords.get() < conf.maxBufferedRecords) { + if (numRecords.get() < config.getMaxBufferedRecords()) { numRecords.incrementAndGet(); Record r = new Record(); @@ -84,7 +76,7 @@ private static void pushCurrentMessages() { OutputStream stream = null; try { - client = new Socket(conf.server, conf.port); + client = new Socket(config.getServer(), config.getPort()); Trace.writeLine( "Client connected to " + client.getRemoteSocketAddress() + " from " + client.getLocalSocketAddress()); @@ -93,9 +85,9 @@ private static void pushCurrentMessages() { DataOutput w = new LittleEndianDataOutputStream(bStream); w.writeLong(OUR_UUID.getLeastSignificantBits()); w.writeLong(OUR_UUID.getMostSignificantBits()); - w.writeBoolean(conf.useCompression); + w.writeBoolean(config.isUseCompression()); - stream = new BufferedOutputStream(conf.useCompression ? new GZIPOutputStream(bStream) : bStream, 65536); + stream = new BufferedOutputStream(config.isUseCompression() ? new GZIPOutputStream(bStream) : bStream, 65536); exhausted = writeRecordsBatchTo(stream); } catch (Exception e) { Trace.writeLine("Failed to push messages: " + e); @@ -110,7 +102,7 @@ private static void pushCurrentMessages() { // Only sleep when waiting for new records. if (exhausted) { try { - Thread.sleep(conf.flushPeriodMs); + Thread.sleep(config.getFlushPeriodMs()); } catch (InterruptedException e) { continue; } @@ -141,12 +133,12 @@ private static void close(Socket sock) { private static boolean writeRecordsBatchTo(OutputStream stream) throws IOException { int maxBatchSize = 10000; DataOutput w = new LittleEndianDataOutputStream(stream); - byte[] cidBytes = conf.clientId.getBytes("utf-8"); + byte[] cidBytes = config.getClientId().getBytes("utf-8"); w.writeInt(cidBytes.length); w.write(cidBytes); int recordsWritten = 0; - byte[] machineBytes = hostname.getBytes("utf-8"); + byte[] machineBytes = config.getHostname().getBytes("utf-8"); CharsetEncoder enc = Charset.forName("utf-8").newEncoder(); @@ -189,7 +181,7 @@ private static void initiateCalibration() { while (true) { Socket client = null; try { - client = new Socket(conf.server, conf.calibrationPort); + client = new Socket(config.getServer(), config.getCalibrationPort()); client.setTcpNoDelay(true); exchangeTicksOver(client.getInputStream(), client.getOutputStream()); } catch (Exception e) { @@ -198,7 +190,7 @@ private static void initiateCalibration() { close(client); } try { - Thread.sleep(conf.calibrationPeriodSec * 1000L); + Thread.sleep(config.getCalibrationPeriodSec() * 1000L); } catch (InterruptedException e) { continue; } @@ -221,4 +213,4 @@ private static void exchangeTicksOver(InputStream in, OutputStream out) throws I // Our sample arrives to them after network latency. } } -} +} \ No newline at end of file diff --git a/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java new file mode 100644 index 0000000..5e23b22 --- /dev/null +++ b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java @@ -0,0 +1,32 @@ +package org.greg.client; + +import junit.framework.TestCase; + +import java.beans.IntrospectionException; +import java.io.IOException; +import java.util.Properties; + +import static org.greg.client.Configuration.*; + +/** + * @author Anton Panasenko + * Date: 25.12.10 + */ +public class ConfigurationTest extends TestCase { + private String[] keys = {SERVER, HOST_NAME, PORT, CALIBRATION_PORT, CALIBRATION_PERIOD_SEC, FLUSH_PERIOD_MS, + CLIENT_ID, MAX_BUFFERED_RECORDS, USE_COMPRESSION}; + + public void testPropertiesFiles() throws IOException, IntrospectionException { + Properties properties = new Properties(); + properties.load(getClass().getResourceAsStream("/greg.properties")); + + Configuration config = Configuration.INSTANCE; + + for(String key: keys) { + String value = properties.getProperty(key); + if (value != null) { + assertEquals(value, config.get(key)); + } + } + } +} \ No newline at end of file diff --git a/greg-clients/java/src/test/resources/greg.properties b/greg-clients/java/src/test/resources/greg.properties new file mode 100644 index 0000000..d08bad9 --- /dev/null +++ b/greg-clients/java/src/test/resources/greg.properties @@ -0,0 +1,8 @@ +greg.server=127.0.0.1 +greg.port=5676 +greg.calibrationPort=5677 +greg.flushPeriodMs=1000 +greg.clientId=default +greg.maxBufferedRecords=1000000 +greg.useCompression=true +greg.calibrationPeriodSec=10 \ No newline at end of file