Skip to content

Commit 6166e8d

Browse files
committed
feat: support inbound plugin jsonschema validator
1 parent adce057 commit 6166e8d

File tree

15 files changed

+673
-6
lines changed

15 files changed

+673
-6
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package jsonschema
2+
3+
import (
4+
"github.com/getkin/kin-openapi/openapi3"
5+
lru "github.com/hashicorp/golang-lru/v2"
6+
"github.com/webhookx-io/webhookx/pkg/openapi"
7+
"github.com/webhookx-io/webhookx/utils"
8+
)
9+
10+
type JSONSchema struct {
11+
schemaDef string
12+
hex string
13+
}
14+
15+
func New(schemaDef []byte) *JSONSchema {
16+
return &JSONSchema{
17+
schemaDef: string(schemaDef),
18+
hex: utils.Hash256(string(schemaDef)),
19+
}
20+
}
21+
22+
var cache, _ = lru.New[string, *openapi3.Schema](128)
23+
24+
func (s *JSONSchema) Validate(ctx *ValidatorContext) error {
25+
schema, ok := cache.Get(s.hex)
26+
if !ok {
27+
schema = &openapi3.Schema{}
28+
err := schema.UnmarshalJSON([]byte(s.schemaDef))
29+
if err != nil {
30+
return err
31+
}
32+
cache.Add(s.hex, schema)
33+
}
34+
35+
err := openapi.Validate(schema, ctx.HTTPRequest.Data)
36+
if err != nil {
37+
return err
38+
}
39+
return nil
40+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package jsonschema
2+
3+
import (
4+
"encoding/json"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
"testing"
8+
)
9+
10+
func TestJSONSchema(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "Schema Validator Suite")
13+
}
14+
15+
var _ = Describe("Schema Validator Plugin", func() {
16+
17+
Context("JSONSchema Validator", func() {
18+
It("should validate valid JSON data against the schema", func() {
19+
schemaDef := `{
20+
"type": "object",
21+
"properties": {
22+
"name": { "type": "string" },
23+
"age": { "type": "integer", "minimum": 0 }
24+
},
25+
"required": ["name", "age"]
26+
}`
27+
28+
validator := New([]byte(schemaDef))
29+
30+
validData := map[string]interface{}{"name": "John Doe", "age": 30}
31+
ctx := &ValidatorContext{
32+
HTTPRequest: &HTTPRequest{
33+
Data: validData,
34+
},
35+
}
36+
37+
err := validator.Validate(ctx)
38+
Expect(err).To(BeNil())
39+
})
40+
41+
It("should return an error for invalid JSON data against the schema", func() {
42+
schemaDef := `{
43+
"type": "object",
44+
"properties": {
45+
"name": { "type": "string" },
46+
"age": { "type": "integer", "minimum": 0 }
47+
},
48+
"required": ["name", "age"]
49+
}`
50+
51+
validator := New([]byte(schemaDef))
52+
53+
invalidData := map[string]interface{}{"name": "John Doe", "age": -5}
54+
ctx := &ValidatorContext{
55+
HTTPRequest: &HTTPRequest{
56+
Data: invalidData,
57+
},
58+
}
59+
60+
err := validator.Validate(ctx)
61+
Expect(err).ToNot(BeNil())
62+
b, _ := json.Marshal(err)
63+
Expect(string(b)).To(Equal(`{"message":"request validation","fields":{"age":"number must be at least 0"}}`))
64+
})
65+
})
66+
})
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package jsonschema
2+
3+
import (
4+
"net/http"
5+
)
6+
7+
type Validator interface {
8+
Validate(ctx *ValidatorContext) error
9+
}
10+
11+
type ValidatorContext struct {
12+
HTTPRequest *HTTPRequest
13+
}
14+
15+
type HTTPRequest struct {
16+
R *http.Request
17+
Data map[string]any
18+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package jsonschema_validator
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"github.com/getkin/kin-openapi/openapi3"
9+
lru "github.com/hashicorp/golang-lru/v2"
10+
"github.com/webhookx-io/webhookx/pkg/errs"
11+
"github.com/webhookx-io/webhookx/pkg/plugin"
12+
"github.com/webhookx-io/webhookx/plugins/jsonschema_validator/jsonschema"
13+
"github.com/webhookx-io/webhookx/utils"
14+
"io"
15+
"net/http"
16+
"os"
17+
"time"
18+
)
19+
20+
type Config struct {
21+
Schemas map[string]*SchemaResource `json:"schemas" validate:"dive,required"`
22+
}
23+
24+
type EventTypeSchema struct {
25+
EventType string `json:"event_type" validate:"required,max=100"`
26+
JSONSchema string `json:"jsonschema" validate:"required,jsonschema,max=1048576"`
27+
}
28+
29+
type SchemaResource struct {
30+
JSONString string `json:"json" validate:"omitempty,json,max=1048576"`
31+
File string `json:"file" validate:"omitempty,file"`
32+
URL string `json:"url" validate:"omitempty,url"`
33+
}
34+
35+
var cache, _ = lru.New[string, []byte](128)
36+
37+
func (s *SchemaResource) Resource() ([]byte, string, error) {
38+
// priority: json > file > url
39+
if s.JSONString != "" {
40+
return []byte(s.JSONString), "json", nil
41+
}
42+
if s.File != "" {
43+
bytes, ok := cache.Get(s.File)
44+
if ok {
45+
return bytes, "file", nil
46+
}
47+
bytes, err := os.ReadFile(s.File)
48+
if err != nil {
49+
return nil, "file", fmt.Errorf("failed to read schema: %w", err)
50+
}
51+
cache.Add(s.File, bytes)
52+
return bytes, "file", nil
53+
}
54+
if s.URL != "" {
55+
bytes, ok := cache.Get(s.URL)
56+
if ok {
57+
return bytes, "url", nil
58+
}
59+
client := &http.Client{
60+
Timeout: time.Second * 2,
61+
}
62+
resp, err := client.Get(s.URL)
63+
if err != nil {
64+
return nil, "url", fmt.Errorf("failed to fetch schema: %w", err)
65+
}
66+
defer func() { _ = resp.Body.Close() }()
67+
body, err := io.ReadAll(resp.Body)
68+
if err != nil {
69+
return nil, "url", fmt.Errorf("failed to read schema from response: %w", err)
70+
}
71+
cache.Add(s.URL, body)
72+
return body, "url", nil
73+
}
74+
return nil, "json", errors.New("no schema defined")
75+
}
76+
77+
type SchemaValidatorPlugin struct {
78+
plugin.BasePlugin[Config]
79+
}
80+
81+
func New(config []byte) (plugin.Plugin, error) {
82+
p := &SchemaValidatorPlugin{}
83+
p.Name = "jsonschema-validator"
84+
85+
if config != nil {
86+
if err := p.UnmarshalConfig(config); err != nil {
87+
return nil, err
88+
}
89+
}
90+
return p, nil
91+
}
92+
93+
func (p *SchemaValidatorPlugin) ValidateConfig() error {
94+
err := utils.Validate(p.Config)
95+
if err != nil {
96+
return err
97+
}
98+
99+
e := errs.NewValidateError(errors.New("request validation"))
100+
for event, schema := range p.Config.Schemas {
101+
field := fmt.Sprintf("schemas[%s]", event)
102+
if schema == nil {
103+
e.Fields[field] = fmt.Errorf("schema is empty")
104+
return e
105+
}
106+
schemaBytes, invalidField, err := schema.Resource()
107+
if err != nil {
108+
e.Fields[field] = map[string]string{
109+
invalidField: err.Error(),
110+
}
111+
return e
112+
}
113+
openapiSchema := &openapi3.Schema{}
114+
err = openapiSchema.UnmarshalJSON(schemaBytes)
115+
if err != nil {
116+
e.Fields[field] = map[string]string{
117+
invalidField: "the content must be a valid json string",
118+
}
119+
return e
120+
}
121+
err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation())
122+
if err != nil {
123+
e.Fields[field] = map[string]string{
124+
invalidField: fmt.Sprintf("invalid jsonschema: %v", err),
125+
}
126+
return e
127+
}
128+
}
129+
return nil
130+
}
131+
132+
func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) {
133+
// parse body to get event type
134+
var event map[string]any
135+
body := inbound.RawBody
136+
if err = json.Unmarshal(body, &event); err != nil {
137+
return
138+
}
139+
140+
eventType, ok := event["event_type"].(string)
141+
if !ok || eventType == "" {
142+
res.Payload = body
143+
return
144+
}
145+
146+
data := event["data"]
147+
if data == nil {
148+
res.Payload = body
149+
return
150+
}
151+
152+
schemaResource, ok := p.Config.Schemas[eventType]
153+
if !ok || schemaResource == nil {
154+
res.Payload = body
155+
return
156+
}
157+
158+
bytes, _, err := schemaResource.Resource()
159+
if err != nil {
160+
return
161+
}
162+
validator := jsonschema.New(bytes)
163+
err = validator.Validate(&jsonschema.ValidatorContext{
164+
HTTPRequest: &jsonschema.HTTPRequest{
165+
R: inbound.Request,
166+
Data: data.(map[string]any),
167+
},
168+
})
169+
if err != nil {
170+
return
171+
}
172+
res.Payload = body
173+
return
174+
}

plugins/plugins.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package plugins
33
import (
44
"github.com/webhookx-io/webhookx/pkg/plugin"
55
"github.com/webhookx-io/webhookx/plugins/function"
6+
"github.com/webhookx-io/webhookx/plugins/jsonschema_validator"
67
"github.com/webhookx-io/webhookx/plugins/wasm"
78
"github.com/webhookx-io/webhookx/plugins/webhookx_signature"
89
)
910

1011
func LoadPlugins() {
1112
plugin.RegisterPlugin(plugin.TypeInbound, "function", function.New)
13+
plugin.RegisterPlugin(plugin.TypeInbound, "jsonschema-validator", jsonschema_validator.New)
1214
plugin.RegisterPlugin(plugin.TypeOutbound, "wasm", wasm.New)
1315
plugin.RegisterPlugin(plugin.TypeOutbound, "webhookx-signature", webhookx_signature.New)
1416
}

proxy/gateway.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/webhookx-io/webhookx/dispatcher"
1515
"github.com/webhookx-io/webhookx/eventbus"
1616
"github.com/webhookx-io/webhookx/mcache"
17+
"github.com/webhookx-io/webhookx/pkg/errs"
1718
"github.com/webhookx-io/webhookx/pkg/http/response"
1819
"github.com/webhookx-io/webhookx/pkg/loglimiter"
1920
"github.com/webhookx-io/webhookx/pkg/metrics"
@@ -256,10 +257,19 @@ func (gw *Gateway) handle(w http.ResponseWriter, r *http.Request) bool {
256257
RawBody: body,
257258
})
258259
if err != nil {
260+
if validateErr, ok := err.(*errs.ValidateError); ok {
261+
response.JSON(w, 400, types.ErrorResponse{
262+
Message: "Request Validation",
263+
Error: validateErr,
264+
})
265+
return false
266+
}
267+
259268
gw.log.Errorf("failed to execute plugin: %v", err)
260269
response.JSON(w, 500, types.ErrorResponse{Message: "internal error"})
261270
return false
262271
}
272+
263273
if result.Terminated {
264274
return false
265275
}

test/cmd/admin_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,13 @@ var _ = Describe("admin", Ordered, func() {
9595

9696
plugins, err = db.Plugins.ListSourcePlugin(context.TODO(), source.ID)
9797
assert.NoError(GinkgoT(), err)
98-
assert.Equal(GinkgoT(), 1, len(plugins))
98+
assert.Equal(GinkgoT(), 2, len(plugins))
9999
assert.Equal(GinkgoT(), "function", plugins[0].Name)
100100
assert.Equal(GinkgoT(), true, plugins[0].Enabled)
101101
assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config))
102+
assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name)
103+
assert.Equal(GinkgoT(), true, plugins[1].Enabled)
104+
assert.Equal(GinkgoT(), `{"schemas": {"charge.succeeded": {"url": "", "file": "", "json": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}}`, string(plugins[1].Config))
102105
})
103106

104107
It("entities not defined in the declarative configuration should be deleted", func() {

0 commit comments

Comments
 (0)