From 56a15da84dc75b98c8e0dfdb4c11b459d622b843 Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Mon, 15 Sep 2025 21:35:08 +0800 Subject: [PATCH 1/6] feat: support inbound plugin jsonschema validator --- .../jsonschema/jsonschema.go | 40 ++++ .../jsonschema/jsonschema_test.go | 66 +++++++ .../jsonschema/validator.go | 18 ++ plugins/jsonschema_validator/plugin.go | 174 ++++++++++++++++++ plugins/plugins.go | 2 + proxy/gateway.go | 10 + test/cmd/admin_test.go | 5 +- test/declarative/declarative_test.go | 96 ++++++++++ test/fixtures/jsonschema/charge.succeed.json | 22 +++ test/fixtures/jsonschema/invalid.json | 1 + test/fixtures/webhookx.yml | 20 +- test/plugins/jsonschema_validator_test.go | 167 +++++++++++++++++ utils/hash.go | 11 ++ utils/validate.go | 28 ++- webhookx.sample.yml | 19 +- 15 files changed, 673 insertions(+), 6 deletions(-) create mode 100644 plugins/jsonschema_validator/jsonschema/jsonschema.go create mode 100644 plugins/jsonschema_validator/jsonschema/jsonschema_test.go create mode 100644 plugins/jsonschema_validator/jsonschema/validator.go create mode 100644 plugins/jsonschema_validator/plugin.go create mode 100644 test/fixtures/jsonschema/charge.succeed.json create mode 100644 test/fixtures/jsonschema/invalid.json create mode 100644 test/plugins/jsonschema_validator_test.go create mode 100644 utils/hash.go diff --git a/plugins/jsonschema_validator/jsonschema/jsonschema.go b/plugins/jsonschema_validator/jsonschema/jsonschema.go new file mode 100644 index 0000000..06a9aa4 --- /dev/null +++ b/plugins/jsonschema_validator/jsonschema/jsonschema.go @@ -0,0 +1,40 @@ +package jsonschema + +import ( + "github.com/getkin/kin-openapi/openapi3" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/webhookx-io/webhookx/pkg/openapi" + "github.com/webhookx-io/webhookx/utils" +) + +type JSONSchema struct { + schemaDef string + hex string +} + +func New(schemaDef []byte) *JSONSchema { + return &JSONSchema{ + schemaDef: string(schemaDef), + hex: utils.Hash256(string(schemaDef)), + } +} + +var cache, _ = lru.New[string, *openapi3.Schema](128) + +func (s *JSONSchema) Validate(ctx *ValidatorContext) error { + schema, ok := cache.Get(s.hex) + if !ok { + schema = &openapi3.Schema{} + err := schema.UnmarshalJSON([]byte(s.schemaDef)) + if err != nil { + return err + } + cache.Add(s.hex, schema) + } + + err := openapi.Validate(schema, ctx.HTTPRequest.Data) + if err != nil { + return err + } + return nil +} diff --git a/plugins/jsonschema_validator/jsonschema/jsonschema_test.go b/plugins/jsonschema_validator/jsonschema/jsonschema_test.go new file mode 100644 index 0000000..c5968aa --- /dev/null +++ b/plugins/jsonschema_validator/jsonschema/jsonschema_test.go @@ -0,0 +1,66 @@ +package jsonschema + +import ( + "encoding/json" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "testing" +) + +func TestJSONSchema(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Schema Validator Suite") +} + +var _ = Describe("Schema Validator Plugin", func() { + + Context("JSONSchema Validator", func() { + It("should validate valid JSON data against the schema", func() { + schemaDef := `{ + "type": "object", + "properties": { + "name": { "type": "string" }, + "age": { "type": "integer", "minimum": 0 } + }, + "required": ["name", "age"] + }` + + validator := New([]byte(schemaDef)) + + validData := map[string]interface{}{"name": "John Doe", "age": 30} + ctx := &ValidatorContext{ + HTTPRequest: &HTTPRequest{ + Data: validData, + }, + } + + err := validator.Validate(ctx) + Expect(err).To(BeNil()) + }) + + It("should return an error for invalid JSON data against the schema", func() { + schemaDef := `{ + "type": "object", + "properties": { + "name": { "type": "string" }, + "age": { "type": "integer", "minimum": 0 } + }, + "required": ["name", "age"] + }` + + validator := New([]byte(schemaDef)) + + invalidData := map[string]interface{}{"name": "John Doe", "age": -5} + ctx := &ValidatorContext{ + HTTPRequest: &HTTPRequest{ + Data: invalidData, + }, + } + + err := validator.Validate(ctx) + Expect(err).ToNot(BeNil()) + b, _ := json.Marshal(err) + Expect(string(b)).To(Equal(`{"message":"request validation","fields":{"age":"number must be at least 0"}}`)) + }) + }) +}) diff --git a/plugins/jsonschema_validator/jsonschema/validator.go b/plugins/jsonschema_validator/jsonschema/validator.go new file mode 100644 index 0000000..7fa6b8f --- /dev/null +++ b/plugins/jsonschema_validator/jsonschema/validator.go @@ -0,0 +1,18 @@ +package jsonschema + +import ( + "net/http" +) + +type Validator interface { + Validate(ctx *ValidatorContext) error +} + +type ValidatorContext struct { + HTTPRequest *HTTPRequest +} + +type HTTPRequest struct { + R *http.Request + Data map[string]any +} diff --git a/plugins/jsonschema_validator/plugin.go b/plugins/jsonschema_validator/plugin.go new file mode 100644 index 0000000..92ddd75 --- /dev/null +++ b/plugins/jsonschema_validator/plugin.go @@ -0,0 +1,174 @@ +package jsonschema_validator + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/getkin/kin-openapi/openapi3" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/webhookx-io/webhookx/pkg/errs" + "github.com/webhookx-io/webhookx/pkg/plugin" + "github.com/webhookx-io/webhookx/plugins/jsonschema_validator/jsonschema" + "github.com/webhookx-io/webhookx/utils" + "io" + "net/http" + "os" + "time" +) + +type Config struct { + Schemas map[string]*SchemaResource `json:"schemas" validate:"dive,required"` +} + +type EventTypeSchema struct { + EventType string `json:"event_type" validate:"required,max=100"` + JSONSchema string `json:"jsonschema" validate:"required,jsonschema,max=1048576"` +} + +type SchemaResource struct { + JSONString string `json:"json" validate:"omitempty,json,max=1048576"` + File string `json:"file" validate:"omitempty,file"` + URL string `json:"url" validate:"omitempty,url"` +} + +var cache, _ = lru.New[string, []byte](128) + +func (s *SchemaResource) Resource() ([]byte, string, error) { + // priority: json > file > url + if s.JSONString != "" { + return []byte(s.JSONString), "json", nil + } + if s.File != "" { + bytes, ok := cache.Get(s.File) + if ok { + return bytes, "file", nil + } + bytes, err := os.ReadFile(s.File) + if err != nil { + return nil, "file", fmt.Errorf("failed to read schema: %w", err) + } + cache.Add(s.File, bytes) + return bytes, "file", nil + } + if s.URL != "" { + bytes, ok := cache.Get(s.URL) + if ok { + return bytes, "url", nil + } + client := &http.Client{ + Timeout: time.Second * 2, + } + resp, err := client.Get(s.URL) + if err != nil { + return nil, "url", fmt.Errorf("failed to fetch schema: %w", err) + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, "url", fmt.Errorf("failed to read schema from response: %w", err) + } + cache.Add(s.URL, body) + return body, "url", nil + } + return nil, "json", errors.New("no schema defined") +} + +type SchemaValidatorPlugin struct { + plugin.BasePlugin[Config] +} + +func New(config []byte) (plugin.Plugin, error) { + p := &SchemaValidatorPlugin{} + p.Name = "jsonschema-validator" + + if config != nil { + if err := p.UnmarshalConfig(config); err != nil { + return nil, err + } + } + return p, nil +} + +func (p *SchemaValidatorPlugin) ValidateConfig() error { + err := utils.Validate(p.Config) + if err != nil { + return err + } + + e := errs.NewValidateError(errors.New("request validation")) + for event, schema := range p.Config.Schemas { + field := fmt.Sprintf("schemas[%s]", event) + if schema == nil { + e.Fields[field] = fmt.Errorf("schema is empty") + return e + } + schemaBytes, invalidField, err := schema.Resource() + if err != nil { + e.Fields[field] = map[string]string{ + invalidField: err.Error(), + } + return e + } + openapiSchema := &openapi3.Schema{} + err = openapiSchema.UnmarshalJSON(schemaBytes) + if err != nil { + e.Fields[field] = map[string]string{ + invalidField: "the content must be a valid json string", + } + return e + } + err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation()) + if err != nil { + e.Fields[field] = map[string]string{ + invalidField: fmt.Sprintf("invalid jsonschema: %v", err), + } + return e + } + } + return nil +} + +func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) { + // parse body to get event type + var event map[string]any + body := inbound.RawBody + if err = json.Unmarshal(body, &event); err != nil { + return + } + + eventType, ok := event["event_type"].(string) + if !ok || eventType == "" { + res.Payload = body + return + } + + data := event["data"] + if data == nil { + res.Payload = body + return + } + + schemaResource, ok := p.Config.Schemas[eventType] + if !ok || schemaResource == nil { + res.Payload = body + return + } + + bytes, _, err := schemaResource.Resource() + if err != nil { + return + } + validator := jsonschema.New(bytes) + err = validator.Validate(&jsonschema.ValidatorContext{ + HTTPRequest: &jsonschema.HTTPRequest{ + R: inbound.Request, + Data: data.(map[string]any), + }, + }) + if err != nil { + return + } + res.Payload = body + return +} diff --git a/plugins/plugins.go b/plugins/plugins.go index df737e9..7bddbab 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -3,12 +3,14 @@ package plugins import ( "github.com/webhookx-io/webhookx/pkg/plugin" "github.com/webhookx-io/webhookx/plugins/function" + "github.com/webhookx-io/webhookx/plugins/jsonschema_validator" "github.com/webhookx-io/webhookx/plugins/wasm" "github.com/webhookx-io/webhookx/plugins/webhookx_signature" ) func LoadPlugins() { plugin.RegisterPlugin(plugin.TypeInbound, "function", function.New) + plugin.RegisterPlugin(plugin.TypeInbound, "jsonschema-validator", jsonschema_validator.New) plugin.RegisterPlugin(plugin.TypeOutbound, "wasm", wasm.New) plugin.RegisterPlugin(plugin.TypeOutbound, "webhookx-signature", webhookx_signature.New) } diff --git a/proxy/gateway.go b/proxy/gateway.go index aea0c8c..a7c0022 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -14,6 +14,7 @@ import ( "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/eventbus" "github.com/webhookx-io/webhookx/mcache" + "github.com/webhookx-io/webhookx/pkg/errs" "github.com/webhookx-io/webhookx/pkg/http/response" "github.com/webhookx-io/webhookx/pkg/loglimiter" "github.com/webhookx-io/webhookx/pkg/metrics" @@ -256,10 +257,19 @@ func (gw *Gateway) handle(w http.ResponseWriter, r *http.Request) bool { RawBody: body, }) if err != nil { + if validateErr, ok := err.(*errs.ValidateError); ok { + response.JSON(w, 400, types.ErrorResponse{ + Message: "Request Validation", + Error: validateErr, + }) + return false + } + gw.log.Errorf("failed to execute plugin: %v", err) response.JSON(w, 500, types.ErrorResponse{Message: "internal error"}) return false } + if result.Terminated { return false } diff --git a/test/cmd/admin_test.go b/test/cmd/admin_test.go index f3c0670..5b529b6 100644 --- a/test/cmd/admin_test.go +++ b/test/cmd/admin_test.go @@ -95,10 +95,13 @@ var _ = Describe("admin", Ordered, func() { plugins, err = db.Plugins.ListSourcePlugin(context.TODO(), source.ID) assert.NoError(GinkgoT(), err) - assert.Equal(GinkgoT(), 1, len(plugins)) + assert.Equal(GinkgoT(), 2, len(plugins)) assert.Equal(GinkgoT(), "function", plugins[0].Name) assert.Equal(GinkgoT(), true, plugins[0].Enabled) assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config)) + assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name) + assert.Equal(GinkgoT(), true, plugins[1].Enabled) + assert.Equal(GinkgoT(), `{"schemas": {"charge.succeeded": {"url": "", "file": "", "json": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}}`, string(plugins[1].Config)) }) It("entities not defined in the declarative configuration should be deleted", func() { diff --git a/test/declarative/declarative_test.go b/test/declarative/declarative_test.go index 5a24445..f883a52 100644 --- a/test/declarative/declarative_test.go +++ b/test/declarative/declarative_test.go @@ -1,6 +1,7 @@ package declarative import ( + "fmt" "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -39,6 +40,44 @@ endpoints: plugins: - name: foo ` + + invalidSourcePluginJSONSchemaJSONYAML = ` +sources: + - name: default-source + path: / + methods: ["POST"] + plugins: + - name: "jsonschema-validator" + config: + schemas: + charge.succeed: + json: '%s' +` + + invalidSourcePluginJSONSchemaFileYAML = ` +sources: + - name: default-source + path: / + methods: ["POST"] + plugins: + - name: "jsonschema-validator" + config: + schemas: + charge.succeed: + file: "%s" +` + invalidSourcePluginJSONSchemaURLYAML = ` +sources: + - name: default-source + path: / + methods: ["POST"] + plugins: + - name: "jsonschema-validator" + config: + schemas: + charge.succeed: + url: "http://localhost/charge.succeed.json" +` ) var _ = Describe("Declarative", Ordered, func() { @@ -65,6 +104,7 @@ var _ = Describe("Declarative", Ordered, func() { resp, err := adminClient.R(). SetBody(string(yaml)). Post("/workspaces/default/config/sync") + fmt.Print(string(resp.Body())) assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 200, resp.StatusCode()) }) @@ -102,6 +142,62 @@ var _ = Describe("Declarative", Ordered, func() { assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), `{"message":"Request Validation","error":{"message":"request validation","fields":{"name":"unknown plugin name 'foo'"}}}`, string(resp.Body())) }) + + It("should return 400 for invalid jsonschema-validator plugin config json string", func() { + resp, err := adminClient.R(). + SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaJSONYAML, "invalid jsonstring")). + Post("/workspaces/default/config/sync") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), 400, resp.StatusCode()) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"json":"value must be a valid json string"}}}}}`, + string(resp.Body())) + }) + + It("should return 400 for invalid jsonschema-validator plugin config jsonschema", func() { + resp, err := adminClient.R(). + SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaJSONYAML, `{"type":"invalidObject"}`)). + Post("/workspaces/default/config/sync") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), 400, resp.StatusCode()) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"json":"invalid jsonschema: unsupported 'type' value \"invalidObject\""}}}}}`, + string(resp.Body())) + }) + + It("should return 400 for invalid jsonschema-validator plugin config file", func() { + resp, err := adminClient.R(). + SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaFileYAML, "./notexist.json")). + Post("/workspaces/default/config/sync") + + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), 400, resp.StatusCode()) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"file":"value must be a valid exist file"}}}}}`, + string(resp.Body())) + }) + + It("should return 400 for invalid jsonschema-validator config file content", func() { + resp, err := adminClient.R(). + SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaFileYAML, "../fixtures/jsonschema/invalid.json")). + Post("/workspaces/default/config/sync") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), 400, resp.StatusCode()) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"file":"the content must be a valid json string"}}}}}`, + string(resp.Body())) + }) + + It("should return 400 for invalid source plugin config url", func() { + resp, err := adminClient.R(). + SetBody(invalidSourcePluginJSONSchemaURLYAML). + Post("/workspaces/default/config/sync") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), 400, resp.StatusCode()) + assert.Equal(GinkgoT(), + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"url":"failed to fetch schema: Get \"http://localhost/charge.succeed.json\": dial tcp [::1]:80: connect: connection refused"}}}}}`, + string(resp.Body())) + }) }) }) }) diff --git a/test/fixtures/jsonschema/charge.succeed.json b/test/fixtures/jsonschema/charge.succeed.json new file mode 100644 index 0000000..1a0f5f6 --- /dev/null +++ b/test/fixtures/jsonschema/charge.succeed.json @@ -0,0 +1,22 @@ +{ + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "amount": { + "type": "integer", + "minimum": 1 + }, + "currency": { + "type": "string", + "minLength": 3, + "maxLength": 6 + } + }, + "required": [ + "id", + "amount", + "currency" + ] +} \ No newline at end of file diff --git a/test/fixtures/jsonschema/invalid.json b/test/fixtures/jsonschema/invalid.json new file mode 100644 index 0000000..9f9837e --- /dev/null +++ b/test/fixtures/jsonschema/invalid.json @@ -0,0 +1 @@ +"invalid jsonschema" \ No newline at end of file diff --git a/test/fixtures/webhookx.yml b/test/fixtures/webhookx.yml index 80bee5b..b572943 100644 --- a/test/fixtures/webhookx.yml +++ b/test/fixtures/webhookx.yml @@ -10,7 +10,7 @@ endpoints: strategy: fixed config: attempts: [0, 3600, 3600] - events: [ "charge.succeeded" ] + events: ["charge.succeeded"] metadata: key1: value1 key2: value2 @@ -28,7 +28,7 @@ endpoints: sources: - name: default-source path: / - methods: [ "POST" ] + methods: ["POST"] response: code: 200 content_type: application/json @@ -37,3 +37,19 @@ sources: - name: function config: function: "function handle() {}" + - name: "jsonschema-validator" + config: + schemas: + charge.succeeded: + #file: "../fixtures/jsonschema/charge.succeed.json" + #url: "https://raw.githubusercontent.com/cchenggit/webhookx/refs/heads/feat/plugin-jsonschema-validator/test/fixtures/jsonschema/charge.succeed.json" + json: | + { + "type": "object", + "properties": { + "id": { "type": "string" }, + "amount": { "type": "integer", "minimum": 1 }, + "currency": { "type": "string", "minLength": 3, "maxLength": 6 } + }, + "required": ["id", "amount", "currency"] + } diff --git a/test/plugins/jsonschema_validator_test.go b/test/plugins/jsonschema_validator_test.go new file mode 100644 index 0000000..3078769 --- /dev/null +++ b/test/plugins/jsonschema_validator_test.go @@ -0,0 +1,167 @@ +package plugins + +import ( + "context" + "fmt" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/plugins/jsonschema_validator" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/test/helper/factory" + "github.com/webhookx-io/webhookx/utils" + "time" +) + +var jsonString = `{ + "type": "object", + "required": ["id", "amount", "currency"], + "properties": { + "id": { + "type": "string" + }, + "amount": { + "type": "integer", + "minimum": 1 + }, + "currency": { + "type": "string", + "maxLength": 6, + "minLength": 3 + } + } +}` + +var _ = Describe("jsonschema-validator", Ordered, func() { + resources := map[string]*jsonschema_validator.SchemaResource{ + "json": { + JSONString: jsonString, + }, + "file": { + File: "../fixtures/jsonschema/charge.succeed.json", + }, + "url": { + URL: "https://raw.githubusercontent.com/cchenggit/webhookx/refs/heads/feat/plugin-jsonschema-validator/test/fixtures/jsonschema/charge.succeed.json", + }, + } + + for key, val := range resources { + Context(key, func() { + var proxyClient *resty.Client + + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{factory.EndpointP()}, + Sources: []*entities.Source{factory.SourceP()}, + } + entitiesConfig.Plugins = []*entities.Plugin{ + factory.PluginP( + factory.WithPluginSourceID(entitiesConfig.Sources[0].ID), + factory.WithPluginName("jsonschema-validator"), + factory.WithPluginConfig(jsonschema_validator.Config{ + Schemas: map[string]*jsonschema_validator.SchemaResource{ + "charge.succeeded": val, + }, + }), + ), + } + + BeforeAll(func() { + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080", + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + })) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": 1000,"currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(200)) + // get event from db + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) != 1 { + return false + } + event = list[0] + return true + }, time.Second*5, time.Second) + assert.Equal(GinkgoT(), "charge.succeeded", event.EventType) + assert.JSONEq(GinkgoT(), `{"id": "ch_1234567890","amount": 1000,"currency": "usd"}`, string(event.Data)) + }) + + It("sanity if undeclared event type", func() { + body := `{"event_type": "unknown.event", "data":{"foo": "bar"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + fmt.Print(string(resp.Body())) + Expect(resp.StatusCode()).To(Equal(200)) + + // get event from db + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) == 0 { + return false + } + for _, item := range list { + if item.EventType == "unknown.event" { + event = item + return true + } + } + return false + }, time.Second*5, time.Second) + assert.Equal(GinkgoT(), "unknown.event", event.EventType) + assert.JSONEq(GinkgoT(), `{"foo": "bar"}`, string(event.Data)) + }) + + It("invalid event - missing required field", func() { + body := `{"event_type": "charge.succeeded","data": {"amount": 1000,"currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(400)) + Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"id":"required field missing"}}}`)) + }) + + It("invalid event - field type mismatch", func() { + body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": "1000","currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(400)) + Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"amount":"value must be an integer"}}}`)) + }) + }) + + } + +}) diff --git a/utils/hash.go b/utils/hash.go new file mode 100644 index 0000000..d571c6f --- /dev/null +++ b/utils/hash.go @@ -0,0 +1,11 @@ +package utils + +import ( + "crypto/sha256" + "encoding/hex" +) + +func Hash256(s string) string { + h := sha256.Sum256([]byte(s)) + return hex.EncodeToString(h[:]) +} diff --git a/utils/validate.go b/utils/validate.go index d0477f7..5366ce0 100644 --- a/utils/validate.go +++ b/utils/validate.go @@ -5,6 +5,7 @@ import ( "github.com/go-playground/validator/v10" "github.com/webhookx-io/webhookx/pkg/errs" "reflect" + "regexp" "strings" "sync" ) @@ -49,6 +50,15 @@ func init() { RegisterFormatter("max", func(fe validator.FieldError) string { return fmt.Sprintf("length must be at most %s", fe.Param()) }) + RegisterFormatter("url", func(fe validator.FieldError) string { + return "value must be a valid url" + }) + RegisterFormatter("file", func(fe validator.FieldError) string { + return "value must be a valid exist file" + }) + RegisterFormatter("json", func(fe validator.FieldError) string { + return "value must be a valid json string" + }) } func RegisterValidation(tag string, fn validator.Func) { @@ -64,15 +74,31 @@ func RegisterFormatter(tag string, fn func(fe validator.FieldError) string) { formatters[tag] = fn } +const fieldPlacehoder = "#field%d" + func Validate(v interface{}) error { err := validate.Struct(v) if err != nil { validateErr := errs.NewValidateError(errs.ErrRequestValidation) for _, e := range err.(validator.ValidationErrors) { - fields := strings.Split(e.Namespace(), ".") + namespace := e.Namespace() + placeholders := make(map[string]string) + if strings.ContainsAny(namespace, "[]") { + re := regexp.MustCompile(`\w+\[[^\]]+\]`) + matches := re.FindAllString(namespace, -1) + for i, field := range matches { + idx := fmt.Sprintf(fieldPlacehoder, i) + placeholders[idx] = field + namespace = strings.Replace(namespace, field, idx, 1) + } + } + fields := strings.Split(namespace, ".") node := validateErr.Fields for i := 1; i < len(fields); i++ { fieldName := fields[i] + if actualField, ok := placeholders[fieldName]; ok { + fieldName = actualField + } if i < len(fields)-1 { if node[fieldName] == nil { node[fieldName] = make(map[string]interface{}) diff --git a/webhookx.sample.yml b/webhookx.sample.yml index 28efccd..8439a65 100644 --- a/webhookx.sample.yml +++ b/webhookx.sample.yml @@ -10,7 +10,7 @@ endpoints: strategy: fixed config: attempts: [0, 3600, 3600] - events: [ "charge.succeeded" ] + events: ["charge.succeeded"] plugins: - name: webhookx-signature config: @@ -25,8 +25,23 @@ endpoints: sources: - name: default-source path: / - methods: [ "POST" ] + methods: ["POST"] response: code: 200 content_type: application/json body: '{"message": "OK"}' + plugins: + - name: "jsonschema-validator" + config: + schemas: + charge.succeeded: + json: | + { + "type": "object", + "properties": { + "id": { "type": "string" }, + "amount": { "type": "integer", "minimum": 1 }, + "currency": { "type": "string", "minLength": 3, "maxLength": 6 } + }, + "required": ["id", "amount", "currency"] + } From b6f075d5ebb68f504aee7d194c2c6a389d30351b Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Thu, 25 Sep 2025 12:36:39 +0800 Subject: [PATCH 2/6] feat: remove schema resource and add default schema --- .../jsonschema/jsonschema.go | 2 +- plugins/jsonschema_validator/plugin.go | 153 +++++----- proxy/gateway.go | 9 - test/cmd/admin_test.go | 3 +- test/declarative/declarative_test.go | 77 ++--- test/fixtures/webhookx.yml | 13 +- test/plugins/jsonschema_validator_test.go | 268 ++++++++++-------- utils/hash.go | 2 +- webhookx.sample.yml | 2 +- 9 files changed, 249 insertions(+), 280 deletions(-) diff --git a/plugins/jsonschema_validator/jsonschema/jsonschema.go b/plugins/jsonschema_validator/jsonschema/jsonschema.go index 06a9aa4..e85d338 100644 --- a/plugins/jsonschema_validator/jsonschema/jsonschema.go +++ b/plugins/jsonschema_validator/jsonschema/jsonschema.go @@ -15,7 +15,7 @@ type JSONSchema struct { func New(schemaDef []byte) *JSONSchema { return &JSONSchema{ schemaDef: string(schemaDef), - hex: utils.Hash256(string(schemaDef)), + hex: utils.Sha256(string(schemaDef)), } } diff --git a/plugins/jsonschema_validator/plugin.go b/plugins/jsonschema_validator/plugin.go index 92ddd75..644575d 100644 --- a/plugins/jsonschema_validator/plugin.go +++ b/plugins/jsonschema_validator/plugin.go @@ -6,72 +6,22 @@ import ( "errors" "fmt" "github.com/getkin/kin-openapi/openapi3" - lru "github.com/hashicorp/golang-lru/v2" "github.com/webhookx-io/webhookx/pkg/errs" + "github.com/webhookx-io/webhookx/pkg/http/response" "github.com/webhookx-io/webhookx/pkg/plugin" + "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/plugins/jsonschema_validator/jsonschema" "github.com/webhookx-io/webhookx/utils" - "io" - "net/http" - "os" - "time" ) type Config struct { - Schemas map[string]*SchemaResource `json:"schemas" validate:"dive,required"` + DraftVersion int `json:"draft_version" validate:"required,oneof=6"` + DefaultSchema string `json:"default_schema" validate:"omitempty,json,max=1048576"` + Schemas map[string]*Schema `json:"schemas" validate:"dive"` } -type EventTypeSchema struct { - EventType string `json:"event_type" validate:"required,max=100"` - JSONSchema string `json:"jsonschema" validate:"required,jsonschema,max=1048576"` -} - -type SchemaResource struct { - JSONString string `json:"json" validate:"omitempty,json,max=1048576"` - File string `json:"file" validate:"omitempty,file"` - URL string `json:"url" validate:"omitempty,url"` -} - -var cache, _ = lru.New[string, []byte](128) - -func (s *SchemaResource) Resource() ([]byte, string, error) { - // priority: json > file > url - if s.JSONString != "" { - return []byte(s.JSONString), "json", nil - } - if s.File != "" { - bytes, ok := cache.Get(s.File) - if ok { - return bytes, "file", nil - } - bytes, err := os.ReadFile(s.File) - if err != nil { - return nil, "file", fmt.Errorf("failed to read schema: %w", err) - } - cache.Add(s.File, bytes) - return bytes, "file", nil - } - if s.URL != "" { - bytes, ok := cache.Get(s.URL) - if ok { - return bytes, "url", nil - } - client := &http.Client{ - Timeout: time.Second * 2, - } - resp, err := client.Get(s.URL) - if err != nil { - return nil, "url", fmt.Errorf("failed to fetch schema: %w", err) - } - defer func() { _ = resp.Body.Close() }() - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, "url", fmt.Errorf("failed to read schema from response: %w", err) - } - cache.Add(s.URL, body) - return body, "url", nil - } - return nil, "json", errors.New("no schema defined") +type Schema struct { + Schema string `json:"schema" validate:"omitempty,json,max=1048576"` } type SchemaValidatorPlugin struct { @@ -90,6 +40,19 @@ func New(config []byte) (plugin.Plugin, error) { return p, nil } +func unmarshalAndValidateSchema(schema string) (*openapi3.Schema, error) { + openapiSchema := &openapi3.Schema{} + err := openapiSchema.UnmarshalJSON([]byte(schema)) + if err != nil { + return nil, fmt.Errorf("value must be a valid jsonschema") + } + err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation()) + if err != nil { + return openapiSchema, err + } + return openapiSchema, nil +} + func (p *SchemaValidatorPlugin) ValidateConfig() error { err := utils.Validate(p.Config) if err != nil { @@ -97,40 +60,42 @@ func (p *SchemaValidatorPlugin) ValidateConfig() error { } e := errs.NewValidateError(errors.New("request validation")) - for event, schema := range p.Config.Schemas { - field := fmt.Sprintf("schemas[%s]", event) - if schema == nil { - e.Fields[field] = fmt.Errorf("schema is empty") - return e - } - schemaBytes, invalidField, err := schema.Resource() + + var defaultErr error + if p.Config.DefaultSchema != "" { + _, err := unmarshalAndValidateSchema(p.Config.DefaultSchema) if err != nil { - e.Fields[field] = map[string]string{ - invalidField: err.Error(), + defaultErr = err + e.Fields = map[string]interface{}{ + "default_schema": err.Error(), } - return e } - openapiSchema := &openapi3.Schema{} - err = openapiSchema.UnmarshalJSON(schemaBytes) - if err != nil { - e.Fields[field] = map[string]string{ - invalidField: "the content must be a valid json string", + } + + for event, schema := range p.Config.Schemas { + field := fmt.Sprintf("schemas[%s]", event) + if schema == nil || schema.Schema == "" { + if defaultErr != nil { + e.Fields[field] = map[string]string{ + "schema": "invalid due to reusing the default_schema definition", + } } - return e - } - err = openapiSchema.Validate(context.Background(), openapi3.EnableSchemaFormatValidation()) - if err != nil { - e.Fields[field] = map[string]string{ - invalidField: fmt.Sprintf("invalid jsonschema: %v", err), + } else { + _, err = unmarshalAndValidateSchema(schema.Schema) + if err != nil { + e.Fields[field] = map[string]string{ + "schema": err.Error(), + } } - return e } } + if len(e.Fields) > 0 { + return e + } return nil } func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plugin.InboundResult, err error) { - // parse body to get event type var event map[string]any body := inbound.RawBody if err = json.Unmarshal(body, &event); err != nil { @@ -149,24 +114,34 @@ func (p *SchemaValidatorPlugin) ExecuteInbound(inbound *plugin.Inbound) (res plu return } - schemaResource, ok := p.Config.Schemas[eventType] - if !ok || schemaResource == nil { + schema, ok := p.Config.Schemas[eventType] + if !ok { res.Payload = body return } - - bytes, _, err := schemaResource.Resource() - if err != nil { - return + if schema == nil || schema.Schema == "" { + if p.Config.DefaultSchema == "" { + res.Payload = body + return + } + schema = &Schema{ + Schema: p.Config.DefaultSchema, + } } - validator := jsonschema.New(bytes) - err = validator.Validate(&jsonschema.ValidatorContext{ + + validator := jsonschema.New([]byte(schema.Schema)) + e := validator.Validate(&jsonschema.ValidatorContext{ HTTPRequest: &jsonschema.HTTPRequest{ R: inbound.Request, Data: data.(map[string]any), }, }) - if err != nil { + if e != nil { + response.JSON(inbound.Response, 400, types.ErrorResponse{ + Message: "Request Validation", + Error: e, + }) + res.Terminated = true return } res.Payload = body diff --git a/proxy/gateway.go b/proxy/gateway.go index a7c0022..691b06c 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -14,7 +14,6 @@ import ( "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/eventbus" "github.com/webhookx-io/webhookx/mcache" - "github.com/webhookx-io/webhookx/pkg/errs" "github.com/webhookx-io/webhookx/pkg/http/response" "github.com/webhookx-io/webhookx/pkg/loglimiter" "github.com/webhookx-io/webhookx/pkg/metrics" @@ -257,14 +256,6 @@ func (gw *Gateway) handle(w http.ResponseWriter, r *http.Request) bool { RawBody: body, }) if err != nil { - if validateErr, ok := err.(*errs.ValidateError); ok { - response.JSON(w, 400, types.ErrorResponse{ - Message: "Request Validation", - Error: validateErr, - }) - return false - } - gw.log.Errorf("failed to execute plugin: %v", err) response.JSON(w, 500, types.ErrorResponse{Message: "internal error"}) return false diff --git a/test/cmd/admin_test.go b/test/cmd/admin_test.go index 5b529b6..5ea9405 100644 --- a/test/cmd/admin_test.go +++ b/test/cmd/admin_test.go @@ -101,7 +101,8 @@ var _ = Describe("admin", Ordered, func() { assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config)) assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name) assert.Equal(GinkgoT(), true, plugins[1].Enabled) - assert.Equal(GinkgoT(), `{"schemas": {"charge.succeeded": {"url": "", "file": "", "json": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}}`, string(plugins[1].Config)) + + assert.Equal(GinkgoT(), `{"schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "draft_version": 6, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, string(plugins[1].Config)) }) It("entities not defined in the declarative configuration should be deleted", func() { diff --git a/test/declarative/declarative_test.go b/test/declarative/declarative_test.go index f883a52..881d894 100644 --- a/test/declarative/declarative_test.go +++ b/test/declarative/declarative_test.go @@ -41,7 +41,7 @@ endpoints: - name: foo ` - invalidSourcePluginJSONSchemaJSONYAML = ` + invalidSourcePluginJSONSchemaConfigYAML = ` sources: - name: default-source path: / @@ -49,24 +49,15 @@ sources: plugins: - name: "jsonschema-validator" config: + default_schema: | + %s schemas: charge.succeed: - json: '%s' + schema: | + %s ` - invalidSourcePluginJSONSchemaFileYAML = ` -sources: - - name: default-source - path: / - methods: ["POST"] - plugins: - - name: "jsonschema-validator" - config: - schemas: - charge.succeed: - file: "%s" -` - invalidSourcePluginJSONSchemaURLYAML = ` + invalidSourcePluginJSONSchemaJSONYAML = ` sources: - name: default-source path: / @@ -74,9 +65,14 @@ sources: plugins: - name: "jsonschema-validator" config: + draft_version: 6 + default_schema: | + %s schemas: charge.succeed: - url: "http://localhost/charge.succeed.json" + schema: | + %s + reuse.default_schema: ` ) @@ -143,59 +139,30 @@ var _ = Describe("Declarative", Ordered, func() { assert.Equal(GinkgoT(), `{"message":"Request Validation","error":{"message":"request validation","fields":{"name":"unknown plugin name 'foo'"}}}`, string(resp.Body())) }) - It("should return 400 for invalid jsonschema-validator plugin config json string", func() { - resp, err := adminClient.R(). - SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaJSONYAML, "invalid jsonstring")). - Post("/workspaces/default/config/sync") - assert.Nil(GinkgoT(), err) - assert.Equal(GinkgoT(), 400, resp.StatusCode()) - assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"json":"value must be a valid json string"}}}}}`, - string(resp.Body())) - }) - - It("should return 400 for invalid jsonschema-validator plugin config jsonschema", func() { - resp, err := adminClient.R(). - SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaJSONYAML, `{"type":"invalidObject"}`)). - Post("/workspaces/default/config/sync") - assert.Nil(GinkgoT(), err) - assert.Equal(GinkgoT(), 400, resp.StatusCode()) - assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"json":"invalid jsonschema: unsupported 'type' value \"invalidObject\""}}}}}`, - string(resp.Body())) - }) - - It("should return 400 for invalid jsonschema-validator plugin config file", func() { + It("should return 400 for invalid jsonschema-validator plugin config", func() { resp, err := adminClient.R(). - SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaFileYAML, "./notexist.json")). + SetBody( + fmt.Sprintf(invalidSourcePluginJSONSchemaConfigYAML, "invalid jsonschema", "invalid jsonschema"), + ). Post("/workspaces/default/config/sync") assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"file":"value must be a valid exist file"}}}}}`, - string(resp.Body())) - }) - - It("should return 400 for invalid jsonschema-validator config file content", func() { - resp, err := adminClient.R(). - SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaFileYAML, "../fixtures/jsonschema/invalid.json")). - Post("/workspaces/default/config/sync") - assert.Nil(GinkgoT(), err) - assert.Equal(GinkgoT(), 400, resp.StatusCode()) - assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"file":"the content must be a valid json string"}}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"value must be a valid json string","draft_version":"required field missing","schemas[charge.succeed]":{"schema":"value must be a valid json string"}}}}}`, string(resp.Body())) }) - It("should return 400 for invalid source plugin config url", func() { + It("should return 400 for invalid jsonschema-validator plugin config jsonschema string", func() { resp, err := adminClient.R(). - SetBody(invalidSourcePluginJSONSchemaURLYAML). + SetBody(fmt.Sprintf(invalidSourcePluginJSONSchemaJSONYAML, + `{"type": "invlidObject","properties": {"id": { "type": "string"}}}`, + `{"type": "object","properties": {"id": { "type": "number", "format":"invalid"}}}`)). Post("/workspaces/default/config/sync") assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"schemas[charge.succeed]":{"url":"failed to fetch schema: Get \"http://localhost/charge.succeed.json\": dial tcp [::1]:80: connect: connection refused"}}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"unsupported 'type' value \"invlidObject\"","schemas[charge.succeed]":{"schema":"unsupported 'format' value \"invalid\""},"schemas[reuse.default_schema]":{"schema":"invalid due to reusing the default_schema definition"}}}}}`, string(resp.Body())) }) }) diff --git a/test/fixtures/webhookx.yml b/test/fixtures/webhookx.yml index b572943..eae3657 100644 --- a/test/fixtures/webhookx.yml +++ b/test/fixtures/webhookx.yml @@ -39,11 +39,18 @@ sources: function: "function handle() {}" - name: "jsonschema-validator" config: + draft_version: 6 + default_schema: | + { + "type": "object", + "properties": { + "id": { "type": "string" } + }, + "required": ["id"] + } schemas: charge.succeeded: - #file: "../fixtures/jsonschema/charge.succeed.json" - #url: "https://raw.githubusercontent.com/cchenggit/webhookx/refs/heads/feat/plugin-jsonschema-validator/test/fixtures/jsonschema/charge.succeed.json" - json: | + schema: | { "type": "object", "properties": { diff --git a/test/plugins/jsonschema_validator_test.go b/test/plugins/jsonschema_validator_test.go index 3078769..03ec564 100644 --- a/test/plugins/jsonschema_validator_test.go +++ b/test/plugins/jsonschema_validator_test.go @@ -2,7 +2,6 @@ package plugins import ( "context" - "fmt" "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -38,130 +37,159 @@ var jsonString = `{ }` var _ = Describe("jsonschema-validator", Ordered, func() { - resources := map[string]*jsonschema_validator.SchemaResource{ - "json": { - JSONString: jsonString, - }, - "file": { - File: "../fixtures/jsonschema/charge.succeed.json", - }, - "url": { - URL: "https://raw.githubusercontent.com/cchenggit/webhookx/refs/heads/feat/plugin-jsonschema-validator/test/fixtures/jsonschema/charge.succeed.json", - }, - } - for key, val := range resources { - Context(key, func() { - var proxyClient *resty.Client - - var app *app.Application - var db *db.DB - - entitiesConfig := helper.EntitiesConfig{ - Endpoints: []*entities.Endpoint{factory.EndpointP()}, - Sources: []*entities.Source{factory.SourceP()}, - } - entitiesConfig.Plugins = []*entities.Plugin{ - factory.PluginP( - factory.WithPluginSourceID(entitiesConfig.Sources[0].ID), - factory.WithPluginName("jsonschema-validator"), - factory.WithPluginConfig(jsonschema_validator.Config{ - Schemas: map[string]*jsonschema_validator.SchemaResource{ - "charge.succeeded": val, + Context("schema string", func() { + var proxyClient *resty.Client + + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{factory.EndpointP()}, + Sources: []*entities.Source{factory.SourceP()}, + } + entitiesConfig.Plugins = []*entities.Plugin{ + factory.PluginP( + factory.WithPluginSourceID(entitiesConfig.Sources[0].ID), + factory.WithPluginName("jsonschema-validator"), + factory.WithPluginConfig(jsonschema_validator.Config{ + DraftVersion: 6, + DefaultSchema: jsonString, + Schemas: map[string]*jsonschema_validator.Schema{ + "charge.succeeded": { + Schema: jsonString, }, - }), - ), - } - - BeforeAll(func() { - db = helper.InitDB(true, &entitiesConfig) - proxyClient = helper.ProxyClient() - - app = utils.Must(helper.Start(map[string]string{ - "WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080", - "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", - "WEBHOOKX_WORKER_ENABLED": "true", - })) - }) - - AfterAll(func() { - app.Stop() - }) - - It("sanity", func() { - body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": 1000,"currency": "usd"}}` - resp, err := proxyClient.R(). - SetHeader("Content-Type", "application/json"). - SetBody(body). - Post("/") - Expect(err).To(BeNil()) - Expect(resp.StatusCode()).To(Equal(200)) - // get event from db - var event *entities.Event - assert.Eventually(GinkgoT(), func() bool { - list, err := db.Events.List(context.TODO(), &query.EventQuery{}) - if err != nil || len(list) != 1 { - return false - } - event = list[0] - return true - }, time.Second*5, time.Second) - assert.Equal(GinkgoT(), "charge.succeeded", event.EventType) - assert.JSONEq(GinkgoT(), `{"id": "ch_1234567890","amount": 1000,"currency": "usd"}`, string(event.Data)) - }) - - It("sanity if undeclared event type", func() { - body := `{"event_type": "unknown.event", "data":{"foo": "bar"}}` - resp, err := proxyClient.R(). - SetHeader("Content-Type", "application/json"). - SetBody(body). - Post("/") - Expect(err).To(BeNil()) - fmt.Print(string(resp.Body())) - Expect(resp.StatusCode()).To(Equal(200)) - - // get event from db - var event *entities.Event - assert.Eventually(GinkgoT(), func() bool { - list, err := db.Events.List(context.TODO(), &query.EventQuery{}) - if err != nil || len(list) == 0 { - return false - } - for _, item := range list { - if item.EventType == "unknown.event" { - event = item - return true - } + "reuse.default_schema": nil, + }, + }), + ), + } + + BeforeAll(func() { + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_ADMIN_LISTEN": "0.0.0.0:8080", + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_WORKER_ENABLED": "true", + })) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": 1000,"currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(200)) + // get event from db + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) != 1 { + return false + } + event = list[0] + return true + }, time.Second*5, time.Second) + assert.Equal(GinkgoT(), "charge.succeeded", event.EventType) + assert.JSONEq(GinkgoT(), `{"id": "ch_1234567890","amount": 1000,"currency": "usd"}`, string(event.Data)) + }) + + It("sanity if undeclared event type", func() { + body := `{"event_type": "unknown.event", "data":{"foo": "bar"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(200)) + + // get event from db + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) == 0 { + return false + } + for _, item := range list { + if item.EventType == "unknown.event" { + event = item + return true } + } + return false + }, time.Second*5, time.Second) + assert.Equal(GinkgoT(), "unknown.event", event.EventType) + assert.JSONEq(GinkgoT(), `{"foo": "bar"}`, string(event.Data)) + }) + + It("sanity if reuse default_schema", func() { + body := `{"event_type": "reuse.default_schema","data": {"id": "ch_1234567890","amount": 1000,"currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(200)) + // get event from db + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) == 0 { return false - }, time.Second*5, time.Second) - assert.Equal(GinkgoT(), "unknown.event", event.EventType) - assert.JSONEq(GinkgoT(), `{"foo": "bar"}`, string(event.Data)) - }) - - It("invalid event - missing required field", func() { - body := `{"event_type": "charge.succeeded","data": {"amount": 1000,"currency": "usd"}}` - resp, err := proxyClient.R(). - SetHeader("Content-Type", "application/json"). - SetBody(body). - Post("/") - Expect(err).To(BeNil()) - Expect(resp.StatusCode()).To(Equal(400)) - Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"id":"required field missing"}}}`)) - }) - - It("invalid event - field type mismatch", func() { - body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": "1000","currency": "usd"}}` - resp, err := proxyClient.R(). - SetHeader("Content-Type", "application/json"). - SetBody(body). - Post("/") - Expect(err).To(BeNil()) - Expect(resp.StatusCode()).To(Equal(400)) - Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"amount":"value must be an integer"}}}`)) - }) + } + for _, item := range list { + if item.EventType == "reuse.default_schema" { + event = item + return true + } + } + return true + }, time.Second*5, time.Second) + assert.Equal(GinkgoT(), "reuse.default_schema", event.EventType) + assert.JSONEq(GinkgoT(), `{"id": "ch_1234567890","amount": 1000,"currency": "usd"}`, string(event.Data)) }) - } + It("invalid event - missing required field", func() { + body := `{"event_type": "charge.succeeded","data": {"amount": 1000,"currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(400)) + Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"id":"required field missing"}}}`)) + }) + It("invalid event - field type mismatch", func() { + body := `{"event_type": "charge.succeeded","data": {"id": "ch_1234567890","amount": "1000","currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(400)) + Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"amount":"value must be an integer"}}}`)) + }) + + It("invalid event - reuse default schema", func() { + body := `{"event_type": "reuse.default_schema","data": {"id": "ch_1234567890","amount": "1000","currency": "usd"}}` + resp, err := proxyClient.R(). + SetHeader("Content-Type", "application/json"). + SetBody(body). + Post("/") + + Expect(err).To(BeNil()) + Expect(resp.StatusCode()).To(Equal(400)) + Expect(string(resp.Body())).To(Equal(`{"message":"Request Validation","error":{"message":"request validation","fields":{"amount":"value must be an integer"}}}`)) + }) + }) }) diff --git a/utils/hash.go b/utils/hash.go index d571c6f..c4590b8 100644 --- a/utils/hash.go +++ b/utils/hash.go @@ -5,7 +5,7 @@ import ( "encoding/hex" ) -func Hash256(s string) string { +func Sha256(s string) string { h := sha256.Sum256([]byte(s)) return hex.EncodeToString(h[:]) } diff --git a/webhookx.sample.yml b/webhookx.sample.yml index 8439a65..e79400b 100644 --- a/webhookx.sample.yml +++ b/webhookx.sample.yml @@ -35,7 +35,7 @@ sources: config: schemas: charge.succeeded: - json: | + schema: | { "type": "object", "properties": { From 79d33b26394e3c416a2f702e1854b7505b4425e3 Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Mon, 20 Oct 2025 00:15:31 +0800 Subject: [PATCH 3/6] chore: update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c90d4d3..5f21f83 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ WebhookX is an open-source webhooks gateway for message receiving, processing, a - `webhookx-signature`: Sign outbound requests with HMAC(SHA-256) by adding `Webhookx-Signature` and `Webhookx-Timestamp` headers. - `wasm`: Transform outbound requests using high-level languages such as AssemblyScript, Rust or TinyGo. See [plugin/wasm](plugins/wasm). - `function`: Customize inbound behavior with JavaScript, e.g. signature verification or request body transformation. + - `jsonschema-validator`: Validate event payloads against JSONSchema definitions. Up to Draft v6 is supported. - **Observability:** OpenTelemetry metrics and tracing for monitoring and troubleshooting. From beec762219629752b1eabede781e14e8bf1966ed Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Mon, 20 Oct 2025 20:37:24 +0800 Subject: [PATCH 4/6] chore: update config example --- webhookx.sample.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/webhookx.sample.yml b/webhookx.sample.yml index e79400b..0a3c793 100644 --- a/webhookx.sample.yml +++ b/webhookx.sample.yml @@ -33,6 +33,7 @@ sources: plugins: - name: "jsonschema-validator" config: + draft_version: 6 schemas: charge.succeeded: schema: | From 014d7ecf056dd90549c68b4b4c89f2725fa7bdc1 Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Mon, 20 Oct 2025 22:39:53 +0800 Subject: [PATCH 5/6] test: clean debug print --- test/declarative/declarative_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/declarative/declarative_test.go b/test/declarative/declarative_test.go index 881d894..d2d907f 100644 --- a/test/declarative/declarative_test.go +++ b/test/declarative/declarative_test.go @@ -100,7 +100,6 @@ var _ = Describe("Declarative", Ordered, func() { resp, err := adminClient.R(). SetBody(string(yaml)). Post("/workspaces/default/config/sync") - fmt.Print(string(resp.Body())) assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 200, resp.StatusCode()) }) From 442afd0c186f98cab3d467d34b7bb871e3bec8a0 Mon Sep 17 00:00:00 2001 From: cchenggit <654031023@qq.com> Date: Tue, 21 Oct 2025 23:55:37 +0800 Subject: [PATCH 6/6] update schema version config --- plugins/jsonschema_validator/plugin.go | 2 +- test/cmd/admin_test.go | 3 +-- test/declarative/declarative_test.go | 4 ++-- test/fixtures/webhookx.yml | 2 +- test/plugins/jsonschema_validator_test.go | 2 +- utils/validate.go | 6 ------ webhookx.sample.yml | 2 +- 7 files changed, 7 insertions(+), 14 deletions(-) diff --git a/plugins/jsonschema_validator/plugin.go b/plugins/jsonschema_validator/plugin.go index 644575d..dbb20d9 100644 --- a/plugins/jsonschema_validator/plugin.go +++ b/plugins/jsonschema_validator/plugin.go @@ -15,7 +15,7 @@ import ( ) type Config struct { - DraftVersion int `json:"draft_version" validate:"required,oneof=6"` + Draft string `json:"draft" validate:"required,oneof=6 default:6"` DefaultSchema string `json:"default_schema" validate:"omitempty,json,max=1048576"` Schemas map[string]*Schema `json:"schemas" validate:"dive"` } diff --git a/test/cmd/admin_test.go b/test/cmd/admin_test.go index 5ea9405..01957dc 100644 --- a/test/cmd/admin_test.go +++ b/test/cmd/admin_test.go @@ -101,8 +101,7 @@ var _ = Describe("admin", Ordered, func() { assert.Equal(GinkgoT(), `{"function": "function handle() {}"}`, string(plugins[0].Config)) assert.Equal(GinkgoT(), `jsonschema-validator`, plugins[1].Name) assert.Equal(GinkgoT(), true, plugins[1].Enabled) - - assert.Equal(GinkgoT(), `{"schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "draft_version": 6, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, string(plugins[1].Config)) + assert.Equal(GinkgoT(), `{"draft": "6", "schemas": {"charge.succeeded": {"schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" },\n \"amount\": { \"type\": \"integer\", \"minimum\": 1 },\n \"currency\": { \"type\": \"string\", \"minLength\": 3, \"maxLength\": 6 }\n },\n \"required\": [\"id\", \"amount\", \"currency\"]\n}\n"}}, "default_schema": "{\n \"type\": \"object\",\n \"properties\": {\n \"id\": { \"type\": \"string\" }\n },\n \"required\": [\"id\"]\n}\n"}`, string(plugins[1].Config)) }) It("entities not defined in the declarative configuration should be deleted", func() { diff --git a/test/declarative/declarative_test.go b/test/declarative/declarative_test.go index d2d907f..c4910e9 100644 --- a/test/declarative/declarative_test.go +++ b/test/declarative/declarative_test.go @@ -65,7 +65,7 @@ sources: plugins: - name: "jsonschema-validator" config: - draft_version: 6 + draft: "6" default_schema: | %s schemas: @@ -148,7 +148,7 @@ var _ = Describe("Declarative", Ordered, func() { assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 400, resp.StatusCode()) assert.Equal(GinkgoT(), - `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"value must be a valid json string","draft_version":"required field missing","schemas[charge.succeed]":{"schema":"value must be a valid json string"}}}}}`, + `{"message":"Request Validation","error":{"message":"request validation","fields":{"config":{"default_schema":"value must be a valid json string","draft":"required field missing","schemas[charge.succeed]":{"schema":"value must be a valid json string"}}}}}`, string(resp.Body())) }) diff --git a/test/fixtures/webhookx.yml b/test/fixtures/webhookx.yml index eae3657..65da523 100644 --- a/test/fixtures/webhookx.yml +++ b/test/fixtures/webhookx.yml @@ -39,7 +39,7 @@ sources: function: "function handle() {}" - name: "jsonschema-validator" config: - draft_version: 6 + draft: "6" default_schema: | { "type": "object", diff --git a/test/plugins/jsonschema_validator_test.go b/test/plugins/jsonschema_validator_test.go index 03ec564..fc08900 100644 --- a/test/plugins/jsonschema_validator_test.go +++ b/test/plugins/jsonschema_validator_test.go @@ -53,7 +53,7 @@ var _ = Describe("jsonschema-validator", Ordered, func() { factory.WithPluginSourceID(entitiesConfig.Sources[0].ID), factory.WithPluginName("jsonschema-validator"), factory.WithPluginConfig(jsonschema_validator.Config{ - DraftVersion: 6, + Draft: "6", DefaultSchema: jsonString, Schemas: map[string]*jsonschema_validator.Schema{ "charge.succeeded": { diff --git a/utils/validate.go b/utils/validate.go index 5366ce0..2953c3b 100644 --- a/utils/validate.go +++ b/utils/validate.go @@ -50,12 +50,6 @@ func init() { RegisterFormatter("max", func(fe validator.FieldError) string { return fmt.Sprintf("length must be at most %s", fe.Param()) }) - RegisterFormatter("url", func(fe validator.FieldError) string { - return "value must be a valid url" - }) - RegisterFormatter("file", func(fe validator.FieldError) string { - return "value must be a valid exist file" - }) RegisterFormatter("json", func(fe validator.FieldError) string { return "value must be a valid json string" }) diff --git a/webhookx.sample.yml b/webhookx.sample.yml index 0a3c793..ed5378d 100644 --- a/webhookx.sample.yml +++ b/webhookx.sample.yml @@ -33,7 +33,7 @@ sources: plugins: - name: "jsonschema-validator" config: - draft_version: 6 + draft: "6" schemas: charge.succeeded: schema: |