diff --git a/java/pom.xml b/java/pom.xml
index 88571ce5..e97406ca 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -13,6 +13,7 @@
2.0.17.Final
9.2.10.v20150310
2.7.2
+ 2.1.1
UNKNOWN
@@ -60,6 +61,9 @@
*:*
META-INF/maven/**
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
google/**
skein.proto
@@ -390,6 +394,20 @@
provided
+
+ org.apache.hive
+ hive-jdbc
+ ${hiveVersion}
+ provided
+
+
+
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${hiveVersion}
+ provided
+
+
junit
diff --git a/java/src/main/java/com/anaconda/skein/DelegationTokenManager.java b/java/src/main/java/com/anaconda/skein/DelegationTokenManager.java
new file mode 100644
index 00000000..a8a3f8e6
--- /dev/null
+++ b/java/src/main/java/com/anaconda/skein/DelegationTokenManager.java
@@ -0,0 +1,165 @@
+package com.anaconda.skein;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.jdbc.HiveConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.commons.lang.SystemUtils.USER_NAME;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+
+public class DelegationTokenManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class);
+ private Credentials credentials;
+ private final List delegationTokenProviders;
+
+ public DelegationTokenManager() { this.delegationTokenProviders=new LinkedList<>(); }
+
+ public void addTokenProvider(Model.DelegationTokenProvider p) { this.delegationTokenProviders.add(p); }
+
+ public void initializeCredentials(Credentials credentials) { this.credentials = credentials; }
+
+ public void obtainTokensHDFS(YarnClient yarnClient, FileSystem fs, Model.ApplicationSpec spec,
+ Configuration conf) throws IOException, YarnException {
+ // Collect security tokens as needed
+ LOG.debug("Collecting filesystem delegation tokens");
+
+ List l= spec.getFileSystems();
+ l.add(0, new Path(fs.getUri()));
+
+ TokenCache.obtainTokensForNamenodes(this.credentials,
+ l.toArray(new Path[l.size()]), conf);
+
+ boolean hasRMToken = false;
+ for (Token> token: this.credentials.getAllTokens()) {
+ if (token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME)) {
+ LOG.debug("RM delegation token already acquired");
+ hasRMToken = true;
+ break;
+ }
+ }
+ if (!hasRMToken) {
+ LOG.debug("Adding RM delegation token");
+ Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(conf);
+ String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+ org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
+ yarnClient.getRMDelegationToken(new Text(tokenRenewer));
+ Token rmToken = ConverterUtils.convertFromYarn(
+ rmDelegationToken, rmDelegationTokenService
+ );
+ this.credentials.addToken(rmDelegationTokenService, rmToken);
+ }
+ }
+
+ private Text getUniqueAlias(Token> token) {
+ return new Text(String.format("%s_%s_%d", token.getKind().toString(),
+ token.getService().toString(), System.currentTimeMillis()));
+ }
+
+ // Delegation token based connection is explained here:
+ // https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-Multi-UserScenariosandProgrammaticLogintoKerberosKDC
+ // This method is inspired from org.apache.oozie.action.hadoop.Hive2Credentials which does the same thing for Oozie.
+ public void obtainTokensHive(Map config, String user) {
+ String jdbcUrl = config.get("hive.jdbc.url");
+ String principal = config.get("hive.jdbc.principal");
+ String fullUrl = jdbcUrl + ";principal=" + principal + ";hive.server2.proxy.user=" + user;
+
+ try {
+ // load the driver
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+
+ Connection con = null;
+ String tokenStr = null;
+ try {
+ con = DriverManager.getConnection(fullUrl);
+ LOG.info("Connected successfully to " + fullUrl);
+ tokenStr = ((HiveConnection)con).getDelegationToken(user, principal);
+ } finally {
+ if (con != null) { con.close(); }
+ }
+ LOG.info("Got Hive Server token from " + fullUrl);
+
+ Token hiveToken = new Token();
+ hiveToken.decodeFromUrlString(tokenStr);
+ credentials.addToken(getUniqueAlias(hiveToken), hiveToken);
+ } catch (IOException | SQLException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // This method is inspired from org.apache.oozie.action.hadoop.HCatCredentials which does the same thing for Oozie.
+ public void obtainTokensHCat(Map config, String user){
+ String principal = config.get("hcat.metastore.principal");
+ String server = config.get("hcat.metastore.uri");
+
+ String protection = "authentication"; // default value
+ if(config.containsKey(HADOOP_RPC_PROTECTION))
+ protection = config.get(HADOOP_RPC_PROTECTION);
+
+ try {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set("hive.metastore.sasl.enabled", "true");
+ hiveConf.set("hive.metastore.kerberos.principal", principal);
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, server);
+ hiveConf.set(HADOOP_RPC_PROTECTION, protection);
+
+ HCatClient hiveClient = HCatClient.create(hiveConf);
+ String tokenStrForm = hiveClient.getDelegationToken(user, principal);
+ Token hcatToken = new Token<>();
+ hcatToken.decodeFromUrlString(tokenStrForm);
+ credentials.addToken(getUniqueAlias(hcatToken), hcatToken);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // For some systems (like Hive) to obtain the delegation token we need to do it
+ // while we are authenticated with kerberos, before we impersonate the user.
+ public void obtainTokensWithoutImpersonation(String userWeAuthenticateFor) throws IllegalArgumentException {
+ for (Model.DelegationTokenProvider p : this.delegationTokenProviders) {
+ if(p.getName().equals("hive")) {
+ this.obtainTokensHive(p.getConfig(), userWeAuthenticateFor);
+ } else if(p.getName().equals("hcat")) {
+ this.obtainTokensHCat(p.getConfig(), userWeAuthenticateFor);
+ } else {
+ throw new IllegalArgumentException("The Provider for Delegation Token was not found");
+ }
+ }
+ }
+
+ public ByteBuffer toBytes() throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ this.credentials.writeTokenStorageToStream(dob);
+ return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+}
diff --git a/java/src/main/java/com/anaconda/skein/Driver.java b/java/src/main/java/com/anaconda/skein/Driver.java
index 9010d9b4..023b7169 100644
--- a/java/src/main/java/com/anaconda/skein/Driver.java
+++ b/java/src/main/java/com/anaconda/skein/Driver.java
@@ -1,7 +1,6 @@
package com.anaconda.skein;
import com.google.common.base.Strings;
-import com.google.common.collect.ObjectArrays;
import com.google.protobuf.ByteString;
import io.grpc.Server;
@@ -20,13 +19,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -43,33 +36,23 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
public class Driver {
@@ -359,9 +342,14 @@ private void killApplicationInner(YarnClient yarnClient, FileSystem fs,
/** Start a new application. **/
public ApplicationId submitApplication(final Model.ApplicationSpec spec)
throws IOException, YarnException, InterruptedException {
+ DelegationTokenManager tokenManager = spec.getDelegationTokenManager();
+ tokenManager.initializeCredentials(UserGroupInformation.getCurrentUser().getCredentials());
+
if (spec.getUser().isEmpty()) {
return submitApplicationInner(defaultYarnClient, defaultFileSystem, spec);
- } else {
+ }
+ else {
+ tokenManager.obtainTokensWithoutImpersonation(spec.getUser());
return UserGroupInformation.createProxyUser(spec.getUser(), ugi).doAs(
new PrivilegedExceptionAction() {
public ApplicationId run() throws IOException, YarnException {
@@ -371,8 +359,7 @@ public ApplicationId run() throws IOException, YarnException {
}
}
- private ApplicationId submitApplicationInner(YarnClient yarnClient,
- FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException {
+ private ApplicationId submitApplicationInner(YarnClient yarnClient, FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException {
// First validate the spec request
spec.validate();
@@ -414,12 +401,17 @@ private ApplicationId submitApplicationInner(YarnClient yarnClient,
+ appDir
+ " >" + logdir + "/application.master.log 2>&1"));
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
ByteBuffer fsTokens = null;
if (UserGroupInformation.isSecurityEnabled()) {
- fsTokens = collectTokens(yarnClient, fs, spec);
- } else {
- env.put("HADOOP_USER_NAME", ugi.getUserName());
+ DelegationTokenManager tokenManager = spec.getDelegationTokenManager();
+ tokenManager.obtainTokensHDFS(yarnClient, fs, spec, conf);
+ fsTokens = tokenManager.toBytes();
+
+ // We cancel the delegation token when the job finishes, so that it cannot be used elsewhere
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
+ }
+ else {
+ env.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
}
Map acls = spec.getAcls().getYarnAcls();
@@ -452,43 +444,6 @@ private ApplicationId submitApplicationInner(YarnClient yarnClient,
return appId;
}
- private ByteBuffer collectTokens(YarnClient yarnClient, FileSystem fs,
- Model.ApplicationSpec spec) throws IOException, YarnException {
- // Collect security tokens as needed
- LOG.debug("Collecting filesystem delegation tokens");
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
- TokenCache.obtainTokensForNamenodes(
- credentials,
- ObjectArrays.concat(
- new Path(fs.getUri()),
- spec.getFileSystems().toArray(new Path[0])),
- conf);
-
- boolean hasRMToken = false;
- for (Token> token: credentials.getAllTokens()) {
- if (token.getKind().equals(RMDelegationTokenIdentifier.KIND_NAME)) {
- LOG.debug("RM delegation token already acquired");
- hasRMToken = true;
- break;
- }
- }
- if (!hasRMToken) {
- LOG.debug("Adding RM delegation token");
- Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(conf);
- String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
- org.apache.hadoop.yarn.api.records.Token rmDelegationToken =
- yarnClient.getRMDelegationToken(new Text(tokenRenewer));
- Token rmToken = ConverterUtils.convertFromYarn(
- rmDelegationToken, rmDelegationTokenService
- );
- credentials.addToken(rmDelegationTokenService, rmToken);
- }
-
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
-
private LocalResource finalizeSecurityFile(
FileSystem fs, Map uploadCache, Path appDir,
LocalResource file, ByteString bytes, String filename)
@@ -1155,10 +1110,14 @@ public void submit(Msg.ApplicationSpec req,
try {
appId = submitApplication(spec);
} catch (Exception exc) {
+ StringWriter sw = new StringWriter();
+ exc.printStackTrace(new PrintWriter(sw));
+
resp.onError(Status.INTERNAL
.withDescription("Failed to submit application, "
+ "exception:\n"
- + exc.getMessage())
+ + exc.getMessage()
+ + sw.toString())
.asRuntimeException());
return;
}
diff --git a/java/src/main/java/com/anaconda/skein/Model.java b/java/src/main/java/com/anaconda/skein/Model.java
index 7ec879f4..68a0fb5d 100644
--- a/java/src/main/java/com/anaconda/skein/Model.java
+++ b/java/src/main/java/com/anaconda/skein/Model.java
@@ -143,7 +143,24 @@ public void validate() throws IllegalArgumentException {
}
}
- public static class Acls {
+ public static class DelegationTokenProvider {
+ private String name;
+ private Map config;
+
+ public DelegationTokenProvider(String name, Map config) {
+ this.name = name;
+ this.config = config;
+ }
+
+ public String getName() { return name;}
+ public void setName(String name) { this.name = name;}
+
+ public Map getConfig() {return config;}
+ public void setConfig(Map config) { this.config = config;}
+ }
+
+
+ public static class Acls {
private boolean enable;
private List viewUsers;
private List viewGroups;
@@ -292,6 +309,8 @@ public void validate() throws IllegalArgumentException {
}
}
+
+
public static class ApplicationSpec {
private String name;
private String queue;
@@ -300,6 +319,7 @@ public static class ApplicationSpec {
private int maxAttempts;
private Set tags;
private List fileSystems;
+ private DelegationTokenManager delegationTokenManager;
private Acls acls;
private Master master;
private Map services;
@@ -308,8 +328,8 @@ public ApplicationSpec() {}
public ApplicationSpec(String name, String queue, String user,
String nodeLabel, int maxAttempts, Set tags,
- List fileSystems, Acls acls, Master master,
- Map services) {
+ List fileSystems, DelegationTokenManager delegationTokenManager,
+ Acls acls, Master master, Map services) {
this.name = name;
this.queue = queue;
this.user = user;
@@ -317,6 +337,7 @@ public ApplicationSpec(String name, String queue, String user,
this.maxAttempts = maxAttempts;
this.tags = tags;
this.fileSystems = fileSystems;
+ this.delegationTokenManager = delegationTokenManager;
this.acls = acls;
this.master = master;
this.services = services;
@@ -330,6 +351,7 @@ public String toString() {
+ "maxAttempts: " + maxAttempts + ", "
+ "tags: " + tags + ", "
+ "fileSystems" + fileSystems + ", "
+ + "delegationTokenManager" + delegationTokenManager + ", "
+ "services: " + services + ">");
}
@@ -356,6 +378,11 @@ public void setFileSystems(List fileSystems) {
}
public List getFileSystems() { return this.fileSystems; }
+ public void setDelegationTokenManager(DelegationTokenManager delegationTokenManager) {
+ this.delegationTokenManager = delegationTokenManager;
+ }
+ public DelegationTokenManager getDelegationTokenManager() { return this.delegationTokenManager; }
+
public void setAcls(Acls acls) { this.acls = acls; }
public Acls getAcls() { return this.acls; }
diff --git a/java/src/main/java/com/anaconda/skein/MsgUtils.java b/java/src/main/java/com/anaconda/skein/MsgUtils.java
index 6c8f2dd6..dc6dafa3 100644
--- a/java/src/main/java/com/anaconda/skein/MsgUtils.java
+++ b/java/src/main/java/com/anaconda/skein/MsgUtils.java
@@ -23,11 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class MsgUtils {
public static final Msg.Empty EMPTY = Msg.Empty.newBuilder().build();
@@ -605,8 +601,13 @@ public static Model.ApplicationSpec readApplicationSpec(Msg.ApplicationSpec spec
}
final List fileSystems = new ArrayList();
- for (int i = 0; i < spec.getFileSystemsCount(); i++) {
- fileSystems.add(new Path(spec.getFileSystems(i)));
+ for (String f: spec.getFileSystemsList()) {
+ fileSystems.add(new Path(f));
+ }
+
+ final DelegationTokenManager delegationTokenManager = new DelegationTokenManager();
+ for(Msg.DelegationTokenProviderSpec s :spec.getDelegationTokenProvidersList()) {
+ delegationTokenManager.addTokenProvider(new Model.DelegationTokenProvider(s.getName(), s.getConfigMap()));
}
return new Model.ApplicationSpec(spec.getName(),
@@ -616,6 +617,7 @@ public static Model.ApplicationSpec readApplicationSpec(Msg.ApplicationSpec spec
spec.getMaxAttempts(),
new HashSet(spec.getTagsList()),
fileSystems,
+ delegationTokenManager,
readAcls(spec.getAcls()),
readMaster(spec.getMaster()),
services);
diff --git a/java/src/main/proto/skein.proto b/java/src/main/proto/skein.proto
index 678edb65..183f927a 100644
--- a/java/src/main/proto/skein.proto
+++ b/java/src/main/proto/skein.proto
@@ -140,6 +140,12 @@ message Master {
}
+message DelegationTokenProviderSpec {
+ string name = 1;
+ map config = 2;
+}
+
+
message ApplicationSpec {
string name = 1;
string queue = 2;
@@ -148,9 +154,10 @@ message ApplicationSpec {
int32 max_attempts = 5;
repeated string tags = 6;
repeated string file_systems = 7;
- Acls acls = 8;
- Master master = 9;
- map services = 10;
+ repeated DelegationTokenProviderSpec delegation_token_providers = 8;
+ Acls acls = 9;
+ Master master = 10;
+ map services = 11;
}
diff --git a/skein/__init__.py b/skein/__init__.py
index d1afb0b5..2a4d972c 100644
--- a/skein/__init__.py
+++ b/skein/__init__.py
@@ -3,7 +3,7 @@
from .exceptions import (SkeinError, ConnectionError, DriverNotRunningError,
ApplicationNotRunningError, DriverError,
ApplicationError)
-from .model import (Security, ApplicationSpec, Service, File, Resources,
+from .model import (Security, ApplicationSpec, Service, File, Resources, DelegationTokenProvider,
FileType, FileVisibility, ACLs, Master, LogLevel)
from ._version import get_versions
diff --git a/skein/model.py b/skein/model.py
index 74b304d7..16a2adfc 100644
--- a/skein/model.py
+++ b/skein/model.py
@@ -16,10 +16,10 @@
xor, lock_file)
__all__ = ('ApplicationSpec', 'Service', 'Resources', 'File', 'FileType',
- 'FileVisibility', 'ACLs', 'Master', 'Security', 'ApplicationState',
- 'FinalStatus', 'ResourceUsageReport', 'ApplicationReport',
- 'ContainerState', 'Container', 'LogLevel', 'NodeState', 'NodeReport',
- 'QueueState', 'Queue', 'ApplicationLogs')
+ 'FileVisibility', 'ACLs', 'Master', 'DelegationTokenProvider', 'Security',
+ 'ApplicationState', 'FinalStatus', 'ResourceUsageReport',
+ 'ApplicationReport', 'ContainerState', 'Container', 'LogLevel', 'NodeState',
+ 'NodeReport', 'QueueState', 'Queue', 'ApplicationLogs')
def _check_is_filename(target):
@@ -105,16 +105,16 @@ def parse_memory(s):
_byte_sizes = {
- 'kb': 10**3,
- 'mb': 10**6,
- 'gb': 10**9,
- 'tb': 10**12,
- 'pb': 10**15,
- 'kib': 2**10,
- 'mib': 2**20,
- 'gib': 2**30,
- 'tib': 2**40,
- 'pib': 2**50,
+ 'kb': 10 ** 3,
+ 'mb': 10 ** 6,
+ 'gb': 10 ** 9,
+ 'tb': 10 ** 12,
+ 'pb': 10 ** 15,
+ 'kib': 2 ** 10,
+ 'mib': 2 ** 20,
+ 'gib': 2 ** 30,
+ 'tib': 2 ** 40,
+ 'pib': 2 ** 50,
'b': 1,
'': 2 ** 20
}
@@ -443,13 +443,13 @@ def new_credentials(cls):
[x509.NameAttribute(NameOID.COMMON_NAME, u'skein-internal')])
now = datetime.utcnow()
cert = (x509.CertificateBuilder()
- .subject_name(subject)
- .issuer_name(issuer)
- .public_key(key.public_key())
- .serial_number(x509.random_serial_number())
- .not_valid_before(now)
- .not_valid_after(now + timedelta(days=365))
- .sign(key, hashes.SHA256(), default_backend()))
+ .subject_name(subject)
+ .issuer_name(issuer)
+ .public_key(key.public_key())
+ .serial_number(x509.random_serial_number())
+ .not_valid_before(now)
+ .not_valid_after(now + timedelta(days=365))
+ .sign(key, hashes.SHA256(), default_backend()))
cert_bytes = cert.public_bytes(serialization.Encoding.PEM)
@@ -1152,6 +1152,55 @@ def from_protobuf(cls, obj):
security=security)
+class DelegationTokenProvider(Specification):
+ """Configuration for the Delegation Token Provider.
+
+ Parameters
+ ----------
+ name : str
+ Describes the name system for which to get the delegation token. Ex: 'hive'
+ config : dict
+ A mapping that contains the configuration to connect to external systems
+ to get the delegation token
+ """
+ __slots__ = ('name', 'config')
+ _params = ('name', 'config')
+ _protobuf_cls = _proto.DelegationTokenProviderSpec
+
+ def __init__(self, name='', config=None):
+ self.name = name
+ # in one of the tests `config` is a protobuf ScalarMapContainer and I convert it to `dict`
+ self.config = {} if config is None else dict(config)
+
+ self._validate()
+
+ def _validate(self):
+ self._check_is_type('name', str)
+ self._check_is_dict_of('config', str, str)
+
+ def __repr__(self):
+ return 'DelegationTokenProvider<...>'
+
+ @classmethod
+ @implements(Specification.from_dict)
+ def from_dict(cls, obj, **kwargs):
+ cls._check_keys(obj)
+
+ obj = obj.copy()
+ name = obj.pop('name', None)
+ config = obj.pop('config', None)
+
+ return cls(name=name,
+ config=config,
+ **obj)
+
+ @classmethod
+ @implements(Specification.from_protobuf)
+ def from_protobuf(cls, obj):
+ return cls(name=obj.name,
+ config=obj.config)
+
+
class ApplicationSpec(Specification):
"""A complete description of an application.
@@ -1181,6 +1230,10 @@ class ApplicationSpec(Specification):
file_systems : list, optional
A list of Hadoop file systems to acquire delegation tokens for.
A token is always acquired for the ``defaultFS``.
+ delegation_token_providers : list, optional
+ A list of mappings.
+ Each mapping is for configuring a connection with an external system
+ and get a delegation token.
acls : ACLs, optional
Allows restricting users/groups to subsets of application access. See
``skein.ACLs`` for more information.
@@ -1190,12 +1243,12 @@ class ApplicationSpec(Specification):
application master during startup. Default is 1.
"""
__slots__ = ('services', 'master', 'name', 'queue', 'user', 'node_label',
- 'tags', 'file_systems', 'acls', 'max_attempts')
+ 'tags', 'file_systems', 'delegation_token_providers', 'acls', 'max_attempts')
_protobuf_cls = _proto.ApplicationSpec
def __init__(self, services=None, master=None, name='skein',
queue='default', user='', node_label='', tags=None,
- file_systems=None, acls=None, max_attempts=1):
+ file_systems=None, delegation_token_providers=None, acls=None, max_attempts=1):
self.services = {} if services is None else services
self.master = Master() if master is None else master
self.name = name
@@ -1204,6 +1257,8 @@ def __init__(self, services=None, master=None, name='skein',
self.node_label = node_label
self.tags = set() if tags is None else set(tags)
self.file_systems = [] if file_systems is None else file_systems
+ self.delegation_token_providers = \
+ [] if delegation_token_providers is None else delegation_token_providers
self.acls = ACLs() if acls is None else acls
self.max_attempts = max_attempts
self._validate()
@@ -1219,6 +1274,8 @@ def _validate(self):
self._check_is_type('node_label', str)
self._check_is_set_of('tags', str)
self._check_is_list_of('file_systems', str)
+ #self._check_is_list_of('delegation_token_providers', DelegationTokenProvider)
+ self._check_is_type('delegation_token_providers', list)
self._check_is_bounded_int('max_attempts', min=1)
self._check_is_type('acls', ACLs)
self.acls._validate()
@@ -1282,12 +1339,16 @@ def from_dict(cls, obj, **kwargs):
def from_protobuf(cls, obj):
services = {k: Service.from_protobuf(v)
for k, v in obj.services.items()}
+ delegation_token_providers = [DelegationTokenProvider.from_protobuf(p)
+ for p in obj.delegation_token_providers]
+
return cls(name=obj.name,
queue=obj.queue,
user=obj.user,
node_label=obj.node_label,
tags=set(obj.tags),
file_systems=list(obj.file_systems),
+ delegation_token_providers=delegation_token_providers,
max_attempts=min(1, obj.max_attempts),
acls=ACLs.from_protobuf(obj.acls),
master=Master.from_protobuf(obj.master),
diff --git a/skein/proto/__init__.py b/skein/proto/__init__.py
index 0732d53a..2cdc159a 100644
--- a/skein/proto/__init__.py
+++ b/skein/proto/__init__.py
@@ -1,9 +1,9 @@
from __future__ import absolute_import
from .skein_pb2 import (Empty, FinalStatus, ApplicationState, Resources, File,
- Service, Acls, Log, Master, Security, ApplicationSpec,
- ResourceUsageReport, ApplicationReport, Application,
- ApplicationsRequest, Url, ContainersRequest, Container,
+ Service, Acls, Log, Master, DelegationTokenProviderSpec,
+ Security, ApplicationSpec, ResourceUsageReport, ApplicationReport,
+ Application, ApplicationsRequest, Url, ContainersRequest, Container,
ContainerInstance, ScaleRequest, AddContainerRequest,
ShutdownRequest, KillRequest, SetProgressRequest,
NodeState, NodeReport, NodesRequest, Queue,
diff --git a/skein/test/test_kv.py b/skein/test/test_kv.py
index 1c2025bc..617e914e 100644
--- a/skein/test/test_kv.py
+++ b/skein/test/test_kv.py
@@ -378,7 +378,6 @@ def test_key_value_mutablemapping(kv_test_app):
with pytest.raises(TypeError):
kv_test_app.kv.update({'a': 1}, {'b': 2})
-
def test_key_value_count(kv_test_app):
kv_test_app.kv.update(kv_test_data)
assert kv_test_app.kv.count() == 7
diff --git a/skein/test/test_model.py b/skein/test/test_model.py
index 2d110892..90af37ad 100644
--- a/skein/test/test_model.py
+++ b/skein/test/test_model.py
@@ -8,9 +8,9 @@
from skein.model import (ApplicationSpec, Service, Resources, File,
ApplicationState, FinalStatus, FileType, ACLs, Master,
- Container, ApplicationReport, ResourceUsageReport,
- NodeReport, LogLevel, parse_memory, Security, Queue,
- ApplicationLogs)
+ DelegationTokenProvider, Container, ApplicationReport,
+ ResourceUsageReport, NodeReport, LogLevel, parse_memory,
+ Security, Queue, ApplicationLogs)
def indent(s, n):
@@ -326,6 +326,15 @@ def test_master_invariants():
Master(files={'./bar': '/source.zip'})
+def test_delegation_token_provider_spec():
+ p1 = DelegationTokenProvider(name='hive',
+ config={
+ 'hive.jdbc.url': 'hive2://127.0.0.1:10000/myDatabase',
+ 'hive.jdbc.principal': 'hive/my.hadoop.mycompany.com@HADOOP.MYCOMPANY.COM'})
+ p3 = DelegationTokenProvider()
+ check_specification_methods(p1, p3)
+
+
def test_service():
r = Resources(memory=1024, vcores=1)
s1 = Service(resources=r,