From d0b82bb0d90e20f257cef651ec27c55c0fdc7f32 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Sat, 25 Dec 2010 18:05:25 +0300 Subject: [PATCH 1/3] LICENSE fix --- LICENSE | 34 ++++++++++++++++++++++++++++------ greg-clients/haskell/LICENSE | 2 +- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/LICENSE b/LICENSE index acfce36..4822dc7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,9 +1,31 @@ -Copyright 2010 Eugene Kirpichov, Dmitry Astapov. All rights reserved. +Copyright (c) 2009, Eugene Kirpichov -Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: +All rights reserved. -1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. -3. The name of the author may not be used to endorse or promote products derived from this software without specific prior written permission. +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: -THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * The names of contributors may not be used to endorse or promote + products derived from this software without specific prior + written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/greg-clients/haskell/LICENSE b/greg-clients/haskell/LICENSE index dd896ac..a2c3343 100644 --- a/greg-clients/haskell/LICENSE +++ b/greg-clients/haskell/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2009, Eugene Kirpichov +Copyright (c) 2009, Eugene Kirpichov, Dmitry Astapov All rights reserved. From 6a50190ae0e3142856cf626eaf497937a631dfa5 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Sat, 25 Dec 2010 19:59:19 +0300 Subject: [PATCH 2/3] Added int configurations from properties file (grep.properties) --- greg-clients/java/pom.xml | 9 ++ .../java/org/greg/client/Configuration.java | 147 ++++++++++++++++-- .../src/main/java/org/greg/client/Greg.java | 19 ++- .../org/greg/client/ConfigurationTest.java | 29 ++++ .../java/src/test/resources/greg.properties | 8 + 5 files changed, 193 insertions(+), 19 deletions(-) create mode 100644 greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java create mode 100644 greg-clients/java/src/test/resources/greg.properties diff --git a/greg-clients/java/pom.xml b/greg-clients/java/pom.xml index 09a5f46..758061e 100644 --- a/greg-clients/java/pom.xml +++ b/greg-clients/java/pom.xml @@ -15,6 +15,15 @@ Greg java client jar + + + junit + junit + 4.8.1 + test + + + diff --git a/greg-clients/java/src/main/java/org/greg/client/Configuration.java b/greg-clients/java/src/main/java/org/greg/client/Configuration.java index 15387ac..3b23efb 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Configuration.java +++ b/greg-clients/java/src/main/java/org/greg/client/Configuration.java @@ -1,18 +1,147 @@ package org.greg.client; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + public class Configuration { - public final String server = System.getProperty("greg.server", "localhost"); - public final int port = Integer.parseInt(System.getProperty("greg.port", "5676")); - public final int calibrationPort = Integer.parseInt(System.getProperty("greg.calibrationPort", "5677")); - public final int flushPeriodMs = Integer.parseInt(System.getProperty("greg.flushPeriodMs", "1000")); - public final String clientId = System.getProperty("greg.clientId", "unknown"); - public final int maxBufferedRecords = Integer.parseInt(System.getProperty("greg.maxBufferedRecords", "1000000")); - public final boolean useCompression = Boolean.parseBoolean(System.getProperty("greg.useCompression", "true")); - public final int calibrationPeriodSec = Integer.parseInt(System.getProperty("greg.calibrationPeriodSec", "10")); + public static final String SERVER = "greg.server"; + public static final String PORT = "greg.port"; + public static final String CALIBRATION_PORT = "greg.port"; + public static final String CALIBRATION_PERIOD_SEC = "greg.calibrationPeriodSec"; + public static final String FLUSH_PERIOD_MS = "greg.flushPeriodMs"; + public static final String CLIENT_ID = "greg.clientId"; + public static final String MAX_BUFFERED_RECORDS = "greg.maxBufferedRecords"; + public static final String USE_COMPRESSION = "greg.useCompression"; /** * This field is not final - you can change it if you wish to use * your own configuration mechanism. */ - public static Configuration INSTANCE = new Configuration(); + public static final Configuration INSTANCE = new Configuration(); + + private static final String defaultPropertiesPath = "/greg.properties"; + private Properties properties = new Properties(); + private String server; + private int port; + private int calibrationPort; + private int calibrationPeriodSec; + private int flushPeriodMs; + private String clientId; + private int maxBufferedRecords; + private boolean useCompression; + + public Configuration() { + this(defaultPropertiesPath); + if (properties.isEmpty()) { + loadDefaultProperties(properties); + initialize(); + } + } + + public Configuration(String path) { + this(Configuration.class.getClass().getResourceAsStream(path)); + } + + public Configuration(InputStream is) { + if (is != null) { + try { + properties.load(is); + initialize(); + } catch (IOException e) { + // ignore + } finally { + close(is); + } + } + } + + public String get(String key) { + return properties.getProperty(key); + } + + public String get(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } + + public int getInt(String key) { + return Integer.parseInt(get(key)); + } + + public int getInt(String key, int defaultValue) { + return Integer.parseInt(get(key, String.valueOf(defaultValue))); + } + + public boolean getBoolean(String key) { + return Boolean.parseBoolean(get(key)); + } + + public boolean getBoolean(String key, boolean defaultValue) { + return Boolean.parseBoolean(get(key, String.valueOf(defaultValue))); + } + + public String getServer() { + return server; + } + + public int getPort() { + return port; + } + + public int getCalibrationPort() { + return calibrationPort; + } + + public int getCalibrationPeriodSec() { + return calibrationPeriodSec; + } + + public int getFlushPeriodMs() { + return flushPeriodMs; + } + + public String getClientId() { + return clientId; + } + + public int getMaxBufferedRecords() { + return maxBufferedRecords; + } + + public boolean isUseCompression() { + return useCompression; + } + + private void initialize() { + server = get(SERVER); + port = getInt(PORT); + calibrationPort = getInt(CALIBRATION_PORT); + calibrationPeriodSec = getInt(CALIBRATION_PERIOD_SEC); + flushPeriodMs = getInt(FLUSH_PERIOD_MS); + clientId = get(CLIENT_ID); + maxBufferedRecords = getInt(MAX_BUFFERED_RECORDS); + useCompression = getBoolean(USE_COMPRESSION); + } + + private void loadDefaultProperties(Properties properties) { + properties.setProperty(SERVER, "localhost"); + properties.setProperty(PORT, "5676"); + properties.setProperty(CALIBRATION_PORT, "5677"); + properties.setProperty(CALIBRATION_PERIOD_SEC, "10"); + properties.setProperty(FLUSH_PERIOD_MS, "1000"); + properties.setProperty(CLIENT_ID, "unknown"); + properties.setProperty(MAX_BUFFERED_RECORDS, "1000000"); + properties.setProperty(USE_COMPRESSION, "true"); + } + + private void close(Closeable is) { + try { + if (is != null) { + is.close(); + } + } catch (IOException ioe) { + // ignore + } + } } diff --git a/greg-clients/java/src/main/java/org/greg/client/Greg.java b/greg-clients/java/src/main/java/org/greg/client/Greg.java index 175f940..dca00f7 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Greg.java +++ b/greg-clients/java/src/main/java/org/greg/client/Greg.java @@ -8,7 +8,6 @@ import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -19,7 +18,7 @@ public class Greg { private static final AtomicInteger numDropped = new AtomicInteger(0); // Don't use ConcurrentLinkedQueue.size() because it's O(n) private static final AtomicInteger numRecords = new AtomicInteger(0); - private static final Configuration conf = Configuration.INSTANCE; + private static final Configuration config = Configuration.INSTANCE; private static final UUID OUR_UUID = UUID.randomUUID(); private static final String hostname; @@ -48,7 +47,7 @@ public void run() { } public static void log(String message) { - if (numRecords.get() < conf.maxBufferedRecords) { + if (numRecords.get() < config.getMaxBufferedRecords()) { numRecords.incrementAndGet(); Record r = new Record(); @@ -84,7 +83,7 @@ private static void pushCurrentMessages() { OutputStream stream = null; try { - client = new Socket(conf.server, conf.port); + client = new Socket(config.getServer(), config.getPort()); Trace.writeLine( "Client connected to " + client.getRemoteSocketAddress() + " from " + client.getLocalSocketAddress()); @@ -93,9 +92,9 @@ private static void pushCurrentMessages() { DataOutput w = new LittleEndianDataOutputStream(bStream); w.writeLong(OUR_UUID.getLeastSignificantBits()); w.writeLong(OUR_UUID.getMostSignificantBits()); - w.writeBoolean(conf.useCompression); + w.writeBoolean(config.isUseCompression()); - stream = new BufferedOutputStream(conf.useCompression ? new GZIPOutputStream(bStream) : bStream, 65536); + stream = new BufferedOutputStream(config.isUseCompression() ? new GZIPOutputStream(bStream) : bStream, 65536); exhausted = writeRecordsBatchTo(stream); } catch (Exception e) { Trace.writeLine("Failed to push messages: " + e); @@ -110,7 +109,7 @@ private static void pushCurrentMessages() { // Only sleep when waiting for new records. if (exhausted) { try { - Thread.sleep(conf.flushPeriodMs); + Thread.sleep(config.getFlushPeriodMs()); } catch (InterruptedException e) { continue; } @@ -141,7 +140,7 @@ private static void close(Socket sock) { private static boolean writeRecordsBatchTo(OutputStream stream) throws IOException { int maxBatchSize = 10000; DataOutput w = new LittleEndianDataOutputStream(stream); - byte[] cidBytes = conf.clientId.getBytes("utf-8"); + byte[] cidBytes = config.getClientId().getBytes("utf-8"); w.writeInt(cidBytes.length); w.write(cidBytes); int recordsWritten = 0; @@ -189,7 +188,7 @@ private static void initiateCalibration() { while (true) { Socket client = null; try { - client = new Socket(conf.server, conf.calibrationPort); + client = new Socket(config.getServer(), config.getCalibrationPort()); client.setTcpNoDelay(true); exchangeTicksOver(client.getInputStream(), client.getOutputStream()); } catch (Exception e) { @@ -198,7 +197,7 @@ private static void initiateCalibration() { close(client); } try { - Thread.sleep(conf.calibrationPeriodSec * 1000L); + Thread.sleep(config.getCalibrationPeriodSec() * 1000L); } catch (InterruptedException e) { continue; } diff --git a/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java new file mode 100644 index 0000000..a280629 --- /dev/null +++ b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java @@ -0,0 +1,29 @@ +package org.greg.client; + +import junit.framework.TestCase; + +import java.beans.IntrospectionException; +import java.io.IOException; +import java.util.Properties; + +import static org.greg.client.Configuration.*; + +/** + * @author Anton Panasenko + * Date: 25.12.10 + */ +public class ConfigurationTest extends TestCase { + private String[] keys = {SERVER, PORT, CALIBRATION_PORT, CALIBRATION_PERIOD_SEC, FLUSH_PERIOD_MS, + CLIENT_ID, MAX_BUFFERED_RECORDS, USE_COMPRESSION}; + + public void testPropertiesFiles() throws IOException, IntrospectionException { + Properties properties = new Properties(); + properties.load(getClass().getResourceAsStream("/greg.properties")); + + Configuration config = Configuration.INSTANCE; + + for(String key: keys) { + assertEquals(properties.getProperty(key), config.get(key)); + } + } +} diff --git a/greg-clients/java/src/test/resources/greg.properties b/greg-clients/java/src/test/resources/greg.properties new file mode 100644 index 0000000..d08bad9 --- /dev/null +++ b/greg-clients/java/src/test/resources/greg.properties @@ -0,0 +1,8 @@ +greg.server=127.0.0.1 +greg.port=5676 +greg.calibrationPort=5677 +greg.flushPeriodMs=1000 +greg.clientId=default +greg.maxBufferedRecords=1000000 +greg.useCompression=true +greg.calibrationPeriodSec=10 \ No newline at end of file From 68ac562a8a2527b53555fa6fe1fe8d4d14f6daa1 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Sat, 25 Dec 2010 21:16:57 +0300 Subject: [PATCH 3/3] Move hostname field in Configuration class --- .../java/org/greg/client/Configuration.java | 64 ++++++++++++++----- .../src/main/java/org/greg/client/Greg.java | 11 +--- .../org/greg/client/ConfigurationTest.java | 11 ++-- 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/greg-clients/java/src/main/java/org/greg/client/Configuration.java b/greg-clients/java/src/main/java/org/greg/client/Configuration.java index 3b23efb..e273981 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Configuration.java +++ b/greg-clients/java/src/main/java/org/greg/client/Configuration.java @@ -3,10 +3,14 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Enumeration; import java.util.Properties; public class Configuration { public static final String SERVER = "greg.server"; + public static final String HOST_NAME = "greg.hostname"; public static final String PORT = "greg.port"; public static final String CALIBRATION_PORT = "greg.port"; public static final String CALIBRATION_PERIOD_SEC = "greg.calibrationPeriodSec"; @@ -24,6 +28,7 @@ public class Configuration { private static final String defaultPropertiesPath = "/greg.properties"; private Properties properties = new Properties(); private String server; + private String hostname; private int port; private int calibrationPort; private int calibrationPeriodSec; @@ -34,10 +39,6 @@ public class Configuration { public Configuration() { this(defaultPropertiesPath); - if (properties.isEmpty()) { - loadDefaultProperties(properties); - initialize(); - } } public Configuration(String path) { @@ -45,10 +46,10 @@ public Configuration(String path) { } public Configuration(InputStream is) { + Properties defaultProperties = new Properties(); if (is != null) { try { - properties.load(is); - initialize(); + load(is, defaultProperties); } catch (IOException e) { // ignore } finally { @@ -85,6 +86,10 @@ public String getServer() { return server; } + public String getHostname() { + return hostname; + } + public int getPort() { return port; } @@ -113,15 +118,11 @@ public boolean isUseCompression() { return useCompression; } - private void initialize() { - server = get(SERVER); - port = getInt(PORT); - calibrationPort = getInt(CALIBRATION_PORT); - calibrationPeriodSec = getInt(CALIBRATION_PERIOD_SEC); - flushPeriodMs = getInt(FLUSH_PERIOD_MS); - clientId = get(CLIENT_ID); - maxBufferedRecords = getInt(MAX_BUFFERED_RECORDS); - useCompression = getBoolean(USE_COMPRESSION); + private void load(InputStream is, Properties defaultProperties) throws IOException { + properties.load(is); + loadDefaultProperties(defaultProperties); + merge(defaultProperties, properties); + initialize(); } private void loadDefaultProperties(Properties properties) { @@ -133,6 +134,37 @@ private void loadDefaultProperties(Properties properties) { properties.setProperty(CLIENT_ID, "unknown"); properties.setProperty(MAX_BUFFERED_RECORDS, "1000000"); properties.setProperty(USE_COMPRESSION, "true"); + properties.setProperty(HOST_NAME, getHostName()); + } + + private void merge(Properties defaultProperties, Properties targetProperties) { + for(Object key : defaultProperties.keySet()) { + if (!targetProperties.containsKey(key)) { + targetProperties.setProperty((String) key, defaultProperties.getProperty((String) key)); + } + } + } + + private void initialize() { + server = get(SERVER); + hostname = get(HOST_NAME); + port = getInt(PORT); + calibrationPort = getInt(CALIBRATION_PORT); + calibrationPeriodSec = getInt(CALIBRATION_PERIOD_SEC); + flushPeriodMs = getInt(FLUSH_PERIOD_MS); + clientId = get(CLIENT_ID); + maxBufferedRecords = getInt(MAX_BUFFERED_RECORDS); + useCompression = getBoolean(USE_COMPRESSION); + } + + private String getHostName() { + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new AssertionError("Can't get localhost?"); + } + return hostname; } private void close(Closeable is) { @@ -144,4 +176,4 @@ private void close(Closeable is) { // ignore } } -} +} \ No newline at end of file diff --git a/greg-clients/java/src/main/java/org/greg/client/Greg.java b/greg-clients/java/src/main/java/org/greg/client/Greg.java index dca00f7..b9ac915 100644 --- a/greg-clients/java/src/main/java/org/greg/client/Greg.java +++ b/greg-clients/java/src/main/java/org/greg/client/Greg.java @@ -21,7 +21,6 @@ public class Greg { private static final Configuration config = Configuration.INSTANCE; private static final UUID OUR_UUID = UUID.randomUUID(); - private static final String hostname; static { Thread pushMessages = new Thread("GregPushMessages") { @@ -38,12 +37,6 @@ public void run() { }; initCalibration.setDaemon(true); initCalibration.start(); - - try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new AssertionError("Can't get localhost?"); - } } public static void log(String message) { @@ -145,7 +138,7 @@ private static boolean writeRecordsBatchTo(OutputStream stream) throws IOExcepti w.write(cidBytes); int recordsWritten = 0; - byte[] machineBytes = hostname.getBytes("utf-8"); + byte[] machineBytes = config.getHostname().getBytes("utf-8"); CharsetEncoder enc = Charset.forName("utf-8").newEncoder(); @@ -220,4 +213,4 @@ private static void exchangeTicksOver(InputStream in, OutputStream out) throws I // Our sample arrives to them after network latency. } } -} +} \ No newline at end of file diff --git a/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java index a280629..5e23b22 100644 --- a/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java +++ b/greg-clients/java/src/test/java/org/greg/client/ConfigurationTest.java @@ -13,8 +13,8 @@ * Date: 25.12.10 */ public class ConfigurationTest extends TestCase { - private String[] keys = {SERVER, PORT, CALIBRATION_PORT, CALIBRATION_PERIOD_SEC, FLUSH_PERIOD_MS, - CLIENT_ID, MAX_BUFFERED_RECORDS, USE_COMPRESSION}; + private String[] keys = {SERVER, HOST_NAME, PORT, CALIBRATION_PORT, CALIBRATION_PERIOD_SEC, FLUSH_PERIOD_MS, + CLIENT_ID, MAX_BUFFERED_RECORDS, USE_COMPRESSION}; public void testPropertiesFiles() throws IOException, IntrospectionException { Properties properties = new Properties(); @@ -23,7 +23,10 @@ public void testPropertiesFiles() throws IOException, IntrospectionException { Configuration config = Configuration.INSTANCE; for(String key: keys) { - assertEquals(properties.getProperty(key), config.get(key)); + String value = properties.getProperty(key); + if (value != null) { + assertEquals(value, config.get(key)); + } } } -} +} \ No newline at end of file