diff --git a/Bundle.ecl b/Bundle.ecl index 34f34a4..9eb2d7d 100644 --- a/Bundle.ecl +++ b/Bundle.ecl @@ -7,8 +7,8 @@ EXPORT Bundle := MODULE(Std.BundleBase) EXPORT Description := 'Generalized Neural Network Bundle'; EXPORT Authors := ['HPCCSystems']; EXPORT License := 'See LICENSE.TXT'; - EXPORT Copyright := 'Copyright (C) 2019 HPCC Systems®'; + EXPORT Copyright := 'Copyright (C) 2020 HPCC Systems®'; EXPORT DependsOn := ['ML_Core']; - EXPORT Version := '1.0.0'; + EXPORT Version := '1.1'; EXPORT PlatformVersion := '7.4.0'; END; diff --git a/GNNI.ecl b/GNNI.ecl index 026327d..38adde3 100644 --- a/GNNI.ecl +++ b/GNNI.ecl @@ -19,12 +19,19 @@ t_Tensor := Tensor.R4.t_Tensor; TensData := Tensor.R4.TensData; nNodes := Thorlib.nodes(); nodeId := Thorlib.node(); +FuncLayerDef := GNN.Types.FuncLayerDef; /** * Generalized Neural Network Interface * - *
Provides a generalized ECL interface to Keras over Tensorflow. It currently only supports - * the Keras Sequential Model. - *
Provides a generalized ECL interface to Keras over Tensorflow. It supports the Keras Functional + * API as well as the Sequential API. + *
The Functional style allows models to be defined as a Directed Acyclic Graph (DAG), allowing branching + * and merging among the layers. It is required for models that have multiple inputs or outputs. + * For an annotated example using the Functional model with multiple inputs and outputs, see + * Test/FuncModelTest.ecl + *
The Sequential stle is a bit simpler, but requires a strict sequence of layers, each one feeding + * into the next. For an annotated example using the Sequential model, see Test/ClassicTest.ecl + *
THEORY OF OPERATION *
A Keras / TF model is built on each HPCC node and training data is distributed among the nodes. * Distributed Synchronous Batch Gradient Descent is performed across nodes, synchronizing weights * periodically based on the 'batchSize' parameter. Each function performs its work in a @@ -33,8 +40,11 @@ nodeId := Thorlib.node(); *
The flow of a program using this interface is as follows:
Tensors can be used to convey record-oriented information such as training data as well as block * oriented data like weights. Both can be N-dimensional. For record-oriented data, the first shape * component is 0 (unspecified) indicating that it can hold an arbitrary set of records. + *
For models with multiple inputs or outputs, Tensor Lists are used (see Tensor.ecl for details), + * with one Tensor per input or output. *
USE OF NumericField *
GNNI also provides a set of interfaces which take in and emit data as 2-dimensional NumericField * datasets (see ML_Core.Types.NumericField). This is purely for convenience for applications that @@ -70,6 +82,11 @@ nodeId := Thorlib.node(); *
MULTIPLE MODEL SUPPORT + *
GNNI supports multiple Keras models within the same work unit. Multiple models are created + * by using multiple calls to DefineModel() using the same sessionId. The returned modelIds are + * used in subsequent calls to discriminate between the models. See Test/MultiModel.ecl for an + * example. */ EXPORT GNNI := MODULE /** @@ -78,7 +95,15 @@ EXPORT GNNI := MODULE * breaking the dependency chain. */ SHARED UNSIGNED4 getToken(UNSIGNED4 lastToken) := EMBED(Python) - return lastToken + 1; + return lastToken + 1 + ENDEMBED; + /** + * Obscure a value (from the ECL compiler) by returning it through a + * python function so that the compiler can't determine that it is + * a constant. + */ + SHARED UNSIGNED4 obscure(UNSIGNED4 value) := EMBED(C++:action) + return value; ENDEMBED; /** * Each node returns status as a kString. Returns an error message @@ -92,6 +117,10 @@ EXPORT GNNI := MODULE '''Didn't recieve reply from all nodes: ''' + COUNT(results), rr1); return rr; END; + + // Number by which to multiply the Keras model id in order to generate + // GNNI model ids. + SHARED kerasIdFactor := 100000; /** * Initialize Keras on all nodes and return a "session" token to be used on the * next call to GNNI. @@ -99,12 +128,12 @@ EXPORT GNNI := MODULE * * @return A session token (UNSIGNED4) to identify this session. */ - EXPORT UNSIGNED4 GetSession() := FUNCTION + EXPORT UNSIGNED4 GetSession(INTEGER numPhysicalComps = -1) := FUNCTION initDat := DATASET(1, TRANSFORM(initParms, SELF.nodeId := nodeId, SELF.nNodes := nNodes, SELF.maxSliceSize := Tensor.MAX_SLICE), LOCAL); - kstatus := ASSERT(Keras.Init(initDat), LENGTH(text) = 0, 'GetSession Exception: ' + text, FAIL); + kstatus := ASSERT(Keras.Init(initDat, numPhysicalComps), LENGTH(text) = 0, 'GetSession Exception: ' + text, FAIL); status := reduceResults(kstatus); model := IF(LENGTH(status) = 0, getToken(0), 0); RETURN model; @@ -121,7 +150,7 @@ EXPORT GNNI := MODULE * @param sess The session token from a previous call to GetSesion(). * @param ldef A set of python strings as would be passed to Keras * model.add(). Each string defines one layer of the model. - * @param cdef A python string as would be passed to Keras model.compile(...). + * @param cdef (optional) A python string as would be passed to Keras model.compile(...). * This line should begin with "compile". Model is implicit here. * @return A model token to be used in subsequent GNNI calls. */ @@ -135,10 +164,60 @@ EXPORT GNNI := MODULE mdefRepl := PROJECT(NOCOMBINE(mdefRepl0), TRANSFORM(RECORDOF(LEFT), SELF.nodeId := nodeId, SELF := LEFT), LOCAL); kstatus := ASSERT(Keras.DefineModel(mdefRepl, sess), LENGTH(text) = 0, 'DefineModel Exception: ' + text); status := reduceResults(kstatus); - model := IF(LENGTH(status) = 0, getToken(sess), 0); + // Extract the Keras modelId from the id field of the returned status. Each node should have the + // same model id since they are kept in sync. So we just use the one from our own node. + modelId := kstatus(nodeId = nodeId)[1].id; + // We need to generate an GNNI modelId that encompasses both the sequence, and encodes + // the Keras model id. + // We multiply the modelId by kerasIdFactor and use that as the basis for our returned modelId token. + // This allows up to kerasIdFactor operations on each model, which should be enough. + modelBase := modelId * kerasIdFactor; + model := IF(LENGTH(status) = 0, getToken(sess + modelBase), 0); + RETURN model; + END; + /** + * DefineFuncModel(...) -- Construct a Keras Functional Model. This allows construction of + * complex models that cannot be expressed as a sequential set of layers. These include models with multiple + * inputs or outputs, or models that use divergence or convergence among different layers. + *
Layers are connected together using the layerName and predecessor fields of the FuncLayerDef. + * The inputs of a layer are connected to the predecessor layers in the order specified by the + * set of names in the predecessor field. + *
The inputs and outputs parameter specifies the names of the layers that form the input and + * output of the model. + *
This is similar to the Keras Functional API, except that the entire model is defined in one + * call rather than assembled piecemeal as in the Functional API. The same rules apply here as + * for the Keras Functional API, and this should be a simple translation of any program using the + * Functional API. + *
For models with multiple inputs, input is specified as a list of tensors (see Tensor.ecl). + *
For models with multiple outputs, output will be a list of tensors. + * @param sess The session token from a previous call to GetSesion(). + * @param lDefs A series of layer definitions using the Types.FuncLayerDef format. + * @param inputs A list of the names of the layers that represent the inputs to the model. + * @param outputs A list of the names of the layers that represent the outputs of the model. + * @param cdef (optional) A python string as would be passed to Keras model.compile(...). + * This line should begin with "compile". Model is implicit here. + * @see Types.FuncLayerDef + */ + EXPORT UNSIGNED4 DefineFuncModel(UNSIGNED sess, + DATASET(FuncLayerDef) lDefs, + SET OF STRING inputs, + SET OF STRING outputs, + STRING cdef = '') := FUNCTION + // Distribute the lDefs to all nodes to make sure that the model is defined on each node + lDefsRepl := DISTRIBUTE(lDefs, ALL); + kstatus := ASSERT(Keras.DefineFuncModel(lDefsRepl, sess, inputs, outputs, cdef), LENGTH(text) = 0, 'DefineFuncModel Exception: ' + text); + status := reduceResults(kstatus); + // Extract the Keras modelId from the id field of the returned status. Each node should have the + // same model id since they are kept in sync. So we just use the one from our own node. + modelId := kstatus(nodeId = nodeId)[1].id; + // We need to generate an GNNI modelId that encompasses both the sequence, and encodes + // the Keras model id. + // We multiply the modelId by kerasIdFactor and use that as the basis for our returned modelId token. + // This allows up to kerasIdFactor operations on each model, which should be enough. + modelBase := modelId * kerasIdFactor; + model := IF(LENGTH(status) = 0, getToken(sess + modelBase), 0); RETURN model; END; - /** * Return a JSON representation of the Keras model. * @@ -147,7 +226,8 @@ EXPORT GNNI := MODULE * @return A JSON string representing the model definition. */ EXPORT STRING ToJSON(UNSIGNED4 mod) := FUNCTION - results := Keras.ToJSON(DATASET([], kString), mod); + kModelId := mod DIV kerasIdFactor; + results := Keras.ToJSON(DATASET([], kString), mod, kModelId); result := results[1].text; RETURN result; END; @@ -171,7 +251,15 @@ EXPORT GNNI := MODULE SELF.text := json), LOCAL); kstatus := ASSERT(Keras.FromJSON(mdefRepl, sess), LENGTH(text) = 0, 'FromJSON Exception: ' + text, FAIL); status := reduceResults(kstatus); - model := IF(LENGTH(status) = 0, getToken(sess), 0); + // Extract the Keras modelId from the id field of the returned status. Each node should have the + // same model id since they are kept in sync. So we just use the one from our own node. + modelId := kstatus(nodeId = nodeId)[1].id; + // We need to generate an GNNI modelId that encompasses both the sequence, and encodes + // the Keras model id. + // We multiply the modelId by kerasIdFactor and use that as the basis for our returned modelId token. + // This allows up to kerasIdFactor operations on each model, which should be enough. + modelBase := modelId * kerasIdFactor; + model := IF(LENGTH(status) = 0, getToken(sess + modelBase), 0); RETURN model; END; /** @@ -190,7 +278,7 @@ EXPORT GNNI := MODULE *
It is convenient to use the triple single quote(''') syntax as * it allows strings to cross line boundaries, and allows - * special characters such as single or double quotes without + * special characters such as siepochNumngle or double quotes without * escaping. *
There is no need to make this call if the compileDef was provided * in the DefineModel(...) call. @@ -209,7 +297,8 @@ EXPORT GNNI := MODULE SELF.id :=1, SELF.typ := kStrType.compile, SELF.text := compileStr), LOCAL); - kstatus := ASSERT(Keras.CompileMod(mdefRepl, model), LENGTH(text) = 0, 'CompileMod Exception: ' + text, FAIL); + kModelId := model DIV kerasIdFactor; + kstatus := ASSERT(Keras.CompileMod(mdefRepl, model, kModelId), LENGTH(text) = 0, 'CompileMod Exception: ' + text, FAIL); status := reduceResults(kstatus); RETURN getToken(model); END; @@ -234,8 +323,9 @@ EXPORT GNNI := MODULE // Get the weights from a single node. Note that weights should // be the same on all nodes since they are automatically // synchronized between nodes. + kModelId := model DIV kerasIdFactor; dummy := DATASET(1, TRANSFORM(kString, SELF.id := 1, SELF.typ := kStrType.None, SELF.text := ''), LOCAL); - weights := Keras.GetWeights(dummy, model); + weights := Keras.GetWeights(dummy, model, kModelId); RETURN weights(nodeId=0); END; @@ -254,7 +344,9 @@ EXPORT GNNI := MODULE * @return A new model token to be used in subsequent calls. */ EXPORT UNSIGNED4 SetWeights(UNSIGNED4 model, DATASET(t_Tensor) weights) := FUNCTION - kstatus := ASSERT(Keras.SetWeights(weights, model), LENGTH(text) = 0, 'SetWeights Exception: ' + text, FAIL); + kModelId := model DIV kerasIdFactor; + weightsD := Tensor.R4.replicate(weights); + kstatus := ASSERT(Keras.SetWeights(weightsD, model, kModelId), LENGTH(text) = 0, 'SetWeights Exception: ' + text, FAIL); status := reduceResults(kstatus); mod := IF(LENGTH(status) = 0, getToken(model), 0); RETURN mod; @@ -266,8 +358,9 @@ EXPORT GNNI := MODULE * @return The average loss. */ EXPORT REAL GetLoss(UNSIGNED4 model) := FUNCTION + kModelId := model DIV kerasIdFactor; dummy := DATASET(1, TRANSFORM(kString, SELF.id := 1, SELF.typ := kStrType.None, SELF.text := ''), LOCAL); - trainLosses := Keras.GetLoss(dummy, model); + trainLosses := Keras.GetLoss(dummy, model, kModelId); // Each node provides the average loss across samples in the epoch. // We return the average of those averages. trainLoss := AVE(trainLosses, loss); @@ -331,18 +424,25 @@ EXPORT GNNI := MODULE EXPORT UNSIGNED4 Fit(UNSIGNED4 model, DATASET(t_Tensor) x, DATASET(t_Tensor) y, - UNSIGNED4 batchSize = 100, - UNSIGNED4 numEpochs = 1) := FUNCTION + UNSIGNED4 batchSize = 100, //RK - now this should be the weight aggregate interval size. + UNSIGNED4 numEpochs = 1, + UNSIGNED4 miniBatch = 32 //RK - mini batch is the keras batch size. GNN batch should be larger than actual batch for performance reasons + ) := FUNCTION + kModelId := model DIV kerasIdFactor; // Get the initial weights to use initWts0 := GetWeights(model); // We get the weights from the first node and then copy them to all nodes // so that everybody starts with the same weights initWts := Tensor.R4.Replicate(initWts0); - // Align the X and Y tensors so that we will get the corresponding records on the same nodes - y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := 2, SELF := LEFT), LOCAL); - aligned := Tensor.R4.AlignTensorPair(x + y1); - xAl := aligned(wi = 1); - yAl := PROJECT(aligned(wi = 2), TRANSFORM(RECORDOF(LEFT), SELF.wi := 1, SELF := LEFT), LOCAL); + // Align the X and Y tensor lists so that we will get the corresponding records on the same nodes + // for each input and output tensor. + maxInputWi := MAX(x, wi); + // Change the wi's for outputs (y) so that they are after the input wi's + y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi + maxInputWi, SELF := LEFT), LOCAL); + aligned := Tensor.R4.AlignTensors(x + y1); + // Now change the Y's wi back to the original numbers + xAl := aligned(wi <= maxInputWi); + yAl := PROJECT(aligned(wi > maxInputWi), TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi - maxInputWi, SELF := LEFT), LOCAL); totalRecords := Tensor.R4.GetRecordCount(yAl); batchesPerEpoch := ROUNDUP(totalRecords / nNodes / batchSize); DATASET(t_Tensor) doEpoch(DATASET(t_Tensor) wts1, UNSIGNED epochNum) := FUNCTION @@ -351,7 +451,9 @@ EXPORT GNNI := MODULE batchPos := (batchNum-1) * batchSize + 1; xBatch := int.TensExtract(xAl, batchPos, batchSize); yBatch := int.TensExtract(yAl, batchPos, batchSize); - wtChanges0 := IF(EXISTS(yBatch), Keras.FitBatch(wts2, xBatch, yBatch, model, epochNum), DATASET([], t_Tensor)); + wtChanges0 := IF(EXISTS(yBatch), Keras.FitBatch(wts2, xBatch, yBatch, obscure(model), epochNum, kModelId, miniBatch), DATASET([], t_Tensor)); + rkLog := Syslog.addWorkunitInformation('RKLOG - Keras.FitBatch iteration: ' + batchNum + ', Epoch: ' + epochNum + ', OldBatchSize(): ' + (string)batchSize); + //rkLog := OUTPUT(xBatch); // Move all the changes for a given wi and slice to the same node. Each // node has a set of wi/sliceIds to roll up. Note that the original // weights are already replicated to all nodes. @@ -359,21 +461,88 @@ EXPORT GNNI := MODULE // Sum up the original weights (de-replicated) and all changes for each wi and slice newWts := rollUpdates(wts2((wi + sliceId) % nNodes = nodeId), wtChanges); // Note: newWts have been replicated to all nodes by rollUpdates. - // We use epochNum + batchNum to generate a unique model token for - // use with GetLoss. This ensures proper sequencing of the - // operation. - batchLoss := IF(EXISTS(newWts), GetLoss(model + epochNum + batchNum), 1.0); - logProgress2 := Syslog.addWorkunitInformation('Training Status (2): Epoch = ' + epochNum + ', Batch = ' + batchNum + ', Loss = ' + batchLoss); - RETURN newWts; + // We use obscure to prevent the ECL compiler from treating GetLoss as a + // constant + batchLoss := IF(EXISTS(newWts), GetLoss(model + (batchesPerEpoch * (epochNum-1)) + batchNum), 1.0); + logProgress2 := Syslog.addWorkunitInformation('Training Status (2): ModelId = ' + + kModelId + ', Epoch = ' + epochNum + ', Batch = ' + batchNum + ', Loss = ' + batchLoss); + RETURN WHEN(newWts, rkLog); END; epochWts := LOOP(wts1, batchesPerEpoch, doBatch(ROWS(LEFT), COUNTER)); - epochLoss := IF(EXISTS(epochWts), GetLoss(model + epochNum), 1.0); - logProgress := Syslog.addWorkunitInformation('Training Status: Epoch = ' + epochNum + ', Loss = ' + epochLoss); + epochLoss := IF(EXISTS(epochWts), GetLoss(model + (batchesPerEpoch * (epochNum-1))), 1.0); + //epochLoss := IF(EXISTS(epochWts), GetLoss(obscure(model)), 1.0); + logProgress := Syslog.addWorkunitInformation('Training Status: ModelId = ' + + kModelId + ', Epoch = ' + epochNum + ', Loss = ' + epochLoss); RETURN WHEN(epochWts, logProgress); END; finalWts := LOOP(initWts, numEpochs, doEpoch(ROWS(LEFT), COUNTER)); - RETURN IF(EXISTS(finalWts), getToken(model), 0); + RETURN IF(EXISTS(finalWts), getToken(model + numEpochs * numEpochs), 0); END; // Fit + + //RK - this NCCL Fit function is a modification of the FIT above. It is to be used with the coresponding GNN.keras changes too + // this will allow for data parallelism accross mutliple GPUs and use NCCL (direct GPU communication) to aggregate weights + // Only applicable when with an NVIDIA GPU count >=2 + // Use ECL to aggregate weights every epoch and NCCL to aggregate weights more often between the two. Don't need to agregate weights + // via ECL if only using GPU nodes on one machine, the weight updates via NCCL will acomplish the same goal + // Thus, use batch size such that th + EXPORT UNSIGNED4 NCCLFit(UNSIGNED4 model, + DATASET(t_Tensor) x, + DATASET(t_Tensor) y, + UNSIGNED4 batchSize = 100, //RK - now this should be the weight aggregate interval size. + UNSIGNED4 numEpochs = 1, + UNSIGNED4 miniBatch = 32 //RK - mini batch is the keras batch size. GNN batch should be larger than actual batch for performance reasons + ) := FUNCTION + kModelId := model DIV kerasIdFactor; + // Get the initial weights to use + initWts0 := GetWeights(model); + // We get the weights from the first node and then copy them to all nodes + // so that everybody starts with the same weights + initWts := Tensor.R4.Replicate(initWts0); + // Align the X and Y tensor lists so that we will get the corresponding records on the same nodes + // for each input and output tensor. + maxInputWi := MAX(x, wi); + // Change the wi's for outputs (y) so that they are after the input wi's + y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi + maxInputWi, SELF := LEFT), LOCAL); + aligned := Tensor.R4.AlignTensors(x + y1); + // Now change the Y's wi back to the original numbers + xAl := aligned(wi <= maxInputWi); + yAl := PROJECT(aligned(wi > maxInputWi), TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi - maxInputWi, SELF := LEFT), LOCAL); + totalRecords := Tensor.R4.GetRecordCount(yAl); + batchesPerEpoch := ROUNDUP(totalRecords / nNodes / batchSize); + DATASET(t_Tensor) doEpoch(DATASET(t_Tensor) wts1, UNSIGNED epochNum) := FUNCTION + DATASET(t_Tensor) doBatch(DATASET(t_Tensor) wts2, UNSIGNED batchNum) := FUNCTION + // Train the model and Get the weight changes from each node + batchPos := (batchNum-1) * batchSize + 1; + xBatch := int.TensExtract(xAl, batchPos, batchSize); + yBatch := int.TensExtract(yAl, batchPos, batchSize); + wtChanges0 := IF(EXISTS(yBatch), Keras.FitBatchNCCL(wts2, xBatch, yBatch, obscure(model), epochNum, kModelId, miniBatch), DATASET([], t_Tensor)); + rkLog := Syslog.addWorkunitInformation('RKLOG - Keras.FitBatch iteration: ' + batchNum + ', Epoch: ' + epochNum + ', OldBatchSize(Batch Size per Computational Node): ' + batchSize); + // Move all the changes for a given wi and slice to the same node. Each + // node has a set of wi/sliceIds to roll up. Note that the original + // weights are already replicated to all nodes. + wtChanges := DISTRIBUTE(wtChanges0, wi + sliceId); + // Sum up the original weights (de-replicated) and all changes for each wi and slice + newWts := rollUpdates(wts2((wi + sliceId) % nNodes = nodeId), wtChanges); + // Note: newWts have been replicated to all nodes by rollUpdates. + // We use obscure to prevent the ECL compiler from treating GetLoss as a + // constant + batchLoss := IF(EXISTS(newWts), GetLoss(model + (batchesPerEpoch * (epochNum-1)) + batchNum), 1.0); + logProgress2 := Syslog.addWorkunitInformation('Training Status (2): ModelId = ' + + kModelId + ', Epoch = ' + epochNum + ', Batch = ' + batchNum + ', Loss = ' + batchLoss); + RETURN WHEN(newWts, rkLog); + END; + epochWts := LOOP(wts1, batchesPerEpoch, doBatch(ROWS(LEFT), COUNTER)); + epochLoss := IF(EXISTS(epochWts), GetLoss(model + (batchesPerEpoch * (epochNum-1))), 1.0); + //epochLoss := IF(EXISTS(epochWts), GetLoss(obscure(model)), 1.0); + logProgress := Syslog.addWorkunitInformation('Training Status: ModelId = ' + + kModelId + ', Epoch = ' + epochNum + ', Loss = ' + epochLoss); + RETURN WHEN(epochWts, logProgress); + END; + finalWts := LOOP(initWts, numEpochs, doEpoch(ROWS(LEFT), COUNTER)); + RETURN IF(EXISTS(finalWts), getToken(model + numEpochs * numEpochs), 0); + END; // NCCL FIT + + /** * Determine the loss and other metrics in order to evaluate * the model. @@ -396,12 +565,45 @@ EXPORT GNNI := MODULE EXPORT DATASET(Types.metrics) EvaluateMod(UNSIGNED4 model, DATASET(t_Tensor) x, DATASET(t_Tensor) y) := FUNCTION - // Align the X and Y tensors so that we will get the corresponding records on the same nodes - y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := 2, SELF := LEFT), LOCAL); - aligned := Tensor.R4.AlignTensorPair(x + y1); - xAl := aligned(wi = 1); - yAl := PROJECT(aligned(wi = 2), TRANSFORM(RECORDOF(LEFT), SELF.wi := 1, SELF := LEFT), LOCAL); - m0 := Keras.Evaluate(xAl, yAl, model); + kModelId := model DIV kerasIdFactor; + // Align the X and Y tensor lists so that we will get the corresponding records on the same nodes + // for each input and output tensor. + maxInputWi := MAX(x, wi); + // Change the wi's for outputs (y) so that they are after the input wi's + y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi + maxInputWi, SELF := LEFT), LOCAL); + aligned := Tensor.R4.AlignTensors(x + y1); + // Now change the Y's wi back to the original number + xAl := aligned(wi <= maxInputWi); + yAl := PROJECT(aligned(wi > maxInputWi), TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi - maxInputWi, SELF := LEFT), LOCAL); + m0 := Keras.Evaluate(xAl, yAl, model, kModelId); + m1 := DISTRIBUTE(m0, metricId); + m2 := TABLE(m1, + {metricId, metricName, avgVal := AVE(GROUP, value)}, + metricId, metricName, LOCAL); + metrics := PROJECT(m2, TRANSFORM(Types.metrics, + SELF.value := LEFT.avgVal, + SELF := LEFT), LOCAL); + RETURN metrics; + END; + + /** + * This produces an ROC/AUC score for measuring NN performance. + */ + + EXPORT DATASET(Types.metrics) RocAucScore(UNSIGNED4 model, + DATASET(t_Tensor) x, + DATASET(t_Tensor) y) := FUNCTION + kModelId := model DIV kerasIdFactor; + // Align the X and Y tensor lists so that we will get the corresponding records on the same nodes + // for each input and output tensor. + maxInputWi := MAX(x, wi); + // Change the wi's for outputs (y) so that they are after the input wi's + y1 := PROJECT(y, TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi + maxInputWi, SELF := LEFT), LOCAL); + aligned := Tensor.R4.AlignTensors(x + y1); + // Now change the Y's wi back to the original number + xAl := aligned(wi <= maxInputWi); + yAl := PROJECT(aligned(wi > maxInputWi), TRANSFORM(RECORDOF(LEFT), SELF.wi := LEFT.wi - maxInputWi, SELF := LEFT), LOCAL); + m0 := Keras.RocAucScore(xAl, yAl, model, kModelId); m1 := DISTRIBUTE(m0, metricId); m2 := TABLE(m1, {metricId, metricName, avgVal := AVE(GROUP, value)}, @@ -411,11 +613,14 @@ EXPORT GNNI := MODULE SELF := LEFT), LOCAL); RETURN metrics; END; + + /** * Predict the results using the trained model. *
The X tensor represents the independent (input) data * for the neural network and the output is returned as - * a tensor. + * a tensor. Input and output will be Tensor Lists if + * there is more than one input or output tensor for the NN. *
The X tensor * should be a record-oriented tensor, indicated by a first shape * component of zero. It must also be distributed (not replicated) @@ -427,7 +632,12 @@ EXPORT GNNI := MODULE * tensor. */ EXPORT DATASET(t_Tensor) Predict(UNSIGNED4 model, DATASET(t_Tensor) x) := FUNCTION - pred := Keras.Predict(x, model); + kModelId := model DIV kerasIdFactor; + // Align all of the X tensors (in case of multi tensor inputs) + maxInputWi := MAX(x, wi); // The number of tensors in the input + aligned := Tensor.R4.AlignTensors(x); + xAl := IF(maxInputWi > 1, aligned, x); // Only align if multiple tensors in input + pred := Keras.Predict(xAl, model, kModelId); return pred; END; /** diff --git a/Internal/Keras.ecl b/Internal/Keras.ecl index 0f68205..b5caf27 100644 --- a/Internal/Keras.ecl +++ b/Internal/Keras.ecl @@ -13,6 +13,7 @@ t_Tensor := Tensor.R4.t_Tensor; kString := iTypes.kString; losses := iTypes.losses; metrics := Types.metrics; +FuncLayerDef := Types.FuncLayerDef; nNodes := Thorlib.nodes(); node := Thorlib.node(); /** @@ -23,6 +24,9 @@ node := Thorlib.node(); *
The Init(...) function initializes the global environment and defines * a frequently used set of functions so that they don't need to be replicated * and created on each individual embed. + *
Note that most functions receive a "seqId" parameter that is ignored by + * the function. This is used to control the order of execution of ECL statements + * that call these functions. */ EXPORT Keras := MODULE SHARED globalScope := 'keras_' + node + '.ecl'; @@ -35,49 +39,64 @@ EXPORT Keras := MODULE * use of STREAMED DATASETS in and out, ensure that this function will * be executed on every Thor slave node. */ - EXPORT STREAMED DATASET(kString) Init(STREAMED DATASET(initParms) initdata) := + EXPORT STREAMED DATASET(kString) Init(STREAMED DATASET(initParms) initdata, INTEGER numPhysicalComps) := EMBED(Python: globalscope(globalScope), persist('query'), activity) # Function to initialize all the global variables and functions. This should # only be called once. def initGlobals(): - import tensorflow as tf + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() from tensorflow.keras import layers import numpy as np global nodeId, nNodes, maxSliceLen - # Extract the initialization parameters from initdata + # Initialize global variables + # Extract the initialization parameters from initdata for rec in initdata: nodeId, nNodes, maxSliceLen = rec # Should only be one record - # Initialize global variables + # Model cache indexed by model id. + import os + #this "CUDA VISIBLE DEVICES" will set which GPU a given Thor node will have access to + #without this, each single Thor will try and allocate memory on all GPUs, which will make it crash + if numPhysicalComps >= 0: os.environ["CUDA_VISIBLE_DEVICES"]=str(math.floor(int(nodeId)/numPhysicalComps)) + else: os.environ["CUDA_VISIBLE_DEVICES"]="-1" + import tensorflow as tf + tf.reset_default_graph() global modcache - global tfHistory - tfHistory = None + modcache = {} + # Session cache indexed by model id. + global sesscache + sesscache = {} + # The next model id to allocate + global nextModId + nextModId = 0 + # The following 3 variables are for record keeping on each model. + # They are stored as a dictionary keyed by the model id. global currEpoch - currEpoch = 0 + currEpoch = {} global batchCount - batchCount = 0 + batchCount = {} global cumLoss - cumLoss = 0 + cumLoss = {} global kStrTypeDict - # kStrTypeDict needs to be kept in sync with Internal/Types.kStrType - # The kString type is used for several different purposes, and the type - # field indicates the meaning of a given string. + # kStrTypeDict needs to be kept in sync with Internal/Types.kStrType + # The kString type is used for several different purposes, and the type + # field indicates the meaning of a given string. kStrTypeDict = {'layer':1, 'compile':2, 'json':3, 'status':4} global dTypeDict, dTypeDictR, DTypeSizeDict - # dTypeDict is used to convey the data type of a tensor. It must be - # kept in sync with the Tensor data types in Tensor.ecl + # dTypeDict is used to convey the data type of a tensor. It must be + # kept in sync with the Tensor data types in Tensor.ecl dTypeDict = {1:np.float32, 2:np.float64, 3:np.int32, 4:np.int64} dTypeDictR = {'float32':1, 'float64':2, 'int32':3, 'int64':4} - # Store the element size for each tensor data type. + # Store the element size for each tensor data type. dTypeSizeDict = {1:4, 2:8, 3:4, 4:8} - modcache = {} # Define some common functions global format_exc # format_exc is used to format an exception as a string so that we # can return it. It indicates where and why an error occurred. def _format_exc(func=''): import traceback as tb - exc = tb.format_exc() - if len(exc) < 10000: + exc = tb.format_exc(limit=2) + if len(exc) < 100000: return func + ': ' + exc else: return func + ': ' + exc[:200] + ' ... ' + exc[-200:] @@ -85,13 +104,19 @@ EXPORT Keras := MODULE # Convert an ECL Tensor dataset into a single numpy ndarray. global Tens2Np - def _Tens2Np(tens): + def _Tens2Np(tens, recordOriented = False): def addData(a, dat, pos, is_fixed_size): if is_fixed_size: a[pos:pos+len(dat)] = dat else: a = np.append(a, dat) return a + def verifyShape(shape): + if recordOriented and shape[0] != 0: + raise Exception('Record Oriented tensors ' + \ + 'as data input to Fit or Predict must have a zero first shape component. Shape = ' + str(shape) + '.') + #assert not recordOriented or (recordOriented and shape[0] == 0), 'Keras.ecl: Tens2Np: Record Oriented tensors ' + \ + # 'as data input to Fit or Predict must have a zero first shape component. Shape = ' + str(shape) + '.' try: a = None tshape = [] @@ -104,6 +129,7 @@ EXPORT Keras := MODULE isFixedSize = False for rec in tens: node, wi, sliceId, shape, dataType, maxSliceSize, sliceSize, densedat, sparsedat = rec + verifyShape(shape) dtype = dTypeDict[dataType] tshape = shape if a is None: @@ -135,6 +161,7 @@ EXPORT Keras := MODULE assert 1 == 0, format_exc('Tens2Np') return None Tens2Np = _Tens2Np + # Convert a numpy ndarray into a Tensor dataset. Yield is used to # return one dataset record at a time. global Np2Tens @@ -145,7 +172,7 @@ EXPORT Keras := MODULE origShape = list(a.shape) flatA = a.reshape(-1) flatSize = flatA.shape[0] - sliceId = 1 + currSlice = 1 indx = 0 datType = dTypeDictR[str(a.dtype)] elemSize = dTypeSizeDict[datType] @@ -165,34 +192,36 @@ EXPORT Keras := MODULE for i in range(len(dat)): if abs(dat[i]) > epsilon: elemCount += 1 - if elemCount > 0 or sliceId == 1: + if elemCount > 0 or currSlice == 1: if elemCount * (elemSize + 4) < len(dat): # Sparse encoding sparse = [] for i in range(len(dat)): if abs(dat[i]) > epsilon: sparse.append((i, dat[i])) - yield (nodeId, wi, sliceId, origShape, datType, maxSliceSize, sliceSize, [], sparse) + yield (nodeId, wi, currSlice, origShape, datType, maxSliceSize, sliceSize, [], sparse) else: # Dense encoding - yield (nodeId, wi, sliceId, origShape, datType, maxSliceSize, sliceSize, dat, []) - sliceId += 1 + yield (nodeId, wi, currSlice, origShape, datType, maxSliceSize, sliceSize, dat, []) + currSlice += 1 indx += sliceSize except: assert 1 == 0, format_exc('NP2Tens') Np2Tens = _Np2Tens global NpList2Tens - # Convert an ECL tensor list dataset into a list of numpy ndarrays. - # The wi field is used to distinguish the tensors in the list. + + # Convert a list of numpy ndarrays into an ECL tensor dataset. Uses wi's to + # distinguish the multiple tensors in the same dataset. def _NpList2Tens(alist): for i in range(len(alist)): for rec in Np2Tens(alist[i], i+1): yield rec NpList2Tens = _NpList2Tens global Tens2NpList - # Convert a list of numpy ndarrays into an ECL tensor dataset. Uses wi's to - # distinguish the multiple tensors in the same dataset. - def _Tens2NpList(tens): + + # Convert an ECL tensor list dataset into a list of numpy ndarrays. + # The wi field is used to distinguish the tensors in the list. + def _Tens2NpList(tens, recordOriented = False): npList = [] slices = [] currWi = 1 @@ -200,27 +229,38 @@ EXPORT Keras := MODULE node = slice[0] wi = slice[1] if wi != currWi: - npList.append(Tens2Np(slices)) + npList.append(Tens2Np(slices, recordOriented=recordOriented)) currWi = wi slices = [] slices.append(slice) if slices: - npList.append(Tens2Np(slices)) + npList.append(Tens2Np(slices, recordOriented=recordOriented)) return npList Tens2NpList = _Tens2NpList # END OF InitGlobals # Only define the globals once, no matter how many times Init gets called. # Use the model cache (modcache) as a flag to determine if we've already # initialized. + import threading + global threadlock + # Make sure we only intialize once. Avoid reentrancy if called on multiple threads. + threadlock = threading.Lock() + threadlock.acquire() try: mc = modcache except: # modcache doesn't exist. Do the initialization. try: + # Initialize globals and define commonly used functions so that + # they don't need to be repeated for each embed. + # Global references to each function are stored in the global namespace. initGlobals() except: # We had an exception. Format and return it. return [(nodeId, 1,4,tb.format_exc('Init'))] + finally: + # Always release the threadlock, success or fail. + threadlock.release() # Success. Return blank status. return [(nodeId, 1, kStrTypeDict['status'], '')] ENDEMBED; // Init() @@ -230,18 +270,30 @@ EXPORT Keras := MODULE * the kString record contains an error message. * DefineModel gets called on each node of the cluster. */ - EXPORT STREAMED DATASET(kString) DefineModel(STREAMED DATASET(kString) mdef, UNSIGNED4 sess) := - EMBED(Python: globalscope(globalScope), persist('query'), activity) + EXPORT STREAMED DATASET(kString) DefineModel(STREAMED DATASET(kString) mdef, UNSIGNED4 seqId) + := EMBED(Python: globalscope(globalScope), persist('query'), activity) import traceback as tb - import tensorflow as tf + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() from tensorflow.keras import layers - global tfSession + global nextModId try: - # Restore the keras / tensorflow context. It sometimes gets lost between calls, + # Allocate a new modelId + # Make sure we do it atomically to avoid conflict with + # another model running on another thread + threadlock.acquire() + modId = nextModId + nextModId += 1 + threadlock.release() + # Create a new keras / tensorflow context. It sometimes gets lost between calls, # so we explicitly restore it before each call that uses it. - tfSession = tf.keras.backend.get_session() - with tfSession.as_default(): - with tfSession.graph.as_default(): + # Note that for each model, we create a new session and new graph under the hood. + # The graph is stored within the session, so only the session and model are stored, + # both by model id. + graph = tf.Graph() + with graph.as_default(): + tfSession = tf.Session() + with tfSession.as_default(): mod = tf.keras.Sequential() for rec in mdef: if rec[0] != nodeId: @@ -256,27 +308,121 @@ EXPORT Keras := MODULE # compile string should be supplied. elif rectype == kStrTypeDict['compile']: exec('mod.' + rec[3]) - modcache['default'] = mod # For some reason we need to do a get_weights / set_weights here, or set_weights # fails later??? w = mod.get_weights() mod.set_weights(w) - # We succeeded. Return a blank status to indicate success. - return [(nodeId, 1, kStrTypeDict['status'], '')] + # Add this model to the model cache + modcache[modId] = mod + # And the session to the session cache + sesscache[modId] = tfSession + # We succeeded. Return a blank status to indicate success. + return [(nodeId, modId, kStrTypeDict['status'], '')] except: # We had an error. Format the exception and return it in the kString return [(nodeId, 1, kStrTypeDict['status'], format_exc('DefineMod'))] - ENDEMBED; + ENDEMBED; // DefineModel + /** Function to Define a Functional (i.e. Non-Sequential) model and (optionally) + * compile the model. + * Returns a kString dataset. An empty string indicates success. Otherwise + * the kString record contains an error message. + * DefineFuncModel gets called on each node of the cluster. + */ + EXPORT STREAMED DATASET(kString) DefineFuncModel(STREAMED DATASET(FuncLayerDef) ldefs, + UNSIGNED4 seqId, + SET OF STRING inputs, + SET OF STRING outputs, + STRING cdef) + := EMBED(Python: globalscope(globalScope), persist('query'), activity) + import traceback as tb + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() + from tensorflow.keras import layers + global nextModId + try: + # Allocate a new modelId + # Make sure we do it atomically to avoid conflict with + # another model running on another thread + threadlock.acquire() + modId = nextModId + nextModId += 1 + threadlock.release() + layerDict = {} # Temporary dictionary for keeping track of layers. + predDict = {} # Temporary dict for keeping track of predecessors. + # Create a new keras / tensorflow context. It sometimes gets lost between calls, + # so we explicitly restore it before each call that uses it. + # Note that for each model, we create a new session and new graph under the hood. + # The graph is stored within the session, so only the session and model are stored, + # both by model id. + graph = tf.Graph() + with graph.as_default(): + tfSession = tf.Session() + with tfSession.as_default(): + # Do two passes through the ldefs so that order of layers won't matter + for rec in ldefs: + lName, ldef, preds = rec + newLayer = eval(ldef) + layerDict[lName] = newLayer + predDict[lName] = preds + # Second pass to resolve the predecessors + for name in layerDict.keys(): + layer = layerDict[name] + predNames = predDict[name] + lpreds = [] + for predName in predNames: + pred = layerDict[predName] + lpreds.append(pred) + # Call the layer object's call method with the list of predecessors + # to set the preds for that layer. + if lpreds: + if len(lpreds) == 1: + layer = layer(lpreds[0]) + else: + layer = layer(lpreds) + layerDict[name] = layer + # Now create the model using inputs and outputs + inps = [] + outps = [] + for inpName in inputs: + l = layerDict[inpName] + inps.append(l) + for outName in outputs: + l = layerDict[outName] + outps.append(l) + mod = tf.keras.models.Model(inputs=inps, outputs=outps) + # If there's a compile string, use it to compile the model. + if cdef: + exec('mod.' + cdef) + # For some reason we need to do a get_weights / set_weights here, or set_weights + # fails later??? + w = mod.get_weights() + mod.set_weights(w) + # Add this model to the model cache + modcache[modId] = mod + # And the session to the session cache + sesscache[modId] = tfSession + # We succeeded. Return a blank status to indicate success. + return [(nodeId, modId, kStrTypeDict['status'], '')] + except: + # We had an error. Format the exception and return it in the kString. + return [(nodeId, 1, kStrTypeDict['status'], format_exc('DefineFuncMod'))] + ENDEMBED; // DefineFuncModel /** * Return a JSON string representing the layers of the model. Does not return any * compile information or trained weights. */ - EXPORT STREAMED DATASET(kString) ToJSON(STREAMED DATASET(kString) dummy, UNSIGNED4 model) := + EXPORT STREAMED DATASET(kString) ToJSON(STREAMED DATASET(kString) dummy, UNSIGNED4 seqId, + UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) try: - mod = modcache['default'] + # Restore the keras / tensorflow context for this model. + mod = modcache[modelid] + tfSession = sesscache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + js = mod.to_json() # Succeeded. Return a blank status. - return [(nodeId, 1, kStrTypeDict['status'], mod.to_json())] + return [(nodeId, 1, kStrTypeDict['status'], js)] except: # Failed. Forat an exception and send it. return [(nodeId, 1, 4, format_exc('ToJSON'))] @@ -284,77 +430,102 @@ EXPORT Keras := MODULE /** * Construct a Keras model from the JSON string passed in. */ - EXPORT STREAMED DATASET(kString) FromJSON(STREAMED DATASET(kString) ksjson, UNSIGNED4 session) := - EMBED(Python: globalscope(globalScope), persist('query'), activity) - import tensorflow as tf + EXPORT STREAMED DATASET(kString) FromJSON(STREAMED DATASET(kString) ksjson, UNSIGNED4 seqId) + := EMBED(Python: globalscope(globalScope), persist('query'), activity) + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() + global nextModId # Should be only one record on each node try: json = 'EMPTY' for rec in ksjson: # Should only be one json kString record. json = rec[2] - mod = tf.keras.models.model_from_json(json) - modcache['default'] = mod + # Restore the keras / tensorflow context for this model. + graph = tf.Graph() + with graph.as_default(): + tfSession = tf.Session() + with tfSession.as_default(): + mod = tf.keras.models.model_from_json(json) + modId = nextModId + nextModId += 1 + modcache[modId] = mod + sesscache[modId] = tfSession except: # Error. Return an exception string. return [(nodeId, 1, kStrTypeDict['status'], format_exc('FromJSON'))] # Success. Return an empty string. - return [(nodeId, 1, kStrTypeDict['status'], '')] + return [(nodeId, modId, kStrTypeDict['status'], '')] ENDEMBED; /** * Compile a previously defined model. */ - EXPORT STREAMED DATASET(kString) CompileMod(STREAMED DATASET(kString) compilestr, UNSIGNED4 model) := - EMBED(Python: globalscope(globalScope), persist('query'), activity) - import tensorflow as tf - tf.keras.backend.set_session(tfSession) - mod = modcache['default'] - # Should only have one compilestr record per node - try: - cstr = 'EMPTY' - for rec in compilestr: - cstr = rec[2] - exec('mod.' + cstr) - except: - return [(nodeId, 1, kStrTypeDict['status'], format_exc('CompileMod'))] - return [(nodeId, 1, kStrTypeDict['status'], '')] + EXPORT STREAMED DATASET(kString) CompileMod(STREAMED DATASET(kString) compilestr, UNSIGNED4 seqId, + UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + mod = modcache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + # Should only have one compilestr record per node + try: + cstr = 'EMPTY' + for rec in compilestr: + cstr = rec[2] + exec('mod.' + cstr) + except: + return [(nodeId, 1, kStrTypeDict['status'], format_exc('CompileMod'))] + return [(nodeId, 1, kStrTypeDict['status'], '')] ENDEMBED; /** * Get the weights from the Keras / Tensorflow model. * Weights are returned as a Tensor List. */ EXPORT STREAMED DATASET(t_Tensor) GetWeights( - STREAMED DATASET(kString) dummy, UNSIGNED4 model) := + STREAMED DATASET(kString) dummy, UNSIGNED4 seqId, UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) - import tensorflow as tf - # Restore the Keras / TF context. - tf.keras.backend.set_session(tfSession) + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() + threadlock.acquire() try: - mod = modcache['default'] - w = mod.get_weights() + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + mod = modcache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + w = mod.get_weights() return NpList2Tens(w) except: # IF there was an error, return an empty dataset. + assert 1 == 0, format_exc('GetWeights modelId = ' + str(modelid)) return [] + finally: + threadlock.release() ENDEMBED; /** * Set the weights into the Keras / TF model. The weights are sent as * a Tensor List (Tensor dataset), one Tensor per layer. */ - EXPORT STREAMED DATASET(kString) SetWeights(STREAMED DATASET(t_Tensor) tens, UNSIGNED4 model) := - EMBED(Python: globalscope(globalScope), persist('query'), activity) - import tensorflow as tf + EXPORT STREAMED DATASET(kString) SetWeights(STREAMED DATASET(t_Tensor) tens, UNSIGNED4 seqId, + UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() import traceback as tb - # Restore the Keras / TF context. - tf.keras.backend.set_session(tfSession) + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + mod = modcache[modelid] try: + # Restore the Keras / TF context. + tfSession = sesscache[modelid] + mod = modcache[modelid] w = Tens2NpList(tens) - mod = modcache['default'] - #w2 = mod.get_weights() - #mod.set_weights(w) - outStr = '' + with tfSession.as_default(): + with tfSession.graph.as_default(): + mod.set_weights(w) # Success. Return an empty status string. - return [(nodeId, 1, 1, outStr)] + return [(nodeId, 1, 1, '')] except: # An error occurred. Return a formatted exception string. return [(nodeId, 1,1,tb.format_exc('SetWeights')[-500:])] @@ -369,48 +540,55 @@ EXPORT Keras := MODULE STREAMED DATASET(t_Tensor) weights, STREAMED DATASET(t_Tensor) x, STREAMED DATASET(t_Tensor) y, - UNSIGNED4 model, - UNSIGNED4 epoch) := - EMBED(Python: globalscope(globalScope), persist('query'), activity) + UNSIGNED4 seqId, + UNSIGNED4 epoch, + UNSIGNED modelid = 0) := + EMBED(Python: globalscope(globalScope), persist('query'), activity) import traceback as tb - import tensorflow as tf + import tensorflow.compat.v1 as tf + tf.disable_v2_behavior() import numpy as np + import uuid + import sys + #stdfileName = '/home/hpcc/pythonlogs/fitLogging' + str(uuid.uuid4()) + '.txt' + #myfile = open(stdfileName, 'w') global currEpoch, batchCount, cumLoss try: # Accumulate the loss for each epoch. - if epoch != currEpoch: - batchCount = 0 - cumLoss = 0.0 - currEpoch = epoch + if epoch != currEpoch.get(modelid, 0): + batchCount[modelid] = 0 + cumLoss[modelid] = 0.0 + currEpoch[modelid] = epoch # Process this batch. - batchCount += 1 + batchCount[modelid] += 1 wA_changes = [] # Restore Keras / TF context - tf.keras.backend.set_session(tfSession) - mod = modcache['default'] + mod = modcache[modelid] # Convert the incoming weights to a list of numpy arrays wA = Tens2NpList(weights) # Convert the X tensor to a numpy array - xAL = Tens2NpList(x) + xAL = Tens2NpList(x, recordOriented = True) # Convert the Y tensor to a numpy array - yAL = Tens2NpList(y) - # Do some error checking. + yAL = Tens2NpList(y, recordOriented = True) if xAL and yAL and xAL[0].size > 0 and yAL[0].size > 0: - xA = xAL[0] - yA = yAL[0] - if xA.size == 0 or yA.size == 0 or xA.shape[0] != yA.shape[0]: - assert 1 == 0, 'Fit: X and Y sizes do not match or are zero: xShape = ' + str(xA.shape) + ', yShape = ' + str(yA.shape) - # More Keras TF context restoration + # We've got some data + # Do some error checking. + for i in range(len(xAL)): + xA = xAL[i] + yA = yAL[i] + if xA.size == 0 or yA.size == 0 or xA.shape[0] != yA.shape[0]: + assert 1 == 0, 'Fit: X and Y sizes do not match or are zero: xShape = ' + str(xA.shape) + ', yShape = ' + str(yA.shape) + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] with tfSession.as_default(): with tfSession.graph.as_default(): - global tfHistory # Set the starting weights mod.set_weights(wA) # Run one batch to fit the model - tfHistory = mod.fit(xA, yA, epochs=epoch, batch_size=32, initial_epoch=epoch-1, shuffle=False, steps_per_epoch = 1) + tfHistory = mod.fit(xAL, yAL, epochs=epoch, batch_size=miniBatch, initial_epoch=epoch-1, shuffle=True) # Update the cumulative (epoch) loss currLoss = tfHistory.history['loss'][-1] - cumLoss += currLoss + cumLoss[modelid] += currLoss # Get the new weights from Keras model. wA_out = mod.get_weights() # For each layer, subtract the new weights from the starting weights to compute @@ -426,14 +604,97 @@ EXPORT Keras := MODULE except: # Error occurred, but no string returned. So we do an assert to convey the error. assert 1 == 0, format_exc('FitBatch') - ENDEMBED; + ENDEMBED; // FitBatch + + EXPORT STREAMED DATASET(t_Tensor) FitBatchNCCL( //experimental + STREAMED DATASET(t_Tensor) weights, + STREAMED DATASET(t_Tensor) x, + STREAMED DATASET(t_Tensor) y, + UNSIGNED4 seqId, + UNSIGNED4 epoch, + UNSIGNED modelid = 0, + UNSIGNED4 miniBatch = 0) := + EMBED(Python: globalscope(globalScope), persist('query'), activity) + import traceback as tb + import tensorflow as tf + import numpy as np + import uuid + import sys + #stdfileName = '/home/hpcc/pythonlogs/fitncclSTDOUT_' + str(uuid.uuid4()) + '.txt' + #sys.stdout = open(stdfileName, 'w') + #print('test') + global currEpoch, batchCount, cumLoss + try: + # Accumulate the loss for each epoch. + if epoch != currEpoch.get(modelid, 0): + batchCount[modelid] = 0 + cumLoss[modelid] = 0.0 + currEpoch[modelid] = epoch + # Process this batch. + batchCount[modelid] += 1 + wA_changes = [] + # Restore Keras / TF context + #mod = modcache[modelid] + # Convert the incoming weights to a list of numpy arrays + wA = Tens2NpList(weights) + # Convert the X tensor to a numpy array + xAL = Tens2NpList(x, recordOriented = True) + # Convert the Y tensor to a numpy array + yAL = Tens2NpList(y, recordOriented = True) + if xAL and yAL and xAL[0].size > 0 and yAL[0].size > 0: + # We've got some data + # Do some error checking. + for i in range(len(xAL)): + xA = xAL[i] + yA = yAL[i] + if xA.size == 0 or yA.size == 0 or xA.shape[0] != yA.shape[0]: + assert 1 == 0, 'FitNCCL: X and Y sizes do not match or are zero: xShape = ' + str(xA.shape) + ', yShape = ' + str(yA.shape) + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + mirrored_strategy = tf.distribute.MirroredStrategy() + with mirrored_strategy.scope(): + # Set the starting weights + #mod = modcache[modelid] + mod = tf.keras.Sequential([tf.keras.Input(shape=(28,28,1)),tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),tf.keras.layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),tf.keras.layers.Conv2D(2560, kernel_size=(3, 3), activation="relu"),tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),tf.keras.layers.Dropout(0.25),tf.keras.layers.Flatten(),tf.keras.layers.Dense(10240, activation='relu'),tf.keras.layers.Dense(3850, activation='relu'),tf.keras.layers.Dropout(0.5),tf.keras.layers.Dense(10, activation="softmax"),]) + mod.compile(loss='categorical_crossentropy', optimizer='adam', metrics=["accuracy"]) + #mod.set_weights(wA) + # Run one batch to fit the model + tfHistory = mod.fit(xAL, yAL, epochs=epoch, batch_size=miniBatch, initial_epoch=epoch-1) + # Update the cumulative (epoch) loss + #currLoss = tfHistory.history['loss'][-1] + #cumLoss[modelid] += currLoss + # Get the new weights from Keras model. + #wA_out = mod.get_weights() + wA_out = wA + # For each layer, subtract the new weights from the starting weights to compute + # the weight updates. + for i in range(len(wA)): + wA_changes.append(wA_out[i] - wA[i]) + else: + # No X / Y data received. Send null changes + for i in range(len(wA)): + wA_changes.append(np.zeros_like(wA[i])) + # Return the weight changes as a Tensor List. + return NpList2Tens(wA_changes) + except: + # Error occurred, but no string returned. So we do an assert to convey the error. + assert 1 == 0, format_exc('FitBatchNCCL') + ENDEMBED; // FitBatchNCCL /** * Get the current epoch's accumulated average loss up to this point. */ - EXPORT STREAMED DATASET(losses) GetLoss(STREAMED DATASET(kString) dummy, UNSIGNED4 model):= + EXPORT STREAMED DATASET(losses) GetLoss(STREAMED DATASET(kString) dummy, UNSIGNED4 seqId, + UNSIGNED modelid = 0):= EMBED(Python: globalscope(globalScope), persist('query'), activity) - assert batchCount > 0, 'Keras.GetLoss: batchCount = 0' + ', currEpoch = ' + str(currEpoch) - loss = cumLoss / batchCount + global batchCount, cumLoss + try: + assert batchCount[modelid] > 0, 'Keras.GetLoss: batchCount = 0' + ', currEpoch = ' + str(currEpoch[modelid]) + loss = cumLoss[modelid] / batchCount[modelid] + except: + assert False, format_exc('GetLoss -- modelId = ' + str(modelid) + ', batchCount = ' + str(batchCount)) + return [(0.0,)] return [(loss,)] ENDEMBED; /** @@ -444,73 +705,178 @@ EXPORT Keras := MODULE EXPORT STREAMED DATASET(metrics) Evaluate( STREAMED DATASET(t_Tensor) x, STREAMED DATASET(t_Tensor) y, - UNSIGNED4 model) := + UNSIGNED4 seqId, + UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) - mod = modcache['default'] - # Convert x data to a numpy array - xA = Tens2NpList(x) - # Convert y data to a numpy array - yA = Tens2NpList(y) - outRecs = [] - # Restore Keras / TF context - with tfSession.as_default(): - with tfSession.graph.as_default(): - # Evaluate the Keras model - metrics = mod.evaluate(xA, yA) - # Get the name for each metric - mNames = mod.metrics_names - for i in range(len(metrics)): - # Return the name and value for each defined metric. - rec = (i, mNames[i], float(metrics[i])) - outRecs.append(rec) - return outRecs + try: + mod = modcache[modelid] + # Convert x data to a numpy array + xA = Tens2NpList(x, recordOriented = True) + # Convert y data to a numpy array + yA = Tens2NpList(y, recordOriented = True) + outRecs = [] + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + # Evaluate the Keras model + metrics = mod.evaluate(xA, yA) + # Get the name for each metric + mNames = mod.metrics_names + for i in range(len(metrics)): + # Return the name and value for each defined metric. + rec = (i, mNames[i], float(metrics[i])) + outRecs.append(rec) + return outRecs + except: + # Error occurred, but no string returned. So we do an assert to convey the error. + assert 1 == 0, format_exc('Evaluate') ENDEMBED; - /** - * Use the Keras model to predict the output for a set - * of independent (x) data. - */ - EXPORT STREAMED DATASET(t_Tensor) Predict( - STREAMED DATASET(t_Tensor) xDat, - UNSIGNED4 model) := + + /** + *Evaluate the ROC/AUC of the model. + */ + EXPORT STREAMED DATASET(metrics) RocAucScore( + STREAMED DATASET(t_Tensor) x, + STREAMED DATASET(t_Tensor) y, + UNSIGNED4 seqId, + UNSIGNED modelid = 0) := EMBED(Python: globalscope(globalScope), persist('query'), activity) - import numpy as np - import traceback as tb - mod = modcache['default'] try: - def predGen(): - # We need to process the data one slice at a time, so that we can emit - # slices with the proper wi and sliceId so that the record indexes line up - # between the supplied x and the returned predictions. - outSlices = [] - for slice in xDat: - # Convert one slice to numpy array format. - xA = Tens2Np([slice]) - node, wi, sliceId, shape, dataType, maxSliceSize, slice_size, \ - densedat, sparsedat = slice - # Restore keras / tf context - with tfSession.as_default(): - with tfSession.graph.as_default(): - predA = mod.predict(xA) - preds = [] - # We need to derive the max slice size from the ratio of record sizes - xSize = np.prod(shape[1:]) - ySize = np.prod(predA.shape[1:]) - newMaxSize = int(maxSliceSize * ySize / xSize) - # Results should be a single slice, but is returned as a list from Np2Tens(...). - for s in Np2Tens(predA, maxSliceOverride=newMaxSize): - preds.append(s) - pred = preds[0] - # Apply the wi and sliceId of the original x data to the predictions - y = (node, wi, sliceId, pred[3], pred[4], pred[5], pred[6], pred[7], pred[8]) - # Yield the output slice. - yield y - return - return predGen() + from sklearn.metrics import roc_auc_score + mod = modcache[modelid] + # Convert x data to a numpy array + xA = Tens2NpList(x, recordOriented = True) + # Convert y data to a numpy array + yA = Tens2NpList(y, recordOriented = True) + outRecs = [] + # Restore the keras / tensorflow context for this model. + tfSession = sesscache[modelid] + with tfSession.as_default(): + with tfSession.graph.as_default(): + # Evaluate the Keras model + yPredictions = mod.predict(xA) + auc = roc_auc_score(yA[0], yPredictions) #returns a float type + outRecs.append((0,'AUC_ROC', float(auc))) + #outRecs.append((0,str(yPredictions.shape), float(1))) + #outRecs.append((1,str(xA[0].shape), float(1))) + #outRecs.append((2,str(yA[0].shape), float(1))) + #outRecs.append((4,str(auc), float(1))) + return outRecs except: - # An error occurred during Predict. - assert 0 == 1, 'Keras Predict error: ' + format_exc('Predict') - return [] + # Error occurred, but no string returned. So we do an assert to convey the error. + assert 1 == 0, format_exc('RocAucScore') ENDEMBED; + + + /** + * Use the Keras model to predict the output for a set + * of independent (x) data. + */ + EXPORT DATASET(t_Tensor) Predict( + DATASET(t_Tensor) xDat, + UNSIGNED4 seqId, + UNSIGNED modelId) := FUNCTION + STREAMED DATASET(t_Tensor) Predict2( + STREAMED DATASET(t_Tensor) x_dat, + UNSIGNED4 seq_id, + UNSIGNED model_id) := + EMBED(Python: globalscope(globalScope), persist('query'), activity) + import numpy as np + import traceback as tb + # Generator function for producing the predictions + def predGen(mod, tfSession): + try: + # We need to process the data one slice at a time, so that we can emit + # slices with the proper wi and sliceId so that the record indexes line up + # between the supplied x and the returned predictions. + xAL = [] # X array list accumulates wi's for each sliceId + currSlice = 0 + maxRecs = 0 # Should be the same for each wi across all slices + # process all the wi's for each sliceId. Inputs should be aligned at this point. + # and sorted by sliceId, wi. + for slice in x_dat: + node, wi, sliceId, shape, dataType, maxSliceSize, slice_size, \ + densedat, sparsedat = slice + # Calculate the maximum number of records per slice. This only needs to be + # done for the first slice and wi, since it should be consistent for aligned + # tensors + if maxRecs == 0: + recSize = np.prod(shape[1:]) + maxRecs = int(maxSliceSize / recSize) + if sliceId != currSlice: + # Got the next slice. Now we should have all the wi's for the previous slice. + # Process the full slice. + if xAL: + # We have a slice accumulated. + # Process it. + # Restore the keras / tensorflow context for this model. + with tfSession.as_default(): + with tfSession.graph.as_default(): + predA = mod.predict(xAL, steps=1) + for i in range(len(predA)): + sliceA = predA[i] + recSize = int(np.prod(sliceA.shape[1:])) + newMaxSize = maxRecs * recSize + # Np2Tens will set the sliceId to 1 since it's the only slice. + # so we need to set the sliceId back to the original + for s in Np2Tens(sliceA, wi=i+1, maxSliceOverride=newMaxSize): + # Should only be one slice since we forced the maxSliceOverride + # but it's a generator, so we need to do for loop. + s = s[:2] + (currSlice,) + s[3:] + yield s + # Now clear the accumultor + xAL = [] + currSlice = sliceId + # Convert the slice to a numpy array and add it to the accumulator + # Force the sliceId to 1 to handle this slice as standalone + sliceAdj = [node, wi, 1, shape, dataType, maxSliceSize, slice_size, densedat, sparsedat] + xA = Tens2Np([sliceAdj], recordOriented = True) + xAL.append(xA) + # END for slice in x_dat + # Process the last sliceId + if xAL: + # Restore keras / tf context + with tfSession.as_default(): + with tfSession.graph.as_default(): + predA = mod.predict(xAL, steps=1) + if type(predA) != type([]): + predA = [predA] + for i in range(len(predA)): + sliceA = predA[i] + recSize = int(np.prod(sliceA.shape[1:])) + newMaxSize = maxRecs * recSize + # Np2Tens will set the sliceId to 1 since it's the only slice. + # so we need to set the sliceId back to the original + for s in Np2Tens(sliceA, wi=i+1, maxSliceOverride=newMaxSize): + # Should only be one slice since we forced the maxSliceOverride + # but it's a generator, so we need to do for loop. + s = s[:2] + (currSlice,) + s[3:] + yield s + return + except: + # An error occured during predGen() + assert 0 == 1, format_exc('Predict2 -- predGen') + # END predGen() + try: + # Get the model + mod = modcache[model_id] + sess = sesscache[model_id] + # Return the generator that will produce the output Tensor list + return predGen(mod, sess) + except: + # An error occurred during Predict. + assert 0 == 1, format_exc('Predict2') + return [] + ENDEMBED; // Predict2 + // Sort Xdat by sliceId and then by wi so that we can present the model with all + // inputs (i.e. wi's) at once. + xDatS := SORT(xDat, sliceId, wi, LOCAL); + preds := predict2(xDatS, seqId, modelId); + // Now re-sort into the cannonical order + predsS := SORT(preds, wi, sliceId, LOCAL); + RETURN predsS; + END; // Predict /** * Shutdown the Keras Interface and free up all global memory fields. * This leaves behind, at most, a small memory footprint that should @@ -522,12 +888,12 @@ EXPORT Keras := MODULE */ EXPORT STREAMED DATASET(kString) Shutdown( STREAMED DATASET(kString) temp, - UNSIGNED4 model) := + UNSIGNED4 seqId) := EMBED(Python: globalscope(globalScope), persist('query'), activity) import traceback as tb global nodeId, nNodes, maxSliceLen global modcache - global tfHistory + global sesscache global currEpoch global batchCount global cumLoss @@ -540,7 +906,6 @@ EXPORT Keras := MODULE nodeId = None nNodes = None maxSliceLen = None - tfHistory = None currEpoch = None batchCount = None cumLoss = None @@ -553,10 +918,11 @@ EXPORT Keras := MODULE Tens2NpList = None NpList2Tens = None format_exc = None + del(modcache) + del(sesscache) del(nodeId) del(nNodes) del(maxSliceLen) - del(tfHistory) del(currEpoch) del(batchCount) del(cumLoss) diff --git a/Internal/TensExtract.ecl b/Internal/TensExtract.ecl index 3901de3..f9e9a8b 100644 --- a/Internal/TensExtract.ecl +++ b/Internal/TensExtract.ecl @@ -8,145 +8,177 @@ nNodes := Thorlib.nodes(); t_Tensor := Tensor.R4.t_Tensor; -MAX_SLICE := Tensor.MAX_SLICE; +//MAX_SLICE := Tensor.MAX_SLICE; +MAX_SLICE := POWER(2, 24); /** * This function is used by GNNI to pull local samples from the X and Y tensors. * The result is a new tensor with samples from each local slice of the tensor. * Note that this will extract datcount samples from EACH node. The pos parameter * indicates how far into the local tensor slices to start extracting. + * If there are multiple tensors in the tensor dataset, then extract datcount + * samples from each one. If there are multiple tensors in the dataset, then + * it is essential to align them before calling this function + * @see Tensor.AlignTensors */ EXPORT DATASET(t_Tensor) TensExtract(DATASET(t_Tensor) tens, UNSIGNED pos, UNSIGNED datcount) := FUNCTION // Python embed function to do most of the heavy lifting. STREAMED DATASET(t_Tensor) extract(STREAMED DATASET(t_Tensor) tens, - UNSIGNED pos, UNSIGNED datcount, nodeid, maxslice) := EMBED(Python: activity) + UNSIGNED pos, UNSIGNED datcount, nodeid, nNodes, maxslice) := EMBED(Python: activity) import numpy as np import traceback as tb - try: - maxSliceLen = maxslice - dTypeDict = {1:np.float32, 2:np.float64, 3:np.int32, 4:np.int64} - dTypeDictR = {'float32':1, 'float64':2, 'int32':3, 'int64':4} - dTypeSizeDict = {1:4, 2:8, 3:4, 4:8} - outArray = None - tshape = [] - sliceNum = 0 - lastSlice = 0 - fullSize = 0 - rowSize = 0 - outSize = 0 - startSlice = 0 - startPos = 0 - endSlice = 0 - endPos = 0 - outPos = 0 - # If the first shape component is non-zero, then this is a fixed size Tensor - # and exact positions are important. If not fixed sized, then we take the - # records sequentially and don't fill gaps. We determine size by the actual - # records received. - isFixedSize = False - wi = 0 - for rec in tens: - node, wi, sliceId, shape, dataType, maxSliceSize, slice_size, densedat, sparsedat = rec - dtype = dTypeDict[dataType] - tshape = shape - if outArray is None: - # Initialize important information on the first slice. - # Full size of the tensor - fullSize = np.prod(shape) - # Is fixed size if the first component of the shape is 0. - isFixedSize = fullSize != 0 - # Row size is the size of the 2nd - last shape component. - rowSize = np.prod(shape[1:]) - # Calculate the size to be returned - outSize = rowSize * datcount - # Create an array of zeros to hold the output. - outArray = np.zeros((outSize,), dtype) - # Figure out which slice and position the desired data starts - # and ends on - startSlice, startPos = divmod(pos * rowSize, maxSliceSize) - endSlice, endPos = divmod((pos + datcount) * rowSize, maxSliceSize) - if sliceNum < startSlice: - # The data is found in a later slice. Skip this one. - sliceNum += 1 - continue - if not densedat: - # Sparse decoding - dat = np.zeros((slice_size,), dtype) - for offset, val in sparsedat: - assert offset < slice_size, 'TensExtract: sparsedat has higher index the sliceSize = ' + str(offset) - dat[offset] = dtype(val) - densedat = dat - if sliceNum == startSlice and sliceNum == endSlice: - # Data starts and ends on this slice. - densedat = densedat[startPos:endPos] - elif sliceNum == startSlice: - # Data starts on this slice, but ends on a further one. - densedat = densedat[startPos:] - elif sliceNum == endSlice: - # Data ends on this slice but started previously. - densedat = densedat[:endPos] - # Add any data from this slice - outArray[outPos:outPos + len(densedat)] = densedat - outPos += len(densedat) - sliceNum += 1 - # If this is the end slice, we're done. - if sliceNum >= endSlice: - break - if sliceNum == 0: - # No data in the slice. Return an empty Tensor. - return [] - if outPos < datcount * rowSize: - # Fewer than requested records available. - outArray.resize((outPos,)) - if tshape[0] == 0: - tshape[0] = -1 - # If this is a variable size tensor, reflect that in the numpy array. - outArray = np.reshape(outArray, tshape) - # Function to convert a numpy array to a tensor. - def Np2Tens(a, wi=1): - epsilon = .000000001 - origShape = list(a.shape) - flatA = a.reshape(-1) - flatSize = flatA.shape[0] - sliceId = 1 - indx = 0 - maxSliceSize = 0 - datType = dTypeDictR[str(a.dtype)] - elemSize = dTypeSizeDict[datType] - max_slice = divmod(maxSliceLen, elemSize)[0] - while indx < flatSize: - remaining = flatSize - indx - if remaining >= max_slice: - sliceSize = max_slice + maxSliceLen = maxslice + dTypeDict = {1:np.float32, 2:np.float64, 3:np.int32, 4:np.int64} + dTypeDictR = {'float32':1, 'float64':2, 'int32':3, 'int64':4} + dTypeSizeDict = {1:4, 2:8, 3:4, 4:8} + # Generator Function to convert a numpy array to a tensor. + def Np2Tens(a, wi=1): + epsilon = .000000001 + origShape = list(a.shape) + # For final shape, the first component should be zero to indicate a record-oriented + # tensor. + finalShape = [0] + origShape[1:] + flatA = a.reshape(-1) + flatSize = flatA.shape[0] + sliceId = nodeid + 1 + indx = 0 + maxSliceSize = 0 + datType = dTypeDictR[str(a.dtype)] + elemSize = dTypeSizeDict[datType] + max_slice = divmod(maxSliceLen, elemSize)[0] + while indx < flatSize: + remaining = flatSize - indx + if remaining >= max_slice: + sliceSize = max_slice + else: + sliceSize = remaining + #if sliceId == 1: + # maxSliceSize = sliceSize + maxSliceSize = sliceSize + dat = list(flatA[indx:indx + sliceSize]) + dat = [float(d) for d in dat] + elemCount = 0 + for i in range(len(dat)): + if abs(dat[i]) > epsilon: + elemCount += 1 + if elemCount > 0 or sliceId == 1: + if elemCount * (elemSize + 4) < len(dat): + # Sparse encoding + sparse = [] + for i in range(len(dat)): + if abs(dat[i]) > epsilon: + sparse.append((i, dat[i])) + yield (nodeid, wi, sliceId, finalShape, datType, maxSliceSize, sliceSize, [], sparse) else: - sliceSize = remaining - if sliceId == 1: - maxSliceSize = sliceSize - dat = list(flatA[indx:indx + sliceSize]) - dat = [float(d) for d in dat] - elemCount = 0 - for i in range(len(dat)): - if abs(dat[i]) > epsilon: - elemCount += 1 - if elemCount > 0 or sliceId == 1: - if elemCount * (elemSize + 4) < len(dat): - # Sparse encoding - sparse = [] - for i in range(len(dat)): - if abs(dat[i]) > epsilon: - sparse.append((i, dat[i])) - yield (nodeid, wi, sliceId, origShape, datType, maxSliceSize, sliceSize, [], sparse) + # Dense encoding + yield (nodeid, wi, sliceId, finalShape, datType, maxSliceSize, sliceSize, dat, []) + sliceId += 1 + indx += sliceSize + # END OF NP2Tens + # Generator function to return the extract from a list of tensors + def getResults(): + try: + outArray = None + tshape = [] + sliceNum = 0 + fullSize = 0 + rowSize = 0 + outSize = 0 + startSlice = 0 + startPos = 0 + endSlice = 0 + endPos = 0 + outPos = 0 + currWi = 0 + # If the first shape component is non-zero, then this is a fixed size Tensor + # and exact positions are important. If not fixed sized, then we take the + # records sequentially and don't fill gaps. We determine size by the actual + # records received. + isFixedSize = False + for rec in tens: + node, wi, sliceId, shape, dataType, maxSliceSize, slice_size, densedat, sparsedat = rec + dtype = dTypeDict[dataType] + if wi != currWi: + if outArray is not None: + # New wi. Output the previous one. + if outPos < datcount * rowSize: + # Fewer than requested records available. + outArray.resize((outPos,)) + # If this is a variable size tensor, reflect that in the numpy array. + outArray = np.reshape(outArray, tshape) + # Yield the previous wi's tensor and reset for the new wi. + yield from Np2Tens(outArray, currWi) + outArray = None + tshape = [] + currWi = wi + if outArray is None: + # Initialize important information on the first slice. + # The output shape (tshape). Note: Only supports record oriented tensors. + if shape[0] == 0: + tshape = [-1] + shape[1:] # Make first term -1 for numpy tensor else: - # Dense encoding - yield (nodeid, wi, sliceId, origShape, datType, maxSliceSize, sliceSize, dat, []) - sliceId += 1 - indx += sliceSize - - return Np2Tens(outArray, wi) - except: - # Error during extraction. - assert 0 == 1, 'TensExtract: ' + tb.format_exc() - ENDEMBED; - RETURN extract(tens, pos-1, datcount, nodeId, MAX_SLICE); + raise Exception('Extract requires record-oriented tensors ' + \ + 'that must have a zero first shape component. Shape = ' + str(shape) + '.') + # Full size of the tensor + fullSize = np.prod(shape) + # Is fixed size if the first component of the shape is 0. + isFixedSize = fullSize != 0 + # Row size is the size of the 2nd - last shape component. + rowSize = np.prod(shape[1:]) + # Calculate the size to be returned + outSize = rowSize * datcount + # Create an array of zeros to hold the output. + outArray = np.zeros((outSize,), dtype) + # Figure out which slice and position the desired data starts + # and ends on + startSlice, startPos = divmod(pos * rowSize, maxSliceSize) + endSlice, endPos = divmod((pos + datcount) * rowSize, maxSliceSize) + # Slice number + sliceNum = 0 + outPos = 0 + if sliceNum < startSlice or sliceNum > endSlice: + # The data is found in a later slice or we're already past the end of + # the data. We have to keep iterating in the latter case because there + # might be more wi's. Skip this record. + sliceNum += 1 + continue + if not densedat: + # Sparse decoding + dat = np.zeros((slice_size,), dtype) + for offset, val in sparsedat: + assert offset < slice_size, 'TensExtract: sparsedat has higher index than the sliceSize = ' + str(offset) + dat[offset] = dtype(val) + densedat = dat + if sliceNum == startSlice and sliceNum == endSlice: + # Data starts and ends on this slice. + densedat = densedat[startPos:endPos] + elif sliceNum == startSlice: + # Data starts on this slice, but ends on a further one. + densedat = densedat[startPos:] + elif sliceNum == endSlice: + # Data ends on this slice but started previously. + densedat = densedat[:endPos] + # Add any data from this slice + outArray[outPos:outPos + len(densedat)] = densedat + outPos += len(densedat) + sliceNum += 1 + # END for + if sliceNum == 0: + # No data in the slice. Return an empty Tensor. + return [] + if outPos < datcount * rowSize: + # Fewer than requested records available. + outArray.resize((outPos,)) + # If this is a variable size tensor, reflect that in the numpy array. + outArray = np.reshape(outArray, tshape) + # Yield the final wi's tensor + yield from Np2Tens(outArray, wi) + except: + # Error during extraction. + assert 0 == 1, 'TensExtract: ' + str(tshape) + ',' + 'currWi = ' + str(currWi) + ', ' + tb.format_exc() + # END OF getResults() + return getResults() + ENDEMBED; // Extract + RETURN SORT(extract(tens, pos-1, datcount, nodeId, nNodes, MAX_SLICE), wi, sliceId, LOCAL); END; \ No newline at end of file diff --git a/README.md b/README.md index 50122d0..1548686 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ It provides Keras / Tensorflow operations parallelized over an HPCC cluster. Models are created on each HPCC node, and training, evaluation and predictions are done in a distributed fashion across the HPCC cluster. -The Module GNNI defines the ECL interface to Keras. It currently only supports -the Keras Sequential model. +The Module GNNI defines the ECL interface to Keras. It supports any Keras +model (Functional or Sequential), and allows models with multiple inputs +and outputs. GNN is designed to handle any type of Neural Network model that can be built -using the Keras Sequential model. This includes Classical Neural Networks as +using Keras. This includes Classical Neural Networks as well as Convolutional Networks and Recursive Networks such as LSTM, or any combination of the above. @@ -28,7 +29,14 @@ that Python3 and Tensorflow are correctly installed on each Thor node. ## EXAMPLES The files Test/ClassicTest.ecl and ClassificationTest.ecl show annotated examples -of using GNN to create a simple Classical Neural Networks. The folder Test/HARTests +of using GNN to create a simple Classical Neural Networks using the Keras Sequential +model. + +The file Test/FuncModelTest.ecl shows an example of building a classical regression / +classification network with multiple inputs and outputs using the Keras Functional +model. + +The folder Test/HARTests contains tests that show how to create more sophisticated Convolutional and Recurrent networks. diff --git a/Tensor.ecl b/Tensor.ecl index d47b797..8691862 100644 --- a/Tensor.ecl +++ b/Tensor.ecl @@ -586,25 +586,30 @@ EXPORT Tensor slicesPerNode := nSlices / nNodes; relNode := (sliceId - 1) div slicesPerNode; // Offset per wi so that differnt work items use staggered node numbering - nodeId := (wi - 1) + relNode % nNodes; + //nodeId := (wi - 1) + relNode % nNodes; + nodeId := relNode % nNodes; // Temporarily disable spreading by wi return nodeId; END; /** * Calculate the sliceId in which a given Tensor cell will reside. */ - SHARED UNSIGNED4 calcSliceId(t_Indexes indexes, t_Indexes indxSizes, UNSIGNED4 sliceSize) := EMBED(Python) - import traceback as tb - try: - # Indexes are 1 based - # Calculate the position (zero based) given a 1-based index - pos = 0 - for i in range(len(indxSizes)): - pos += (indexes[i]-1) * indxSizes[i] - # Return the sliceId (1-based) - return int(pos / sliceSize) + 1 - except: - assert 0 == 1, 'Tensor.calcSliceId: ' + tb.format_exc() - ENDEMBED; + SHARED UNSIGNED4 calcSliceId(UNSIGNED recNum, UNSIGNED4 recSize, UNSIGNED4 sliceSize) := FUNCTION + pos := (recNum - 1) * recSize; + RETURN (pos DIV sliceSize) + 1; + END; + /** + * Optimized version of calcNodeId(wi, calcSliceId(...), nSlices) + * Because of the number of times this is called, we need to optimize as much as possible, + * even at the cost of clarity. + */ + SHARED UNSIGNED4 calcNodeId2(UNSIGNED4 wi, UNSIGNED4 nSlices, UNSIGNED4 sliceSize, UNSIGNED recNum, UNSIGNED4 recSize) := FUNCTION + sliceIdZ :=((recNum -1) * recSize) DIV sliceSize; // Zero based + relNode := sliceIdZ DIV (nSlices / nNodes); + //nodeId := (wi - 1) + relNode % nNodes; + nodeId := relNode % nNodes; // Temporarily disable spreading by wi + RETURN nodeId; + END; + /** * Make a Tensor from a set of TensorData and * some meta-data. @@ -645,7 +650,8 @@ EXPORT Tensor sliceElems := sliceSize / elemSize; nSlices := ROUNDUP(totalSize / sliceSize); indxSizes := calcIndexSizes(shape); - contentsD := DISTRIBUTE(contents, calcNodeId(wi, calcSliceId(indexes, indxSizes, sliceElems), nSlices)); + recSize := indxSizes[1]; + contentsD := DISTRIBUTE(contents, calcNodeId2(wi, nSlices, sliceElems, indexes[1], recSize)); contentsDS := SORT(NOCOMBINE(contentsD), indexes[1], LOCAL); slices0 := makeSlices(contentsDS, wi, shape, adjShape, t_TensType.R4, elemSize, sliceElems); // If not replicated, slices are already correctly distributed (i.e. by wi and sliceId) @@ -660,10 +666,13 @@ EXPORT Tensor */ SHARED DATASET(t_Tensor) deReplicate(DATASET(t_Tensor) tens) := FUNCTION nSlices := COUNT(tens); + maxSlice := MAX(tens, sliceId); slicesPerNode := nSlices / nNodes; wi := tens[1].wi; derep := tens(nodeId = calcNodeId(wi, sliceId, nSlices)); - return derep; + // Only de-rep if it is a replicated tensor, otherwise bad things can happen. + outTens := IF(nSlices > maxSlice, derep, tens); + return outTens; END; /** * Extract the data from a tensor and return it in sparse TensData format. @@ -887,5 +896,59 @@ EXPORT Tensor IF(ArecsPerSlice > BrecsPerSlice, alignedA + tB0, tens)); RETURN reAligned; END; // AlignTensorPair + /** + * Aligns a list of Tensors (seperated by wi) so that all of the tensors' + * corresponding records are stored on the same node. + * This prevents different sized + * records from being distributed differently among the nodes. + *
In most cases, the inputs and outputs to a neural network during training, + * and the inputs during prediction should be aligned so that + * various aspects of the same observation are presented together. + * + * @param tens A Tensor List with at least two tensors identified by + * sequential work item ids from 1-N. + * @return A new Tensor List with the same number of tensors as the input + * list, with all of the tensors being aligned. + **/ + EXPORT DATASET(t_Tensor) AlignTensors(DATASET(t_Tensor) tensList) := FUNCTION + // This can be optimized if necessary by custom restructuring + // versus use of GetData(...) and MakeTensor(...) + elemSize := 4; // REAL4 + UNSIGNED recSize(t_Indexes shape) := EMBED(Python) + import numpy as np + recSz = np.prod(shape[1:]) + return int(recSz) + ENDEMBED; + itemInfo0 := TABLE(tensList, {wi, shape, maxSliceSize, UNSIGNED recsPerSlice := 0, UNSIGNED recSize := 0}, wi, shape, maxSliceSize); + itemInfo1 := PROJECT(itemInfo0, TRANSFORM(RECORDOF(LEFT), + SELF.recSize := recSize(LEFT.shape), + SELF.recsPerSlice := LEFT.maxSliceSize / SELF.recSize, + SELF := LEFT), LOCAL); + itemInfo := SORT(itemInfo1, recsPerSlice, -recSize); + largestRecItem := itemInfo[1]; + newRecSize := largestRecItem.recSize; + newRecsPerSlice := largestRecItem.recsPerSlice; + largestRecWI := largestRecItem.wi; + numTensors := COUNT(itemInfo); + DATASET(t_Tensor) adjustTensors(DATASET(t_Tensor) tl, UNSIGNED ctr) := FUNCTION + // Do one tensor for each loop + thisTens := tl(wi = ctr); + thisTensDat := GetData(thisTens); + thisTensShape := thisTens[1].shape; + thisMaxSliceSize := newRecsPerSlice * recSize(thisTensShape) * elemSize; + // We want all the tensors to be aligned the same, so we create the slices with + // wi of the largestRecItem, and then project to the correct wi. This is because MakeTensor spreads + // the wi's across nodes. By Making all the tensors with the wi of the largestRecItem, we + // save the need to re-create that largest of the tensors. + adjTens0 := MakeTensor(thisTensShape, thisTensDat, wi := largestRecWI, forceMaxSliceSize := thisMaxSliceSize); + adjTens := PROJECT(adjTens0, TRANSFORM(RECORDOF(LEFT), SELF.wi := ctr, SELF := LEFT), LOCAL); + // If this is the tensor with the largest rec size, don't need to adjust. Otherwise adjust. + newTens := IF(ctr = largestRecWI, thisTens, adjTens); + outTens := tl(wi != ctr) + newTens; + return outTens; + END; + reAligned := LOOP(tensList, numTensors, LEFT.wi >= COUNTER, adjustTensors(ROWS(LEFT), COUNTER)); + RETURN SORT(reAligned, sliceId, LOCAL); + END; // AlignTensors END; // R4 END; // t_Tensor diff --git a/Test/ClassicTest.ecl b/Test/ClassicTest.ecl index 023c53d..dc780ca 100644 --- a/Test/ClassicTest.ecl +++ b/Test/ClassicTest.ecl @@ -24,6 +24,8 @@ RAND_MAX := POWER(2,32) -1; trainCount := 1000; testCount := 1000; featureCount := 5; +batchSize := 128; +numEpochs := 5; // END Test parameters // Prepare training data. @@ -138,7 +140,7 @@ OUTPUT(wts, NAMED('InitWeights')); // Fit trains the models, given training X and Y data. BatchSize is not the Keras batchSize, // but defines how many records are processed on each node before synchronizing the weights -mod2 := GNNI.Fit(mod, trainX, trainY, batchSize := 128, numEpochs := 5); +mod2 := GNNI.Fit(mod, trainX, trainY, batchSize := batchSize, numEpochs := numEpochs); OUTPUT(mod2, NAMED('mod2')); diff --git a/Test/ClassificationTest.ecl b/Test/ClassificationTest.ecl index f583120..f90bf69 100644 --- a/Test/ClassificationTest.ecl +++ b/Test/ClassificationTest.ecl @@ -38,6 +38,8 @@ trainCount := 1000; testCount := 100; featureCount := 5; classCount := 3; +numEpochs := 5; +batchSize := 128; // End of Test Parameters // Prepare training data. @@ -65,11 +67,11 @@ END; // Build the training data train0 := DATASET(trainCount, TRANSFORM(trainRec, SELF.id := COUNTER, - SELF.x := [(RANDOM() % RAND_MAX) / RAND_MAX -.5, - (RANDOM() % RAND_MAX) / RAND_MAX -.5, - (RANDOM() % RAND_MAX) / RAND_MAX -.5, - (RANDOM() % RAND_MAX) / RAND_MAX -.5, - (RANDOM() % RAND_MAX) / RAND_MAX -.5], + SELF.x := [(RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1], SELF.y := []) ); // Be sure to compute Y in a second step. Otherewise, the RANDOM() will be executed twice and the Y will be based @@ -97,7 +99,7 @@ trainX := NORMALIZE(train, featureCount, TRANSFORM(NumericField, SELF.id := LEFT.id, SELF.number := COUNTER, SELF.value := LEFT.x[COUNTER])); -trainY := NORMALIZE(train, 3, TRANSFORM(NumericField, +trainY := NORMALIZE(train, classCount, TRANSFORM(NumericField, SELF.wi := 1, SELF.id := LEFT.id, SELF.number := COUNTER, @@ -111,7 +113,7 @@ testX := NORMALIZE(test, featureCount, TRANSFORM(NumericField, SELF.id := LEFT.id, SELF.number := COUNTER, SELF.value := LEFT.x[COUNTER])); -testY := NORMALIZE(test, 3, TRANSFORM(NumericField, +testY := NORMALIZE(test, classCount, TRANSFORM(NumericField, SELF.wi := 1, SELF.id := LEFT.id, SELF.number := COUNTER, @@ -129,8 +131,8 @@ ldef := ['''layers.Dense(16, activation='tanh', input_shape=(5,))''', // compileDef defines the compile line to use for compiling the defined model. // Note that 'model.' is implied, and should not be included in the compile line. compileDef := '''compile(optimizer=tf.keras.optimizers.SGD(.05), - loss=tf.keras.losses.mean_squared_error, - metrics=[tf.keras.metrics.mean_squared_error]) + loss=tf.keras.losses.categorical_crossentropy, + metrics=['accuracy']) '''; // Note that the order of the GNNI functions is maintained by passing tokens returned from one call @@ -155,7 +157,7 @@ OUTPUT(wts, NAMED('InitWeights')); // Fit trains the models, given training X and Y data. BatchSize is not the Keras batchSize, // but defines how many records are processed on each node before synchronizing the weights // Note that we use the NF form of Fit since we are using NumericField for I / o. -mod2 := GNNI.FitNF(mod, trainX, trainY, batchSize := 128, numEpochs := 20); +mod2 := GNNI.FitNF(mod, trainX, trainY, batchSize := batchSize, numEpochs := numEpochs); OUTPUT(mod2, NAMED('mod2')); diff --git a/Test/Datasets/MNIST/README.MD b/Test/Datasets/MNIST/README.MD new file mode 100644 index 0000000..9fe34ef --- /dev/null +++ b/Test/Datasets/MNIST/README.MD @@ -0,0 +1,3 @@ +# MNIST FOR GNN + +Spray the two files as fixed 785 length. One is training set with 60k instances, and the other has 10k instances for a test set. diff --git a/Test/Datasets/MNIST/mnist_test_noheader b/Test/Datasets/MNIST/mnist_test_noheader new file mode 100644 index 0000000..e79f2e5 Binary files /dev/null and b/Test/Datasets/MNIST/mnist_test_noheader differ diff --git a/Test/Datasets/MNIST/mnist_train_noheader b/Test/Datasets/MNIST/mnist_train_noheader new file mode 100644 index 0000000..b54fc8c Binary files /dev/null and b/Test/Datasets/MNIST/mnist_train_noheader differ diff --git a/Test/ExtractTest.ecl b/Test/ExtractTest.ecl index 542acff..532d3d3 100644 --- a/Test/ExtractTest.ecl +++ b/Test/ExtractTest.ecl @@ -1,6 +1,9 @@ /*############################################################################## ## HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. All rights reserved. -############################################################################## */ +##############################################################################*/ +/** + * Test the TensorExtract module + */ IMPORT Python; IMPORT $.^ AS GNN; IMPORT GNN.Tensor; @@ -45,13 +48,17 @@ OUTPUT(train0, NAMED('trainData')); trainX0 := NORMALIZE(train0, featureCount, TRANSFORM(TensData, SELF.indexes := [LEFT.id, COUNTER], SELF.value := LEFT.x[COUNTER])); -trainY0 := NORMALIZE(train0, 1, TRANSFORM(TensData, - SELF.indexes := [LEFT.id, COUNTER], - SELF.value := LEFT.y)); -X := Tensor.R4.MakeTensor([0, featureCount], trainX0); -Y:= Tensor.R4.MakeTensor([0, 1], trainY0); + +// Test with 2 tensors in a Tensor List. It's okay that they're both the same contents. +// We manually align the tensors since they would normally be distributed differently +// because of the different wi's. +X1 := Tensor.R4.MakeTensor([0, featureCount], trainX0, wi := 1); +X2 := PROJECT(Tensor.R4.MakeTensor([0, featureCount], trainX0, wi := 1), + TRANSFORM(RECORDOF(LEFT), + SELF.wi := 2, + SELF := LEFT)); +X := X1 + X2; OUTPUT(X, NAMED('X')); -OUTPUT(Y, NAMED('Y')); eX1 := int.TensExtract(X, 1, 10); eX2 := int.TensExtract(X, 11, 10); diff --git a/Test/FuncModelTest.ecl b/Test/FuncModelTest.ecl new file mode 100644 index 0000000..c465690 --- /dev/null +++ b/Test/FuncModelTest.ecl @@ -0,0 +1,315 @@ +/*############################################################################## +## HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. All rights reserved. +############################################################################## */ +/** + * Test GNNI Functional (i.e. non-sequential) Model Capability + * Create a single model that does a Regression and a Classification in a single + * functional model. This model has two sets of inputs (one for Regression and + * one for Classification), and two corresponding outputs. + */ +IMPORT Python3 AS Python; +IMPORT $.^ AS GNN; +IMPORT GNN.Tensor; +IMPORT GNN.Internal.Types AS iTypes; +IMPORT GNN.Types; +IMPORT GNN.GNNI; +IMPORT GNN.Internal AS Int; +IMPORT Std.System.Thorlib; +IMPORT GNN.Utils; +t_Tensor := Tensor.R4.t_Tensor; +TensData := Tensor.R4.TensData; +FuncLayerDef := Types.FuncLayerDef; + +RAND_MAX := POWER(2,32) -1; + +// Test parameters +numEpochs := 5; +batchSize := 128; +trainCount := 1000; +testCount := 1000; +featureCount := 5; +// END Test parameters + +// Prepare Regression training and test data. +// We use 5 inputs (X) and a single output (Y) +trainRecR := RECORD + UNSIGNED8 id; + SET OF REAL x; + REAL4 y; +END; + +// The Regression target function maps a set of X features into a Y value, which is a polynomial function of X. +REAL4 targetFuncR(REAL4 x1, REAL4 x2, REAL4 x3, REAL4 x4, REAL4 x5) := FUNCTION + rslt := .5 * POWER(x1, 4) - .4 * POWER(x2, 3) + .3 * POWER(x3,2) - .2 * x4 + .1 * x5; + RETURN rslt; +END; + +// Build the regression training data. Pick random data for X values, and use a polynomial +// function of X to compute Y. +train0R := DATASET(trainCount, TRANSFORM(trainRecR, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1], + SELF.y := 0) + ); +// Be sure to compute Y in a second step. Otherwise, the RANDOM() will be executed twice and the Y will be based +// on different values than those assigned to X. This is an ECL quirk that is not easy to fix. +trainR := PROJECT(train0R, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncR(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); +OUTPUT(trainR, NAMED('trainDataR')); + +// Build the test data. Same process as the training data. +test0R := DATASET(testCount, TRANSFORM(trainRecR, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1], + SELF.y := 0) + ); + +testR := PROJECT(test0R, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncR(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); + +// Break the training and test data into X (independent) and Y (dependent) data sets. Format as Tensor Data. +trainX0R := NORMALIZE(trainR, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +trainY0R := NORMALIZE(trainR, 1, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.y)); + +// Form a Tensor from the tensor data. This packs the data into 'slices' that can contain dense +// or sparse portions of the Tensor. If the tensor is small, it will fit into a single slice. +// Huge tensors may require many slices. The slices also contain tensor metadata such as the shape. +// For record oriented data, the first component of the shape should be 0, indicating that it is an +// arbitrary length set of records. +trainXR := Tensor.R4.MakeTensor([0, featureCount], trainX0R); +trainYR:= Tensor.R4.MakeTensor([0, 1], trainY0R); + +OUTPUT(trainXR, NAMED('trainXR')); +OUTPUT(trainYR, NAMED('trainYR')); + +testX0R := NORMALIZE(testR, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +testY0R := NORMALIZE(testR, 1, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.y)); +testXR := Tensor.R4.MakeTensor([0, featureCount], testX0R); +testYR:= Tensor.R4.MakeTensor([0, 1], testY0R); + +// Now Prepare Classification training and test data. +// We use 5 inputs (X) and a one hot encoded output (Y) with 3 classes +// (i.e. 3 outputs). +trainRecC := RECORD + UNSIGNED8 id; + SET OF REAL4 x; + REAL4 y; +END; + +// The target function maps a set of X features into a Y value, +// which is a threshold on a polynomial function of X. + +// Returns 1 of 3 classes, 0, 1, or 2. +REAL4 targetFuncC(REAL4 x1, REAL4 x2, REAL4 x3, REAL4 x4, REAL4 x5) := FUNCTION + rslt0 := TANH(POWER(x1, 4) - 2 * POWER(x2, 3) + 3 * POWER(x3,2) - 4 * x4 + 5 * x5); + rslt := MAP(rslt0 < -.33 => 0, rslt0 < .33 => 1, 2); + RETURN rslt; +END; + +// Build the training data +train0C := DATASET(trainCount, TRANSFORM(trainRecC, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1], + SELF.y := []) + ); +// Be sure to compute Y in a second step. Otherewise, the RANDOM() will be executed twice and the Y will be based +// on different values than those assigned to X. This is an ECL quirk that is not easy to fix. +trainC := PROJECT(train0C, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncC(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); +OUTPUT(trainC, NAMED('trainDataC')); + +// Build the test data. Same process as the training data. +test0C := DATASET(testCount, TRANSFORM(trainRecC, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1, + (RANDOM() % RAND_MAX) / (RAND_MAX / 2) - 1], + SELF.y := []) + ); + +testC := PROJECT(test0C, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncC(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); + +// Break the training and test data into X (independent) and Y (dependent) data sets. +// Format as NumericField data. +trainX0C := NORMALIZE(trainC, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +trainY0C := NORMALIZE(trainC, 1, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.Y)); +// We need to one-hot enncode the Y value since we are doing classification using +// softmax. +trainY1C := Utils.ToOneHot(trainY0C, 3); // Three classes + +trainXC := Tensor.R4.MakeTensor([0, featureCount], trainX0C, wi:=2); +trainYC:= Tensor.R4.MakeTensor([0, 3], trainY1C, wi:=2); +OUTPUT(trainXC, NAMED('trainXC')); +OUTPUT(trainYC, NAMED('trainYC')); + +testX0C := NORMALIZE(testC, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +testY0C := NORMALIZE(testC, 3, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.Y)); +// We need to one-hot enncode the Y value since we are doing classification using +// softmax. +testY1C := Utils.ToOneHot(testY0C, 3); // Three classes + +testXC := Tensor.R4.MakeTensor([0, featureCount], testX0C, wi:=2); +testYC:= Tensor.R4.MakeTensor([0, 3], testY1C, wi:=2); + +// New we can create the combined Train and Test data by adding (i.e. concatenating) +// the Regression and Classification Tensors. Note that we use wi = 1 (default) for the first +// (i.e. Regression tensor), and wi = 2 for the second (i.e. Classification) tensors +// Classification Tensors. Thus we have a Tensor List with 2 tensors for each training +// and test input. +trainX := trainXR + trainXC; +trainY := trainYR + trainYC; +OUTPUT(trainX, NAMED('trainX')); +OUTPUT(trainY, NAMED('trainY')); +testX := testXR + testXC; +testY := testYR + testYC; +OUTPUT(testX, NAMED('testX')); +OUTPUT(testY, NAMED('testY')); +// Create a functional model with two inputs, and two outputs. This is a combination +// of a regression model and a classification model. The two models are combined into +// a single functional model. The attributes ldef1 and ldef2 illustrate what the +// individual models would look like using a sequential model definition. +// This allows the reader to see how to map from sequential models to +// a functional model. +// Note that the two models are not connected internally. This is not the normal +// way to use functional models, but is simple and does allow us to test multiple +// inputs and outputs as well as the functional definition process. + +// ldef1 is the sequential model definition for the Regression model (for reference) +ldef1 := ['''layers.Dense(256, activation='tanh', input_shape=(5,))''', + '''layers.Dense(256, activation='relu')''', + '''layers.Dense(1, activation=None)''']; +// ldef2 is the sequential model definition for the Classification model (for reference) +ldef2 := ['''layers.Dense(16, activation='tanh', input_shape=(5,))''', + '''layers.Dense(16, activation='relu')''', + '''layers.Dense(3, activation='softmax')''']; + +// fldef is the Functional model definition that combines the two above models +// The first field is the name of that layer. The second is the definition of the layer. +// The third field is a list of predecessor layer names for this layer. The +// Functional model defines a Directed Acyclic Graph (DAG), which is stitched together +// using the predecessor list. If there were Concatenation layers, they would +// list multiple predecessors. Note that Input layers have no predecessors. +fldef := DATASET([{'input1', '''layers.Input(shape=(5,))''', []}, // Regression Input + {'d1', '''layers.Dense(256, activation='tanh')''', ['input1']}, // Regression Hidden 1 + {'d2', '''layers.Dense(256, activation='relu')''', ['d1']}, // Regression Hidden 2 + {'output1', '''layers.Dense(1, activation=None)''', ['d2']}, // Regression Output + {'input2', '''layers.Input(shape=(5,))''', []}, // Classification Input + {'d3', '''layers.Dense(16, activation='tanh', input_shape=(5,))''',['input2']}, // Classification Hidden 1 + {'d4', '''layers.Dense(16, activation='relu')''',['d3']}, // Classification Hidden 2 + {'output2', '''layers.Dense(3, activation='softmax')''', ['d4']}], // Classification Output + FuncLayerDef); + +// compileDef defines the compile line to use for compiling the defined model. +// Note that 'model.' is implied, and should not be included in the compile line. +// We define two losses, one for each output. lossWeights allows us to weight one +// loss stronger than the other if desired. +compileDef := '''compile(optimizer=tf.keras.optimizers.SGD(.05), + loss=[tf.keras.losses.mean_squared_error, tf.keras.losses.categorical_crossentropy], + metrics=[]) + '''; + +// Note that the order of the GNNI functions is maintained by passing tokens returned from +// one call into the next call that is dependent on it. +// For example, s is returned from GetSession(). It is used as the input to +// DefineModels(...) so +// that DefineModels() cannot execute until GetSession() has completed. +// Likewise, mod, the output from GetSession() is provided as input to Fit(). Fit in turn +// returns a token that is used by GetLoss(), EvaluateMod(), and Predict(), +// which are only dependent on Fit() having completed, and are not order +// dependent on one another. + +// GetSession must be called before any other functions +s := GNNI.GetSession(); +// DefineModel is dependent on the Session +// fldef defines the functional model +// inputs lists the input layer names +// outputs lists the output layer names +// compileDef contains the Keras compile statement. +mod := GNNI.DefineFuncModel(s, fldef, ['input1', 'input2'], ['output1', 'output2'], compileDef); +// GetWeights returns the initialized weights that have been synchronized across all nodes. +wts := GNNI.GetWeights(mod); + +OUTPUT(wts, NAMED('InitWeights')); + +// Fit trains the models, given training X and Y data. BatchSize is not the Keras batchSize, +// but defines how many records are processed on each node before synchronizing the weights +mod2 := GNNI.Fit(mod, trainX, trainY, batchSize := batchSize, numEpochs := numEpochs); + +OUTPUT(mod2, NAMED('mod2')); + +// GetLoss returns the average loss for the final training epoch +losses := GNNI.GetLoss(mod2); + +OUTPUT(losses, NAMED('Losses')); + +// EvaluateMod computes the loss, as well as any other metrics that were defined in the Keras +// compile line. +metrics := GNNI.EvaluateMod(mod2, testX, testY); + +OUTPUT(metrics, NAMED('metrics')); + +// Predict computes the neural network output given a set of inputs. +preds := GNNI.Predict(mod2, testX); + +// Note that the Tensor is a packed structure of Tensor slices. GetData() +// extracts the data into a sparse cell-based form -- each record represents +// one Tensor cell. See Tensor.R4.TensData. +testYDatR := Tensor.R4.GetData(testY(wi=1)); +testYDatC0 := Tensor.R4.GetData(testY(wi=2)); +predDatR := Tensor.R4.GetData(preds(wi=1)); +predDatC0 := Tensor.R4.GetData(preds(wi=2)); +testYDatC := Utils.FromOneHot(testYDatC0); +predDatC := Utils.FromOneHot(predDatC0); +OUTPUT(SORT(testYDatR, indexes), ALL, NAMED('testDatR')); +OUTPUT(SORT(testYDatC, indexes), ALL, NAMED('testDatC')); +OUTPUT(preds, NAMED('predictions')); +OUTPUT(SORT(predDatR, indexes), ALL, NAMED('predDatR')); +OUTPUT(SORT(predDatC, indexes), ALL, NAMED('predDatC')); + +// Here we are comparing the expected values (testYDat) with the predicted values (predDat) +// and calculating the error and squared error to validate the process +cmpR := JOIN(testYDatR, predDatR, LEFT.indexes[1] = RIGHT.indexes[1], TRANSFORM({SET OF REAL indexes, + REAL pred, REAL actual, REAL error, REAL sqError}, + SELF.indexes := LEFT.indexes, SELF.pred := RIGHT.value, SELF.actual := LEFT.value, + SELF.error := ABS(SELF.actual - SELF.pred), + SELF.sqError := SELF.error * SELF.error), LEFT OUTER, LOCAL); + +cmpC := JOIN(testYDatC, predDatC, LEFT.indexes[1] = RIGHT.indexes[1], TRANSFORM({SET OF REAL indexes, + REAL pred, REAL actual, BOOLEAN correct}, + SELF.indexes := LEFT.indexes, SELF.pred := RIGHT.value, SELF.actual := LEFT.value, + SELF.correct := SELF.pred = SELF.actual), LEFT OUTER, LOCAL); + +OUTPUT(SORT(cmpR, indexes), ALL, NAMED('predcompareR')); +OUTPUT(SORT(cmpC, indexes), ALL, NAMED('predcompareC')); + +accuracyR := AVE(cmpR, sqError); +accuracyC := COUNT(cmpC(correct=TRUE)) / COUNT(cmpC); +OUTPUT(accuracyR, NAMED('accuracyR')); +OUTPUT(accuracyC, NAMED('accuracyC')); \ No newline at end of file diff --git a/Test/MNISTTests/minst_wGNN.ecl b/Test/MNISTTests/minst_wGNN.ecl new file mode 100644 index 0000000..d204000 --- /dev/null +++ b/Test/MNISTTests/minst_wGNN.ecl @@ -0,0 +1,141 @@ +/* +This is will train a small Convolutional Neural Network on the MNIST dataset. It demonstrates +how to use image data with GNN (see mnist_as_real.ecl on how to properly prepare the data for a tensor) +as well as demnostrates how to use the GNN that is GPU enabled. + + +Trains a 34k parameter CNN on 60k MNIST images and tests on an additional, unseen, 8k images for +measuring accuracy. Trains for 10 epochs and a mini-batch size of 128. + + +Expected Accuracy: You can expect over 96% accuracy accross 2 GPUs acrros 2 physcal computer +Expect Training Time: You can expect the whole Workunit to complete in under 6 minutes using 2 + physcal computers, 2 NVIDIA K80 GPUs, and a 2 Thor cluster. + +*/ + +#option('outputLimit',2000); +#option('hthorMemoryLimit',10000); +IMPORT Python3 AS Python; +IMPORT $.^.^ AS GNN; +IMPORT GNN.Tensor; +IMPORT GNN.Internal.Types AS iTypes; +IMPORT GNN.Types; +IMPORT GNN.GNNI; +IMPORT GNN.Utils; +IMPORT GNN.Internal AS Int; +IMPORT Std.System.Thorlib; + + + +numOfGPUs := 2; //16 for total system GPU count, just used for workunit naming +aggregateInterval := 1; //how many times do you want the weights to be aggregated per epoch +numOfPhysicalMachines := 2; //-1 makes TF to use CPU across the board, set the number of physical computer, assumes number of GPUs are equal accross machines and assumes number of Thor nodes is equal to total number of GPUs in system + + +IMPORT mnist_as_real_big as mnist; +trainCount := 60000; //This is all of them +testCount := 8196; //Most of the test, fits easier accross multiple GPUs +numEpochs := 10; + +#WORKUNIT('name', 'MNIST Test with '+numOfGPUs+' GPUs - TrainCount: '+trainCount + ' - aggs per epoch: ' + aggregateInterval + ' - epochs:' + numEpochs); + +miniBatch := 128; //32 is Stock-GNN +oldBatchSize := 128; + +featureCount := 784; +weightAggregateInterval := trainCount / (numOfGPUs * aggregateInterval); //old "batchSize" + + +trainData := CHOOSEN(mnist.train,trainCount); +testData := CHOOSEN(mnist.test,testCount); + +myXTensData := NORMALIZE(trainData, featureCount, + TRANSFORM(Tensor.R4.TensData, + SELF.indexes := [LEFT.id, (COUNTER-1) DIV 28 + 1, (COUNTER-1) % 28 +1, 1], + SELF.value := LEFT.pixel[COUNTER])); + +myXTensDataTest := NORMALIZE(testData, featureCount, + TRANSFORM(Tensor.R4.TensData, + SELF.indexes := [LEFT.id, (COUNTER-1) DIV 28 + 1, (COUNTER-1) % 28 +1, 1], + SELF.value := LEFT.pixel[COUNTER])); +//OUTPUT(COUNT(myXTensData)); + +// Each source record becomes 10 Y (in this case "label" tensor cells (one per class value) +// using One-Hot encoding. +// But only the record associated with the class (i.e. value of Y) +// will be 1. The others will be zero. Since the TensData format +// is sparse, we just skip the zero cells. +myYTensData := NORMALIZE(trainData, 10, + TRANSFORM(Tensor.R4.TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := IF(LEFT.label != COUNTER - 1, SKIP, 1))); + +myYTensDataTest := NORMALIZE(testData, 10, + TRANSFORM(Tensor.R4.TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := IF(LEFT.label != COUNTER - 1, SKIP, 1))); + +// Now we convert the Tensor Data to a Tensor dataset by calling MakeTensor() +myXTensor := Tensor.R4.MakeTensor([0,28,28,1], myXTensData); +myYTensor := Tensor.R4.MakeTensor([0,10], myYTensData); + +myXTensorTest := Tensor.R4.MakeTensor([0,28,28,1], myXTensDataTest); +myYTensorTest := Tensor.R4.MakeTensor([0,10], myYTensDataTest); + + +instancesPerSlice := COUNT(myXTensor[1].densedata)/featureCount; +//OUTPUT(myXTensor, NAMED('SeeingTheSlices')); +//OUTPUT(instancesPerSlice, NAMED('InstancesPerSlice')); + +//OUTPUT(myXTensor, NAMED('x1')); +//OUTPUT(myYTensor, NAMED('y1')); + +ldef := [ '''layers.Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(28,28,1))''', + '''layers.MaxPooling2D(pool_size=(2, 2))''', + '''layers.Conv2D(64, kernel_size=(3, 3), activation="relu")''', + '''layers.MaxPooling2D(pool_size=(2, 2))''', + '''layers.Flatten()''', + '''layers.Dropout(0.5)''', + '''layers.Dense(10, activation="softmax")''']; // 34,826 trainable parameters + + +compileDef := '''compile(loss="categorical_crossentropy", optimizer=tf.keras.optimizers.SGD(learning_rate=0.0005, momentum=0.1), metrics=["accuracy"])'''; + + +// GetSession must be called before any other functions +s := GNNI.GetSession(numOfPhysicalMachines); +// DefineModel is dependent on the Session +// ldef contains the Python definition for each Keras layer +// compileDef contains the Keras compile statement. + +mod := GNNI.DefineModel(s, ldef, compileDef); + +// GetWeights returns the initialized weights that have been synchronized across all nodes. +wts := GNNI.GetWeights(mod); +OUTPUT(wts, NAMED('InitWeights')); + +mod2 := GNNI.Fit(mod, myXTensor, myYTensor, batchSize := weightAggregateInterval, miniBatch := miniBatch, numEpochs := numEpochs); + +//OUTPUT(mod2, NAMED('mod2_FIT')); + +// GetLoss returns the average loss for the final training epoch +losses := GNNI.GetLoss(mod2); +OUTPUT(losses, NAMED('Losses')); + +// EvaluateMod computes the loss, as well as any other metrics that were defined in the Keras +// compile line. +metrics := GNNI.EvaluateMod(mod2, myXTensorTest, myYTensorTest); + +OUTPUT(metrics, NAMED('metrics')); + + + + + + + + + + + diff --git a/Test/MNISTTests/mnist_as_real.ecl b/Test/MNISTTests/mnist_as_real.ecl new file mode 100644 index 0000000..61b4264 --- /dev/null +++ b/Test/MNISTTests/mnist_as_real.ecl @@ -0,0 +1,76 @@ +/* + Helper file to load the MNIST dataset for use in GNN. + + To get training data: mnist_as_real.train; + To get testing data: mnist_as_real.test; +*/ +ImageType := DATA784; + +mnist_dt := RECORD + INTEGER1 label; + ImageType image; +END; + +mnist_dt_set := RECORD + UNSIGNED8 id; + UNSIGNED1 label; + SET OF UNSIGNED1 pixel; +END; + + +GetSet(ImageType I) := FUNCTION + PixRec := {UNSIGNED1 Pixels}; + PixDS := DATASET(SIZEOF(I), + TRANSFORM(PixRec, + SELF.Pixels := (>UNSIGNED1<)I[COUNTER])); + RETURN SET(PixDS,Pixels); +END; + +GetSetREAL(ImageType I) := FUNCTION + PixRec := {REAL4 Pixels}; + PixDS := DATASET(SIZEOF(I), + TRANSFORM(PixRec, + SELF.Pixels := (>UNSIGNED1<)I[COUNTER])); + RETURN SET(PixDS,Pixels); +END; + + +train0 := DATASET('~mnist::train', mnist_dt, THOR); + + +test0 := DATASET('~mnist::test', mnist_dt,THOR); + +trainDat := PROJECT(train0, + TRANSFORM(mnist_dt_set, + SELF.id := COUNTER, + SELF.label := (UNSIGNED1)LEFT.label, + SELF.pixel := GetSet(LEFT.image))); + +testDat:= PROJECT(test0, + TRANSFORM(mnist_dt_set, + SELF.id := COUNTER, + SELF.label := (UNSIGNED1)LEFT.label, + SELF.pixel := GetSet(LEFT.image))); + + + +trainDatREAL := PROJECT(train0, + TRANSFORM(mnist_dt_set, + SELF.id := COUNTER, + SELF.label := (REAL4)LEFT.label, + SELF.pixel := GetSetREAL(LEFT.image))); +testDatREAL := PROJECT(test0, + TRANSFORM(mnist_dt_set, + SELF.id := COUNTER, + SELF.label := (REAL4)LEFT.label, + SELF.pixel := GetSetREAL(LEFT.image))); + +//OUTPUT(trainDatREAL); +//OUTPUT(testDatREAL); + +EXPORT mnist_as_real := MODULE + EXPORT train := trainDatREAL;//trainDat; + EXPORT test := testDatREAL;//testDat; +END; + + diff --git a/Test/MultiModel.ecl b/Test/MultiModel.ecl new file mode 100644 index 0000000..fd3fcd1 --- /dev/null +++ b/Test/MultiModel.ecl @@ -0,0 +1,316 @@ +/*############################################################################## +## HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. All rights reserved. +############################################################################## */ +/** + * Multiple Model Test + * Tests the use of multiple Keras models at the same time. + * This is a simplified combination of ClassicTest.ecl (Regression) + * and ClassificationTest.ecl (Classification). + * This test uses synthetic data generated internally. + * It shows how GNN can be used to produce and consume multiple + * models in the same work-unit. + */ +IMPORT Python3 AS Python; +IMPORT $.^ AS GNN; +IMPORT GNN.Tensor; +IMPORT GNN.Internal.Types AS iTypes; +IMPORT GNN.Types; +IMPORT GNN.GNNI; +IMPORT GNN.Internal AS Int; +IMPORT Std.System.Thorlib; +IMPORT ML_Core AS mlc; + +NumericField := mlc.Types.NumericField; +t_Tensor := Tensor.R4.t_Tensor; +TensData := Tensor.R4.TensData; + +RAND_MAX := POWER(2,32) -1; + +// Test parameters +trainCount := 1000; +testCount := 1000; +featureCount := 5; +numEpochs := 10; +batchSize := 128; +// END Test parameters + +// Prepare training data. +// We use 5 inputs (X) and a single output (Y) +trainRecR := RECORD + UNSIGNED8 id; + SET OF REAL x; + REAL4 y; +END; + +// The target function maps a set of X features into a Y value, which is a polynomial function of X. +REAL4 targetFuncR(REAL4 x1, REAL4 x2, REAL4 x3, REAL4 x4, REAL4 x5) := FUNCTION + rslt := .5 * POWER(x1, 4) - .4 * POWER(x2, 3) + .3 * POWER(x3,2) - .2 * x4 + .1 * x5; + RETURN rslt; +END; + +// Build the training data. Pick random data for X values, and use a polynomial +// function of X to compute Y. +trainR0 := DATASET(trainCount, TRANSFORM(trainRecR, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5], + SELF.y := 0) + ); +// Be sure to compute Y in a second step. Otherwise, the RANDOM() will be executed twice and the Y will be based +// on different values than those assigned to X. This is an ECL quirk that is not easy to fix. +trainR := PROJECT(trainR0, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncR(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); +OUTPUT(trainR, NAMED('trainDataR')); + +// Build the test data. Same process as the training data. +testR0 := DATASET(testCount, TRANSFORM(trainRecR, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5], + SELF.y := 0) + ); + +testR := PROJECT(testR0, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncR(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); + +// Break the training and test data into X (independent) and Y (dependent) data sets. Format as Tensor Data. +trainRX0 := NORMALIZE(trainR, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +trainRY0 := NORMALIZE(trainR, 1, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.y)); + +// Form a Tensor from the tensor data. This packs the data into 'slices' that can contain dense +// or sparse portions of the Tensor. If the tensor is small, it will fit into a single slice. +// Huge tensors may require many slices. The slices also contain tensor metadata such as the shape. +// For record oriented data, the first component of the shape should be 0, indicating that it is an +// arbitrary length set of records. +trainRX := Tensor.R4.MakeTensor([0, featureCount], trainRX0); +trainRY:= Tensor.R4.MakeTensor([0, 1], trainRY0); + +OUTPUT(trainRX, NAMED('X1')); +OUTPUT(trainRY, NAMED('y1')); + +testRX0 := NORMALIZE(testR, featureCount, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.x[COUNTER])); +testRY0 := NORMALIZE(testR, 1, TRANSFORM(TensData, + SELF.indexes := [LEFT.id, COUNTER], + SELF.value := LEFT.y)); +testRX := Tensor.R4.MakeTensor([0, featureCount], testRX0); +testRY:= Tensor.R4.MakeTensor([0, 1], testRY0); + + +// ldef provides the set of Keras layers that form the neural network. These are +// provided as strings representing the Python layer definitions as would be provided +// to Keras. Note that the symbol 'tf' is available for use (import tensorflow as tf), +// as is the symbol 'layers' (from tensorflow.keras import layers). +// Recall that in Keras, the input_shape must be present in the first layer. +// Note that this shape is the shape of a single observation. +ldefR := ['''layers.Dense(256, activation='tanh', input_shape=(5,))''', + '''layers.Dense(256, activation='relu')''', + '''layers.Dense(1, activation=None)''']; + +// compileDef defines the compile line to use for compiling the defined model. +// Note that 'model.' is implied, and should not be included in the compile line. +compileDefR := '''compile(optimizer=tf.keras.optimizers.SGD(.05), + loss=tf.keras.losses.mean_squared_error, + metrics=[tf.keras.metrics.mean_squared_error]) + '''; + +// Note that the order of the GNNI functions is maintained by passing tokens returned from +// one call into the next call that is dependent on it. +// For example, s is returned from GetSession(). It is used as the input to +// DefineModels(...) so +// that DefineModels() cannot execute until GetSession() has completed. +// Likewise, mod, the output from GetSession() is provided as input to Fit(). Fit in turn +// returns a token that is used by GetLoss(), EvaluateMod(), and Predict(), +// which are only dependent on Fit() having completed, and are not order +// dependent on one another. + +// GetSession must be called before any other functions +s := GNNI.GetSession(); +// DefineModel is dependent on the Session +// ldef contains the Python definition for each Keras layer +// compileDef contains the Keras compile statement. +modR := GNNI.DefineModel(s, ldefR, compileDefR); +OUTPUT(modR, NAMED('modR')); +// GetWeights returns the initialized weights that have been synchronized across all nodes. +wtsR := GNNI.GetWeights(modR); + +OUTPUT(wtsR, NAMED('InitWeightsR')); + +// Fit trains the models, given training X and Y data. BatchSize is not the Keras batchSize, +// but defines how many records are processed on each node before synchronizing the weights +modR2 := GNNI.Fit(modR, trainRX, trainRY, batchSize := batchSize, numEpochs := numEpochs); + +OUTPUT(modR2, NAMED('modR2')); + +// GetLoss returns the average loss for the final training epoch +lossesR := GNNI.GetLoss(modR2); + +// EvaluateMod computes the loss, as well as any other metrics that were defined in the Keras +// compile line. +metricsR := GNNI.EvaluateMod(modR2, testRX, testRY); + +OUTPUT(metricsR, NAMED('metricsR')); + +// Predict computes the neural network output given a set of inputs. +predsR := GNNI.Predict(modR2, testRX); + +// Note that the Tensor is a packed structure of Tensor slices. GetData() +// extracts the data into a sparse cell-based form -- each record represents +// one Tensor cell. See Tensor.R4.TensData. +testRYDat := Tensor.R4.GetData(testRY); +predDatR := Tensor.R4.GetData(predsR); + +OUTPUT(SORT(testRYDat, indexes), ALL, NAMED('testDatR')); +OUTPUT(predsR, NAMED('predictionsR')); +OUTPUT(SORT(predDatR, indexes), ALL, NAMED('predDatR')); + +//********************************************************************************** +// Classification Model +//********************************************************************************** +// Prepare training data. +// We use 5 inputs (X) and a one hot encoded output (Y) with 3 classes +// (i.e. 3 outputs). +trainRecC := RECORD + UNSIGNED8 id; + SET OF REAL4 x; + SET OF REAL4 y; +END; + +// The target function maps a set of X features into a Y value, +// which is a threshold on a polynomial function of X. +// Note that we are effectively doing a One Hot encoding here, since we +// return a set of Y values, one for each class, with only one value +// being one and the rest zero. +// If we were working with tensors here, we could have used a class +// label and then called Utils.ToOneHot to encode it. +SET OF REAL4 targetFuncC(REAL4 x1, REAL4 x2, REAL4 x3, REAL4 x4, REAL4 x5) := FUNCTION + rslt0 := TANH(.5 * POWER(x1, 4) - .4 * POWER(x2, 3) + .3 * POWER(x3,2) - .2 * x4 + .1 * x5); + rslt := MAP(rslt0 > -.25 => [1,0,0], rslt0 < .25 => [0,1,0], [0,0,1]); + RETURN rslt; +END; + +// Build the training data +trainC0 := DATASET(trainCount, TRANSFORM(trainRecC, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5], + SELF.y := []) + ); +// Be sure to compute Y in a second step. Otherewise, the RANDOM() will be executed twice and the Y will be based +// on different values than those assigned to X. This is an ECL quirk that is not easy to fix. +trainC := PROJECT(trainC0, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncC(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); +OUTPUT(trainC, NAMED('trainData')); + +// Build the test data. Same process as the training data. +testC0 := DATASET(testCount, TRANSFORM(trainRecC, + SELF.id := COUNTER, + SELF.x := [(RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5, + (RANDOM() % RAND_MAX) / RAND_MAX -.5], + SELF.y := []) + ); + +testC := PROJECT(testC0, TRANSFORM(RECORDOF(LEFT), SELF.y := targetFuncC(LEFT.x[1], LEFT.x[2], LEFT.x[3], LEFT.x[4], LEFT.x[5]), SELF := LEFT)); + +// Break the training and test data into X (independent) and Y (dependent) data sets. +// Format as NumericField data. +trainCX := NORMALIZE(trainC, featureCount, TRANSFORM(NumericField, + SELF.wi := 1, + SELF.id := LEFT.id, + SELF.number := COUNTER, + SELF.value := LEFT.x[COUNTER])); +trainCY := NORMALIZE(trainC, 3, TRANSFORM(NumericField, + SELF.wi := 1, + SELF.id := LEFT.id, + SELF.number := COUNTER, + SELF.value := LEFT.y[COUNTER])); + +OUTPUT(trainCX, NAMED('TrainCX')); +OUTPUT(trainCY, NAMED('TrainCY')); + +testCX := NORMALIZE(testC, featureCount, TRANSFORM(NumericField, + SELF.wi := 1, + SELF.id := LEFT.id, + SELF.number := COUNTER, + SELF.value := LEFT.x[COUNTER])); +testCY := NORMALIZE(testC, 3, TRANSFORM(NumericField, + SELF.wi := 1, + SELF.id := LEFT.id, + SELF.number := COUNTER, + SELF.value := LEFT.y[COUNTER])); + + +// ldef provides the set of Keras layers that form the neural network. These are +// provided as strings representing the Python layer definitions as would be provided +// to Keras. Note that the symbol 'tf' is available for use (import tensorflow as tf), as is +// the symbol 'layers' (from tensorflow.keras import layers). +ldefC := ['''layers.Dense(16, activation='tanh', input_shape=(5,))''', + '''layers.Dense(16, activation='relu')''', + '''layers.Dense(3, activation='softmax')''']; + +// compileDef defines the compile line to use for compiling the defined model. +// Note that 'model.' is implied, and should not be included in the compile line. +compileDefC := '''compile(optimizer=tf.keras.optimizers.SGD(.05), + loss=tf.keras.losses.mean_squared_error, + metrics=[tf.keras.metrics.mean_squared_error]) + '''; + +// Note that the order of the GNNI functions is maintained by passing tokens returned from one call +// into the next call that is dependent on it. +// For example, s is returned from GetSession(). It is used as the input to DefineModels(...) so +// that DefineModels() cannot execute until GetSession() has completed. +// Likewise, mod, the output from GetSession() is provided as input to Fit(). Fit in turn returns +// a token that is used by GetLoss(), EvaluateMod(), and Predict(), which are only dependent on Fit() +// having completed, and are not order dependent on one another. + +// Use the same session ID as the Regression Network +// Define model is dependent on the Session +// ldef contains the Python definition for each Keras layer +// compileDef contains the Keras compile statement. +modC := GNNI.DefineModel(s, ldefC, compileDefC); +OUTPUT(modC, NAMED('ModC')); +// GetWeights returns the initialized weights that have been synchronized across all nodes. +wtsC := GNNI.GetWeights(modC); + +OUTPUT(wtsC, NAMED('InitWeightsC')); + +// Fit trains the models, given training X and Y data. BatchSize is not the Keras batchSize, +// but defines how many records are processed on each node before synchronizing the weights +// Note that we use the NF form of Fit since we are using NumericField for I / o. +modC2 := GNNI.FitNF(modC, trainCX, trainCY, batchSize := batchSize, numEpochs := numEpochs); + +OUTPUT(modC2, NAMED('modC2')); + +// GetLoss returns the average loss for the final training epoch +lossesC := GNNI.GetLoss(modC2); + +// EvaluateNF computes the loss, as well as any other metrics that were defined in the Keras +// compile line. This is the NumericField form of EvaluateMod. +metricsC := GNNI.EvaluateNF(modC2, testCX, testCY); + +OUTPUT(metricsC, NAMED('metricsC')); + +// PredictNF computes the neural network output given a set of inputs. +// This is the NumericField form of Predict. Note that these predictions are +// effectively the probabilities for each class (as output from softmax in the +// final NN layer). If we had used Tensors rather than NumericField, we +// could convert to a class label by using Utils.FromOneHot, or +// Utils.Probabilities2Class. +predsC := GNNI.PredictNF(modC2, testCX); + +OUTPUT(testCY, ALL, NAMED('testDatC')); +OUTPUT(predsC, NAMED('predictionsC')); diff --git a/Test/TensorAlignTest.ecl b/Test/TensorAlignTest.ecl new file mode 100644 index 0000000..64c7f8f --- /dev/null +++ b/Test/TensorAlignTest.ecl @@ -0,0 +1,45 @@ +/*############################################################################## +## HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems. All rights reserved. +############################################################################## */ +/** + * Unit tests for the Tensor module Alignment mechanism + */ +IMPORT Python3 AS Python; +IMPORT $.^ AS GNN; +IMPORT GNN.Tensor; +IMPORT GNN.Internal AS int; +IMPORT GNN.Internal.Types AS iTypes; +IMPORT Std.System.Thorlib; + +t_Tensor := Tensor.R4.t_Tensor; +TensData := Tensor.R4.TensData; + +DATASET(t_Tensor) MakeTensor(UNSIGNED nRecs, UNSIGNED nRows, nCols, UNSIGNED wi) := FUNCTION + DATASET(TensData) MakeData(UNSIGNED nRecs, UNSIGNED nRows, UNSIGNED nCols) := EMBED(Python) + outrecs = [] + for i in range(nRecs): + for row in range(nRows): + for col in range(nCols): + indx = [i+1,row+1, col+1] + rec = (indx, 1.0) + outrecs.append(rec) + return outrecs + ENDEMBED; + tdat := MakeData(nRecs, nRows, nCols); + tshape := [0, nRows, nCols]; // Zero first time implies record oriented tensor + tens := Tensor.R4.MakeTensor(tshape, tdat, wi := wi); + return tens; +END; + +// Make 3 different sized tensors and then align them. +t1 := MakeTensor(1000,5,5, 1); +t2 := MakeTensor(1000,15, 15, 2); +t3 := MakeTensor(1000, 2, 3, 3); + +combined := SORT(t1 + t2 + t3, wi, sliceId); + +OUTPUT(combined, NAMED('Original')); + +aligned := SORT(Tensor.R4.AlignTensors(combined), wi, sliceId); + +OUTPUT(aligned, NAMED('Aligned')); diff --git a/Types.ecl b/Types.ecl index ba9f3d0..29bf266 100644 --- a/Types.ecl +++ b/Types.ecl @@ -17,4 +17,13 @@ EXPORT Types := MODULE STRING metricName; REAL8 value; END; + /** + * Record to use for defining complex (i.e. Non-Sequential, Functional) models + * using the DefineFuncModel() GNNI method. + */ + EXPORT FuncLayerDef := RECORD + STRING layerName; + STRING layerDef; + SET OF STRING predecessors; + END; END;