Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
aa05865
SAMZA-1498: Support arbitrary system clock timer in operators
xinyuiscool Mar 7, 2018
f7b0d38
SAMZA-1602: Moving class StreamAssert from src/test to src/main
Sanil15 Mar 7, 2018
89dc18e
SAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData
shanthoosh Mar 7, 2018
6492826
SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.w…
shanthoosh Mar 9, 2018
1971d59
Infinite loop when trying to use SamzaSqlApplicationRunner in yarn mode
srinipunuru Mar 9, 2018
2be7061
SAMZA-1610: Implementation of remote table provider
pdu-mn1 Mar 9, 2018
49e5073
SAMZA-1618: fix HdfsFileSystemAdapter to get files recursively
lhaiesp Mar 13, 2018
3f9b967
SAMZA-1611: BootstrappingChooser should use systemAdmin offsetCompara…
atoomula Mar 14, 2018
77986c2
Samza InMemory working patch
Sanil15 Mar 15, 2018
4a4de27
In Memory backup fixed
Sanil15 Mar 16, 2018
8b06fb8
Basic Working Inmemory module hooked with an integration test
Sanil15 Mar 16, 2018
43c947d
Initial Working Prototype
Sanil15 Mar 21, 2018
8bbb223
Working Module for TestStreamTask module with TaskAssert
Sanil15 Mar 23, 2018
9059f59
General Sync and Async Test Api
Sanil15 Mar 26, 2018
f51c1ab
Changes for LocalApplicationRunner to run Aynsc task
Sanil15 Mar 26, 2018
e7fffde
Async Test
Sanil15 Mar 26, 2018
3f23341
Adding end of stream
Sanil15 Mar 27, 2018
fe729a3
Working state of SamzaAsync Test
Sanil15 Mar 28, 2018
2dfbb48
CI Test Framework High Level Design
Sanil15 Mar 30, 2018
5d98466
Changing Apis using streamId notation
Sanil15 Apr 4, 2018
d81b5c4
Moving all task assert utilities to StreamAssert
Sanil15 Apr 5, 2018
99bf58b
Cleaning up code, changing APIs for create in TestTask and Collection…
Sanil15 Apr 6, 2018
aebd3fd
Final Design Working Prototype of Test Framework
Sanil15 Apr 12, 2018
8c95f8e
Local version of Test Framework
Sanil15 Apr 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
593 changes: 308 additions & 285 deletions build.gradle

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* See {@link InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
*/
@InterfaceStability.Unstable
public interface StreamApplication {
public interface StreamApplication {

/**
* Describes and initializes the transforms for processing message streams and generating results.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators;

/**
* Allows registering epoch-time timer callbacks from the operators.
* See {@link org.apache.samza.operators.functions.TimerFunction} for details.
* @param <K> type of the timer key
*/
public interface TimerRegistry<K> {

/**
* Register a epoch-time timer with key.
* @param key unique timer key
* @param timestamp epoch time when the timer will be fired, in milliseconds
*/
void register(K key, long timestamp);

/**
* Delete the timer for the provided key.
* @param key key for the timer to delete
*/
void delete(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators.functions;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.task.TaskContext;


/**
* A function that can be initialized before execution.
*
Expand All @@ -41,5 +41,4 @@ public interface InitableFunction {
* @param context the {@link TaskContext} for this task
*/
default void init(Config config, TaskContext context) { }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators.functions;

import org.apache.samza.operators.TimerRegistry;

import java.util.Collection;

/**
* Allows timer registration with a key and is invoked when the timer is fired.
* Key must be a unique identifier for this timer, and is provided in the callback when the timer fires.
*
* <p>
* Example of a {@link FlatMapFunction} with timer:
* <pre>{@code
* public class ExampleTimerFn implements FlatMapFunction<String, String>, TimerFunction<String, String> {
* public void registerTimer(TimerRegistry timerRegistry) {
* long time = System.currentTimeMillis() + 5000; // fire after 5 sec
* timerRegistry.register("example-timer", time);
* }
* public Collection<String> apply(String s) {
* ...
* }
* public Collection<String> onTimer(String key, long timestamp) {
* // example-timer fired
* ...
* }
* }
* }</pre>
* @param <K> type of the key
* @param <OM> type of the output
*/
public interface TimerFunction<K, OM> {

/**
* Registers any epoch-time timers using the registry
* @param timerRegistry a keyed {@link TimerRegistry}
*/
void registerTimer(TimerRegistry<K> timerRegistry);

/**
* Returns the output after the timer with key fires.
* @param key timer key
* @param timestamp time of the epoch-time timer fired, in milliseconds
* @return {@link Collection} of output elements
*/
Collection<OM> onTimer(K key, long timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.Map;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.operators.KV;
import org.apache.samza.task.TaskContext;


/**
Expand All @@ -34,6 +36,14 @@
*/
@InterfaceStability.Unstable
public interface ReadableTable<K, V> extends Table<KV<K, V>> {
/**
* Initializes the table during container initialization.
* Guaranteed to be invoked as the first operation on the table.
* @param containerContext Samza container context
* @param taskContext nullable for global table
*/
default void init(SamzaContainerContext containerContext, TaskContext taskContext) {
}

/**
* Gets the value associated with the specified {@code key}.
Expand All @@ -57,5 +67,4 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> {
* Close the table and release any resources acquired
*/
void close();

}
18 changes: 11 additions & 7 deletions samza-api/src/main/java/org/apache/samza/table/TableProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.task.TaskContext;


/**
Expand All @@ -29,6 +31,13 @@
*/
@InterfaceStability.Unstable
public interface TableProvider {
/**
* Initialize TableProvider with container and task context
* @param containerContext Samza container context
* @param taskContext nullable for global table
*/
void init(SamzaContainerContext containerContext, TaskContext taskContext);

/**
* Get an instance of the table for read/write operations
* @return the underlying table
Expand All @@ -46,12 +55,7 @@ public interface TableProvider {
Map<String, String> generateConfig(Map<String, String> config);

/**
* Start the underlying table
*/
void start();

/**
* Stop the underlying table
* Shutdown the underlying table
*/
void stop();
void close();
}
19 changes: 19 additions & 0 deletions samza-api/src/main/java/org/apache/samza/task/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,23 @@ default void setUserContext(Object context) { }
default Object getUserContext() {
return null;
}

/**
* Register a keyed timer with a callback of {@link TimerCallback} in this task.
* The callback will be invoked exclusively with any other operations for this task,
* e.g. processing, windowing and commit.
* @param key timer key
* @param timestamp epoch time when the timer will be fired, in milliseconds
* @param callback callback when the timer is fired
* @param <K> type of the key
*/
<K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);

/**
* Delete the keyed timer in this task.
* Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
* @param key timer key
* @param <K> type of the key
*/
<K> void deleteTimer(K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table;

import org.apache.samza.storage.StorageEngine;

package org.apache.samza.task;

/**
* Interface for tables backed by Samza local stores. The backing stores are
* injected during initialization of the table. Since the lifecycle
* of the underlying stores are already managed by Samza container,
* the table provider will not manage the lifecycle of the backing
* stores.
* The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
* @param <K> type of the timer key
*/
public interface LocalStoreBackedTableProvider extends TableProvider {
public interface TimerCallback<K> {
/**
* Initializes the table provider with the backing store
* @param store the backing store
* Invoked when the timer of key fires.
* @param key timer key
* @param collector contains the means of sending message envelopes to the output stream.
* @param coordinator manages execution of tasks.
*/
void init(StorageEngine store);
void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
}
20 changes: 4 additions & 16 deletions samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.samza.annotation.InterfaceStability;
Expand All @@ -43,7 +44,6 @@
* <ul>
* <li>Block indefinitely until requested credits become available</li>
* <li>Block for a provided amount of time, then return available credits</li>
* <li>Non-blocking, returns immediately available credits</li>
* </ul>
*
*/
Expand Down Expand Up @@ -79,15 +79,6 @@ public interface RateLimiter extends Serializable {
*/
int acquire(int numberOfCredit, long timeout, TimeUnit unit);

/**
* Attempt to acquire the provided number of credits, returns immediately number of
* credits acquired.
*
* @param numberOfCredit requested number of credits
* @return number of credits acquired
*/
int tryAcquire(int numberOfCredit);

/**
* Attempt to acquire the provided number of credits for a number of tags, blocks indefinitely
* until all requested credits become available
Expand All @@ -110,11 +101,8 @@ public interface RateLimiter extends Serializable {
Map<String, Integer> acquire(Map<String, Integer> tagToCreditMap, long timeout, TimeUnit unit);

/**
* Attempt to acquire the provided number of credits for a number of tags, returns immediately number of
* credits acquired.
*
* @param tagToCreditMap a map of requested number of credits keyed by tag
* @return a map of number of credits acquired keyed by tag
* Get the entire set of tags for which we have configured credits for rate limiting.
* @return set of supported tags
*/
Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditMap);
Set<String> getSupportedTags();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.apache.samza.container;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.ReadableMetricsRegistry;
Expand All @@ -32,11 +29,16 @@
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableManager;
import org.apache.samza.task.SystemTimerScheduler;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TimerCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

public class TaskContextImpl implements TaskContext {
private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class);
Expand All @@ -51,6 +53,7 @@ public class TaskContextImpl implements TaskContext {
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
private final Map<String, Object> objectRegistry = new HashMap<>();
private final SystemTimerScheduler timerScheduler;

private Object userContext = null;

Expand All @@ -62,7 +65,8 @@ public TaskContextImpl(TaskName taskName,
TaskStorageManager storageManager,
TableManager tableManager,
JobModel jobModel,
StreamMetadataCache streamMetadataCache) {
StreamMetadataCache streamMetadataCache,
ScheduledExecutorService timerExecutor) {
this.taskName = taskName;
this.metrics = metrics;
this.containerContext = containerContext;
Expand All @@ -72,6 +76,7 @@ public TaskContextImpl(TaskName taskName,
this.tableManager = tableManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
}

@Override
Expand Down Expand Up @@ -129,6 +134,16 @@ public Object getUserContext() {
return userContext;
}

@Override
public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
timerScheduler.setTimer(key, timestamp, callback);
}

@Override
public <K> void deleteTimer(K key) {
timerScheduler.deleteTimer(key);
}

public void registerObject(String name, Object value) {
objectRegistry.put(name, value);
}
Expand All @@ -144,4 +159,8 @@ public JobModel getJobModel() {
public StreamMetadataCache getStreamMetadataCache() {
return streamMetadataCache;
}

public SystemTimerScheduler getTimerScheduler() {
return timerScheduler;
}
}
Loading