Skip to content

Commit d093862

Browse files
authored
[PRODCRE-1522] register schemas using chip config (#2330)
1 parent 615aaf1 commit d093862

File tree

15 files changed

+2095
-1522
lines changed

15 files changed

+2095
-1522
lines changed

.github/workflows/framework-dockercompose-tests.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ jobs:
66
test:
77
defaults:
88
run:
9-
working-directory: framework/examples/myproject
9+
working-directory: framework/examples/chip_ingress
1010
runs-on: ubuntu-latest
1111
permissions:
1212
id-token: write
@@ -66,11 +66,12 @@ jobs:
6666
- name: Install dependencies
6767
run: go mod download
6868

69-
- name: Run System Tests
69+
- name: Run Docker Compose Tests
7070
if: steps.changes.outputs.src == 'true'
7171
env:
7272
CTF_CONFIGS: ${{ matrix.test.config }}
73-
CHIP_INGRESS_IMAGE: ${{ secrets.AWS_ACCOUNT_ID_PROD }}.dkr.ecr.us-west-2.amazonaws.com/atlas-chip-ingress:qa-latest
73+
CHIP_INGRESS_IMAGE: ${{ secrets.AWS_ACCOUNT_ID_PROD }}.dkr.ecr.us-west-2.amazonaws.com/atlas-chip-ingress:da84cb72d3a160e02896247d46ab4b9806ebee2f
74+
CHIP_CONFIG_IMAGE: ${{ secrets.AWS_ACCOUNT_ID_PROD }}.dkr.ecr.us-west-2.amazonaws.com/atlas-chip-config:7b4e9ee68fd1c737dd3480b5a3ced0188f29b969
7475
CTF_LOG_LEVEL: debug
7576
run: |
7677
go test -timeout ${{ matrix.test.timeout }} -v -count ${{ matrix.test.count }} -run ${{ matrix.test.name }}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
golang 1.25.3
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
package chipingressset
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"maps"
8+
"os"
9+
"path"
10+
"path/filepath"
11+
"slices"
12+
"strings"
13+
14+
"github.com/jhump/protocompile"
15+
"google.golang.org/protobuf/reflect/protoreflect"
16+
17+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
18+
)
19+
20+
// code copied from: https://github.com/smartcontractkit/atlas/blob/master/chip-cli/config/config.go and https://github.com/smartcontractkit/atlas/blob/master/chip-cli/config/proto_validator.go
21+
// reason: avoid dependency on the private atlas module
22+
func convertToPbSchemas(schemas map[string]*schema, domain string) []*pb.Schema {
23+
pbSchemas := make([]*pb.Schema, len(schemas))
24+
25+
for i, schema := range slices.Collect(maps.Values(schemas)) {
26+
27+
pbReferences := make([]*pb.SchemaReference, len(schema.References))
28+
for j, reference := range schema.References {
29+
pbReferences[j] = &pb.SchemaReference{
30+
Subject: fmt.Sprintf("%s-%s", domain, reference.Entity),
31+
Name: reference.Name,
32+
// Explicitly omit Version, this tells chip-config to use the latest version of the schema for this reference
33+
}
34+
}
35+
36+
pbSchema := &pb.Schema{
37+
Subject: fmt.Sprintf("%s-%s", domain, schema.Entity),
38+
Schema: schema.SchemaContent,
39+
References: pbReferences,
40+
}
41+
42+
// If the schema has metadata, we need to add pb metadata to the schema
43+
if schema.Metadata.Stores != nil {
44+
45+
stores := make(map[string]*pb.Store, len(schema.Metadata.Stores))
46+
for key, store := range schema.Metadata.Stores {
47+
stores[key] = &pb.Store{
48+
Index: store.Index,
49+
Partition: store.Partition,
50+
}
51+
}
52+
53+
pbSchema.Metadata = &pb.MetaData{
54+
Stores: stores,
55+
}
56+
}
57+
58+
pbSchemas[i] = pbSchema
59+
}
60+
61+
return pbSchemas
62+
}
63+
64+
type registrationConfig struct {
65+
Domain string `json:"domain"`
66+
Schemas []schema `json:"schemas"`
67+
}
68+
69+
type schema struct {
70+
Entity string `json:"entity"`
71+
Path string `json:"path"`
72+
References []schemaReference `json:"references,omitempty"`
73+
SchemaContent string
74+
Metadata metadata `json:"metadata,omitempty"`
75+
}
76+
77+
type metadata struct {
78+
Stores map[string]store `json:"stores"`
79+
}
80+
81+
type store struct {
82+
Index []string `json:"index"`
83+
Partition []string `json:"partition"`
84+
}
85+
86+
type schemaReference struct {
87+
Name string `json:"name"`
88+
Entity string `json:"entity"`
89+
Path string `json:"path"`
90+
}
91+
92+
func parseSchemaConfig(configFilePath, schemaDir string) (*registrationConfig, map[string]*schema, error) {
93+
cfg, err := readConfig(configFilePath)
94+
if err != nil {
95+
return nil, nil, err
96+
}
97+
98+
if err := validateEntityNames(cfg, schemaDir); err != nil {
99+
return nil, nil, fmt.Errorf("entity name validation failed: %w", err)
100+
}
101+
102+
// Our end goal is to generate a schema registration request to chip config
103+
// We will use a map to store the schemas by entity and path
104+
// this is because more than one schema may reference the same schema
105+
// technically, since SR is idempotent, this is not strictly necessary, as duplicate registrations are noop
106+
schemas := make(map[string]*schema)
107+
108+
for _, s := range cfg.Schemas {
109+
// For each of the schemas, we need to get the references schema content
110+
for _, reference := range s.References {
111+
112+
// read schema contents
113+
refSchemaContent, err := os.ReadFile(path.Join(schemaDir, reference.Path))
114+
if err != nil {
115+
return nil, nil, fmt.Errorf("error reading schema: %v", err)
116+
}
117+
118+
// generate key with entity and path since other schemas may also reference this schema
119+
key := fmt.Sprintf("%s:%s", reference.Entity, reference.Path)
120+
121+
// if the schema already exists, skip it
122+
if _, ok := schemas[key]; ok {
123+
continue
124+
}
125+
126+
schemas[key] = &schema{
127+
Entity: reference.Entity,
128+
Path: reference.Path,
129+
SchemaContent: string(refSchemaContent),
130+
}
131+
}
132+
133+
// add the root schema to the map
134+
schemaContent, err := os.ReadFile(path.Join(schemaDir, s.Path))
135+
if err != nil {
136+
return nil, nil, fmt.Errorf("error reading schema: %v", err)
137+
}
138+
139+
key := fmt.Sprintf("%s:%s", s.Entity, s.Path)
140+
// if the schema already exists, that means it is referenced by another schema.
141+
// so we just need to add the references to the existing schema in the map
142+
if existing, ok := schemas[key]; ok {
143+
existing.References = append(existing.References, s.References...)
144+
continue
145+
}
146+
147+
schemas[key] = &schema{
148+
Entity: s.Entity,
149+
Path: s.Path,
150+
SchemaContent: string(schemaContent),
151+
References: s.References,
152+
}
153+
154+
}
155+
156+
return cfg, schemas, nil
157+
}
158+
159+
func readConfig(path string) (*registrationConfig, error) {
160+
f, err := os.Open(path)
161+
if err != nil {
162+
return nil, fmt.Errorf("failed to open config file '%s': %w", path, err)
163+
}
164+
defer f.Close()
165+
166+
var cfg registrationConfig
167+
168+
dErr := json.NewDecoder(f).Decode(&cfg)
169+
if dErr != nil {
170+
return nil, fmt.Errorf("failed to decode config: %w", dErr)
171+
}
172+
173+
return &cfg, nil
174+
}
175+
176+
// validateEntityNames validates that all entity names in the config match the fully qualified
177+
// protobuf names (package.MessageName) from their corresponding proto files.
178+
// It collects all validation errors and returns them together for better user experience.
179+
func validateEntityNames(cfg *registrationConfig, schemaDir string) error {
180+
var errors []string
181+
182+
for _, schema := range cfg.Schemas {
183+
if err := validateEntityName(schema.Entity, schema.Path, schemaDir); err != nil {
184+
errors = append(errors, fmt.Sprintf(" - schema '%s': %s", schema.Path, err))
185+
}
186+
187+
for _, ref := range schema.References {
188+
if err := validateEntityName(ref.Entity, ref.Path, schemaDir); err != nil {
189+
errors = append(errors, fmt.Sprintf(" - referenced schema '%s': %s", ref.Path, err))
190+
}
191+
}
192+
}
193+
194+
if len(errors) > 0 {
195+
return fmt.Errorf("entity name validation failed with %d error(s):\n%s", len(errors), strings.Join(errors, "\n"))
196+
}
197+
198+
return nil
199+
}
200+
201+
func validateEntityName(entityName, protoPath, schemaDir string) error {
202+
fullPath := path.Join(schemaDir, protoPath)
203+
204+
// Find the message descriptor that matches the entity name
205+
msgDesc, err := findMessageDescriptor(fullPath, entityName)
206+
if err != nil {
207+
return fmt.Errorf("failed to find message descriptor in '%s': %w", protoPath, err)
208+
}
209+
210+
// Extract the expected entity name from the message descriptor
211+
expectedEntity := string(msgDesc.FullName())
212+
if entityName != expectedEntity {
213+
return fmt.Errorf(
214+
"entity name mismatch in chip.json:\n"+
215+
" Proto file: %s\n"+
216+
" Expected: %s\n"+
217+
" Got: %s\n"+
218+
" \n"+
219+
" The entity name must be the fully qualified protobuf name: {package}.{MessageName}",
220+
protoPath,
221+
expectedEntity,
222+
entityName,
223+
)
224+
}
225+
226+
return nil
227+
}
228+
229+
// findMessageDescriptor finds a message descriptor by name (either full name or short name)
230+
// This matches the logic in chip-ingress/internal/serde/message.go
231+
func findMessageDescriptor(filePath, targetMessageName string) (protoreflect.MessageDescriptor, error) {
232+
compiler := protocompile.Compiler{
233+
Resolver: &protocompile.SourceResolver{
234+
ImportPaths: getImportPaths(filePath, 3),
235+
},
236+
}
237+
238+
filename := filepath.Base(filePath)
239+
fds, err := compiler.Compile(context.Background(), filename)
240+
if err != nil {
241+
return nil, fmt.Errorf("failed to compile proto file: %w", err)
242+
}
243+
244+
if len(fds) == 0 {
245+
return nil, fmt.Errorf("no file descriptors found")
246+
}
247+
248+
// Search through all file descriptors for the target message
249+
for _, fd := range fds {
250+
messages := fd.Messages()
251+
for i := range messages.Len() {
252+
msgDesc := messages.Get(i)
253+
254+
// Match by full name (e.g., "package.MessageName") or short name (e.g., "MessageName")
255+
if string(msgDesc.FullName()) == targetMessageName || string(msgDesc.Name()) == targetMessageName {
256+
return msgDesc, nil
257+
}
258+
}
259+
}
260+
261+
return nil, fmt.Errorf("message descriptor not found for name: %s", targetMessageName)
262+
}
263+
264+
func getImportPaths(path string, depth int) []string {
265+
paths := make([]string, 0, depth+1)
266+
paths = append(paths, filepath.Dir(path))
267+
268+
currentPath := path
269+
for i := 0; i < depth; i++ {
270+
currentPath = filepath.Dir(currentPath)
271+
paths = append(paths, currentPath)
272+
}
273+
return paths
274+
}

0 commit comments

Comments
 (0)