Skip to content

Commit 32b7329

Browse files
[FSTORE-743][append][java] Add support for insert to external feature groups online (#983)
1 parent df30819 commit 32b7329

File tree

3 files changed

+100
-18
lines changed

3 files changed

+100
-18
lines changed

java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
8888
ExternalDataFormat dataFormat, String path, Map<String, String> options,
8989
@NonNull StorageConnector storageConnector, String description,
9090
List<String> primaryKeys, List<Feature> features, StatisticsConfig statisticsConfig,
91-
String eventTime) {
91+
String eventTime, boolean onlineEnabled, String onlineTopicName) {
9292
this();
9393
this.timeTravelFormat = null;
9494
this.featureStore = featureStore;
@@ -108,6 +108,8 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
108108
this.features = features;
109109
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
110110
this.eventTime = eventTime;
111+
this.onlineEnabled = onlineEnabled;
112+
this.onlineTopicName = onlineTopicName;
111113
}
112114

113115
public ExternalFeatureGroup() {
@@ -135,17 +137,17 @@ public Dataset<Row> read() throws FeatureStoreException, IOException {
135137

136138
@Override
137139
public Dataset<Row> read(boolean online) throws FeatureStoreException, IOException {
138-
return null;
140+
return selectAll().read(online);
139141
}
140142

141143
@Override
142144
public Dataset<Row> read(Map<String, String> readOptions) throws FeatureStoreException, IOException {
143-
return null;
145+
return selectAll().read(false, readOptions);
144146
}
145147

146148
@Override
147149
public Dataset<Row> read(boolean online, Map<String, String> readOptions) throws FeatureStoreException, IOException {
148-
return null;
150+
return selectAll().read(online, readOptions);
149151
}
150152

151153
@Override
@@ -176,18 +178,7 @@ public void show(int numRows) throws FeatureStoreException, IOException {
176178

177179
@Override
178180
public void show(int numRows, boolean online) throws FeatureStoreException, IOException {
179-
180-
}
181-
182-
@Override
183-
public void insert(Dataset<Row> featureData) throws IOException, FeatureStoreException, ParseException {
184-
185-
}
186-
187-
@Override
188-
public void insert(Dataset<Row> featureData, Map<String, String> writeOptions)
189-
throws FeatureStoreException, IOException, ParseException {
190-
181+
read(true).show(numRows);
191182
}
192183

193184
@Override
@@ -238,6 +229,77 @@ public void insert(Dataset<Row> featureData, boolean overwrite, Map<String, Stri
238229

239230
}
240231

232+
/**
233+
* Incrementally insert data to the online storage of an external feature group. The feature group has to be online
234+
* enabled to perform this operation.
235+
* The `features` dataframe can be a Spark DataFrame or RDD.
236+
* If statistics are enabled, statistics are recomputed for the entire feature group.
237+
* If the feature group doesn't exist, the insert method will create the necessary metadata the first time it is
238+
* invoked and write the specified `features` dataframe as feature group to the online feature store.
239+
*
240+
* <pre>
241+
* {@code
242+
* // get feature store handle
243+
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
244+
* // get feature group handle
245+
* ExternalFeatureGroup fg = fs.getExternalFeatureGroup("electricity_prices", 1);
246+
* // insert data
247+
* fg.insert(featureData, writeOptions);
248+
* }
249+
* </pre>
250+
*
251+
* @param featureData Spark DataFrame, RDD. Features to be saved.
252+
* @throws IOException Generic IO exception.
253+
* @throws FeatureStoreException If client is not connected to Hopsworks; cannot run read query on storage and/or
254+
* can't reconcile schema.
255+
*/
256+
@Override
257+
public void insert(Dataset<Row> featureData)
258+
throws FeatureStoreException, IOException {
259+
260+
featureGroupEngine.insert(this, featureData, null);
261+
262+
codeEngine.saveCode(this);
263+
computeStatistics();
264+
}
265+
266+
/**
267+
* Incrementally insert data to the online storage of an external feature group. The feature group has to be online
268+
* enabled to perform this operation.
269+
* The `features` dataframe can be a Spark DataFrame or RDD.
270+
* If statistics are enabled, statistics are recomputed for the entire feature group.
271+
* If the feature group doesn't exist, the insert method will create the necessary metadata the first time it is
272+
* invoked and write the specified `features` dataframe as feature group to the online feature store.
273+
*
274+
* <pre>
275+
* {@code
276+
* // get feature store handle
277+
* FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
278+
* // get feature group handle
279+
* ExternalFeatureGroup fg = fs.getExternalFeatureGroup("electricity_prices", 1);
280+
* // Define additional write options (for example for Spark)
281+
* Map<String, String> writeOptions = = new HashMap<String, String>();
282+
* // insert data
283+
* fg.insert(featureData, writeOptions);
284+
* }
285+
* </pre>
286+
*
287+
* @param featureData Spark DataFrame, RDD. Features to be saved.
288+
* @param writeOptions Additional write options as key-value pairs.
289+
* @throws IOException Generic IO exception.
290+
* @throws FeatureStoreException If client is not connected to Hopsworks; cannot run read query on storage and/or
291+
* can't reconcile schema.
292+
*/
293+
@Override
294+
public void insert(Dataset<Row> featureData, Map<String, String> writeOptions)
295+
throws FeatureStoreException, IOException, ParseException {
296+
297+
featureGroupEngine.insert(this, featureData, writeOptions);
298+
299+
codeEngine.saveCode(this);
300+
computeStatistics();
301+
}
302+
241303
@Override
242304
public void commitDeleteRecord(Dataset<Row> featureData) throws FeatureStoreException, IOException, ParseException {
243305

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,25 @@ public void insert(StreamFeatureGroup streamFeatureGroup, Dataset<Row> featureDa
157157
SparkEngine.getInstance().getKafkaConfig(streamFeatureGroup, writeOptions));
158158
}
159159

160+
public void insert(ExternalFeatureGroup externalFeatureGroup, Dataset<Row> featureData,
161+
Map<String, String> writeOptions)
162+
throws FeatureStoreException, IOException {
163+
164+
if (!externalFeatureGroup.getOnlineEnabled()) {
165+
throw new FeatureStoreException("Online storage is not enabled for this feature group. External feature groups "
166+
+ "can only store data in online storage. To create an offline only external feature group, use the `save` "
167+
+ "method.");
168+
}
169+
170+
if (externalFeatureGroup.getId() == null) {
171+
externalFeatureGroup = saveExternalFeatureGroup(externalFeatureGroup);
172+
}
173+
174+
SparkEngine.getInstance().writeOnlineDataframe(externalFeatureGroup, featureData,
175+
externalFeatureGroup.getOnlineTopicName(),
176+
SparkEngine.getInstance().getKafkaConfig(externalFeatureGroup, writeOptions));
177+
}
178+
160179
@Deprecated
161180
public StreamingQuery insertStream(FeatureGroup featureGroup, Dataset<Row> featureData, String queryName,
162181
String outputMode, boolean awaitTermination, Long timeout,
@@ -461,6 +480,7 @@ public ExternalFeatureGroup saveExternalFeatureGroup(ExternalFeatureGroup extern
461480

462481
ExternalFeatureGroup apiFg = saveExtennalFeatureGroupMetaData(externalFeatureGroup, ExternalFeatureGroup.class);
463482
externalFeatureGroup.setId(apiFg.getId());
483+
externalFeatureGroup.setOnlineTopicName(apiFg.getOnlineTopicName());
464484

465485
return externalFeatureGroup;
466486
}

java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestExternalFeatureGroup.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testExternalFeatureGroupPrimaryKey() throws FeatureStoreException, I
5050

5151
ExternalFeatureGroup externalFeatureGroup = new ExternalFeatureGroup(featureStore, "fgName", 1,
5252
"SELECT *", ExternalDataFormat.HUDI, "path", null, storageConnector, "description",
53-
Collections.singletonList("primaryKey"), features, null, "featureA");
53+
Collections.singletonList("primaryKey"), features, null, "featureA", false, "topic");
5454

5555
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
5656
externalFeatureGroupEngine.saveExternalFeatureGroup(externalFeatureGroup);
@@ -76,7 +76,7 @@ public void testFeatureGroupEventTimeFeature() throws FeatureStoreException, IOE
7676

7777
ExternalFeatureGroup externalFeatureGroup = new ExternalFeatureGroup(featureStore, "fgName", 1,
7878
"SELECT *", ExternalDataFormat.HUDI, "path", null, storageConnector, "description",
79-
Collections.singletonList("featureA"), features, null, "eventTime");
79+
Collections.singletonList("featureA"), features, null, "eventTime", false, "topic");
8080

8181
Exception pkException = assertThrows(FeatureStoreException.class, () -> {
8282
externalFeatureGroupEngine.saveExternalFeatureGroup(externalFeatureGroup);

0 commit comments

Comments
 (0)