Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5d6e588
Initial commit for getting Samza to work with Java 11
james-deee Aug 18, 2022
1dbd8a0
Upgrade Hadoop to 3.3.4
james-deee Aug 18, 2022
e67cc59
Fix test
james-deee Aug 18, 2022
332ca88
Update gradle.properties
james-deee Aug 18, 2022
c38e2d9
Merge branch 'master' of github.com:james-deee/samza into jtd/get-sam…
james-deee Sep 17, 2022
f2971be
Merge branch 'jtd/get-samza-to-build-with-java11-real' of github.com:…
james-deee Sep 17, 2022
2cd5b5a
Move yarn back to 2.10.1 by default.
james-deee Sep 20, 2022
380486d
Upgrade gradle, remove yarn 3 change
james-deee Sep 20, 2022
bb5080b
Make a samza-yarn3 project that can be included given the right yarn …
james-deee Sep 21, 2022
4e1b7a0
SAMZA-2758: Upgrade vulnerable versions jetty and jackson (#1630)
perkss Sep 19, 2022
8058a51
Make StreamAppender extensible for sending event to SystemProducer (#…
jia-gao Sep 19, 2022
540a528
Apply patch from Yi
james-deee Sep 27, 2022
cad892e
PR comments and revert of powermock changes
james-deee Oct 3, 2022
6d24d92
Merge branch 'master' of github.com:james-deee/samza into jtd/get-sam…
james-deee Oct 3, 2022
5545325
Another powermock revert and adding in conditional to run script for …
james-deee Oct 3, 2022
5d9c790
A couple more reverts
james-deee Oct 3, 2022
bcddf68
Revert the yarn2 changes for samza-yarn/src/test/java/org/apache/samz…
james-deee Oct 3, 2022
a18e6d1
Remove commented out lines
james-deee Oct 3, 2022
473d39b
missed backtick
james-deee Oct 3, 2022
d1760cf
Add a samza-shell-yarn3 module
james-deee Oct 5, 2022
50e1baf
Get the actual distribution to be built :facepalm:
james-deee Oct 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,28 @@ To build Samza from a source release, it is first necessary to download the grad

After the bootstrap script has completed, the regular gradlew instructions below are available.

### Java Version Support

This project is built with Java 8 and can run in a Java 8 runtime enviornment. Additionally, it also supports running in a Java 11 runtime environment.
If you intend to use Samza in a Java 11 runtime environment, it means you will also need to use YARN 3.3.4+ and in which case, you should also use
the `samza-yarn3` module (built with YARN 3.3.4) instead of the `samza-yarn` (built with YARN 2.10.1). There is also a `samza-shell-yarn3` that
depends on the `samza-yarn3` module, so use that shell module if you intend on using Yarn 3.

#### Scala and YARN

Samza builds with [Scala](http://www.scala-lang.org/) 2.11 or 2.12 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.6.1, by default. Use the -PscalaSuffix switches to change Scala versions. Samza supports building Scala with 2.11 and 2.12.
Samza builds with [Scala](http://www.scala-lang.org/) 2.11 or 2.12 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.10.1, by default.
Use the -PscalaSuffix switches to change Scala versions. Samza supports building Scala with 2.11 and 2.12 and provides a YARN 2 module (`samze-yarn`) and a YARN 3 module (`samza-yarn3`).

NOTE: Some modules currently do **not** officially support Java 11 Runtime and are still using the YARN 2.10.1 dependency:
* `samza-yarn`
* `samza-shell`
* `samza-test`
* `samza-hdfs`


./gradlew -PscalaSuffix=2.12 clean build

./gradlew -PscalaSuffix=2.11 clean build
Also, you can make use of `bin/check-all.sh` in order to test multiple variants of Java JDKs, Scala, and Yarn.

### Testing Samza

Expand Down
138 changes: 138 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,76 @@ project(":samza-yarn_$scalaSuffix") {
jar.dependsOn("lesscss")
}

project(":samza-yarn3_$scalaSuffix") {
apply plugin: 'scala'
apply plugin: 'lesscss'

// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
sourceSets.test.scala.srcDir "src/test/java"

// Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
// tasks.compileTestJava.enabled = false
sourceSets.main.java.srcDirs = []
sourceSets.test.java.srcDirs = []

dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaSuffix")
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.scala-lang:scala-compiler:$scalaVersion"
compile "com.google.guava:guava:$guavaVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"
compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
compile("org.apache.hadoop:hadoop-yarn-api:$yarn3Version") {
}
compile("org.apache.hadoop:hadoop-yarn-common:$yarn3Version") {
exclude module: 'servlet-api'
}
compile("org.apache.hadoop:hadoop-yarn-client:$yarn3Version") {
exclude module: 'servlet-api'
}
compile("org.apache.hadoop:hadoop-common:$yarn3Version") {
exclude module: 'servlet-api'
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
compile("org.apache.hadoop:hadoop-hdfs:$yarn3Version") {
exclude module: 'servlet-api'
}
compile("org.scalatra:scalatra_$scalaSuffix:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
}
compile("org.scalatra:scalatra-scalate_$scalaSuffix:$scalatraVersion") {
exclude module: 'scala-compiler'
exclude module: 'slf4j-api'
}
compile "joda-time:joda-time:$jodaTimeVersion"
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
testCompile project(":samza-core_$scalaSuffix").sourceSets.test.output
testCompile "org.scalatest:scalatest_$scalaSuffix:$scalaTestVersion"
}

repositories {
maven {
url "https://repo.typesafe.com/typesafe/releases"
}
}

lesscss {
source = fileTree('src/main/less') {
include 'main.less'
}
dest = "$buildDir/resources/main/scalate/css"
}

jar.dependsOn("lesscss")
}

project(":samza-shell") {
apply plugin: 'java'

Expand Down Expand Up @@ -590,6 +660,73 @@ project(":samza-shell") {
}
}

project(":samza-shell-yarn3") {
apply plugin: 'java'

configurations {
gradleShell
}

dependencies {
gradleShell project(":samza-core_$scalaSuffix")
gradleShell project(":samza-kafka_$scalaSuffix")
gradleShell project(":samza-test_$scalaSuffix")
gradleShell project(":samza-yarn3_$scalaSuffix")
}

task shellTarGz(type: Tar) {
compression = Compression.GZIP
classifier = 'dist'
from(project(":samza-shell").file("./src/main/bash"))
from(project(":samza-shell").file("./src/main/resources"))
from(project(":samza-shell").file("./src/main/visualizer"))
}

artifacts {
archives(shellTarGz) {
name 'samza-shell-yarn3'
classifier 'dist'
}
}

// Usage: ./gradlew samza-shell:runJob \
// -PconfigPath=/path/to/job/config.properties
task runJob(type:JavaExec) {
description 'To run a job (defined in a properties file)'
main = 'org.apache.samza.job.JobRunner'
classpath = configurations.gradleShell
if (project.hasProperty('configPath')) args += [
'--config', 'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
'--config', 'job.config.loader.properties.path=' + configPath]
jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}

// Usage: ./gradlew samza-shell:checkpointTool \
// -PconfigPath=/path/to/job/config.properties -PnewOffsets=/path/to/new/offsets.properties
task checkpointTool(type:JavaExec) {
description 'Command-line tool to inspect and manipulate the job’s checkpoint'
main = 'org.apache.samza.checkpoint.CheckpointTool'
classpath = configurations.gradleShell
if (project.hasProperty('configPath')) args += [
'--config', 'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
'--config', 'job.config.loader.properties.path=' + configPath]
if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}

// Usage: ./gradlew samza-shell:kvPerformanceTest
// -PconfigPath=/path/to/job/config.properties
task kvPerformanceTest(type:JavaExec) {
description 'Command-line tool to run key-value performance tests'
main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
classpath = configurations.gradleShell
if (project.hasProperty('configPath')) args += [
'--config', 'job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory',
'--config', 'job.config.loader.properties.path=' + configPath]
jvmArgs = ["-Dlog4j.configurationFile=file:src/main/resources/log4j2-console.xml"]
}
}

project(":samza-kv_$scalaSuffix") {
apply plugin: 'scala'

Expand Down Expand Up @@ -710,6 +847,7 @@ project(":samza-rest_$scalaSuffix") {
compile "org.glassfish.jersey.media:jersey-media-moxy:$jerseyVersion"
compile "org.eclipse.jetty.aggregate:jetty-all:$jettyVersion"
compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"
compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jacksonVersion"
compile("org.apache.hadoop:hadoop-yarn-client:$yarnVersion") {
exclude module: 'servlet-api'
exclude group: 'com.sun.jersey'
Expand Down
5 changes: 4 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
group=org.apache.samza
version=1.7.0-SNAPSHOT
scalaSuffix=2.12
yarnVersion=2.10.1
# This version of YARN supports Java11 and allows Samza to be run in a Java11 runtime environment
yarn3Version=3.3.4

# after changing this value, run `$ ./gradlew wrapper` and commit the resulting changed files
gradleVersion=5.2.1
gradleVersion=6.9.2

org.gradle.jvmargs="-XX:MaxPermSize=512m"

Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@
jnaVersion = "4.5.1"
couchbaseClientVersion = "2.7.2"
couchbaseMockVersion = "1.5.22"
yarn3Version = "3.3.4"
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import java.util.Set;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

package org.apache.samza.system.hdfs.partitioner;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -29,16 +38,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.annotation.Nullable;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import static org.apache.samza.system.hdfs.partitioner.FileSystemAdapter.FileMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,14 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
val properties = new Properties()

if (isStreamMode) {
properties.putAll(ImmutableMap.of(
"cleanup.policy", "compact",
"segment.bytes", String.valueOf(segmentBytes),
"max.message.bytes", String.valueOf(maxMessageBytes)))
properties.put("cleanup.policy", "compact")
properties.put("segment.bytes", String.valueOf(segmentBytes))
properties.put("max.message.bytes", String.valueOf(maxMessageBytes))
} else {
properties.putAll(ImmutableMap.of(
"cleanup.policy", "compact,delete",
"retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
"segment.bytes", String.valueOf(segmentBytes),
"max.message.bytes", String.valueOf(maxMessageBytes)))
properties.put("cleanup.policy", "compact,delete")
properties.put("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
properties.put("segment.bytes", String.valueOf(segmentBytes))
properties.put("max.message.bytes", String.valueOf(maxMessageBytes))
}
properties
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.samza.rest;

import java.util.Collection;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.rest.resources.DefaultResourceFactory;
import org.apache.samza.rest.resources.ResourceFactory;
import org.apache.samza.util.ReflectionUtil;
import org.codehaus.jackson.jaxrs.JacksonJsonProvider;

import org.glassfish.jersey.server.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.samza.rest.proxy.task;

import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.rest.proxy.installation.InstallationFinder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.rest.proxy.task.TaskProxyFactory;
Expand Down
13 changes: 12 additions & 1 deletion samza-shell/src/main/bash/run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ function check_and_enable_64_bit_mode {
fi
}

# Try and use the -XX:+PrintGCDateStamps jvm argument. Java11 will fail
function check_and_enable_print_gc_datestamps {
`$JAVA -XX:+PrintGCDateStamps -version`
if [ $? -eq 0 ] ; then
JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDateStamps"
fi
}

### Inherit JVM_OPTS from task.opts configuration, and initialize defaults ###

# Make the MDC inheritable to child threads by setting the system property to true if config not explicitly specified
Expand Down Expand Up @@ -153,14 +161,17 @@ fi
[[ $JAVA_OPTS != *-Xmx* ]] && JAVA_OPTS="$JAVA_OPTS -Xmx768M"

# Check if the GC related flags are specified. If not - add the respective flags to JVM_OPTS.
[[ $JAVA_OPTS != *PrintGCDateStamps* && $JAVA_OPTS != *-Xloggc* ]] && JAVA_OPTS="$JAVA_OPTS -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log"
[[ $JAVA_OPTS != *-Xloggc* ]] && JAVA_OPTS="$JAVA_OPTS -Xloggc:$SAMZA_LOG_DIR/gc.log"

# Check if GC log rotation is already enabled. If not - add the respective flags to JVM_OPTS
[[ $JAVA_OPTS != *UseGCLogFileRotation* ]] && check_and_enable_gc_log_rotation

# Check if 64 bit is set. If not - try and set it if it's supported
[[ $JAVA_OPTS != *-d64* ]] && check_and_enable_64_bit_mode

# Check if we can use PrintGCDateStamps. Java 11 will fail if this is provided, Java 8 is fine
[[ $JAVA_OPTS != *PrintGCDateStamps* ]] && check_and_enable_print_gc_datestamps

# HADOOP_CONF_DIR should be supplied to classpath explicitly for Yarn to parse configs
echo $JAVA $JAVA_OPTS -cp $HADOOP_CONF_DIR:$PATHING_JAR_FILE "$@"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
Expand Down
Loading