diff --git a/docs/templates/api/data_source_api.md b/docs/templates/api/data_source_api.md new file mode 100644 index 0000000000..e6eeff24d1 --- /dev/null +++ b/docs/templates/api/data_source_api.md @@ -0,0 +1,13 @@ +# Data Source + +## Retrieval + +{{ds_get}} + +## Properties + +{{data_source_properties}} + +## Methods + +{{data_source_methods}} diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java index 3e8654cef9..a5112e3e57 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java @@ -68,7 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.onlineTopicName = onlineTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java index 1eeb9e1d59..b91616a22f 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java @@ -75,7 +75,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/DataSource.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/DataSource.java index 4dadc3ec81..56b7aa5436 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/DataSource.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/DataSource.java @@ -53,4 +53,8 @@ public class DataSource extends RestDto { @Setter private String path = ""; + @Getter + @Setter + private StorageConnector storageConnector = null; + } \ No newline at end of file diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 9f1c259521..3945267035 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -135,10 +135,6 @@ public abstract class FeatureGroupBase { @Setter protected OnlineConfig onlineConfig; - @Getter - @Setter - protected StorageConnector storageConnector; - @Getter @Setter protected DataSource dataSource; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java index dbdbf33ee4..20333d5370 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java @@ -205,6 +205,34 @@ public StorageConnector getStorageConnector(String name) throws FeatureStoreExce return storageConnectorApi.getByName(this, name, StorageConnector.class); } + /** + * Get a previously created data source from the feature store. + * + *

data sources encapsulate all information needed for the execution engine to read and write to a specific + * storage. + * + *

If you want to connect to the online feature store, see the getOnlineDataSource` method to get the + * JDBC connector for the Online Feature Store. + * + *

+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        DataSource ds = fs.getDataSource("ds_name");
+   * }
+   * 
+ * + * @param name Name of the data source to retrieve. + * @return DataSource Data source object. + * @throws FeatureStoreException If unable to retrieve DataSource from the feature store. + * @throws IOException Generic IO exception. + */ + public DataSource getDataSource(String name) throws FeatureStoreException, IOException { + DataSource dataSource = new DataSource(); + dataSource.setStorageConnector(getStorageConnector(name)); + return dataSource; + } + /** * Get a previously created HopsFs compliant storage connector from the feature store. * diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java index cefa2388e9..1f6ef4e21f 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureViewBase.java @@ -186,6 +186,7 @@ public Integer createTrainingData( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided `startTime`/`endTime` strings to date types. */ + @Deprecated public Integer createTrainingData(String startTime, String endTime, String description, DataFormat dataFormat, Boolean coalesce, StorageConnector storageConnector, String location, Long seed, StatisticsConfig statisticsConfig, @@ -211,6 +212,80 @@ public Integer createTrainingData(String startTime, String endTime, String descr return trainingDataset.getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data can be retrieved by calling `featureView.getTrainingData()`. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset
+   *        String startTime = "20220101000000";
+   *        String endTime = "20220606235959";
+   *        String description = "demo training dataset";
+   *        DataSource dataSource = fs.getDataSource("my_datasource");
+   *        dataSource.setPath("test/path");
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true);
+   *        fv.createTrainingData(startTime, endTime, description, DataFormat.CSV, true, dataSource,
+   *        null, statisticsConfig, null, null, null);
+   * }
+   * 
+ * + * @param startTime Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param endTime Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided `startTime`/`endTime` date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided `startTime`/`endTime` strings to date types. + */ + public Integer createTrainingData(String startTime, String endTime, String description, DataFormat dataFormat, + Boolean coalesce, DataSource dataSource, Long seed, + StatisticsConfig statisticsConfig, + Map writeOptions, FilterLogic extraFilterLogic, Filter extraFilter) + throws IOException, FeatureStoreException, ParseException { + TrainingDatasetBase trainingDataset = + TrainingDatasetBase.builder() + .featureStore(featureStore) + .eventStartTime(startTime) + .eventEndTime(endTime) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .seed(seed) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + trainingDataset = featureViewApi.createTrainingData(name, version, trainingDataset, TrainingDatasetBase.class); + featureViewApi.computeTrainingData(featureStore, this, trainingDataset); + return trainingDataset.getVersion(); + } + /** * Create the metadata for a training dataset and save the corresponding training data into `location`. The training * data is split into train and test set at random or according to time ranges. The training data can be retrieved by @@ -363,6 +438,7 @@ public Integer createTrainTestSplit( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided date strings to date types. */ + @Deprecated public Integer createTrainTestSplit( Float testSize, String trainStart, String trainEnd, String testStart, String testEnd, String description, DataFormat dataFormat, Boolean coalesce, @@ -397,6 +473,120 @@ public Integer createTrainTestSplit( return trainingDataset.getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data is split into train and test set at random or according to time ranges. The training data can be retrieved by + * calling `featureView.getTrainTestSplit` method. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset based on time split
+   *        String trainStart = "20220101000000";
+   *        String trainEnd = "20220630235959";
+   *        String testStart = "20220701000000";
+   *        String testEnd = "20220830235959";
+   *        String description = "demo training dataset":
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true)
+   *        Map writeOptions = new HashMap() {{
+   *                           put("header", "true");
+   *                           put("delimiter", ",")}
+   *                           };
+   *        // define extra filters
+   *        Filter leftFtFilter = new Filter();
+   *        leftFtFilter.setFeature(new Feature("left_ft_name"));
+   *        leftFtFilter.setValue("400");
+   *        leftFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        Filter rightFtFilter = new Filter();
+   *        rightFtFilter.setFeature(new Feature("right_ft_name"));
+   *        rightFtFilter.setValue("50");
+   *        rightFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        FilterLogic extraFilterLogic = new FilterLogic(SqlFilterLogic.AND, leftFtFilter, rightFtFilter);
+   *        Filter extraFilter = new Filter();
+   *        extraFilter.setFeature(new Feature("ft_name"));
+   *        extraFilter.setValue("100");
+   *        extraFilter.setCondition(SqlFilterCondition.GREATER_THAN);
+   *
+   *        // create training data
+   *        fv.createTrainTestSplit(null, null, trainStart, trainEnd, testStart,
+   *        testEnd,  description, DataFormat.CSV, coalesce, dataSource, seed, statisticsConfig,
+   *        writeOptions, extraFilterLogic, extraFilter);
+   *
+   *        // or based on random split
+   *        fv.createTrainTestSplit(20, 10, null, null,  null, null, description, DataFormat.CSV, coalesce,
+   *        dataSource, seed, statisticsConfig, writeOptions, extraFilterLogic, extraFilter);
+
+   * }
+   * 
+ * + * @param testSize Size of test set. + * @param trainStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param trainEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided date strings to date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided date strings to date types. + */ + public Integer createTrainTestSplit( + Float testSize, String trainStart, String trainEnd, String testStart, String testEnd, + String description, DataFormat dataFormat, Boolean coalesce, DataSource dataSource, + Long seed, StatisticsConfig statisticsConfig, Map writeOptions, + FilterLogic extraFilterLogic, Filter extraFilter + ) throws IOException, FeatureStoreException, ParseException { + validateTrainTestSplit(testSize, trainEnd, testStart); + TrainingDatasetBase trainingDataset = + TrainingDatasetBase.builder() + .featureStore(featureStore) + .testSize(testSize) + .trainStart(trainStart) + .trainEnd(trainEnd) + .testStart(testStart) + .testEnd(testEnd) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .trainSplit(Split.TRAIN) + .seed(seed) + .timeSplitSize(2) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + + trainingDataset = featureViewApi.createTrainingData(name, version, trainingDataset, TrainingDatasetBase.class); + featureViewApi.computeTrainingData(featureStore, this, trainingDataset); + return trainingDataset.getVersion(); + } + /** * Create the metadata for a training dataset and save the corresponding training data into `location`. The training * data is split into train, validation, and test set at random or according to time range. The training data can be @@ -569,6 +759,7 @@ public Integer createTrainValidationTestSplit( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided date strings to date types. */ + @Deprecated public Integer createTrainValidationTestSplit( Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, String validationEnd, String testStart, String testEnd, String description, DataFormat dataFormat, @@ -606,6 +797,133 @@ public Integer createTrainValidationTestSplit( return trainingDataset.getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data is split into train, validation, and test set at random or according to time range. The training data can be + * retrieved by calling `feature_view.getTrainValidationTestSplit`. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset based on time split
+   *        String trainStart = "20220101000000";
+   *        String trainEnd = "20220630235959";
+   *        String validationStart = "20220701000000";
+   *        String validationEnd = "20220830235959";
+   *        String testStart = "20220901000000";
+   *        String testEnd = "20220931235959";
+   *        String description = "demo training dataset";
+   *        DataSource dataSource = fs.getDataSource("my_datasource");
+   *        dataSource.setPath("test/path");
+   *        Long seed = 1234L;
+   *        Boolean coalesce = true;
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true)
+   *        Map writeOptions = new HashMap() {{
+   *                           put("header", "true");
+   *                           put("delimiter", ",")}
+   *                           };
+   *        // define extra filters
+   *        Filter leftFtFilter = new Filter();
+   *        leftFtFilter.setFeature(new Feature("left_ft_name"));
+   *        leftFtFilter.setValue("400");
+   *        leftFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        Filter rightFtFilter = new Filter();
+   *        rightFtFilter.setFeature(new Feature("right_ft_name"));
+   *        rightFtFilter.setValue("50");
+   *        rightFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        FilterLogic extraFilterLogic = new FilterLogic(SqlFilterLogic.AND, leftFtFilter, rightFtFilter);
+   *        Filter extraFilter = new Filter();
+   *        extraFilter.setFeature(new Feature("ft_name"));
+   *        extraFilter.setValue("100");
+   *        extraFilter.setCondition(SqlFilterCondition.GREATER_THAN);
+   *        // create training data
+   *        fv.createTrainTestSplit(null, null, trainStart, trainEnd, validationStart, validationEnd, testStart,
+   *        testEnd,  description, DataFormat.CSV, coalesce, dataSource, seed, statisticsConfig,
+   *        writeOptions, extraFilterLogic, extraFilter);
+   *
+   *        // or based on random split
+   *        fv.createTrainTestSplit(20, 10, null, null, null, null, null, null, description, DataFormat.CSV, coalesce,
+   *        dataSource, seed, statisticsConfig, writeOptions, extraFilterLogic, extraFilter);
+   * }
+   * 
+ * + * @param validationSize Size of validation set. + * @param testSize Size of test set. + * @param trainStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param trainEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param validationStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param validationEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided date strings to date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided date strings to date types. + */ + public Integer createTrainValidationTestSplit( + Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, + String validationEnd, String testStart, String testEnd, String description, DataFormat dataFormat, + Boolean coalesce, DataSource dataSource, + Long seed, StatisticsConfig statisticsConfig, Map writeOptions, + FilterLogic extraFilterLogic, Filter extraFilter + ) throws IOException, FeatureStoreException, ParseException { + validateTrainValidationTestSplit(validationSize, testSize, trainEnd, validationStart, validationEnd, testStart); + TrainingDatasetBase trainingDataset = + TrainingDatasetBase.builder() + .featureStore(featureStore) + .validationSize(validationSize) + .testSize(testSize) + .trainStart(trainStart) + .trainEnd(trainEnd) + .validationStart(validationStart) + .validationEnd(validationEnd) + .testStart(testStart) + .testEnd(testEnd) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .trainSplit(Split.TRAIN) + .timeSplitSize(3) + .seed(seed) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + + trainingDataset = featureViewApi.createTrainingData(name, version, trainingDataset, TrainingDatasetBase.class); + featureViewApi.computeTrainingData(featureStore, this, trainingDataset); + return trainingDataset.getVersion(); + } + protected void validateTrainTestSplit(Float testSize, String trainEnd, String testStart) throws FeatureStoreException { if (!((testSize != null && testSize > 0 && testSize < 1) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StreamFeatureGroup.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StreamFeatureGroup.java index c0ad32bb9b..ee081f4ab9 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StreamFeatureGroup.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StreamFeatureGroup.java @@ -60,8 +60,8 @@ public StreamFeatureGroup(FeatureStoreBase featureStore, @NonNull String name, I this.onlineTopicName = onlineTopicName; this.eventTime = eventTime; this.timeTravelFormat = timeTravelFormat; - this.storageConnector = storageConnector; this.onlineConfig = onlineConfig; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); } diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/TrainingDatasetBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/TrainingDatasetBase.java index 625d59977b..6493b04e90 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/TrainingDatasetBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/TrainingDatasetBase.java @@ -115,7 +115,7 @@ public class TrainingDatasetBase { @Getter @Setter - protected StorageConnector storageConnector; + protected DataSource dataSource; @Getter @Setter @@ -138,14 +138,19 @@ public TrainingDatasetBase(Integer version, String description, DataFormat dataF TrainingDatasetType trainingDatasetType, Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, String validationEnd, String testStart, String testEnd, Integer timeSplitSize, - FilterLogic extraFilterLogic, Filter extraFilter) + FilterLogic extraFilterLogic, Filter extraFilter, DataSource dataSource) throws FeatureStoreException, ParseException { this.version = version; this.description = description; this.dataFormat = dataFormat != null ? dataFormat : DataFormat.PARQUET; this.coalesce = coalesce != null ? coalesce : false; - this.location = location; - this.storageConnector = storageConnector; + if (dataSource == null) { + this.dataSource = new DataSource(); + this.dataSource.setStorageConnector(storageConnector); + this.dataSource.setPath(location); + } else { + this.dataSource = dataSource; + } this.trainSplit = trainSplit; this.splits = splits == null ? Lists.newArrayList() : splits; this.seed = seed; @@ -155,7 +160,7 @@ public TrainingDatasetBase(Integer version, String description, DataFormat dataF this.eventStartTime = eventStartTime != null ? FeatureGroupUtils.getDateFromDateString(eventStartTime) : null; this.eventEndTime = eventEndTime != null ? FeatureGroupUtils.getDateFromDateString(eventEndTime) : null; this.trainingDatasetType = trainingDatasetType != null ? trainingDatasetType : - getTrainingDatasetType(storageConnector); + getTrainingDatasetType(dataSource); setValTestSplit(validationSize, testSize); setTimeSeriesSplits(timeSplitSize, trainStart, trainEnd, validationStart, validationEnd, testStart, testEnd); if (extraFilter != null) { @@ -225,10 +230,10 @@ public void setLabel(List label) { this.label = label.stream().map(String::toLowerCase).collect(Collectors.toList()); } - public TrainingDatasetType getTrainingDatasetType(StorageConnector storageConnector) { - if (storageConnector == null) { + public TrainingDatasetType getTrainingDatasetType(DataSource dataSource) { + if (dataSource == null || dataSource.getStorageConnector() == null) { return TrainingDatasetType.HOPSFS_TRAINING_DATASET; - } else if (storageConnector.getStorageConnectorType() == StorageConnectorType.HOPSFS) { + } else if (dataSource.getStorageConnector().getStorageConnectorType() == StorageConnectorType.HOPSFS) { return TrainingDatasetType.HOPSFS_TRAINING_DATASET; } else { return TrainingDatasetType.EXTERNAL_TRAINING_DATASET; diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java index 77b3767e51..9221827585 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java @@ -88,7 +88,6 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int this.description = description; this.primaryKeys = primaryKeys != null ? primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null; - this.storageConnector = storageConnector; this.features = features; this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig(); this.eventTime = eventTime; @@ -97,6 +96,7 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int this.topicName = topicName; this.notificationTopicName = notificationTopicName; this.onlineConfig = onlineConfig; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); this.dataSource.setQuery(query); } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java index 8cf925f0af..cc779357c2 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java @@ -88,7 +88,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureView.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureView.java index 8b33966803..02e8ac70b4 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureView.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureView.java @@ -23,6 +23,7 @@ import com.logicalclocks.hsfs.spark.constructor.Query; import com.logicalclocks.hsfs.spark.engine.FeatureViewEngine; import com.logicalclocks.hsfs.DataFormat; +import com.logicalclocks.hsfs.DataSource; import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.FeatureViewBase; import com.logicalclocks.hsfs.Split; @@ -418,6 +419,8 @@ public Integer createTrainingData( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided `startTime`/`endTime` strings to date types. */ + @Deprecated + @Override public Integer createTrainingData(String startTime, String endTime, String description, DataFormat dataFormat, Boolean coalesce, StorageConnector storageConnector, String location, Long seed, StatisticsConfig statisticsConfig, @@ -441,6 +444,79 @@ public Integer createTrainingData(String startTime, String endTime, String descr return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data can be retrieved by calling `featureView.getTrainingData()`. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset
+   *        String startTime = "20220101000000";
+   *        String endTime = "20220606235959";
+   *        String description = "demo training dataset";
+   *        DataSource dataSource = fs.getDataSource("my_datasource");
+   *        dataSource.setPath("test/path");
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true);
+   *        fv.createTrainingData(startTime, endTime, description, DataFormat.CSV, true, dataSource,
+   *        statisticsConfig, null, null, null);
+   * }
+   * 
+ * + * @param startTime Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param endTime Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided `startTime`/`endTime` date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided `startTime`/`endTime` strings to date types. + */ + @Override + public Integer createTrainingData(String startTime, String endTime, String description, DataFormat dataFormat, + Boolean coalesce, DataSource dataSource, Long seed, + StatisticsConfig statisticsConfig, + Map writeOptions, FilterLogic extraFilterLogic, Filter extraFilter) + throws IOException, FeatureStoreException, ParseException { + TrainingDataset trainingDataset = + this.featureStore + .createTrainingDataset() + .eventStartTime(startTime) + .eventEndTime(endTime) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .seed(seed) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); + } + /** * Create the metadata for a training dataset and save the corresponding training data into `location`. The training * data is split into train and test set at random or according to time ranges. The training data can be retrieved by @@ -590,6 +666,8 @@ public Integer createTrainTestSplit( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided date strings to date types. */ + @Deprecated + @Override public Integer createTrainTestSplit( Float testSize, String trainStart, String trainEnd, String testStart, String testEnd, String description, DataFormat dataFormat, Boolean coalesce, @@ -621,6 +699,118 @@ public Integer createTrainTestSplit( return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data is split into train and test set at random or according to time ranges. The training data can be retrieved by + * calling `featureView.getTrainTestSplit` method. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset based on time split
+   *        String trainStart = "20220101000000";
+   *        String trainEnd = "20220630235959";
+   *        String testStart = "20220701000000";
+   *        String testEnd = "20220830235959";
+   *        String description = "demo training dataset":
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true)
+   *        Map writeOptions = new HashMap() {{
+   *                           put("header", "true");
+   *                           put("delimiter", ",")}
+   *                           };
+   *        // define extra filters
+   *        Filter leftFtFilter = new Filter();
+   *        leftFtFilter.setFeature(new Feature("left_ft_name"));
+   *        leftFtFilter.setValue("400");
+   *        leftFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        Filter rightFtFilter = new Filter();
+   *        rightFtFilter.setFeature(new Feature("right_ft_name"));
+   *        rightFtFilter.setValue("50");
+   *        rightFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        FilterLogic extraFilterLogic = new FilterLogic(SqlFilterLogic.AND, leftFtFilter, rightFtFilter);
+   *        Filter extraFilter = new Filter();
+   *        extraFilter.setFeature(new Feature("ft_name"));
+   *        extraFilter.setValue("100");
+   *        extraFilter.setCondition(SqlFilterCondition.GREATER_THAN);
+   *
+   *        // create training data
+   *        fv.createTrainTestSplit(null, null, trainStart, trainEnd, testStart,
+   *        testEnd,  description, DataFormat.CSV, coalesce, dataSource, seed, statisticsConfig,
+   *        writeOptions, extraFilterLogic, extraFilter);
+   *
+   *        // or based on random split
+   *        fv.createTrainTestSplit(20, 10, null, null,  null, null, description, DataFormat.CSV, coalesce,
+   *        dataSource, seed, statisticsConfig, writeOptions, extraFilterLogic, extraFilter);
+
+   * }
+   * 
+ * + * @param testSize Size of test set. + * @param trainStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param trainEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided date strings to date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided date strings to date types. + */ + @Override + public Integer createTrainTestSplit( + Float testSize, String trainStart, String trainEnd, String testStart, String testEnd, + String description, DataFormat dataFormat, Boolean coalesce, DataSource dataSource, + Long seed, StatisticsConfig statisticsConfig, Map writeOptions, + FilterLogic extraFilterLogic, Filter extraFilter + ) throws IOException, FeatureStoreException, ParseException { + validateTrainTestSplit(testSize, trainEnd, testStart); + TrainingDataset trainingDataset = + this.featureStore + .createTrainingDataset() + .testSize(testSize) + .trainStart(trainStart) + .trainEnd(trainEnd) + .testStart(testStart) + .testEnd(testEnd) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .trainSplit(Split.TRAIN) + .seed(seed) + .timeSplitSize(2) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); + } + /** * Create the metadata for a training dataset and save the corresponding training data into `location`. The training * data is split into train, validation, and test set at random or according to time range. The training data can be @@ -790,6 +980,8 @@ public Integer createTrainValidationTestSplit( * @throws IOException Generic IO exception. * @throws ParseException In case it's unable to parse provided date strings to date types. */ + @Deprecated + @Override public Integer createTrainValidationTestSplit( Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, String validationEnd, String testStart, String testEnd, String description, DataFormat dataFormat, @@ -824,6 +1016,131 @@ public Integer createTrainValidationTestSplit( return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); } + /** + * Create the metadata for a training dataset and save the corresponding training data into `location`. The training + * data is split into train, validation, and test set at random or according to time range. The training data can be + * retrieved by calling `feature_view.getTrainValidationTestSplit`. + * + *
+   * {@code
+   *        // get feature store handle
+   *        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
+   *        // get feature view handle
+   *        FeatureView fv = fs.getFeatureView("fv_name", 1);
+   *        // create training dataset based on time split
+   *        String trainStart = "20220101000000";
+   *        String trainEnd = "20220630235959";
+   *        String validationStart = "20220701000000";
+   *        String validationEnd = "20220830235959";
+   *        String testStart = "20220901000000";
+   *        String testEnd = "20220931235959";
+   *        String description = "demo training dataset";
+   *        DataSource dataSource = fs.getDataSource("my_datasource");
+   *        dataSource.setPath("test/path");
+   *        Long seed = 1234L;
+   *        Boolean coalesce = true;
+   *        StatisticsConfig statisticsConfig = new StatisticsConfig(true, true, true, true);
+   *        Map writeOptions = new HashMap() {{
+   *                           put("header", "true");
+   *                           put("delimiter", ",")}
+   *                           };
+   *        // define extra filters
+   *        Filter leftFtFilter = new Filter();
+   *        leftFtFilter.setFeature(new Feature("left_ft_name"));
+   *        leftFtFilter.setValue("400");
+   *        leftFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        Filter rightFtFilter = new Filter();
+   *        rightFtFilter.setFeature(new Feature("right_ft_name"));
+   *        rightFtFilter.setValue("50");
+   *        rightFtFilter.setCondition(SqlFilterCondition.EQUALS);
+   *        FilterLogic extraFilterLogic = new FilterLogic(SqlFilterLogic.AND, leftFtFilter, rightFtFilter);
+   *        Filter extraFilter = new Filter();
+   *        extraFilter.setFeature(new Feature("ft_name"));
+   *        extraFilter.setValue("100");
+   *        extraFilter.setCondition(SqlFilterCondition.GREATER_THAN);
+   *        // create training data
+   *        fv.createTrainTestSplit(null, null, trainStart, trainEnd, validationStart, validationEnd, testStart,
+   *        testEnd,  description, DataFormat.CSV, coalesce, dataSource, seed, statisticsConfig,
+   *        writeOptions, extraFilterLogic, extraFilter);
+   *
+   *        // or based on random split
+   *        fv.createTrainTestSplit(20, 10, null, null, null, null, null, null, description, DataFormat.CSV, coalesce,
+   *        dataSource, seed, statisticsConfig, writeOptions, extraFilterLogic, extraFilter);
+   * }
+   * 
+ * + * @param validationSize Size of validation set. + * @param testSize Size of test set. + * @param trainStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param trainEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param validationStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param validationEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testStart Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param testEnd Datetime string. The String should be formatted in one of the following formats `yyyyMMdd`, + * `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`. + * @param description A string describing the contents of the training dataset to improve discoverability for + * Data Scientists. + * @param dataFormat The data format used to save the training dataset. + * @param coalesce If true the training dataset data will be coalesced into a single partition before writing. + * The resulting training dataset will be a single file per split. + * @param dataSource Data source defining the sink location for the training dataset. If `null` is + * provided and materializes training dataset on HopsFS. + * @param seed Define a seed to create the random splits with, in order to guarantee reproducability, + * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for + * this feature group, `"correlations`" to turn on feature correlation computation, + * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute + * uniqueness, distinctness and entropy. The values should be booleans indicating the + * setting. To fully turn off statistics computation pass `statisticsConfig=null`. + * @param writeOptions Additional write options as key-value pairs. + * @param extraFilterLogic Additional filters (set of Filter objects) to be attached to the training dataset. + * The filters will be also applied in `getBatchData`. + * @param extraFilter Additional filter to be attached to the training dataset. The filter will be also applied + * in `getBatchData`. + * @return Integer Training dataset version. + * @throws FeatureStoreException If Client is not connected to Hopsworks and/or unable to identify format of the + * provided date strings to date formats. + * @throws IOException Generic IO exception. + * @throws ParseException In case it's unable to parse provided date strings to date types. + */ + @Override + public Integer createTrainValidationTestSplit( + Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, + String validationEnd, String testStart, String testEnd, String description, DataFormat dataFormat, + Boolean coalesce, DataSource dataSource, + Long seed, StatisticsConfig statisticsConfig, Map writeOptions, + FilterLogic extraFilterLogic, Filter extraFilter + ) throws IOException, FeatureStoreException, ParseException { + validateTrainValidationTestSplit(validationSize, testSize, trainEnd, validationStart, validationEnd, testStart); + TrainingDataset trainingDataset = + this.featureStore + .createTrainingDataset() + .validationSize(validationSize) + .testSize(testSize) + .trainStart(trainStart) + .trainEnd(trainEnd) + .validationStart(validationStart) + .validationEnd(validationEnd) + .testStart(testStart) + .testEnd(testEnd) + .description(description) + .dataFormat(dataFormat) + .coalesce(coalesce) + .dataSource(dataSource) + .trainSplit(Split.TRAIN) + .timeSplitSize(3) + .seed(seed) + .statisticsConfig(statisticsConfig) + .extraFilterLogic(extraFilterLogic) + .extraFilter(extraFilter) + .build(); + return featureViewEngine.createTrainingDataset(this, trainingDataset, writeOptions).getVersion(); + } + private List> getDataset(TrainingDatasetBundle trainingDatasetBundle, List splits) { List> features = Lists.newArrayList(); List> labels = Lists.newArrayList(); diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java index 16abec7293..30c7f8f21c 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java @@ -85,7 +85,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; + this.dataSource.setStorageConnector(storageConnector); this.dataSource.setPath(path); } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/TrainingDataset.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/TrainingDataset.java index 00eb26e75f..299aea2e68 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/TrainingDataset.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/TrainingDataset.java @@ -20,6 +20,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.logicalclocks.hsfs.DataFormat; +import com.logicalclocks.hsfs.DataSource; import com.logicalclocks.hsfs.FeatureStoreBase; import com.logicalclocks.hsfs.Split; import com.logicalclocks.hsfs.StatisticsConfig; @@ -60,14 +61,19 @@ public TrainingDataset(Integer version, String description, DataFormat dataForma TrainingDatasetType trainingDatasetType, Float validationSize, Float testSize, String trainStart, String trainEnd, String validationStart, String validationEnd, String testStart, String testEnd, Integer timeSplitSize, - FilterLogic extraFilterLogic, Filter extraFilter) + FilterLogic extraFilterLogic, Filter extraFilter, DataSource dataSource) throws FeatureStoreException, ParseException { this.version = version; this.description = description; this.dataFormat = dataFormat != null ? dataFormat : DataFormat.PARQUET; this.coalesce = coalesce != null ? coalesce : false; - this.location = location; - this.storageConnector = storageConnector; + if (dataSource == null) { + this.dataSource = new DataSource(); + this.dataSource.setStorageConnector(storageConnector); + this.dataSource.setPath(location); + } else { + this.dataSource = dataSource; + } this.trainSplit = trainSplit; this.splits = splits == null ? Lists.newArrayList() : splits; this.seed = seed; @@ -77,7 +83,7 @@ public TrainingDataset(Integer version, String description, DataFormat dataForma this.eventStartTime = eventStartTime != null ? FeatureGroupUtils.getDateFromDateString(eventStartTime) : null; this.eventEndTime = eventEndTime != null ? FeatureGroupUtils.getDateFromDateString(eventEndTime) : null; this.trainingDatasetType = trainingDatasetType != null ? trainingDatasetType : - getTrainingDatasetType(storageConnector); + getTrainingDatasetType(dataSource); setValTestSplit(validationSize, testSize); setTimeSeriesSplits(timeSplitSize, trainStart, trainEnd, validationStart, validationEnd, testStart, testEnd); if (extraFilter != null) { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index 369966df76..594d8bc251 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -215,10 +215,10 @@ public Dataset sql(String query) { public Dataset registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup, String alias) throws FeatureStoreException, IOException { DataSource dataSource = onDemandFeatureGroup.getDataSource(); - dataSource.setPath(onDemandFeatureGroup.getStorageConnector().getPath( - onDemandFeatureGroup.getDataSource().getPath())); + dataSource.setPath(dataSource.getStorageConnector().getPath( + dataSource.getPath())); - Dataset dataset = storageConnectorUtils.read(onDemandFeatureGroup.getStorageConnector(), + Dataset dataset = storageConnectorUtils.read(dataSource.getStorageConnector(), dataSource, onDemandFeatureGroup.getDataFormat() != null ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup)); @@ -280,7 +280,7 @@ public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map< public Dataset[] write(TrainingDataset trainingDataset, Query query, Map queryReadOptions, Map writeOptions, SaveMode saveMode) throws FeatureStoreException, IOException { - setupConnectorHadoopConf(trainingDataset.getStorageConnector()); + setupConnectorHadoopConf(trainingDataset.getDataSource().getStorageConnector()); if (trainingDataset.getSplits() == null || trainingDataset.getSplits().isEmpty()) { // Write a single dataset diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/TrainingDatasetEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/TrainingDatasetEngine.java index f2bd1fd3b3..7aa34a61d3 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/TrainingDatasetEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/TrainingDatasetEngine.java @@ -69,7 +69,7 @@ public TrainingDataset save(TrainingDataset trainingDataset, Query query, trainingDataset.setLocation(apiTD.getLocation()); trainingDataset.setVersion(apiTD.getVersion()); trainingDataset.setId(apiTD.getId()); - trainingDataset.setStorageConnector(apiTD.getStorageConnector()); + trainingDataset.setDataSource(apiTD.getDataSource()); // Build write options map Map writeOptions = @@ -93,7 +93,7 @@ public Dataset read(TrainingDataset trainingDataset, String split, Map DataSource: + cls, json_dict: Dict[str, Any], storage_connector: Optional[sc.StorageConnector] = None + ) -> "DataSource": + """ + Create a DataSource object (or list of objects) from a JSON response. + + Args: + json_dict (Dict[str, Any]): The JSON dictionary from the API response. + storage_connector (Optional[sc.StorageConnector]): The storage connector object. + + Returns: + DataSource or List[DataSource] or None: The created object(s), or None if input is None. + """ if json_dict is None: return None json_decamelized: dict = humps.decamelize(json_dict) if "items" not in json_decamelized: - return cls(**json_decamelized) + data_source = cls(**json_decamelized) + if storage_connector is not None: + data_source.storage_connector = storage_connector + return data_source else: return [ - cls(**item) + DataSource.from_response_json(item, storage_connector) for item in json_decamelized["items"] ] def to_dict(self): - return { + """ + Convert the DataSource object to a dictionary. + + Returns: + dict: Dictionary representation of the object. + """ + ds_meta_dict = { "query": self._query, "database": self._database, "group": self._group, "table": self._table, "path": self._path } + if self._storage_connector: + ds_meta_dict["storageConnector"] = self._storage_connector.to_dict() + return ds_meta_dict def json(self): + """ + Serialize the DataSource object to a JSON string. + + Returns: + str: JSON string representation of the object. + """ return json.dumps(self, cls=util.Encoder) @property def query(self) -> Optional[str]: + """ + Get or set the SQL query string for the data source. + + Returns: + Optional[str]: The SQL query string. + """ return self._query @query.setter @@ -85,6 +156,12 @@ def query(self, query: str) -> None: @property def database(self) -> Optional[str]: + """ + Get or set the database name for the data source. + + Returns: + Optional[str]: The database name. + """ return self._database @database.setter @@ -93,6 +170,12 @@ def database(self, database: str) -> None: @property def group(self) -> Optional[str]: + """ + Get or set the group/schema name for the data source. + + Returns: + Optional[str]: The group or schema name. + """ return self._group @group.setter @@ -101,6 +184,12 @@ def group(self, group: str) -> None: @property def table(self) -> Optional[str]: + """ + Get or set the table name for the data source. + + Returns: + Optional[str]: The table name. + """ return self._table @table.setter @@ -109,8 +198,158 @@ def table(self, table: str) -> None: @property def path(self) -> Optional[str]: + """ + Get or set the file system path for the data source. + + Returns: + Optional[str]: The file system path. + """ return self._path @path.setter def path(self, path: str) -> None: self._path = path + + @property + def storage_connector(self) -> Optional[sc.StorageConnector]: + """ + Get or set the storage connector for the data source. + + Returns: + Optional[StorageConnector]: The storage connector object. + """ + return self._storage_connector + + @storage_connector.setter + def storage_connector(self, storage_connector: sc.StorageConnector) -> None: + self._storage_connector = storage_connector + + def get_databases(self) -> list[str]: + """ + Retrieve the list of available databases. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + data_source = fs.get_data_source("test_data_source") + + databases = data_source.get_databases() + ``` + + Returns: + list[str]: A list of database names available in the data source. + """ + return self._storage_connector.get_databases() + + def get_tables(self, database: str = None) -> list[DataSource]: + """ + Retrieve the list of tables from the specified database. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + data_source = fs.get_data_source("test_data_source") + + tables = data_source.get_tables() + ``` + + Args: + database (str, optional): The name of the database to list tables from. + If not provided, the default database is used. + + Returns: + list[DataSource]: A list of DataSource objects representing the tables. + """ + return self._storage_connector.get_tables(database) + + def get_data(self) -> dsd.DataSourceData: + """ + Retrieve the data from the data source. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + table = fs.get_data_source("test_data_source").get_tables()[0] + + data = table.get_data() + ``` + + Returns: + DataSourceData: An object containing the data retrieved from the data source. + """ + return self._storage_connector.get_data(self) + + def get_metadata(self) -> dict: + """ + Retrieve metadata information about the data source. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + table = fs.get_data_source("test_data_source").get_tables()[0] + + metadata = table.get_metadata() + ``` + + Returns: + dict: A dictionary containing metadata about the data source. + """ + return self._storage_connector.get_metadata(self) + + def get_feature_groups_provenance(self): + """Get the generated feature groups using this data source, based on explicit + provenance. These feature groups can be accessible or inaccessible. Explicit + provenance does not track deleted generated feature group links, so deleted + will always be empty. + For inaccessible feature groups, only a minimal information is returned. + + # Returns + `Links`: the feature groups generated using this data source or `None` if none were created + + # Raises + `hopsworks.client.exceptions.RestAPIError`: In case the backend encounters an issue + """ + return self._storage_connector.get_feature_groups_provenance() + + def get_feature_groups(self): + """Get the feature groups using this data source, based on explicit + provenance. Only the accessible feature groups are returned. + For more items use the base method - get_feature_groups_provenance + + # Returns + `List[FeatureGroup]`: List of feature groups. + """ + return self._storage_connector.get_feature_groups() + + def get_training_datasets_provenance(self): + """Get the generated training datasets using this data source, based on explicit + provenance. These training datasets can be accessible or inaccessible. Explicit + provenance does not track deleted generated training dataset links, so deleted + will always be empty. + For inaccessible training datasets, only a minimal information is returned. + + # Returns + `Links`: the training datasets generated using this data source or `None` if none were created + + # Raises + `hopsworks.client.exceptions.RestAPIError`: In case the backend encounters an issue + """ + return self._storage_connector.get_training_datasets_provenance() + + def get_training_datasets(self): + """Get the training datasets using this data source, based on explicit + provenance. Only the accessible training datasets are returned. + For more items use the base method - get_training_datasets_provenance + + # Returns + `List[TrainingDataset]`: List of training datasets. + """ + return self._storage_connector.get_training_datasets() diff --git a/python/hsfs/core/data_source_api.py b/python/hsfs/core/data_source_api.py index e8472694ad..c654f71c32 100644 --- a/python/hsfs/core/data_source_api.py +++ b/python/hsfs/core/data_source_api.py @@ -16,36 +16,37 @@ from __future__ import annotations from hopsworks_common import client +from hsfs import storage_connector as sc from hsfs.core import data_source as ds from hsfs.core import data_source_data as dsd class DataSourceApi: - def get_databases(self, feature_store_id: int, name: str) -> list[str]: + def get_databases(self, storage_connector: sc.StorageConnector) -> list[str]: _client = client.get_instance() path_params = [ "project", _client._project_id, "featurestores", - feature_store_id, + storage_connector._featurestore_id, "storageconnectors", - name, + storage_connector._name, "data_source", "databases", ] return _client._send_request("GET", path_params) - def get_tables(self, feature_store_id: int, name: str, database: str) -> list[ds.DataSource]: + def get_tables(self, storage_connector: sc.StorageConnector, database: str) -> list[ds.DataSource]: _client = client.get_instance() path_params = [ "project", _client._project_id, "featurestores", - feature_store_id, + storage_connector._featurestore_id, "storageconnectors", - name, + storage_connector._name, "data_source", "tables", ] @@ -53,18 +54,18 @@ def get_tables(self, feature_store_id: int, name: str, database: str) -> list[ds query_params = {"database": database} return ds.DataSource.from_response_json( - _client._send_request("GET", path_params, query_params) + _client._send_request("GET", path_params, query_params), storage_connector=storage_connector ) - def get_data(self, feature_store_id: int, name: str, data_source: ds.DataSource) -> dsd.DataSourceData: + def get_data(self, data_source: ds.DataSource) -> dsd.DataSourceData: _client = client.get_instance() path_params = [ "project", _client._project_id, "featurestores", - feature_store_id, + data_source._storage_connector._featurestore_id, "storageconnectors", - name, + data_source._storage_connector._name, "data_source", "data", ] @@ -76,15 +77,15 @@ def get_data(self, feature_store_id: int, name: str, data_source: ds.DataSource) ) - def get_metadata(self, feature_store_id: int, name: str, data_source: ds.DataSource) -> dict: + def get_metadata(self, data_source: ds.DataSource) -> dict: _client = client.get_instance() path_params = [ "project", _client._project_id, "featurestores", - feature_store_id, + data_source._storage_connector._featurestore_id, "storageconnectors", - name, + data_source._storage_connector._name, "data_source", "metadata", ] diff --git a/python/hsfs/core/explicit_provenance.py b/python/hsfs/core/explicit_provenance.py index f7343f65a1..1db88e5020 100644 --- a/python/hsfs/core/explicit_provenance.py +++ b/python/hsfs/core/explicit_provenance.py @@ -187,6 +187,7 @@ class Type(Enum): FEATURE_VIEW = 2 MODEL = 3 STORAGE_CONNECTOR = 4 + TRAINING_DATASET = 5 def __str__(self, indent=None): return json.dumps(self, cls=ProvenanceEncoder, indent=indent) @@ -265,6 +266,27 @@ def __parse_feature_views(links_json: dict, artifacts: Set[str]): ) return links + @staticmethod + def __parse_training_datasets(links_json: dict, artifacts: Set[str]): + links = Links() + for link_json in links_json: + if link_json["node"]["artifact_type"] in artifacts: + if link_json["node"].get("exception_cause") is not None: + links._faulty.append(Artifact.from_response_json(link_json["node"])) + elif bool(link_json["node"]["accessible"]): + links.accessible.append( + training_dataset.TrainingDataset.from_response_json( + link_json["node"]["artifact"] + ) + ) + elif bool(link_json["node"]["deleted"]): + links.deleted.append(Artifact.from_response_json(link_json["node"])) + else: + links.inaccessible.append( + Artifact.from_response_json(link_json["node"]) + ) + return links + @staticmethod def __parse_models( links_json: dict, training_dataset_version: Optional[int] = None @@ -393,6 +415,10 @@ def from_response_json( return Links.__parse_feature_views( links_json["downstream"], {"FEATURE_VIEW"} ) + elif artifact == Links.Type.TRAINING_DATASET: + return Links.__parse_training_datasets( + links_json["downstream"], {"TRAINING_DATASET"} + ) else: return Links() diff --git a/python/hsfs/core/external_feature_group_engine.py b/python/hsfs/core/external_feature_group_engine.py index a66975f283..fb8485c4ce 100644 --- a/python/hsfs/core/external_feature_group_engine.py +++ b/python/hsfs/core/external_feature_group_engine.py @@ -18,29 +18,44 @@ DataValidationException, FeatureStoreException, ) -from hsfs import engine, util +from hsfs import engine, feature, util from hsfs import feature_group as fg from hsfs.core import feature_group_base_engine class ExternalFeatureGroupEngine(feature_group_base_engine.FeatureGroupBaseEngine): def save(self, feature_group): - if feature_group.features is None or len(feature_group.features) == 0: - # If the user didn't specify the schema, parse it from the query - external_dataset = engine.get_instance().register_external_temporary_table( - feature_group, "read_ondmd" + if not feature_group.data_source: + raise FeatureStoreException( + "A data source needs to be provided when creating an external feature group." ) - # if python engine user should pass features as we do not parse it in this case - if external_dataset is None: - raise FeatureStoreException( - "Features (schema) need to be set for creation of external feature groups with engine " - + engine.get_type() - + ". Alternatively use Spark kernel." - ) - feature_group._features = engine.get_instance().parse_schema_feature_group( - external_dataset - ) + if feature_group.features is None or len(feature_group.features) == 0: + if ( + feature_group.data_source.database and + feature_group.data_source.group and + feature_group.data_source.table) or feature_group.data_source.query: + # If the user provided a data source, we can use it to infer the schema + feature_group._features = [ + feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat + for feat in (feature_group.data_source.get_data().features or []) + ] + else: + # If the user didn't specify the schema, parse it from the query + external_dataset = engine.get_instance().register_external_temporary_table( + feature_group, "read_ondmd" + ) + # if python engine user should pass features as we do not parse it in this case + if external_dataset is None: + raise FeatureStoreException( + "Features (schema) need to be set for creation of external feature groups with engine " + + engine.get_type() + + ". Alternatively use Spark kernel." + ) + + feature_group._features = engine.get_instance().parse_schema_feature_group( + external_dataset + ) # set primary, foreign and partition key columns # we should move this to the backend diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index d4b7259c6b..dddfcbc89d 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -683,7 +683,7 @@ def _read_dir_from_storage_connector( dataframe_type, ): try: - df = training_data_obj.storage_connector.read( + df = training_data_obj.data_source.storage_connector.read( # always read from materialized dataset, not query object query=None, data_format=training_data_obj.data_format, diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 6aafdd22b5..ff5c38431d 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -172,7 +172,7 @@ def _setup_hudi_write_opts(self, operation, write_options): ) # dont enable hive sync when using managed FG - hive_sync = self._feature_group.storage_connector is None + hive_sync = self._feature_group.data_source.storage_connector is None hudi_options = { self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL, diff --git a/python/hsfs/core/storage_connector_api.py b/python/hsfs/core/storage_connector_api.py index 81797f67d4..dac0daf2e3 100644 --- a/python/hsfs/core/storage_connector_api.py +++ b/python/hsfs/core/storage_connector_api.py @@ -15,12 +15,16 @@ # from __future__ import annotations -from typing import Any, Dict +from typing import TYPE_CHECKING, Any, Dict from hopsworks_common import client from hsfs import decorators, storage_connector +if TYPE_CHECKING: + from hsfs.core.explicit_provenance import Links + + class StorageConnectorApi: @decorators.catch_not_found( "hsfs.storage_connector.StorageConnector", fallback_return=None @@ -108,7 +112,7 @@ def get_kafka_connector( _client._send_request("GET", path_params, query_params=query_params) ) - def get_feature_groups_provenance(self, storage_connector_instance): + def get_feature_groups_provenance(self, storage_connector_instance) -> "Links": """Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted @@ -139,10 +143,49 @@ def get_feature_groups_provenance(self, storage_connector_instance): "downstreamLvls": 1, } links_json = _client._send_request("GET", path_params, query_params) - from hsfs.core import explicit_provenance + from hsfs.core import explicit_provenance return explicit_provenance.Links.from_response_json( links_json, explicit_provenance.Links.Direction.DOWNSTREAM, explicit_provenance.Links.Type.FEATURE_GROUP, ) + + def get_training_datasets_provenance(self, storage_connector_instance) -> "Links": + """Get the generated training datasets using this storage connector, based on explicit + provenance. These training datasets can be accessible or inaccessible. Explicit + provenance does not track deleted generated training dataset links, so deleted + will always be empty. + For inaccessible training datasets, only a minimal information is returned. + + # Arguments + storage_connector_instance: Metadata object of storage connector. + + # Returns + `ExplicitProvenance.Links`: the training datasets generated using this + storage connector + """ + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "featurestores", + storage_connector_instance._featurestore_id, + "storageconnectors", + storage_connector_instance.name, + "provenance", + "links", + ] + query_params = { + "expand": "provenance_artifacts", + "upstreamLvls": 0, + "downstreamLvls": 1, + } + links_json = _client._send_request("GET", path_params, query_params) + + from hsfs.core import explicit_provenance + return explicit_provenance.Links.from_response_json( + links_json, + explicit_provenance.Links.Direction.DOWNSTREAM, + explicit_provenance.Links.Type.TRAINING_DATASET, + ) diff --git a/python/hsfs/core/training_dataset_engine.py b/python/hsfs/core/training_dataset_engine.py index 34907ce3ca..d15783dff2 100644 --- a/python/hsfs/core/training_dataset_engine.py +++ b/python/hsfs/core/training_dataset_engine.py @@ -89,7 +89,7 @@ def read(self, training_dataset, split, user_read_options): else: path = training_dataset.location + "/" + training_dataset.name - return training_dataset.storage_connector.read( + return training_dataset.data_source.storage_connector.read( # always read from materialized dataset, not query object query=None, data_format=training_dataset.data_format, diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index d9e0339383..1b29ba2d7c 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -193,11 +193,11 @@ def set_job_group(self, group_id, description): def register_external_temporary_table(self, external_fg, alias): if not isinstance(external_fg, fg_mod.SpineGroup): - external_dataset = external_fg.storage_connector.read( + external_dataset = external_fg.data_source.storage_connector.read( external_fg.data_source.query, external_fg.data_format, external_fg.options, - external_fg.storage_connector._get_path( + external_fg.data_source.storage_connector._get_path( external_fg.data_source.path ), # cant rely on location since this method can be used before FG is saved ) @@ -795,7 +795,7 @@ def write_training_dataset( return self._write_training_dataset_single( feature_view_obj.transformation_functions, dataset, - training_dataset.storage_connector, + training_dataset.data_source.storage_connector, training_dataset.data_format, write_options, save_mode, @@ -1012,7 +1012,7 @@ def _write_training_dataset_splits( feature_dataframes[split_name] = self._write_training_dataset_single( transformation_functions, feature_dataframe, - training_dataset.storage_connector, + training_dataset.data_source.storage_connector, training_dataset.data_format, write_options, save_mode, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index b741b6bde3..8316b3e34b 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -149,7 +149,6 @@ def __init__( Dict[str, Any], ] ] = None, - storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, ttl: Optional[Union[int, float, timedelta]] = None, ttl_enabled: Optional[bool] = None, online_disk: Optional[bool] = None, @@ -173,7 +172,6 @@ def __init__( deprecated: Whether this feature group is deprecated online_config: Configuration for online serving data_source: Data source configuration - storage_connector: Storage connector configuration ttl: Time-to-live (TTL) configuration for this feature group ttl_enabled: Whether to enable time-to-live (TTL) for this feature group. Defaults to True if ttl is set. online_disk: Whether to enable online disk storage for this feature group. Overrides online_config.table_space. @@ -198,12 +196,6 @@ def __init__( self.ttl = ttl self._ttl_enabled = ttl_enabled if ttl_enabled is not None else ttl is not None - if storage_connector is not None and isinstance(storage_connector, dict): - self._storage_connector = sc.StorageConnector.from_response_json( - storage_connector - ) - else: - self._storage_connector: "sc.StorageConnector" = storage_connector self._online_config = ( OnlineConfig.from_response_json(online_config) if isinstance(online_config, dict) @@ -757,9 +749,27 @@ def get_storage_connector_provenance(self) -> Optional[explicit_provenance.Links For deleted and inaccessible storage connector, only minimal information is returned. + !!! warning "Deprecated" + `get_storage_connector_provenance` method is deprecated. Use `get_data_source_provenance` instead. + # Returns `Links`: the storage connector used to generate this feature group or `None` if it does not exist. + # Raises + `hopsworks.client.exceptions.RestAPIError`: If the backend encounters an error when handling the request + """ + return self.get_data_source_provenance() + + def get_data_source_provenance(self) -> Optional[explicit_provenance.Links]: + """Get the parents of this feature group, based on explicit provenance. + Parents are data sources. These data sources can be accessible, + deleted or inaccessible. + For deleted and inaccessible data sources, only minimal information is + returned. + + # Returns + `Links`: the data source used to generate this feature group or `None` if it does not exist. + # Raises `hopsworks.client.exceptions.RestAPIError`: If the backend encounters an error when handling the request """ @@ -770,24 +780,40 @@ def get_storage_connector(self) -> Optional["sc.StorageConnector"]: provenance. Only the accessible storage connector is returned. For more items use the base method - get_storage_connector_provenance + !!! warning "Deprecated" + `get_storage_connector` method is deprecated. Use `get_data_source` instead. + # Returns `StorageConnector`: Storage connector or `None` if it does not exist. # Raises `hopsworks.client.exceptions.RestAPIError`: If the backend encounters an error when handling the request """ - storage_connector_provenance = self.get_storage_connector_provenance() + return self.get_data_source() - if storage_connector_provenance and ( - storage_connector_provenance.inaccessible - or storage_connector_provenance.deleted + def get_data_source(self) -> Optional["ds.DataSource"]: + """Get the data source using this feature group, based on explicit + provenance. Only the accessible data source is returned. + For more items use the base method - get_data_source_provenance + + # Returns + `DataSource`: Data source or `None` if it does not exist. + + # Raises + `hopsworks.client.exceptions.RestAPIError`: If the backend encounters an error when handling the request + """ + data_source_provenance = self.get_storage_connector_provenance() + + if data_source_provenance and ( + data_source_provenance.inaccessible + or data_source_provenance.deleted ): _logger.info( - "The parent storage connector is deleted or inaccessible. For more details access `get_storage_connector_provenance`" + "The parent data source is deleted or inaccessible. For more details access `get_data_source_provenance`" ) - if storage_connector_provenance and storage_connector_provenance.accessible: - return storage_connector_provenance.accessible[0] + if data_source_provenance and data_source_provenance.accessible: + return data_source_provenance.accessible[0] else: return None @@ -2217,6 +2243,7 @@ def event_time(self, feature_name: Optional[str]) -> None: @property def location(self) -> Optional[str]: + """Storage specific location. Including data source path if specified.""" return self._location @property @@ -2280,12 +2307,21 @@ def online_enabled(self, online_enabled: bool) -> None: @property def storage_connector(self) -> "sc.StorageConnector": - return self._storage_connector + """" + !!! warning "Deprecated" + `storage_connector` method is deprecated. Use + `data_source` instead. + """ + return self._data_source.storage_connector + + @property + def data_source(self) -> "ds.DataSource": + return self._data_source def prepare_spark_location(self) -> str: location = self.location - if self.storage_connector is not None: - location = self.storage_connector.prepare_spark(location) + if self.data_source is not None and self.data_source.storage_connector: + location = self.data_source.storage_connector.prepare_spark(location) return location @property @@ -2315,10 +2351,6 @@ def deprecated(self) -> bool: def deprecated(self, deprecated: bool) -> None: self._deprecated = deprecated - @property - def data_source(self) -> Optional[ds.DataSource]: - return self._data_source - @property def subject(self) -> Dict[str, Any]: """Subject of the feature group.""" @@ -2601,7 +2633,6 @@ def __init__( ] ] = None, offline_backfill_every_hr: Optional[Union[str, int]] = None, - storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, data_source: Optional[ Union[ ds.DataSource, @@ -2628,7 +2659,6 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, - storage_connector=storage_connector, data_source=data_source, ttl=ttl, ttl_enabled=ttl_enabled, @@ -4015,8 +4045,6 @@ def to_dict(self) -> Dict[str, Any]: fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict() if self._stream: fg_meta_dict["deltaStreamerJobConf"] = self._deltastreamer_jobconf - if self._storage_connector: - fg_meta_dict["storageConnector"] = self._storage_connector.to_dict() return fg_meta_dict def _get_table_name(self) -> str: @@ -4200,7 +4228,6 @@ class ExternalFeatureGroup(FeatureGroupBase): def __init__( self, - storage_connector: Union[sc.StorageConnector, Dict[str, Any]], data_format: Optional[str] = None, options: Optional[Dict[str, Any]] = None, name: Optional[str] = None, @@ -4264,7 +4291,6 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, - storage_connector=storage_connector, data_source=data_source, ttl=ttl, ttl_enabled=ttl_enabled, @@ -4327,7 +4353,7 @@ def save(self) -> None: version=1, description="Physical shop sales features", query=query, - storage_connector=connector, + data_source=ds, primary_key=['ss_store_sk'], event_time='sale_date' ) @@ -4676,7 +4702,6 @@ def to_dict(self) -> Dict[str, Any]: "options": [{"name": k, "value": v} for k, v in self._options.items()] if self._options else None, - "storageConnector": self._storage_connector.to_dict(), "type": "onDemandFeaturegroupDTO", "statisticsConfig": self._statistics_config, "eventTime": self._event_time, @@ -4738,9 +4763,6 @@ class SpineGroup(FeatureGroupBase): def __init__( self, - storage_connector: Optional[ - Union["sc.StorageConnector", Dict[str, Any]] - ] = None, query: Optional[str] = None, data_format: Optional[str] = None, options: Dict[str, Any] = None, diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 3cf1c8f5f8..bdbc341fd8 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -436,6 +436,9 @@ def get_storage_connector(self, name: str) -> storage_connector.StorageConnector `get_online_storage_connector` method to get the JDBC connector for the Online Feature Store. + !!! warning "Deprecated" + `get_storage_connector` method is deprecated. Use `get_data_source` instead. + !!! example ```python # connect to the Feature Store @@ -450,7 +453,34 @@ def get_storage_connector(self, name: str) -> storage_connector.StorageConnector # Returns `StorageConnector`. Storage connector object. """ - return self._storage_connector_api.get(self._id, name) + return self.get_data_source(name).storage_connector + + @usage.method_logger + def get_data_source(self, name: str) -> ds.DataSource: + """Get a data source from the feature store. + + Data sources encapsulate all information needed for the execution engine + to read and write to specific storage. + + If you want to connect to the online feature store, see the + `get_online_data_source` method to get the JDBC connector for the Online + Feature Store. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + data_source = fs.get_data_source("test_data_source") + ``` + + # Arguments + name: Name of the data source to retrieve. + + # Returns + `DataSource`. Data source object. + """ + return ds.DataSource(storage_connector=self._storage_connector_api.get(self._id, name)) def sql( self, @@ -501,6 +531,9 @@ def get_online_storage_connector(self) -> storage_connector.StorageConnector: The returned storage connector depends on the project that you are connected to. + !!! warning "Deprecated" + `get_online_storage_connector` method is deprecated. Use `get_online_data_source` instead. + !!! example ```python # connect to the Feature Store @@ -512,7 +545,27 @@ def get_online_storage_connector(self) -> storage_connector.StorageConnector: # Returns `StorageConnector`. JDBC storage connector to the Online Feature Store. """ - return self._storage_connector_api.get_online_connector(self._id) + return self.get_online_data_source().storage_connector + + @usage.method_logger + def get_online_data_source(self) -> ds.DataSource: + """Get the data source for the Online Feature Store of the respective + project's feature store. + + The returned data source depends on the project that you are connected to. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + online_data_source = fs.get_online_data_source() + ``` + + # Returns + `DataSource`. JDBC data source to the Online Feature Store. + """ + return ds.DataSource(storage_connector=self._storage_connector_api.get_online_connector(self._id)) @usage.method_logger def create_feature_group( @@ -672,10 +725,10 @@ def plus_two(value): or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no scheduling). storage_connector: the storage connector used to establish connectivity - with the data source. + with the data source. **[DEPRECATED: Use `data_source` instead.]** path: The location within the scope of the storage connector, from where to read - the data for the external feature group - data_source: The data source specifying the location of the data. Overrides the path and query arguments when specified. + the data for the external feature group. **[DEPRECATED: Use `data_source` instead.]** + data_source: The data source specifying the location of the data. Overrides the storage_connector and path arguments when specified. ttl: Optional time-to-live duration for features in this group. Can be specified as: - An integer or float representing seconds @@ -690,7 +743,7 @@ def plus_two(value): `FeatureGroup`. The feature group metadata object. """ if not data_source: - data_source = ds.DataSource(path=path) + data_source = ds.DataSource(storage_connector=storage_connector, path=path) feature_group_object = feature_group.FeatureGroup( name=name, version=version, @@ -715,7 +768,6 @@ def plus_two(value): transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, - storage_connector=storage_connector, data_source=data_source, ttl=ttl, ttl_enabled=ttl_enabled, @@ -867,10 +919,10 @@ def get_or_create_feature_group( or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no automatic scheduling). Applies only on Feature Group creation. storage_connector: the storage connector used to establish connectivity - with the data source. + with the data source. **[DEPRECATED: Use `data_source` instead.]** path: The location within the scope of the storage connector, from where to read - the data for the external feature group - data_source: The data source specifying the location of the data. Overrides the path and query arguments when specified. + the data for the external feature group. **[DEPRECATED: Use `data_source` instead.]** + data_source: The data source specifying the location of the data. Overrides the storage_connector and path arguments when specified. ttl: Optional time-to-live duration for features in this group. Can be specified as: - An integer or float representing seconds - A timedelta object @@ -886,7 +938,7 @@ def get_or_create_feature_group( feature_group_object = self._feature_group_api.get(self.id, name, version) if not feature_group_object: if not data_source: - data_source = ds.DataSource(path=path) + data_source = ds.DataSource(storage_connector=storage_connector, path=path) feature_group_object = feature_group.FeatureGroup( name=name, version=version, @@ -911,7 +963,6 @@ def get_or_create_feature_group( transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, - storage_connector=storage_connector, data_source=data_source, ttl=ttl, ttl_enabled=ttl_enabled, @@ -924,7 +975,7 @@ def get_or_create_feature_group( def create_on_demand_feature_group( self, name: str, - storage_connector: storage_connector.StorageConnector, + storage_connector: Optional[storage_connector.StorageConnector] = None, query: Optional[str] = None, data_format: Optional[str] = None, path: Optional[str] = "", @@ -967,14 +1018,14 @@ def create_on_demand_feature_group( # Arguments name: Name of the external feature group to create. storage_connector: the storage connector used to establish connectivity - with the data source. + with the data source. **[DEPRECATED: Use `data_source` instead.]** query: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the - feature group is used. + feature group is used. **[DEPRECATED: Use `data_source` instead.]** data_format: If the external feature groups refers to a directory with data, the data format to use when reading it path: The location within the scope of the storage connector, from where to read - the data for the external feature group + the data for the external feature group. **[DEPRECATED: Use `data_source` instead.]** options: Additional options to be used by the engine when reading data from the specified storage connector. For example, `{"header": True}` when reading CSV files with column names in the first row. @@ -1015,7 +1066,7 @@ def create_on_demand_feature_group( expectation_suite: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to `None`. - data_source: The data source specifying the location of the data. Overrides the path and query arguments when specified. + data_source: The data source specifying the location of the data. Overrides the storage_connector, path and query arguments when specified. online_enabled: Define whether it should be possible to sync the feature group to the online feature store for low latency access, defaults to `False`. ttl: Optional time-to-live duration for features in this group. Can be specified as: @@ -1029,12 +1080,13 @@ def create_on_demand_feature_group( `ExternalFeatureGroup`. The external feature group metadata object. """ if not data_source: - data_source = ds.DataSource(query=query, path=path) + if not storage_connector: + raise ValueError("Data source must be provided to create an external feature group.") + data_source = ds.DataSource(storage_connector=storage_connector, query=query, path=path) feature_group_object = feature_group.ExternalFeatureGroup( name=name, data_format=data_format, options=options or {}, - storage_connector=storage_connector, version=version, description=description, primary_key=primary_key or [], @@ -1059,7 +1111,7 @@ def create_on_demand_feature_group( def create_external_feature_group( self, name: str, - storage_connector: storage_connector.StorageConnector, + storage_connector: Optional[storage_connector.StorageConnector] = None, query: Optional[str] = None, data_format: Optional[str] = None, path: Optional[str] = "", @@ -1108,8 +1160,7 @@ def create_external_feature_group( name="sales", version=1, description="Physical shop sales features", - query=query, - storage_connector=connector, + data_source=data_source, primary_key=['ss_store_sk'], event_time='sale_date', ttl=timedelta(days=30), @@ -1129,8 +1180,7 @@ def create_external_feature_group( name="sales", version=1, description="Physical shop sales features", - query=query, - storage_connector=connector, + data_source=data_source, primary_key=['ss_store_sk'], event_time='sale_date', online_enabled=True, @@ -1150,14 +1200,14 @@ def create_external_feature_group( # Arguments name: Name of the external feature group to create. storage_connector: the storage connector used to establish connectivity - with the data source. + with the data source. **[DEPRECATED: Use `data_source` instead.]** query: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the - feature group is used. + feature group is used. **[DEPRECATED: Use `data_source` instead.]** data_format: If the external feature groups refers to a directory with data, the data format to use when reading it path: The location within the scope of the storage connector, from where to read - the data for the external feature group + the data for the external feature group. **[DEPRECATED: Use `data_source` instead.]** options: Additional options to be used by the engine when reading data from the specified storage connector. For example, `{"header": True}` when reading CSV files with column names in the first row. @@ -1203,7 +1253,7 @@ def create_external_feature_group( notification_topic_name: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent. online_config: Optionally, define configuration which is used to configure online table. - data_source: The data source specifying the location of the data. Overrides the path and query arguments when specified. + data_source: The data source specifying the location of the data. Overrides the storage_connector, path and query arguments when specified. ttl: Optional time-to-live duration for features in this group. Can be specified as: - An integer or float representing seconds - A timedelta object @@ -1218,12 +1268,13 @@ def create_external_feature_group( `ExternalFeatureGroup`. The external feature group metadata object. """ if not data_source: - data_source = ds.DataSource(query=query, path=path) + if not storage_connector: + raise ValueError("Data source must be provided to create an external feature group.") + data_source = ds.DataSource(storage_connector=storage_connector, query=query, path=path) feature_group_object = feature_group.ExternalFeatureGroup( name=name, data_format=data_format, options=options or {}, - storage_connector=storage_connector, version=version, description=description, primary_key=primary_key or [], @@ -1405,6 +1456,12 @@ def create_training_dataset( label: Optional[List[str]] = None, transformation_functions: Optional[Dict[str, TransformationFunction]] = None, train_split: str = None, + data_source: Optional[ + Union[ + ds.DataSource, + Dict[str, Any], + ] + ] = None, ) -> "training_dataset.TrainingDataset": """Create a training dataset metadata object. @@ -1446,7 +1503,7 @@ def create_training_dataset( will be a single file per split. Default False. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset - on HopsFS. + on HopsFS. **[DEPRECATED: Use `data_source` instead.]** splits: A dictionary defining training dataset splits to be created. Keys in the dictionary define the name of the split as `str`, values represent percentage of samples in the split as `float`. Currently, only random @@ -1456,7 +1513,7 @@ def create_training_dataset( storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to `""`, saving the training dataset at the root defined by the - storage connector. + storage connector. **[DEPRECATED: Use `data_source` instead.]** seed: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults to `None`. statistics_config: A configuration object, or a dictionary with keys @@ -1477,17 +1534,19 @@ def create_training_dataset( train_split: If `splits` is set, provide the name of the split that is going to be used for training. The statistics of this split will be used for transformation functions if necessary. Defaults to `None`. + data_source: The data source specifying the location of the data. Overrides the storage_connector and location arguments when specified. # Returns: `TrainingDataset`: The training dataset metadata object. """ + if not data_source: + data_source = ds.DataSource(storage_connector=storage_connector, path=location) return training_dataset.TrainingDataset( name=name, version=version, description=description, data_format=data_format, - storage_connector=storage_connector, - location=location, + data_source=data_source, featurestore_id=self._id, splits=splits or {}, seed=seed, diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 3ffe3af5f5..ac7bf881fe 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -49,6 +49,7 @@ from hsfs import serving_key as skm from hsfs.constructor import filter, query from hsfs.constructor.filter import Filter, Logic +from hsfs.core import data_source as ds from hsfs.core import ( explicit_provenance, feature_monitoring_config_engine, @@ -1313,6 +1314,12 @@ def create_training_data( write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, transformation_context: Dict[str, Any] = None, + data_source: Optional[ + Union[ + ds.DataSource, + Dict[str, Any], + ] + ] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1394,13 +1401,13 @@ def create_training_data( feature_view = fs.get_feature_view(...) # get storage connector instance - external_storage_connector = fs.get_storage_connector("storage_connector_name") + data_source = fs.get_data_source("test_data_source") # create a train-test split dataset version, job = feature_view.create_training_data( start_time=..., end_time=..., - storage_connector = external_storage_connector, + data_source=data_source, description=..., # you can have different data formats such as csv, tsv, tfrecord, parquet and others data_format=... @@ -1434,12 +1441,12 @@ def create_training_data( or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset - on HopsFS. + on HopsFS. **[DEPRECATED: Use `data_source` instead.]** location: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to `""`, saving the training dataset at the root defined by the - storage connector. + storage connector. **[DEPRECATED: Use `data_source` instead.]** description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -1480,6 +1487,7 @@ def create_training_data( be available in the spine group. transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + data_source: The data source specifying the location of the data. Overrides the storage_connector and location arguments when specified. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1488,6 +1496,8 @@ def create_training_data( # Raises `hopsworks.client.exceptions.RestAPIError`: If the backend encounters an error when handling the request """ + if not data_source: + data_source = ds.DataSource(storage_connector=storage_connector, path=location) td = training_dataset.TrainingDataset( name=self.name, version=None, @@ -1495,8 +1505,7 @@ def create_training_data( event_end_time=end_time, description=description, data_format=data_format, - storage_connector=storage_connector, - location=location, + data_source=data_source, featurestore_id=self._featurestore_id, splits={}, seed=seed, @@ -1540,6 +1549,12 @@ def create_train_test_split( write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, transformation_context: Dict[str, Any] = None, + data_source: Optional[ + Union[ + ds.DataSource, + Dict[str, Any], + ] + ] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1627,7 +1642,7 @@ def create_train_test_split( feature_view = fs.get_feature_view(...) # get storage connector instance - external_storage_connector = fs.get_storage_connector("storage_connector_name") + data_source = fs.get_data_source("test_data_source") # create a train-test split dataset version, job = feature_view.create_train_test_split( @@ -1635,7 +1650,7 @@ def create_train_test_split( train_end=..., test_start=..., test_end=..., - storage_connector = external_storage_connector, + data_source=data_source, description=..., # you can have different data formats such as csv, tsv, tfrecord, parquet and others data_format=... @@ -1707,12 +1722,12 @@ def create_train_test_split( or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset - on HopsFS. + on HopsFS. **[DEPRECATED: Use `data_source` instead.]** location: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to `""`, saving the training dataset at the root defined by the - storage connector. + storage connector. **[DEPRECATED: Use `data_source` instead.]** description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -1753,6 +1768,7 @@ def create_train_test_split( be available in the spine group. transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + data_source: The data source specifying the location of the data. Overrides the storage_connector and location arguments when specified. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -1764,6 +1780,8 @@ def create_train_test_split( self._validate_train_test_split( test_size=test_size, train_end=train_end, test_start=test_start ) + if not data_source: + data_source = ds.DataSource(storage_connector=storage_connector, path=location) td = training_dataset.TrainingDataset( name=self.name, version=None, @@ -1775,8 +1793,7 @@ def create_train_test_split( test_end=test_end, description=description, data_format=data_format, - storage_connector=storage_connector, - location=location, + data_source=data_source, featurestore_id=self._featurestore_id, splits={}, seed=seed, @@ -1822,6 +1839,12 @@ def create_train_validation_test_split( write_options: Optional[Dict[Any, Any]] = None, spine: Optional[SplineDataFrameTypes] = None, transformation_context: Dict[str, Any] = None, + data_source: Optional[ + Union[ + ds.DataSource, + Dict[str, Any], + ] + ] = None, **kwargs, ) -> Tuple[int, job.Job]: """Create the metadata for a training dataset and save the corresponding training data into `location`. @@ -1917,7 +1940,7 @@ def create_train_validation_test_split( feature_view = fs.get_feature_view(...) # get storage connector instance - external_storage_connector = fs.get_storage_connector("storage_connector_name") + data_source = fs.get_data_source("test_data_source") # create a train-validation-test split dataset version, job = feature_view.create_train_validation_test_split( @@ -1928,7 +1951,7 @@ def create_train_validation_test_split( test_start=..., test_end=..., description=..., - storage_connector = external_storage_connector, + data_source=data_source, # you can have different data formats such as csv, tsv, tfrecord, parquet and others data_format=... ) @@ -1975,12 +1998,12 @@ def create_train_validation_test_split( or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset - on HopsFS. + on HopsFS. **[DEPRECATED: Use `data_source` instead.]** location: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to `""`, saving the training dataset at the root defined by the - storage connector. + storage connector. **[DEPRECATED: Use `data_source` instead.]** description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -2021,6 +2044,7 @@ def create_train_validation_test_split( be available in the spine group. transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`. + data_source: The data source specifying the location of the data. Overrides the storage_connector and location arguments when specified. # Returns (td_version, `Job`): Tuple of training dataset version and job. When using the `python` engine, it returns the Hopsworks Job @@ -2038,6 +2062,8 @@ def create_train_validation_test_split( validation_end=validation_end, test_start=test_start, ) + if not data_source: + data_source = ds.DataSource(storage_connector=storage_connector, path=location) td = training_dataset.TrainingDataset( name=self.name, version=None, @@ -2052,8 +2078,7 @@ def create_train_validation_test_split( test_end=test_end, description=description, data_format=data_format, - storage_connector=storage_connector, - location=location, + data_source=data_source, featurestore_id=self._featurestore_id, splits={}, seed=seed, @@ -2284,10 +2309,9 @@ def training_data( event_start_time=start_time, event_end_time=end_time, description=description, - storage_connector=None, + data_source=None, featurestore_id=self._featurestore_id, data_format="tsv", - location="", statistics_config=statistics_config, training_dataset_type=training_dataset.TrainingDataset.IN_MEMORY, extra_filter=extra_filter, @@ -2459,10 +2483,9 @@ def train_test_split( test_end=test_end, time_split_size=2, description=description, - storage_connector=None, + data_source=None, featurestore_id=self._featurestore_id, data_format="tsv", - location="", statistics_config=statistics_config, training_dataset_type=training_dataset.TrainingDataset.IN_MEMORY, extra_filter=extra_filter, @@ -2675,10 +2698,9 @@ def train_validation_test_split( test_start=test_start, test_end=test_end, description=description, - storage_connector=None, + data_source=None, featurestore_id=self._featurestore_id, data_format="tsv", - location="", statistics_config=statistics_config, training_dataset_type=training_dataset.TrainingDataset.IN_MEMORY, extra_filter=extra_filter, diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index a013fffc77..3246fd39d5 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -31,6 +31,7 @@ from hsfs import engine from hsfs.core import data_source as ds from hsfs.core import data_source_api, storage_connector_api +from hsfs.core import data_source_data as dsd if HAS_NUMPY: @@ -241,10 +242,49 @@ def get_feature_groups(self): else: return [] - def get_databases(self): - return self._data_source_api.get_databases(self._featurestore_id, self._name) + def get_training_datasets_provenance(self): + """Get the generated training datasets using this storage connector, based on explicit + provenance. These training datasets can be accessible or inaccessible. Explicit + provenance does not track deleted generated training dataset links, so deleted + will always be empty. + For inaccessible training datasets, only a minimal information is returned. + + # Returns + `Links`: the training datasets generated using this storage connector or `None` if none were created + + # Raises + `hopsworks.client.exceptions.RestAPIError`: In case the backend encounters an issue + """ + links = self._storage_connector_api.get_training_datasets_provenance(self) + if not links.is_empty(): + return links + + def get_training_datasets(self): + """Get the training datasets using this storage connector, based on explicit + provenance. Only the accessible training datasets are returned. + For more items use the base method - get_training_datasets_provenance + + # Returns + `List[TrainingDataset]`: List of training datasets. + """ + training_datasets_provenance = self.get_training_datasets_provenance() + + if training_datasets_provenance and ( + training_datasets_provenance.inaccessible or training_datasets_provenance.deleted + ): + _logger.info( + "There are deleted or inaccessible training datasets. For more details access `get_training_datasets_provenance`" + ) + + if training_datasets_provenance and training_datasets_provenance.accessible: + return training_datasets_provenance.accessible + else: + return [] + + def get_databases(self) -> list[str]: + return self._data_source_api.get_databases(self) - def get_tables(self, database: str): + def get_tables(self, database: str = None) -> list[ds.DataSource]: if not database: if self.type == StorageConnector.REDSHIFT: database = self.database_name @@ -259,13 +299,13 @@ def get_tables(self, database: str): "Database name is required for this connector type. " "Please provide a database name." ) - return self._data_source_api.get_tables(self._featurestore_id, self._name, database) + return self._data_source_api.get_tables(self, database) - def get_data(self, data_source: ds.DataSource): - return self._data_source_api.get_data(self._featurestore_id, self._name, data_source) + def get_data(self, data_source: ds.DataSource) -> dsd.DataSourceData: + return self._data_source_api.get_data(data_source) - def get_metadata(self, data_source: ds.DataSource): - return self._data_source_api.get_metadata(self._featurestore_id, self._name, data_source) + def get_metadata(self, data_source: ds.DataSource) -> dict: + return self._data_source_api.get_metadata(data_source) class HopsFSConnector(StorageConnector): diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 0b3420d3d4..6d35bcf40b 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -25,6 +25,7 @@ from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, training_dataset_feature, util from hsfs.constructor import filter, query +from hsfs.core import data_source as ds from hsfs.core import ( statistics_engine, training_dataset_api, @@ -60,7 +61,6 @@ def __init__( event_end_time=None, coalesce=False, description=None, - storage_connector=None, splits=None, validation_size=None, test_size=None, @@ -80,6 +80,7 @@ def __init__( train_split=None, time_split_size=None, extra_filter=None, + data_source=None, **kwargs, ): self._name = name @@ -103,13 +104,15 @@ def __init__( self.training_dataset_type = training_dataset_type else: self._training_dataset_type = None + + self.data_source = data_source + # set up depending on user initialized or coming from backend response if created is None: self._start_time = util.convert_event_time_to_timestamp(event_start_time) self._end_time = util.convert_event_time_to_timestamp(event_end_time) # no type -> user init self._features = features - self.storage_connector = storage_connector self.splits = splits self.statistics_config = statistics_config self._label = label @@ -140,10 +143,6 @@ def __init__( self._start_time = event_start_time self._end_time = event_end_time # type available -> init from backend response - # make rest call to get all connector information, description etc. - self._storage_connector = StorageConnector.from_response_json( - storage_connector - ) if features is None: features = [] @@ -240,14 +239,12 @@ def _infer_training_dataset_type(self, connector_type): ) def to_dict(self): - return { + td_meta_dict = { "name": self._name, "version": self._version, "description": self._description, "dataFormat": self._data_format, "coalesce": self._coalesce, - "storageConnector": self._storage_connector, - "location": self._location, "trainingDatasetType": self._training_dataset_type, "splits": self._splits, "seed": self._seed, @@ -257,6 +254,9 @@ def to_dict(self): "eventEndTime": self._end_time, "extraFilter": self._extra_filter, } + if self._data_source: + td_meta_dict["dataSource"] = self._data_source.to_dict() + return td_meta_dict @property def name(self) -> str: @@ -306,17 +306,39 @@ def coalesce(self, coalesce: bool): self._coalesce = coalesce @property - def storage_connector(self): - """Storage connector.""" - return self._storage_connector + def data_source(self) -> "ds.DataSource": + return self._data_source + + @data_source.setter + def data_source(self, data_source): + self._data_source = ( + ds.DataSource.from_response_json(data_source) + if isinstance(data_source, dict) + else data_source + ) + if self._data_source is None: + self._data_source = ds.DataSource() + self.storage_connector = self._data_source.storage_connector + + @property + def storage_connector(self) -> StorageConnector: + """" + !!! warning "Deprecated" + `storage_connector` method is deprecated. Use + `data_source` instead. + """ + return self._data_source.storage_connector @storage_connector.setter def storage_connector(self, storage_connector): + if self._data_source is None: + self._data_source = ds.DataSource() + if isinstance(storage_connector, StorageConnector): - self._storage_connector = storage_connector + self._data_source.storage_connector = storage_connector elif storage_connector is None: # init empty connector, otherwise will have to handle it at serialization time - self._storage_connector = HopsFSConnector( + self._data_source.storage_connector = HopsFSConnector( None, None, None, None, None, None ) else: @@ -327,7 +349,7 @@ def storage_connector(self, storage_connector): ) if self.training_dataset_type != self.IN_MEMORY: self._training_dataset_type = self._infer_training_dataset_type( - self._storage_connector.type + self._data_source.storage_connector.type ) @property @@ -357,13 +379,9 @@ def splits(self, splits: Optional[Dict[str, float]]): @property def location(self) -> str: - """Path to the training dataset location. Can be an empty string if e.g. the training dataset is in-memory.""" + """Storage specific location. Including data source path if specified. Can be an empty string if e.g. the training dataset is in-memory.""" return self._location - @location.setter - def location(self, location: str): - self._location = location - @property def seed(self) -> Optional[int]: """Seed used to perform random split, ensure reproducibility of the random split at a later date.""" @@ -520,7 +538,6 @@ def __init__( event_end_time=None, coalesce=False, description=None, - storage_connector=None, splits=None, validation_size=None, test_size=None, @@ -545,6 +562,7 @@ def __init__( train_split=None, time_split_size=None, extra_filter=None, + data_source=None, **kwargs, ): super().__init__( @@ -556,7 +574,6 @@ def __init__( event_end_time=event_end_time, coalesce=coalesce, description=description, - storage_connector=storage_connector, splits=splits, validation_size=validation_size, test_size=test_size, @@ -576,6 +593,7 @@ def __init__( train_split=train_split, time_split_size=time_split_size, extra_filter=extra_filter, + data_source=data_source, ) self._id = id @@ -644,7 +662,7 @@ def save( training_dataset, td_job = self._training_dataset_engine.save( self, features, write_options or {} ) - self.storage_connector = training_dataset.storage_connector + self.data_source = training_dataset.data_source # currently we do not save the training dataset statistics config for training datasets self.statistics_config = user_stats_config if self.statistics_config.enabled and engine.get_type().startswith("spark"): @@ -872,14 +890,16 @@ def from_response_json(cls, json_dict): return [] tds = [] for td in json_decamelized["items"]: - td.pop("type") - td.pop("href") + td.pop("type", None) + td.pop("href", None) cls._rewrite_location(td) tds.append(cls(**td)) return tds + elif isinstance(json_decamelized, dict): + return cls(**json_decamelized) else: # backwards compatibility for td in json_decamelized: - _ = td.pop("type") + _ = td.pop("type", None) cls._rewrite_location(td) return [cls(**td) for td in json_decamelized] @@ -916,14 +936,12 @@ def json(self): return json.dumps(self, cls=util.Encoder) def to_dict(self): - return { + td_meta_dict = { "name": self._name, "version": self._version, "description": self._description, "dataFormat": self._data_format, "coalesce": self._coalesce, - "storageConnector": self._storage_connector, - "location": self._location, "trainingDatasetType": self._training_dataset_type, "features": self._features, "splits": self._splits, @@ -936,6 +954,9 @@ def to_dict(self): "extraFilter": self._extra_filter, "type": "trainingDatasetDTO", } + if self._data_source: + td_meta_dict["dataSource"] = self._data_source.to_dict() + return td_meta_dict @property def id(self): diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 53ba0a8250..28c0d43400 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -21,6 +21,7 @@ from hsfs import feature_group, feature_view, storage_connector, training_dataset from hsfs.constructor import fs_query from hsfs.core import arrow_flight_client +from hsfs.core import data_source as ds from hsfs.engine import python from hsfs.feature import Feature from hsfs.feature_store import FeatureStore @@ -97,7 +98,7 @@ def _arrange_dataset_reads(self, mocker, backend_fixtures, data_format): json_td = backend_fixtures["training_dataset"]["get_basic_info"]["response"] td_hopsfs = training_dataset.TrainingDataset.from_response_json(json_td)[0] td_hopsfs.training_dataset_type = "HOPSFS_TRAINING_DATASET" - td_hopsfs.storage_connector = HopsFSConnector(0, "", "") + td_hopsfs.data_source.storage_connector = HopsFSConnector(0, "", "") td_hopsfs.data_format = data_format mocker.patch( "hsfs.core.feature_view_engine.FeatureViewEngine._get_training_dataset_metadata", @@ -391,7 +392,7 @@ def test_construct_query_object_snowflake(self, mocker, backend_fixtures): json1 = backend_fixtures["feature_group"]["get_external_snowflake"]["response"] test_fg1 = feature_group.ExternalFeatureGroup.from_response_json(json1) - test_fg1._storage_connector = sc + test_fg1._data_source._storage_connector = sc mocker.patch("hsfs.constructor.query.Query.to_string", return_value="") mocker.patch("hsfs.constructor.query.Query._to_string", return_value="") @@ -471,7 +472,7 @@ def test_supports(self): # Arrange connector = storage_connector.BigQueryConnector(0, "BigQueryConnector", 99) external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=connector, primary_key=[""] + primary_key=[""], data_source=ds.DataSource(storage_connector=connector) ) # Act @@ -490,7 +491,7 @@ def spark_options(self): def test_supports_unsupported(self): # Arrange external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=self.FakeConnector(), primary_key=[""] + primary_key=[""], data_source=ds.DataSource(storage_connector=self.FakeConnector()) ) # Act @@ -503,7 +504,7 @@ def test_supports_mixed_featuregroups(self): # Arrange connector = storage_connector.BigQueryConnector(0, "BigQueryConnector", 99) external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=connector, primary_key=[""] + primary_key=[""], data_source=ds.DataSource(storage_connector=connector) ) mock_feature_group = MagicMock(spec=feature_group.FeatureGroup) @@ -518,7 +519,7 @@ def test_supports_mixed_featuregroups(self): def test_supports_mixed_featuregroups_unsupported(self): # Arrange external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=self.FakeConnector(), primary_key=[""] + primary_key=[""], data_source=ds.DataSource(storage_connector=self.FakeConnector()) ) mock_feature_group = MagicMock(spec=feature_group.FeatureGroup) diff --git a/python/tests/core/test_external_feature_group_engine.py b/python/tests/core/test_external_feature_group_engine.py index c7c29cb7bf..d3aaaee39e 100644 --- a/python/tests/core/test_external_feature_group_engine.py +++ b/python/tests/core/test_external_feature_group_engine.py @@ -16,6 +16,8 @@ import pytest from hsfs import feature, feature_group, storage_connector from hsfs.client import exceptions +from hsfs.core import data_source as ds +from hsfs.core import data_source_data as dsd from hsfs.core import external_feature_group_engine from hsfs.engine import python @@ -41,7 +43,6 @@ def test_save(self, mocker): featurestore_id=feature_store_id, primary_key=[], id=10, - storage_connector=mocker.patch("hsfs.storage_connector.JdbcConnector"), ) mock_engine_get_instance.return_value.parse_schema_feature_group.return_value = [ @@ -56,6 +57,72 @@ def test_save(self, mocker): assert len(mock_fg_api.return_value.save.call_args[0][0].features) == 1 assert not mock_fg_api.return_value.save.call_args[0][0].features[0].primary + def test_save_arrowflight(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mock_get_data = mocker.patch("hsfs.core.data_source.DataSource.get_data") + mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") + + external_fg_engine = external_feature_group_engine.ExternalFeatureGroupEngine( + feature_store_id=feature_store_id + ) + + f = feature.Feature(name="f", type="str") + + fg = feature_group.ExternalFeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + id=10, + data_source=ds.DataSource(database="test", group="test", table="test") + ) + + mock_get_data.return_value = dsd.DataSourceData(features=[f]) + + # Act + external_fg_engine.save(feature_group=fg) + + # Assert + assert mock_fg_api.return_value.save.call_count == 1 + assert len(mock_fg_api.return_value.save.call_args[0][0].features) == 1 + assert not mock_fg_api.return_value.save.call_args[0][0].features[0].primary + + def test_save_arrowflight_query(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mock_get_data = mocker.patch("hsfs.core.data_source.DataSource.get_data") + mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") + + external_fg_engine = external_feature_group_engine.ExternalFeatureGroupEngine( + feature_store_id=feature_store_id + ) + + f = feature.Feature(name="f", type="str") + + fg = feature_group.ExternalFeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + id=10, + data_source=ds.DataSource(query="test") + ) + + mock_get_data.return_value = dsd.DataSourceData(features=[f]) + + # Act + external_fg_engine.save(feature_group=fg) + + # Assert + assert mock_fg_api.return_value.save.call_count == 1 + assert len(mock_fg_api.return_value.save.call_args[0][0].features) == 1 + assert not mock_fg_api.return_value.save.call_args[0][0].features[0].primary + def test_save_primary_key(self, mocker): # Arrange feature_store_id = 99 @@ -145,7 +212,7 @@ def test_update_features_metadata(self, mocker): features = [f] external_fg = feature_group.ExternalFeatureGroup( - storage_connector=jdbc_connector, id=10 + id=10, data_source=ds.DataSource(storage_connector=jdbc_connector) ) # Act @@ -158,8 +225,8 @@ def test_update_features_metadata(self, mocker): assert ( mock_fg_api.return_value.update_metadata.call_args[0][ 1 - ].storage_connector.id - == external_fg.storage_connector.id + ].data_source.storage_connector.id + == external_fg.data_source.storage_connector.id ) assert ( mock_fg_api.return_value.update_metadata.call_args[0][1].id @@ -381,8 +448,8 @@ def test_save_python_engine_features(self, mocker): assert mock_fg_api.return_value.save.call_count == 1 assert len(mock_fg_api.return_value.save.call_args[0][0].features) == 2 assert ( - mock_fg_api.return_value.save.call_args[0][0].storage_connector - == fg.storage_connector + mock_fg_api.return_value.save.call_args[0][0].data_source.storage_connector + == fg.data_source.storage_connector ) assert mock_fg_api.return_value.save.call_args[0][0].features == features assert mock_fg_api.return_value.save.call_args[0][0].id == fg.id diff --git a/python/tests/core/test_feature_view_engine.py b/python/tests/core/test_feature_view_engine.py index cca4d31afe..2b23bc5b79 100644 --- a/python/tests/core/test_feature_view_engine.py +++ b/python/tests/core/test_feature_view_engine.py @@ -30,6 +30,7 @@ from hsfs.constructor import fs_query from hsfs.constructor.query import Query from hsfs.core import arrow_flight_client, feature_view_engine +from hsfs.core import data_source as ds from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics from hsfs.hopsworks_udf import udf from hsfs.storage_connector import BigQueryConnector, StorageConnector @@ -1576,6 +1577,7 @@ def test_read_dir_from_storage_connector(self, mocker): feature_store_id = 99 mocker.patch("hsfs.core.feature_view_api.FeatureViewApi") + mock_drop_helper_columns = mocker.patch("hsfs.core.feature_view_engine.FeatureViewEngine._drop_helper_columns") mock_sc_read = mocker.patch("hsfs.storage_connector.StorageConnector.read") fv_engine = feature_view_engine.FeatureViewEngine( @@ -1608,6 +1610,7 @@ def test_read_dir_from_storage_connector(self, mocker): # Assert assert mock_sc_read.call_count == 1 + assert mock_drop_helper_columns.call_count == 3 def test_read_dir_from_storage_connector_file_not_found(self, mocker): # Arrange @@ -2577,7 +2580,7 @@ def test_check_feature_group_accessibility_arrow_flight(self, mocker): mock_constructor_query = mocker.patch("hsfs.constructor.query.Query") connector = BigQueryConnector(0, "BigQueryConnector", 99) mock_external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=connector, primary_key="" + primary_key="", data_source=ds.DataSource(storage_connector=connector) ) mock_feature_group = MagicMock(spec=feature_group.FeatureGroup) mock_constructor_query.featuregroups = [ @@ -2626,7 +2629,7 @@ def spark_options(self): connector = FakeConnector() mock_external_feature_group = feature_group.ExternalFeatureGroup( - storage_connector=connector, primary_key="" + primary_key="", data_source=ds.DataSource(storage_connector=connector) ) mock_feature_group = MagicMock(spec=feature_group.FeatureGroup) mock_constructor_query.featuregroups = [ diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 52e38d83a4..0fde102f28 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -37,6 +37,7 @@ ) from hsfs.client import exceptions from hsfs.constructor import hudi_feature_group_alias, query +from hsfs.core import data_source as ds from hsfs.core import online_ingestion, training_dataset_engine from hsfs.core.constants import HAS_GREAT_EXPECTATIONS from hsfs.engine import spark @@ -197,7 +198,7 @@ def test_register_external_temporary_table(self, mocker): ) external_fg = feature_group.ExternalFeatureGroup( - storage_connector=jdbc_connector, id=10, location="test_location" + id=10, location="test_location", data_source=ds.DataSource(storage_connector=jdbc_connector) ) # Act diff --git a/python/tests/fixtures/external_feature_group_alias_fixtures.json b/python/tests/fixtures/external_feature_group_alias_fixtures.json index 719e257128..9aa2311f1a 100644 --- a/python/tests/fixtures/external_feature_group_alias_fixtures.json +++ b/python/tests/fixtures/external_feature_group_alias_fixtures.json @@ -4,12 +4,18 @@ "on_demand_feature_group": { "type": "onDemandFeaturegroupDTO", "id": 15, - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC" + "dataSource": { + "query": "select * from Customer", + "database": "test_database", + "group": "test_schema", + "path": "", + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC" + } }, "spine": false }, diff --git a/python/tests/fixtures/external_feature_group_fixtures.json b/python/tests/fixtures/external_feature_group_fixtures.json index 496d05cd1d..6ff91cf695 100644 --- a/python/tests/fixtures/external_feature_group_fixtures.json +++ b/python/tests/fixtures/external_feature_group_fixtures.json @@ -69,32 +69,32 @@ "runValidation": true }, "eventTime": "datet", - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "description": "JDBC connector for the Offline Feature Store", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC", - "arguments": [ - { - "name": "sslTrustStore" - }, - { - "name": "trustStorePassword" - }, - { - "name": "sslKeyStore" - }, - { - "name": "keyStorePassword" - } - ], - "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" - }, "dataSource": { "query": "Select * from ", - "path": "test_path" + "path": "test_path", + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "description": "JDBC connector for the Offline Feature Store", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC", + "arguments": [ + { + "name": "sslTrustStore" + }, + { + "name": "trustStorePassword" + }, + { + "name": "sslKeyStore" + }, + { + "name": "keyStorePassword" + } + ], + "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" + } }, "dataFormat": "HUDI", "options": [{"name": "test_name", "value": "test_value"}] @@ -184,32 +184,32 @@ "runValidation": true }, "eventTime": "datet", - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "description": "JDBC connector for the Offline Feature Store", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC", - "arguments": [ - { - "name": "sslTrustStore" - }, - { - "name": "trustStorePassword" - }, - { - "name": "sslKeyStore" - }, - { - "name": "keyStorePassword" - } - ], - "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" - }, "dataSource": { "query": "Select * from ", - "path": "test_path" + "path": "test_path", + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "description": "JDBC connector for the Offline Feature Store", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC", + "arguments": [ + { + "name": "sslTrustStore" + }, + { + "name": "trustStorePassword" + }, + { + "name": "sslKeyStore" + }, + { + "name": "keyStorePassword" + } + ], + "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" + } }, "dataFormat": "HUDI", "options": [{"name": "test_name", "value": "test_value"}] @@ -233,12 +233,14 @@ "response": { "type": "onDemandFeaturegroupDTO", "id": 15, - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC" + "dataSource": { + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC" + } } }, "method": "GET", diff --git a/python/tests/fixtures/feature_group_fixtures.json b/python/tests/fixtures/feature_group_fixtures.json index 54b07269d7..8487809eac 100644 --- a/python/tests/fixtures/feature_group_fixtures.json +++ b/python/tests/fixtures/feature_group_fixtures.json @@ -603,14 +603,16 @@ "query": "select * from Customer", "database": "test_database", "group": "test_schema", - "path": "" + "path": "", + "storageConnector": { + "id": 4, + "name": "snowflake", + "featurestoreId": 67, + "storageConnectorType": "SNOWFLAKE" + } }, "dataFormat": null, "options": null, - "storageConnector": {"id": 4, - "name": "snowflake", - "featurestoreId": 67, - "storageConnectorType": "SNOWFLAKE"}, "type": "onDemandFeaturegroupDTO", "statisticsConfig": {"enabled": true, "correlations": false, diff --git a/python/tests/fixtures/fs_query_fixtures.json b/python/tests/fixtures/fs_query_fixtures.json index 1e27156508..42460a07f7 100644 --- a/python/tests/fixtures/fs_query_fixtures.json +++ b/python/tests/fixtures/fs_query_fixtures.json @@ -7,12 +7,16 @@ "on_demand_feature_group": { "type": "onDemandFeaturegroupDTO", "id": 15, - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC" + "dataSource": { + "query": "Select * from ", + "path": "test_path", + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC" + } }, "spine": false }, diff --git a/python/tests/fixtures/query_fixtures.json b/python/tests/fixtures/query_fixtures.json index 8b33aa69c6..9846a69830 100644 --- a/python/tests/fixtures/query_fixtures.json +++ b/python/tests/fixtures/query_fixtures.json @@ -226,32 +226,32 @@ "runValidation": true }, "eventTime": "datet", - "storageConnector": { - "type": "featurestoreJdbcConnectorDTO", - "description": "JDBC connector for the Offline Feature Store", - "featurestoreId": 67, - "id": 2, - "name": "test_project_featurestore", - "storageConnectorType": "JDBC", - "arguments": [ - { - "name": "sslTrustStore" - }, - { - "name": "trustStorePassword" - }, - { - "name": "sslKeyStore" - }, - { - "name": "keyStorePassword" - } - ], - "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" - }, "dataSource": { "query": "Select * from ", - "path": "test_path" + "path": "test_path", + "storageConnector": { + "type": "featurestoreJdbcConnectorDTO", + "description": "JDBC connector for the Offline Feature Store", + "featurestoreId": 67, + "id": 2, + "name": "test_project_featurestore", + "storageConnectorType": "JDBC", + "arguments": [ + { + "name": "sslTrustStore" + }, + { + "name": "trustStorePassword" + }, + { + "name": "sslKeyStore" + }, + { + "name": "keyStorePassword" + } + ], + "connectionString": "jdbc:hopshive://10.0.2.15:9085/test_project_featurestore;auth=noSasl;ssl=true;twoWay=true;" + } }, "dataFormat": "HUDI", "options": [{ "name": "test_name", "value": "test_value" }] diff --git a/python/tests/fixtures/training_dataset_fixtures.json b/python/tests/fixtures/training_dataset_fixtures.json index 6db5d08325..a9b8969758 100644 --- a/python/tests/fixtures/training_dataset_fixtures.json +++ b/python/tests/fixtures/training_dataset_fixtures.json @@ -11,28 +11,161 @@ "event_end_time": 1646697600000, "coalesce": true, "description": "test_description", - "storage_connector": { - "type": "featurestoreJdbcConnectorDTO", - "description": "JDBC connector description", - "featurestoreId": 67, - "id": 1, - "name": "test_jdbc", - "storageConnectorType": "JDBC", - "arguments": [ - { - "name": "sslTrustStore" - }, - { - "name": "trustStorePassword" - }, - { - "name": "sslKeyStore" + "dataSource": { + "query": "", + "database": "", + "group": "", + "path": "test_path", + "storage_connector": { + "type": "featurestoreHOPSFSConnectorDTO", + "description": "HOPSFS connector description", + "featurestoreId": 67, + "id": 1, + "name": "test_HOPSFS", + "storageConnectorType": "HOPSFS" + } + }, + "splits": [ + { + "name": "test_name", + "split_type": "test_split_type", + "percentage": "test_percentage", + "start_time": "test_start_time", + "end_time": "test_end_time" + } + ], + "validation_size": 0.0, + "test_size": 0.5, + "train_start": 4, + "train_end": 5, + "validation_start": 6, + "validation_end": 7, + "test_start": 8, + "test_end": 9, + "seed": 123, + "created": "test_created", + "creator": "test_creator", + "features": [ + { + "name": "test_name", + "type": "test_type", + "index": "test_index", + "featuregroup": { + "type": "cachedFeaturegroupDTO", + "validation_type": "test_validation_type", + "created": "2022-08-01T11:07:55Z", + "creator": { + "email": "admin@hopsworks.ai", + "firstName": "Admin", + "lastName": "Admin", + "maxNumProjects": 0, + "numActiveProjects": 0, + "numRemainingProjects": 0, + "status": 0, + "testUser": false, + "tos": false, + "toursState": 0, + "twoFactor": false + }, + "description": "test_description", + "featurestoreId": 67, + "featurestoreName": "test_featurestore", + "id": 15, + "location": "hopsfs://10.0.2.15:8020/apps/hive/warehouse/test_featurestore.db/fg_test_1", + "name": "fg_test", + "statisticsConfig": { + "columns": [], + "correlations": false, + "enabled": true, + "exactUniqueness": false, + "histograms": false + }, + "version": 1, + "features": [ + { + "defaultValue": null, + "featureGroupId": 15, + "hudiPrecombineKey": true, + "name": "intt", + "onlineType": "int", + "partition": false, + "primary": true, + "type": "int" + }, + { + "defaultValue": null, + "featureGroupId": 15, + "hudiPrecombineKey": false, + "name": "stringt", + "onlineType": "varchar(1000)", + "partition": false, + "primary": false, + "type": "string" + } + ], + "onlineTopicName": "119_15_fg_test_1_onlinefs", + "onlineEnabled": true, + "timeTravelFormat": "HUDI" }, - { - "name": "keyStorePassword" + "feature_group_feature_name": "test_feature_group_feature_name", + "label": { + "count": 1, + "items": [ + { + "featurestore_id": 11, + "version": 1, + "name": "test_name", + "href": "test_href" + } + ] } - ], - "connectionString": "test_conn_string" + } + ], + "statistics_config": { + "enabled": true, + "correlations": true, + "histograms": true, + "exact_uniqueness": true, + "columns": [] + }, + "featurestore_name": "test_featurestore_name", + "id": 11, + "inode_id": 64, + "training_dataset_type": "HOPSFS_TRAINING_DATASET", + "from_query": "test_from_query", + "querydto": "test_querydto", + "label": "test_label", + "train_split": "test_train_split", + "time_split_size": "test_time_split_size", + "type": "trainingDatasetDTO" + } + ] + }, + "get_external": { + "response": [ + { + "name": "test_name", + "version": 1, + "data_format": "hudi", + "featurestore_id": 22, + "location": "test_location", + "event_start_time": 1646438400000, + "event_end_time": 1646697600000, + "coalesce": true, + "description": "test_description", + "dataSource": { + "query": "", + "database": "", + "group": "", + "path": "test_path", + "storage_connector": { + "type": "featurestoreS3ConnectorDTO", + "description": "S3 connector description", + "featurestoreId": 67, + "id": 1, + "name": "test_s3", + "storageConnectorType": "S3" + } }, "splits": [ { @@ -159,28 +292,11 @@ "featurestore_id": 22, "type": "trainingDatasetDTO", "created": "test_created", - "storage_connector": { - "type": "featurestoreJdbcConnectorDTO", - "description": "JDBC connector description", - "featurestoreId": 67, - "id": 1, - "name": "test_jdbc", - "storageConnectorType": "JDBC", - "arguments": [ - { - "name": "sslTrustStore" - }, - { - "name": "trustStorePassword" - }, - { - "name": "sslKeyStore" - }, - { - "name": "keyStorePassword" - } - ], - "connectionString": "test_conn_string" + "dataSource": { + "query": "", + "database": "", + "group": "", + "path": "test_path" }, "splits": [], "statistics_config": {} diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index afab4b6938..ab506638fb 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -826,7 +826,7 @@ def test_from_response_json(self, backend_fixtures): fg = feature_group.ExternalFeatureGroup.from_response_json(json) # Assert - assert isinstance(fg.storage_connector, storage_connector.StorageConnector) + assert isinstance(fg.data_source.storage_connector, storage_connector.StorageConnector) assert fg.data_source.query == "Select * from " assert fg.data_format == "HUDI" assert fg.data_source.path == "test_path" @@ -860,7 +860,7 @@ def test_from_response_json_list(self, backend_fixtures): # Assert assert len(fg_list) == 1 fg = fg_list[0] - assert isinstance(fg.storage_connector, storage_connector.StorageConnector) + assert isinstance(fg.data_source.storage_connector, storage_connector.StorageConnector) assert fg.data_source.query == "Select * from " assert fg.data_format == "HUDI" assert fg.data_source.path == "test_path" @@ -892,7 +892,7 @@ def test_from_response_json_basic_info(self, backend_fixtures): fg = feature_group.ExternalFeatureGroup.from_response_json(json) # Assert - assert isinstance(fg.storage_connector, storage_connector.StorageConnector) + assert isinstance(fg.data_source.storage_connector, storage_connector.StorageConnector) assert fg.data_source.query is None assert fg.data_format is None assert fg.data_source.path is None @@ -1088,7 +1088,7 @@ def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector( + fg._data_source._storage_connector = storage_connector.S3Connector( id=1, name="s3_conn", featurestore_id=fg.feature_store_id ) @@ -1110,7 +1110,7 @@ def test_prepare_spark_location_with_s3_connector_python( json = backend_fixtures["feature_group"]["get_basic_info"]["response"] fg = feature_group.FeatureGroup.from_response_json(json) fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector( + fg._data_source._storage_connector = storage_connector.S3Connector( id=1, name="s3_conn", featurestore_id=fg.feature_store_id ) diff --git a/python/tests/test_training_dataset.py b/python/tests/test_training_dataset.py index 8dc6ba1ca6..9f2dad4b6b 100644 --- a/python/tests/test_training_dataset.py +++ b/python/tests/test_training_dataset.py @@ -59,7 +59,51 @@ def test_from_response_json(self, mocker, backend_fixtures): assert td.feature_store_id == 22 assert td.train_split == "test_train_split" assert td.training_dataset_type == "HOPSFS_TRAINING_DATASET" - assert isinstance(td.storage_connector, storage_connector.JdbcConnector) + assert isinstance(td.data_source.storage_connector, storage_connector.HopsFSConnector) + assert len(td._features) == 1 + assert isinstance( + td._features[0], training_dataset_feature.TrainingDatasetFeature + ) + assert len(td.splits) == 1 + assert isinstance(td.splits[0], training_dataset_split.TrainingDatasetSplit) + assert isinstance(td.statistics_config, statistics_config.StatisticsConfig) + assert td.label == ["test_name"] + + def test_from_response_json_external(self, mocker, backend_fixtures): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + json = backend_fixtures["training_dataset"]["get_external"]["response"] + + # Act + td_list = training_dataset.TrainingDataset.from_response_json(json) + + # Assert + assert len(td_list) == 1 + td = td_list[0] + assert td.id == 11 + assert td.name == "test_name" + assert td.version == 1 + assert td.description == "test_description" + assert td.data_format == "hudi" + assert td._start_time == 1646438400000 + assert td._end_time == 1646697600000 + assert td.validation_size == 0.0 + assert td.test_size == 0.5 + assert td.train_start == 4 + assert td.train_end == 5 + assert td.validation_start == 6 + assert td.validation_end == 7 + assert td.test_start == 8 + assert td.test_end == 9 + assert td.coalesce is True + assert td.seed == 123 + assert td.location == "test_location" + assert td._from_query == "test_from_query" + assert td._querydto == "test_querydto" + assert td.feature_store_id == 22 + assert td.train_split == "test_train_split" + assert td.training_dataset_type == "EXTERNAL_TRAINING_DATASET" + assert isinstance(td.data_source.storage_connector, storage_connector.S3Connector) assert len(td._features) == 1 assert isinstance( td._features[0], training_dataset_feature.TrainingDatasetFeature @@ -102,8 +146,8 @@ def test_from_response_json_basic_info(self, mocker, backend_fixtures): assert td._querydto is None assert td.feature_store_id == 22 assert td.train_split is None - assert td.training_dataset_type is None - assert isinstance(td.storage_connector, storage_connector.JdbcConnector) + assert td.training_dataset_type == "HOPSFS_TRAINING_DATASET" + assert isinstance(td.data_source.storage_connector, storage_connector.HopsFSConnector) assert len(td._features) == 0 assert len(td.splits) == 0 assert isinstance(td.statistics_config, statistics_config.StatisticsConfig)