Skip to content

Commit 5a7f9f4

Browse files
committed
Add SendGrid Sink Plugin
1 parent 05cd742 commit 5a7f9f4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2094
-444
lines changed

checkstyle.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
-->
1414

1515
<!DOCTYPE module PUBLIC
16-
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
17-
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
16+
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
17+
"https://checkstyle.org/dtds/configuration_1_3.dtd">
1818

1919
<!-- This is a checkstyle configuration file. For descriptions of
2020
what the following rules do, please see the checkstyle configuration

docs/SendGrid-batchsink.md

Whitespace-only changes.

icons/SendGrid-batchsink.png

442 Bytes
Loading

pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@
3434
<common.logging.version>1.2</common.logging.version>
3535
</properties>
3636

37+
<scm>
38+
<connection>scm:git:https://github.com/data-integrations/sendgrid.git</connection>
39+
<developerConnection>scm:git@github.com:data-integrations/sendgrid.git</developerConnection>
40+
<url>https://github.com/data-integrations/sendgrid</url>
41+
<tag>HEAD</tag>
42+
</scm>
43+
44+
<issueManagement>
45+
<url>https://issues.cask.co/browse/CDAP</url>
46+
</issueManagement>
47+
3748
<repositories>
3849
<repository>
3950
<id>sonatype</id>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.sendgrid.batch.sink;
17+
18+
import io.cdap.plugin.sendgrid.common.objects.mail.SendGridMail;
19+
import org.apache.hadoop.io.NullWritable;
20+
import org.apache.hadoop.mapreduce.JobContext;
21+
import org.apache.hadoop.mapreduce.OutputCommitter;
22+
import org.apache.hadoop.mapreduce.OutputFormat;
23+
import org.apache.hadoop.mapreduce.RecordWriter;
24+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
25+
26+
import java.io.IOException;
27+
28+
/**
29+
* An OutputFormat that sends the output of a Hadoop job to the SendGrid record writer, also
30+
* it defines the output committer.
31+
*/
32+
public class SendGridOutputFormat extends OutputFormat<NullWritable, SendGridMail> {
33+
@Override
34+
public RecordWriter<NullWritable, SendGridMail> getRecordWriter(TaskAttemptContext taskAttemptContext) {
35+
return new SendGridRecordWriter(taskAttemptContext);
36+
}
37+
38+
@Override
39+
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
40+
// no-op
41+
}
42+
43+
@Override
44+
public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
45+
return new OutputCommitter() {
46+
@Override
47+
public void setupJob(JobContext jobContext) throws IOException {
48+
49+
}
50+
51+
@Override
52+
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
53+
54+
}
55+
56+
@Override
57+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
58+
return false;
59+
}
60+
61+
@Override
62+
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
63+
64+
}
65+
66+
@Override
67+
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
68+
69+
}
70+
};
71+
}
72+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.sendgrid.batch.sink;
17+
18+
import com.google.common.collect.ImmutableMap;
19+
import com.google.gson.Gson;
20+
import com.google.gson.GsonBuilder;
21+
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* Provides {@link SendGridOutputFormat} class name and configuration
27+
*/
28+
public class SendGridOutputFormatProvider implements OutputFormatProvider {
29+
public static final String PROPERTY_CONFIG_JSON = "cdap.sendgrid.config";
30+
private static final Gson gson = new GsonBuilder().create();
31+
private final Map<String, String> conf;
32+
33+
SendGridOutputFormatProvider(SendGridSinkConfig config) {
34+
this.conf = new ImmutableMap.Builder<String, String>()
35+
.put(PROPERTY_CONFIG_JSON, gson.toJson(config))
36+
.build();
37+
}
38+
39+
@Override
40+
public String getOutputFormatClassName() {
41+
return SendGridOutputFormat.class.getName();
42+
}
43+
44+
@Override
45+
public Map<String, String> getOutputFormatConfiguration() {
46+
return conf;
47+
}
48+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.sendgrid.batch.sink;
17+
18+
import com.google.gson.Gson;
19+
import com.google.gson.GsonBuilder;
20+
import io.cdap.plugin.sendgrid.common.SendGridClient;
21+
import io.cdap.plugin.sendgrid.common.objects.mail.SendGridMail;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.io.NullWritable;
24+
import org.apache.hadoop.mapreduce.RecordWriter;
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26+
27+
28+
import java.io.IOException;
29+
30+
/**
31+
* Writes {@link SendGridMail} into batches and submit them to SendGrid send API
32+
*/
33+
public class SendGridRecordWriter extends RecordWriter<NullWritable, SendGridMail> {
34+
private static final Gson gson = new GsonBuilder().create();
35+
private SendGridClient client;
36+
37+
public SendGridRecordWriter(TaskAttemptContext taskAttemptContext) {
38+
Configuration conf = taskAttemptContext.getConfiguration();
39+
String serializedConfig = conf.get(SendGridOutputFormatProvider.PROPERTY_CONFIG_JSON);
40+
SendGridSinkConfig sgConfig = gson.fromJson(serializedConfig, SendGridSinkConfig.class);
41+
42+
client = new SendGridClient(sgConfig);
43+
}
44+
45+
@Override
46+
public void write(NullWritable nullWritable, SendGridMail sendGridMail) throws IOException {
47+
client.sendMail(sendGridMail);
48+
}
49+
50+
@Override
51+
public void close(TaskAttemptContext taskAttemptContext) {
52+
// no-op
53+
}
54+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.cdap.plugin.sendgrid.batch.sink;
17+
18+
import io.cdap.cdap.api.annotation.Description;
19+
import io.cdap.cdap.api.annotation.Name;
20+
import io.cdap.cdap.api.annotation.Plugin;
21+
import io.cdap.cdap.api.data.batch.Output;
22+
import io.cdap.cdap.api.data.format.StructuredRecord;
23+
import io.cdap.cdap.api.data.schema.Schema;
24+
import io.cdap.cdap.api.dataset.lib.KeyValue;
25+
import io.cdap.cdap.etl.api.Emitter;
26+
import io.cdap.cdap.etl.api.FailureCollector;
27+
import io.cdap.cdap.etl.api.PipelineConfigurer;
28+
import io.cdap.cdap.etl.api.batch.BatchSink;
29+
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
30+
import io.cdap.plugin.common.IdUtils;
31+
import io.cdap.plugin.common.LineageRecorder;
32+
import io.cdap.plugin.sendgrid.common.config.BaseConfig;
33+
import io.cdap.plugin.sendgrid.common.objects.mail.SendGridMail;
34+
import org.apache.hadoop.io.NullWritable;
35+
36+
import java.util.stream.Collectors;
37+
38+
/**
39+
* Batch Sink Plugin
40+
*/
41+
@Plugin(type = BatchSink.PLUGIN_TYPE)
42+
@Name(BaseConfig.PLUGIN_NAME)
43+
@Description("Sends mails via SendGrid")
44+
public class SendGridSink extends BatchSink<StructuredRecord, NullWritable, SendGridMail> {
45+
46+
private final SendGridSinkConfig config;
47+
48+
public SendGridSink(SendGridSinkConfig config) {
49+
this.config = config;
50+
}
51+
52+
@Override
53+
@SuppressWarnings("ThrowableNotThrown")
54+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
55+
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
56+
57+
IdUtils.validateReferenceName(config.referenceName, failureCollector);
58+
59+
config.validate(failureCollector);
60+
config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema());
61+
62+
failureCollector.getOrThrowException();
63+
}
64+
65+
@Override
66+
public void prepareRun(BatchSinkContext batchSinkContext) {
67+
Schema inputSchema = batchSinkContext.getInputSchema();
68+
config.validate(inputSchema);
69+
70+
batchSinkContext.addOutput(Output.of(config.referenceName, new SendGridOutputFormatProvider(config)));
71+
72+
LineageRecorder lineageRecorder = new LineageRecorder(batchSinkContext, config.referenceName);
73+
lineageRecorder.createExternalDataset(inputSchema);
74+
75+
if (inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) {
76+
String operationDescription = String.format("Wrote to SendGrid %s", config.getFrom());
77+
lineageRecorder.recordWrite("Write", operationDescription,
78+
inputSchema.getFields().stream()
79+
.map(Schema.Field::getName)
80+
.collect(Collectors.toList()));
81+
}
82+
}
83+
84+
@Override
85+
public void transform(StructuredRecord record, Emitter<KeyValue<NullWritable, SendGridMail>> emitter) {
86+
SendGridMail sendGridMail = SendGridSinkTransformer.transform(config, record);
87+
emitter.emit(new KeyValue<>(null, sendGridMail));
88+
}
89+
}

0 commit comments

Comments
 (0)