From c8828b33dae48681945e2ed2c5ba442d9a6741d1 Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Tue, 25 Apr 2023 09:40:12 +0200 Subject: [PATCH 1/7] feat(bulk): added duckdb database to bulk service --- docker-compose.yml | 5 +- go.mod | 1 + go.sum | 2 + hub3.toml | 50 ++++--------------- ikuzo/ikuzoctl/cmd/config/bulk.go | 11 +++++ ikuzo/ikuzoctl/cmd/config/config.go | 6 ++- ikuzo/ikuzoctl/cmd/config/elasticsearch.go | 6 ++- ikuzo/ikuzoctl/cmd/serve.go | 6 ++- ikuzo/service/x/bulk/options.go | 11 +++++ ikuzo/service/x/bulk/service.go | 57 ++++++++++++++++++++-- 10 files changed, 103 insertions(+), 52 deletions(-) create mode 100644 ikuzo/ikuzoctl/cmd/config/bulk.go diff --git a/docker-compose.yml b/docker-compose.yml index 500aa513..7a82a859 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,13 +23,12 @@ services: image: zacanbot/fuseki container_name: hub3-fuseki environment: - JVM_ARGS: -Xmx1g + - JVM_ARGS=-Xmx1g + - ADMIN_PASSWORD=pw123 ports: - "3033:3030" volumes: - ./docker-data/fuseki/data:/data/fuseki - environment: - - ADMIN_PASSWORD=pw123 command: ["/jena-fuseki/fuseki-server", "--set", "tdb:unionDefaultGraph=true"] diff --git a/go.mod b/go.mod index bbcac779..3d558b24 100644 --- a/go.mod +++ b/go.mod @@ -130,6 +130,7 @@ require ( github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/marcboeker/go-duckdb v1.2.2 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect diff --git a/go.sum b/go.sum index 1123b5ca..5eb69334 100644 --- a/go.sum +++ b/go.sum @@ -895,6 +895,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/marcboeker/go-duckdb v1.2.2 h1:Qy5yW83qAcZgsEmGo+pkEZeZvxA2dzuQNPLh7wcorb0= +github.com/marcboeker/go-duckdb v1.2.2/go.mod h1:wm91jO2GNKa6iO9NTcjXIRsW+/ykPoJbQcHSXhdAl28= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/pkger v0.15.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= diff --git a/hub3.toml b/hub3.toml index ea8ea960..a6597b4d 100644 --- a/hub3.toml +++ b/hub3.toml @@ -1,53 +1,23 @@ -[org.dcn] -domains = ["localhost:3000"] +[org.nl-hana] +customID = "NL-HaNA" +domains = ["localhost:3000", "hub3.nl-hana:3000"] default = true -[[org.dcn.sitemaps]] +[[org.nl-hana.sitemaps]] id = "all" baseURL = "http://localhost:3000" filters = "meta.tags:narthex" -[org.dcn.oaipmh] +[org.nl-hana.oaipmh] enabled = true adminEmails = ["info@delving.eu"] repositoryName = "DCN OAI-PMH repository" -[org.hub3] -domains = ["localhost:3001"] -customID = "hub3" -default = true - - -################################### -# NIOD # -################################### -[org.niod] -domains = ["hub3.niod:3000"] - -[org.niod.elasticsearch] -# indexTypes enabled types for the bulk index service -indexTypes = ["v2"] -# configuration for MimimumShouldMatch -minimumShouldMatch = "2<70%" -shards = 3 -replicas = 2 -# the name of the index. If empty it is the name of the OrgId -# indexName = "dcn" -# if non-empty digital objects will be indexed in a dedicated v2 index -# digitalObjectSuffix = "scans" - -[org.niod.sparql] -enabled = true -# the fully qualified URL including the port -sparqlHost = "http://localhost:3033" -# the path to the SPARQL endpoint. A '%s' can be inserted in the path to be replaced with the orgId. -sparqlPath = "/%s/sparql" -# sparqlUpdate path is the path to the write endpoint. A '%s' can be inserted in the path to be replaced with the orgId. -sparqlUpdatePath = "/%s/update" -# dataPath is the path to the GraphStore Protocol endpoint -dataPath = "/%s/data" -# Enable storing of the RDF records in the triple store specified in sparqlUpdatePath (default: false) -storeRecords = false +[bulk] +dbPath = "hub3-bulksvc2.db" +[bulk.minio] +host = "" +bucket = "" userName = "" password = "" diff --git a/ikuzo/ikuzoctl/cmd/config/bulk.go b/ikuzo/ikuzoctl/cmd/config/bulk.go new file mode 100644 index 00000000..8b613448 --- /dev/null +++ b/ikuzo/ikuzoctl/cmd/config/bulk.go @@ -0,0 +1,11 @@ +package config + +type Bulk struct { + DBPath string `json:"dbPath,omitempty"` + Minio struct { + Host string `json:"host,omitempty"` + Bucket string `json:"bucket,omitempty"` + UserName string `json:"userName,omitempty"` + Password string `json:"password,omitempty"` + } `json:"minio,omitempty"` +} diff --git a/ikuzo/ikuzoctl/cmd/config/config.go b/ikuzo/ikuzoctl/cmd/config/config.go index 49c44b10..aa41a368 100644 --- a/ikuzo/ikuzoctl/cmd/config/config.go +++ b/ikuzo/ikuzoctl/cmd/config/config.go @@ -17,13 +17,14 @@ package config import ( "fmt" + "github.com/pacedotdev/oto/otohttp" + "github.com/spf13/viper" + "github.com/delving/hub3/ikuzo" "github.com/delving/hub3/ikuzo/domain" "github.com/delving/hub3/ikuzo/logger" "github.com/delving/hub3/ikuzo/service/organization" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/pacedotdev/oto/otohttp" - "github.com/spf13/viper" ) type Option interface { @@ -51,6 +52,7 @@ type Config struct { NDE map[string]NDECfg `json:"nde"` NDERegister NDE `json:"-" toml:"-"` RDF `json:"rdf"` + Bulk `json:"bulk"` Sitemap `json:"sitemap"` oto *otohttp.Server } diff --git a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go index aa95e173..e2cd60da 100644 --- a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go +++ b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go @@ -19,6 +19,9 @@ import ( "fmt" "sync" + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/rs/zerolog/log" + "github.com/delving/hub3/ikuzo" "github.com/delving/hub3/ikuzo/domain" es "github.com/delving/hub3/ikuzo/driver/elasticsearch" @@ -27,8 +30,6 @@ import ( "github.com/delving/hub3/ikuzo/service/x/bulk" "github.com/delving/hub3/ikuzo/service/x/esproxy" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/rs/zerolog/log" ) type ElasticSearch struct { @@ -122,6 +123,7 @@ func (e *ElasticSearch) AddOptions(cfg *Config) error { bulk.SetIndexService(is), bulk.SetIndexTypes(e.IndexTypes...), bulk.SetPostHookService(postHooks...), + bulk.SetDBPath(cfg.Bulk.DBPath), ) if bulkErr != nil { return fmt.Errorf("unable to create bulk service; %w", isErr) diff --git a/ikuzo/ikuzoctl/cmd/serve.go b/ikuzo/ikuzoctl/cmd/serve.go index d3064c46..0654e4f2 100644 --- a/ikuzo/ikuzoctl/cmd/serve.go +++ b/ikuzo/ikuzoctl/cmd/serve.go @@ -19,10 +19,11 @@ import ( stdlog "log" - "github.com/delving/hub3/hub3/server/http/handlers" - "github.com/delving/hub3/ikuzo" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + + "github.com/delving/hub3/hub3/server/http/handlers" + "github.com/delving/hub3/ikuzo" ) //go:embed static @@ -50,6 +51,7 @@ func serve() { options, err := cfg.Options() if err != nil { + stdlog.Printf("unable to create options: %s", err) log.Fatal(). Err(err). Stack(). diff --git a/ikuzo/service/x/bulk/options.go b/ikuzo/service/x/bulk/options.go index 9903d555..ef0ee427 100644 --- a/ikuzo/service/x/bulk/options.go +++ b/ikuzo/service/x/bulk/options.go @@ -7,6 +7,17 @@ import ( type Option func(*Service) error +func SetDBPath(path string) Option { + return func(s *Service) error { + if path == "" { + return nil + } + + s.dbPath = path + return nil + } +} + func SetIndexService(is *index.Service) Option { return func(s *Service) error { s.index = is diff --git a/ikuzo/service/x/bulk/service.go b/ikuzo/service/x/bulk/service.go index a6084f2a..1a06e050 100644 --- a/ikuzo/service/x/bulk/service.go +++ b/ikuzo/service/x/bulk/service.go @@ -16,12 +16,19 @@ package bulk import ( "context" + "database/sql" "net/http" - "github.com/delving/hub3/ikuzo/domain" - "github.com/delving/hub3/ikuzo/service/x/index" + stdlog "log" + + _ "github.com/marcboeker/go-duckdb" + "github.com/go-chi/chi" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/delving/hub3/ikuzo/domain" + "github.com/delving/hub3/ikuzo/service/x/index" ) var _ domain.Service = (*Service)(nil) @@ -32,12 +39,15 @@ type Service struct { postHooks map[string][]domain.PostHookService log zerolog.Logger orgs domain.OrgConfigRetriever + dbPath string + db *sql.DB } func NewService(options ...Option) (*Service, error) { s := &Service{ indexTypes: []string{"v2"}, postHooks: map[string][]domain.PostHookService{}, + dbPath: "hub3-bulksvc.db", } // apply options @@ -47,9 +57,49 @@ func NewService(options ...Option) (*Service, error) { } } + if err := s.setupDB(); err != nil { + stdlog.Printf("unable to setup db: %q", err) + return nil, err + } + return s, nil } +func (s *Service) setupDB() error { + db, err := sql.Open("duckdb", s.dbPath+"?access_mode=READ_WRITE") + if err != nil { + s.log.Error().Err(err).Msgf("unable to open duckdb at %s", s.dbPath) + return err + } + + pingErr := db.Ping() + if pingErr != nil { + return pingErr + } + s.db = db + + if setupErr := s.setupTables(); setupErr != nil { + return setupErr + } + + s.log.Info().Str("path", s.dbPath).Msg("started duckdb for bulk service") + log.Printf("started duckdb for bulk service; %q", s.dbPath) + + return nil +} + +func (s *Service) setupTables() error { + query := `create table if not exists dataset ( + orgID text, + datasetID text, + published boolean, +); +create unique index if not exists org_dataset_idx ON dataset (orgID, datasetID); + ` + _, err := s.db.Exec(query) + return err +} + func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := chi.NewRouter() s.Routes("", router) @@ -57,10 +107,11 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (s *Service) Shutdown(ctx context.Context) error { + s.db.Close() return s.index.Shutdown(ctx) } func (s *Service) SetServiceBuilder(b *domain.ServiceBuilder) { - s.log = b.Logger.With().Str("svc", "sitemap").Logger() + s.log = b.Logger.With().Str("svc", "bulk").Logger() s.orgs = b.Orgs } From edb25ac87a072295b7d30b9baeaffdcc67f78466 Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Tue, 25 Apr 2023 10:33:01 +0200 Subject: [PATCH 2/7] feat(bulk): added support for minio blob-storage to bulk service --- docker-compose.yml | 15 +++++- go.mod | 17 ++++--- go.sum | 33 +++++++++----- hub3.toml | 9 ++-- ikuzo/ikuzoctl/cmd/config/bulk.go | 9 ++-- ikuzo/ikuzoctl/cmd/config/elasticsearch.go | 3 +- ikuzo/ikuzoctl/cmd/index.go | 7 +-- ikuzo/service/x/bulk/options.go | 15 ++++++ ikuzo/service/x/bulk/service.go | 53 ++++++++++++++++++++++ 9 files changed, 131 insertions(+), 30 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 7a82a859..bbb27569 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,20 @@ services: networks: - esnet + minio: + image: minio/minio + ports: + - "9000:9000" + - "9001:9001" + volumes: + - ./docker-data/minio_storage:/data + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: Strong#Pass#2023 + MINIO_ACCESS_KEY: "kPxADQXcHFbR9i1V" + MINIO_SECRET_KEY: "LiwsjNxulYJVyey50U1ypTabt8BiMWiU" + command: server --console-address ":9001" /data + fuseki: image: zacanbot/fuseki container_name: hub3-fuseki @@ -59,6 +73,5 @@ services: environment: POSTGRES_PASSWORD: pw123 - networks: esnet: diff --git a/go.mod b/go.mod index 3d558b24..df188eb5 100644 --- a/go.mod +++ b/go.mod @@ -40,8 +40,10 @@ require ( github.com/lib/pq v1.10.6 github.com/linkeddata/gojsonld v0.0.0-20170418210642-4f5db6791326 github.com/mailgun/groupcache v1.3.0 + github.com/marcboeker/go-duckdb v1.2.2 github.com/matryer/is v1.4.0 github.com/microcosm-cc/bluemonday v1.0.19 + github.com/minio/minio-go/v7 v7.0.26 github.com/mitchellh/go-homedir v1.1.0 github.com/nats-io/stan.go v0.10.2 github.com/olivere/elastic/v7 v7.0.32 @@ -67,7 +69,7 @@ require ( github.com/tidwall/gjson v1.12.1 github.com/valyala/fasthttp v1.35.0 golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde - golang.org/x/text v0.3.7 + golang.org/x/text v0.7.0 google.golang.org/protobuf v1.28.1 ) @@ -96,6 +98,7 @@ require ( github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/docker v20.10.9+incompatible // indirect github.com/docker/go-units v0.4.0 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/fatih/color v1.13.0 // indirect @@ -126,16 +129,18 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/compress v1.16.0 // indirect + github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/marcboeker/go-duckdb v1.2.2 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/sys/mount v0.2.0 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect @@ -180,9 +185,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.10.0 // indirect - golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect - golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect - golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.5.0 // indirect golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect google.golang.org/genproto v0.0.0-20220829175752-36a9c930ecbf // indirect google.golang.org/grpc v1.49.0 // indirect diff --git a/go.sum b/go.sum index 5eb69334..b25ecb9c 100644 --- a/go.sum +++ b/go.sum @@ -379,7 +379,6 @@ github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyG github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= @@ -400,6 +399,7 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNE github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elastic/go-elasticsearch/v8 v8.0.0-20200716073932-4f0b75746dc1 h1:fp+4nNyJtg/qflGLXJQivQJS1VTRq2RlC7LZ9bnCejM= @@ -848,8 +848,12 @@ github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE= +github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 h1:P6Mw09IOeKKS4klYhjzHzaEx2RcNshynjfDhzCQ8BoE= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49/go.mod h1:dQr9I8Xw26daWGE/crxUleRxmpFI5uhfedWqRNHHq0c= github.com/knakk/rdf v0.0.0-20190304171630-8521bf4c5042 h1:Vzdm5hdlLdpJOKK+hKtkV5u7xGZmNW6aUBjGcTfwx84= @@ -944,6 +948,12 @@ github.com/microcosm-cc/bluemonday v1.0.19/go.mod h1:QNzV2UbLK2/53oIIwTOyLUSABMk github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.26 h1:D0HK+8793etZfRY/vHhDmFaP+vmT41K3K4JV9vmZCBQ= +github.com/minio/minio-go/v7 v7.0.26/go.mod h1:x81+AX5gHSfCSqw7jxRKHvxUXMlE5uKX0Vb75Xk5yYg= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -1375,8 +1385,8 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1489,8 +1499,8 @@ golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY= -golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1643,11 +1653,11 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1656,8 +1666,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/hub3.toml b/hub3.toml index a6597b4d..0a8e8ccf 100644 --- a/hub3.toml +++ b/hub3.toml @@ -16,10 +16,11 @@ repositoryName = "DCN OAI-PMH repository" [bulk] dbPath = "hub3-bulksvc2.db" [bulk.minio] -host = "" -bucket = "" -userName = "" -password = "" +endpoint = "127.0.0.1:9000" +accessKeyId = "kPxADQXcHFbR9i1V" +secretAccessKey = "LiwsjNxulYJVyey50U1ypTabt8BiMWiU" +useSSL = false +bucketName = "bulk-svc" [http] # all the configuration for the http sub-command diff --git a/ikuzo/ikuzoctl/cmd/config/bulk.go b/ikuzo/ikuzoctl/cmd/config/bulk.go index 8b613448..5ebcab1e 100644 --- a/ikuzo/ikuzoctl/cmd/config/bulk.go +++ b/ikuzo/ikuzoctl/cmd/config/bulk.go @@ -3,9 +3,10 @@ package config type Bulk struct { DBPath string `json:"dbPath,omitempty"` Minio struct { - Host string `json:"host,omitempty"` - Bucket string `json:"bucket,omitempty"` - UserName string `json:"userName,omitempty"` - Password string `json:"password,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + AccessKeyID string `json:"accessKeyID,omitempty"` + SecretAccessKey string `json:"secretAccessKey,omitempty"` + UseSSL bool `json:"useSSL,omitempty"` + BucketName string `json:"bucketName,omitempty"` } `json:"minio,omitempty"` } diff --git a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go index e2cd60da..b82c8eff 100644 --- a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go +++ b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go @@ -124,9 +124,10 @@ func (e *ElasticSearch) AddOptions(cfg *Config) error { bulk.SetIndexTypes(e.IndexTypes...), bulk.SetPostHookService(postHooks...), bulk.SetDBPath(cfg.Bulk.DBPath), + bulk.SetBlobConfig(cfg.Bulk.Minio), ) if bulkErr != nil { - return fmt.Errorf("unable to create bulk service; %w", isErr) + return fmt.Errorf("unable to create bulk service; %w", bulkErr) } cfg.options = append( diff --git a/ikuzo/ikuzoctl/cmd/index.go b/ikuzo/ikuzoctl/cmd/index.go index 24a0c4f6..aeb1b34f 100644 --- a/ikuzo/ikuzoctl/cmd/index.go +++ b/ikuzo/ikuzoctl/cmd/index.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -28,9 +28,10 @@ import ( "syscall" "time" - "github.com/delving/hub3/ikuzo/service/x/bulk" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + + "github.com/delving/hub3/ikuzo/service/x/bulk" ) // indexCmd represents the index command @@ -77,7 +78,7 @@ func indexRecords() error { bulk.SetIndexTypes(cfg.ElasticSearch.IndexTypes...), ) if bulkErr != nil { - return fmt.Errorf("unable to create bulk service; %w", isErr) + return fmt.Errorf("unable to create bulk service; %w", bulkErr) } parser := bulkSvc.NewParser() diff --git a/ikuzo/service/x/bulk/options.go b/ikuzo/service/x/bulk/options.go index ef0ee427..7b61e9a0 100644 --- a/ikuzo/service/x/bulk/options.go +++ b/ikuzo/service/x/bulk/options.go @@ -18,6 +18,21 @@ func SetDBPath(path string) Option { } } +type BlobConfig struct { + Endpoint string `json:"endpoint,omitempty"` + AccessKeyID string `json:"accessKeyID,omitempty"` + SecretAccessKey string `json:"secretAccessKey,omitempty"` + UseSSL bool `json:"useSSL,omitempty"` + BucketName string `json:"bucketName,omitempty"` +} + +func SetBlobConfig(cfg BlobConfig) Option { + return func(s *Service) error { + s.blobCfg = cfg + return nil + } +} + func SetIndexService(is *index.Service) Option { return func(s *Service) error { s.index = is diff --git a/ikuzo/service/x/bulk/service.go b/ikuzo/service/x/bulk/service.go index 1a06e050..d098f642 100644 --- a/ikuzo/service/x/bulk/service.go +++ b/ikuzo/service/x/bulk/service.go @@ -17,11 +17,14 @@ package bulk import ( "context" "database/sql" + "fmt" "net/http" stdlog "log" _ "github.com/marcboeker/go-duckdb" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" "github.com/go-chi/chi" "github.com/rs/zerolog" @@ -41,6 +44,9 @@ type Service struct { orgs domain.OrgConfigRetriever dbPath string db *sql.DB + ctx context.Context + blobCfg BlobConfig + mc *minio.Client } func NewService(options ...Option) (*Service, error) { @@ -48,6 +54,7 @@ func NewService(options ...Option) (*Service, error) { indexTypes: []string{"v2"}, postHooks: map[string][]domain.PostHookService{}, dbPath: "hub3-bulksvc.db", + ctx: context.Background(), } // apply options @@ -62,6 +69,10 @@ func NewService(options ...Option) (*Service, error) { return nil, err } + if blobSetupErr := s.setupBlobStorage(); blobSetupErr != nil { + return nil, blobSetupErr + } + return s, nil } @@ -100,6 +111,48 @@ create unique index if not exists org_dataset_idx ON dataset (orgID, datasetID); return err } +func (s *Service) setupBlobStorage() error { + if s.blobCfg.Endpoint == "" { + return fmt.Errorf("blob storage config must have endpoint") + } + // Initialize minio client object. + minioClient, err := minio.New(s.blobCfg.Endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(s.blobCfg.AccessKeyID, s.blobCfg.SecretAccessKey, ""), + Secure: s.blobCfg.UseSSL, + }) + if err != nil { + return err + } + + s.mc = minioClient + + err = minioClient.MakeBucket(s.ctx, s.blobCfg.BucketName, minio.MakeBucketOptions{}) + if err != nil { + // Check to see if we already own this bucket (which happens if you run this twice) + exists, errBucketExists := minioClient.BucketExists(s.ctx, s.blobCfg.BucketName) + if errBucketExists == nil && exists { + log.Printf("We already own %s\n", s.blobCfg.BucketName) + } else { + return err + } + } else { + s.log.Info().Msgf("Successfully created %s\n", s.blobCfg.BucketName) + } + + bucketCfg, err := s.mc.GetBucketVersioning(s.ctx, s.blobCfg.BucketName) + if err != nil { + return err + } + + if !bucketCfg.Enabled() { + if err := s.mc.EnableVersioning(s.ctx, s.blobCfg.BucketName); err != nil { + return fmt.Errorf("version must be enabled for bucket %s; %w", s.blobCfg.BucketName, err) + } + } + + return nil +} + func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := chi.NewRouter() s.Routes("", router) From b7d7be9724043c467f745413a97e2701f5221bff Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Tue, 25 Apr 2023 12:17:25 +0200 Subject: [PATCH 3/7] feat(bulk): added support for delta based sparql update queries --- go.mod | 2 +- go.sum | 4 +- ikuzo/service/x/bulk/sparql.go | 104 ++++++++++++++++++++++++++++ ikuzo/service/x/bulk/sparql_test.go | 88 +++++++++++++++++++++++ 4 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 ikuzo/service/x/bulk/sparql.go create mode 100644 ikuzo/service/x/bulk/sparql_test.go diff --git a/go.mod b/go.mod index df188eb5..67b3f0d1 100644 --- a/go.mod +++ b/go.mod @@ -168,7 +168,7 @@ require ( github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rychipman/easylex v0.0.0-20160129204217-49ee7767142f // indirect github.com/satori/go.uuid v1.2.0 // indirect - github.com/sergi/go-diff v1.2.0 // indirect + github.com/sergi/go-diff v1.3.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/snabb/diagio v1.0.0 // indirect github.com/spf13/afero v1.9.2 // indirect diff --git a/go.sum b/go.sum index b25ecb9c..3b1e912f 100644 --- a/go.sum +++ b/go.sum @@ -1193,8 +1193,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod github.com/segmentio/ksuid v1.0.3 h1:FoResxvleQwYiPAVKe1tMUlEirodZqlqglIuFsdDntY= github.com/segmentio/ksuid v1.0.3/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= -github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= -github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= diff --git a/ikuzo/service/x/bulk/sparql.go b/ikuzo/service/x/bulk/sparql.go new file mode 100644 index 00000000..06fda3e4 --- /dev/null +++ b/ikuzo/service/x/bulk/sparql.go @@ -0,0 +1,104 @@ +package bulk + +import ( + "bufio" + "fmt" + "sort" + "strings" +) + +func diffAsSparqlUpdate(previous, current, graphURI, spec string) (updateQuery string, err error) { + if graphURI == "" { + return "", fmt.Errorf("graphURI cannot be empty: %q", graphURI) + } + added, removed := diffTriples(previous, current) + + updateQuery = generateSPARQLUpdateQuery(added, removed, graphURI, spec) + + return updateQuery, nil +} + +func generateSPARQLUpdateQuery(added, removed []string, graphURI, spec string) string { + var sb strings.Builder + + if len(added) == 0 && len(removed) == 0 { + return "" + } + + if len(removed) != 0 { + // Construct the DELETE statement + sb.WriteString("DELETE DATA {\n") + sb.WriteString("GRAPH <") + sb.WriteString(graphURI) + sb.WriteString("> { ") + for _, triple := range removed { + sb.WriteString(triple + "\n") + } + sb.WriteString(" }\n") + sb.WriteString("}\n") + + } + + if len(added) != 0 { + // Construct the INSERT statement + sb.WriteString("INSERT DATA {\n") + sb.WriteString("GRAPH <") + sb.WriteString(graphURI) + sb.WriteString("> { ") + if spec != "" { + sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", graphURI, spec)) + } + for _, triple := range added { + sb.WriteString(triple + "\n") + } + sb.WriteString(" }\n") + sb.WriteString("}\n") + } + + return sb.String() +} + +func diffTriples(a, b string) (insertedLines, deletedLines []string) { + return diffStrings(getSortedLines(a), getSortedLines(b)) +} + +func diffStrings(a, b []string) (added, removed []string) { + mapA := make(map[string]bool) + for _, val := range a { + mapA[val] = true + } + mapB := make(map[string]bool) + for _, val := range b { + mapB[val] = true + } + + for val := range mapB { + if !mapA[val] { + added = append(added, val) + } + } + + for val := range mapA { + if !mapB[val] { + removed = append(removed, val) + } + } + + sort.Strings(added) + sort.Strings(removed) + + return added, removed +} + +func getSortedLines(s string) []string { + lines := make([]string, 0) + scanner := bufio.NewScanner(strings.NewReader(s)) + for scanner.Scan() { + line := scanner.Text() + if strings.TrimSpace(line) != "" { + lines = append(lines, line) + } + } + sort.Strings(lines) + return lines +} diff --git a/ikuzo/service/x/bulk/sparql_test.go b/ikuzo/service/x/bulk/sparql_test.go new file mode 100644 index 00000000..a22a759a --- /dev/null +++ b/ikuzo/service/x/bulk/sparql_test.go @@ -0,0 +1,88 @@ +package bulk + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/matryer/is" +) + +func TestDiffTriples(t *testing.T) { + t.Run("inserted and deleted", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + + b := `<1> "title 2" . + <1> "subject". + <1> "123" . + <1> <2> . + <1> "subject3". + ` + + inserted, deleted := diffTriples(a, b) + t.Logf("inserted: %#v", inserted) + t.Logf("deleted: %#v", deleted) + is.Equal(len(inserted), 2) + is.Equal(len(deleted), 2) + is.Equal(inserted, []string{"\t<1> \"subject3\".", "<1> \"title 2\" ."}) + is.Equal(deleted, []string{"\t<1> \"subject2\".", "<1> \"title\" ."}) + }) + t.Run("no changes", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + inserted, deleted := diffTriples(a, a) + is.Equal(len(inserted), 0) + is.Equal(len(deleted), 0) + }) + t.Run("no changes", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + inserted, deleted := diffTriples(``, a) + is.Equal(len(inserted), 5) + is.Equal(len(deleted), 0) + }) +} + +func TestDiffAsSparqlUpdate(t *testing.T) { + is := is.New(t) + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + + b := `<1> "title 2" . + <1> "subject". + <1> "123" . + <1> <2> . + <1> "subject3". + ` + + want := "DELETE DATA {\nGRAPH { \t<1> \"subject2\".\n<1> \"title\" .\n }\n}\nINSERT DATA {\nGRAPH { \"123\" .\n\t<1> \"subject3\".\n<1> \"title 2\" .\n }\n}\n" + + got, err := diffAsSparqlUpdate(a, b, "urn:123/graph", "123") + t.Logf("query: %#v", got) + is.NoErr(err) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("diffAsSparqlUpdate() mismatch (-want +got):\n%s", diff) + } +} From b8ae4cea8765f29cced68c5bde6dc660d7d7c1e9 Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Thu, 27 Apr 2023 10:34:19 +0200 Subject: [PATCH 4/7] chore(docker-compose.yml): change minio port from 9000 to 9010 feat(go.mod): add oklog/ulid v1.3.1 dependency chore(hub3.toml): change minio endpoint port from 9000 to 9010 refactor(hub3/fragments/graph.go): modify Reader method to return the length of the byte array refactor(rdfstream.go): reorganize import statements feat(rdfstream.go): add HubID and OrgID to the fragment metadata in IndexFragments() function refactor(resource.go): fix typo in CreateDateRange error message refactor(resource.go): rename year variable to date in padYears function refactor(resource.go): rename formattedDate variable in padYears function refactor(resource.go): fix typo in hyphenateDate error message refactor(resource.go): rename splitDate function to splitPeriod for clarity refactor(resource.go): improve error messages in SetContextLevels and NewResourceMap functions fix(resource.go): fix typo in error message in SetContextLevels function refactor(sparql.go): add omitempty to SparqlUpdate struct fields feat(config/bulk.go): add StoreRequests field to Bulk struct feat(config/elasticsearch.go): add LogRequests field to BulkConfig struct feat(handle_upload.go): add GetGraph method to Service struct feat(options.go): add SetLogRequests option to Service struct feat(parser.go): add support for logging raw requests in bulk parser service feat(parser.go): add support for storing bulk request to disk for debugging feat(parser.go): add support for storing graphs to MinIO fix(parser.go): fix variable naming inconsistency in setDataSet function refactor(parser.go): remove unused code and comments feat(parser.go): add HubID and OrgID to RDF bulk request fix(parser.go): use IterTriplesOrdered instead of IterTriples to serialize triples in order refactor(service.go): reformat code for better readability feat(service.go): add logRequests boolean option to NewService function --- docker-compose.yml | 2 +- go.mod | 1 + go.sum | 1 + hub3.toml | 3 +- hub3/fragments/graph.go | 9 +- hub3/fragments/rdfstream.go | 7 +- hub3/fragments/resource.go | 43 ++++---- hub3/fragments/sparql.go | 14 ++- ikuzo/ikuzoctl/cmd/config/bulk.go | 5 +- ikuzo/ikuzoctl/cmd/config/elasticsearch.go | 1 + ikuzo/service/x/bulk/handle_upload.go | 21 +++- ikuzo/service/x/bulk/options.go | 7 ++ ikuzo/service/x/bulk/parser.go | 117 ++++++++++++++++++--- ikuzo/service/x/bulk/service.go | 21 ++-- 14 files changed, 191 insertions(+), 61 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bbb27569..51144daa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: minio: image: minio/minio ports: - - "9000:9000" + - "9010:9000" - "9001:9001" volumes: - ./docker-data/minio_storage:/data diff --git a/go.mod b/go.mod index 67b3f0d1..1e0c1f57 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/minio/minio-go/v7 v7.0.26 github.com/mitchellh/go-homedir v1.1.0 github.com/nats-io/stan.go v0.10.2 + github.com/oklog/ulid v1.3.1 github.com/olivere/elastic/v7 v7.0.32 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 diff --git a/go.sum b/go.sum index 3b1e912f..becd00a1 100644 --- a/go.sum +++ b/go.sum @@ -1015,6 +1015,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= diff --git a/hub3.toml b/hub3.toml index 0a8e8ccf..08d6cd15 100644 --- a/hub3.toml +++ b/hub3.toml @@ -15,8 +15,9 @@ repositoryName = "DCN OAI-PMH repository" [bulk] dbPath = "hub3-bulksvc2.db" +storeRequests = false [bulk.minio] -endpoint = "127.0.0.1:9000" +endpoint = "127.0.0.1:9010" accessKeyId = "kPxADQXcHFbR9i1V" secretAccessKey = "LiwsjNxulYJVyey50U1ypTabt8BiMWiU" useSSL = false diff --git a/hub3/fragments/graph.go b/hub3/fragments/graph.go index 7dc53d6b..268fa60c 100644 --- a/hub3/fragments/graph.go +++ b/hub3/fragments/graph.go @@ -36,13 +36,16 @@ func (fg *FragmentGraph) Marshal() ([]byte, error) { return json.Marshal(fg) } -func (fg *FragmentGraph) Reader() (io.Reader, error) { +func (fg *FragmentGraph) Reader() (int, io.Reader, error) { + // TODO: idempotency is an issue + // fg.Meta.Modified = 0 + // fg.Meta.Revision = 0 b, err := json.MarshalIndent(fg, "", " ") if err != nil { - return nil, err + return 0, nil, err } - return bytes.NewReader(b), nil + return len(b), bytes.NewReader(b), nil } func (fg *FragmentGraph) IndexMessage() (*domainpb.IndexMessage, error) { diff --git a/hub3/fragments/rdfstream.go b/hub3/fragments/rdfstream.go index 9cdc9ca6..f243d187 100644 --- a/hub3/fragments/rdfstream.go +++ b/hub3/fragments/rdfstream.go @@ -23,10 +23,11 @@ import ( "log" "strings" - c "github.com/delving/hub3/config" - "github.com/delving/hub3/ikuzo/domain/domainpb" rdf "github.com/kiivihal/gon3" r "github.com/kiivihal/rdf2go" + + c "github.com/delving/hub3/config" + "github.com/delving/hub3/ikuzo/domain/domainpb" ) // parseTurtleFile creates a graph from an uploaded file @@ -241,6 +242,8 @@ func (upl *RDFUploader) IndexFragments(bi BulkIndex) (int, error) { Triples: triples.String(), NamedGraphURI: fg.Meta.NamedGraphURI, Spec: fg.Meta.Spec, + HubID: fg.Meta.HubID, + OrgID: fg.Meta.OrgID, SpecRevision: revision, } triples.Reset() diff --git a/hub3/fragments/resource.go b/hub3/fragments/resource.go index d146e44a..6348561c 100644 --- a/hub3/fragments/resource.go +++ b/hub3/fragments/resource.go @@ -25,15 +25,16 @@ import ( "time" "unicode" + r "github.com/kiivihal/rdf2go" + elastic "github.com/olivere/elastic/v7" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + c "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/index" "github.com/delving/hub3/ikuzo/rdf" "github.com/delving/hub3/ikuzo/search" "github.com/delving/hub3/ikuzo/storage/x/memory" - r "github.com/kiivihal/rdf2go" - elastic "github.com/olivere/elastic/v7" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" ) const ( @@ -543,7 +544,6 @@ func (tp *TreePaging) CalculatePaging() { tp.HasNext = true tp.PageNext = tp.PageLast + 1 } - return } func (tp *TreePaging) setFirstLastPage() { @@ -568,7 +568,6 @@ func (tp *TreePaging) setFirstLastPage() { } tp.PageFirst = min tp.PageLast = max - return } // TreePageEntry contains information how to merge pages from different responses. @@ -887,7 +886,7 @@ func CreateDateRange(period string) (IndexRange, error) { ir.Greater, _ = padYears(parts[0], true) ir.Less, _ = padYears(parts[1], false) default: - return ir, fmt.Errorf("Unable to create data range for: %#v", parts) + return ir, fmt.Errorf("unable to create data range for: %#v", parts) } if err := ir.Valid(); err != nil { @@ -897,11 +896,11 @@ func CreateDateRange(period string) (IndexRange, error) { return ir, nil } -func padYears(year string, start bool) (string, error) { - parts := strings.Split(year, "-") +func padYears(date string, start bool) (string, error) { + parts := strings.Split(date, "-") switch len(parts) { case 3: - return year, nil + return date, nil case 2: year := parts[0] month := parts[1] @@ -929,15 +928,15 @@ func padYears(year string, start bool) (string, error) { return fmt.Sprintf("%s-12-31", year), nil } default: - // try to hyphenate the date - date, err := hyphenateDate(year) + // try to hyphenate the formattedDate + formattedDate, err := hyphenateDate(year) if err != nil { return "", err } - return padYears(date, start) + return padYears(formattedDate, start) } } - return "", fmt.Errorf("unsupported case for padding: %s", year) + return "", fmt.Errorf("unsupported case for padding: %s", date) } // hyphenateDate converts a string of date string into the hyphenated form. @@ -951,7 +950,7 @@ func hyphenateDate(date string) (string, error) { case 8: return fmt.Sprintf("%s-%s-%s", date[:4], date[4:6], date[6:]), nil } - return "", fmt.Errorf("Unable to hyphenate date string: %#v", date) + return "", fmt.Errorf("unable to hyphenate date string: %#v", date) } func splitPeriod(c rune) bool { @@ -976,12 +975,12 @@ func (fr *FragmentResource) GetLabel() (label, language string) { // SetContextLevels sets FragmentReferrerContext to each level from the root func (rm *ResourceMap) SetContextLevels(subjectURI string) (map[string]*FragmentResource, error) { if len(rm.resources) == 0 { - return nil, fmt.Errorf("ResourceMap cannot be empty for subjecURI: %s", subjectURI) + return nil, fmt.Errorf("subjectURI %q must be present in ResourceMap", subjectURI) } subject, ok := rm.GetResource(subjectURI) if !ok { - return nil, fmt.Errorf("Subject %s is not part of the graph", subjectURI) + return nil, fmt.Errorf("subject %s is not part of the graph", subjectURI) } linkedObjects := map[string]*FragmentResource{} @@ -1110,9 +1109,9 @@ func (re *ResourceEntry) AsTriple(subject rdf.Subject) (*rdf.Triple, error) { case re.Language != "": object, err = rdf.NewLiteralWithLang(re.Value, re.Language) case re.DataType != "": - dt, err := rdf.NewIRI(re.DataType) - if err != nil { - return nil, err + dt, iriErr := rdf.NewIRI(re.DataType) + if iriErr != nil { + return nil, iriErr } object, err = rdf.NewLiteralWithType(re.Value, dt) default: @@ -1183,7 +1182,7 @@ func NewResourceMap(orgID string, g *r.Graph) (*ResourceMap, error) { } if g.Len() == 0 { - return rm, fmt.Errorf("The graph cannot be empty") + return rm, fmt.Errorf("the graph cannot be empty") } seen := 0 @@ -1309,8 +1308,6 @@ func (f *Fragment) SetPath(contextPath string) { f.NestedPath = append(f.NestedPath, path) f.Path = append(f.Path, typedLabel) } - - return } // CreateTriple creates a *rdf2go.Triple from a Fragment diff --git a/hub3/fragments/sparql.go b/hub3/fragments/sparql.go index 223612ff..acf32177 100644 --- a/hub3/fragments/sparql.go +++ b/hub3/fragments/sparql.go @@ -26,16 +26,20 @@ import ( "text/template" "time" - "github.com/delving/hub3/config" "github.com/parnurzeal/gorequest" + + "github.com/delving/hub3/config" ) // SparqlUpdate contains the elements to perform a SPARQL update query type SparqlUpdate struct { - Triples string `json:"triples"` - NamedGraphURI string `json:"graphUri"` - Spec string `json:"datasetSpec"` - SpecRevision int `json:"specRevision"` + Triples string `json:"triples,omitempty"` + PreviousTriples string `json:"previousTriples,omitempty"` + NamedGraphURI string `json:"graphUri,omitempty"` + OrgID string `json:"orgID,omitempty"` + HubID string `json:"hubID,omitempty"` + Spec string `json:"datasetSpec,omitempty"` + SpecRevision int `json:"specRevision,omitempty"` } // TripleCount counts the number of Ntriples in a string diff --git a/ikuzo/ikuzoctl/cmd/config/bulk.go b/ikuzo/ikuzoctl/cmd/config/bulk.go index 5ebcab1e..b002d0f6 100644 --- a/ikuzo/ikuzoctl/cmd/config/bulk.go +++ b/ikuzo/ikuzoctl/cmd/config/bulk.go @@ -1,8 +1,9 @@ package config type Bulk struct { - DBPath string `json:"dbPath,omitempty"` - Minio struct { + DBPath string `json:"dbPath,omitempty"` + StoreRequests bool + Minio struct { Endpoint string `json:"endpoint,omitempty"` AccessKeyID string `json:"accessKeyID,omitempty"` SecretAccessKey string `json:"secretAccessKey,omitempty"` diff --git a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go index b82c8eff..b4d27f96 100644 --- a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go +++ b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go @@ -125,6 +125,7 @@ func (e *ElasticSearch) AddOptions(cfg *Config) error { bulk.SetPostHookService(postHooks...), bulk.SetDBPath(cfg.Bulk.DBPath), bulk.SetBlobConfig(cfg.Bulk.Minio), + bulk.SetLogRequests(cfg.Bulk.StoreRequests), ) if bulkErr != nil { return fmt.Errorf("unable to create bulk service; %w", bulkErr) diff --git a/ikuzo/service/x/bulk/handle_upload.go b/ikuzo/service/x/bulk/handle_upload.go index df83c89e..ab63f381 100644 --- a/ikuzo/service/x/bulk/handle_upload.go +++ b/ikuzo/service/x/bulk/handle_upload.go @@ -1,12 +1,16 @@ package bulk import ( + "fmt" "net/http" + "strings" - "github.com/delving/hub3/hub3/fragments" - "github.com/delving/hub3/ikuzo/domain" "github.com/go-chi/render" + "github.com/minio/minio-go/v7" "github.com/rs/zerolog/log" + + "github.com/delving/hub3/hub3/fragments" + "github.com/delving/hub3/ikuzo/domain" ) // bulkApi receives bulkActions in JSON form (1 per line) and processes them in @@ -55,6 +59,8 @@ func (s *Service) NewParser() *Parser { indexTypes: s.indexTypes, bi: s.index, sparqlUpdates: []fragments.SparqlUpdate{}, + s: s, + graphs: map[string]*fragments.FragmentBuilder{}, } if len(s.postHooks) != 0 { @@ -63,3 +69,14 @@ func (s *Service) NewParser() *Parser { return p } + +func (s *Service) GetGraph(hubID string, versionID string) error { + parts := strings.Split(hubID, "_") + path := fmt.Sprintf("%s/%s/fg/%s.json", parts[0], parts[1], parts[2]) + _, err := s.mc.GetObject(s.ctx, s.blobCfg.BucketName, path, minio.GetObjectOptions{VersionID: versionID}) + if err != nil { + return err + } + + return nil +} diff --git a/ikuzo/service/x/bulk/options.go b/ikuzo/service/x/bulk/options.go index 7b61e9a0..4e1dbe36 100644 --- a/ikuzo/service/x/bulk/options.go +++ b/ikuzo/service/x/bulk/options.go @@ -47,6 +47,13 @@ func SetIndexTypes(indexTypes ...string) Option { } } +func SetLogRequests(enable bool) Option { + return func(s *Service) error { + s.logRequests = enable + return nil + } +} + func SetPostHookService(hooks ...domain.PostHookService) Option { return func(s *Service) error { for _, hook := range hooks { diff --git a/ikuzo/service/x/bulk/parser.go b/ikuzo/service/x/bulk/parser.go index a5dd7f44..738d245b 100644 --- a/ikuzo/service/x/bulk/parser.go +++ b/ikuzo/service/x/bulk/parser.go @@ -22,20 +22,25 @@ import ( "errors" "fmt" "io" + "os" "strings" "sync" "sync/atomic" + "time" + "github.com/minio/minio-go/v7" + "github.com/oklog/ulid" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" + "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/fragments" "github.com/delving/hub3/hub3/models" "github.com/delving/hub3/ikuzo/domain" "github.com/delving/hub3/ikuzo/domain/domainpb" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/rs/zerolog/log" - "golang.org/x/sync/errgroup" rdf "github.com/kiivihal/rdf2go" ) @@ -50,13 +55,15 @@ type Parser struct { sparqlUpdates []fragments.SparqlUpdate // store all the triples here for bulk insert postHooks []*domain.PostHookItem m sync.RWMutex + s *Service + graphs map[string]*fragments.FragmentBuilder // TODO: probably remove this later + rawRequest bytes.Buffer } func (p *Parser) Parse(ctx context.Context, r io.Reader) error { ctx, done := context.WithCancel(ctx) g, gctx := errgroup.WithContext(ctx) _ = gctx - defer done() workers := 4 @@ -64,7 +71,9 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { actions := make(chan Request) g.Go(func() error { - defer close(actions) + defer func() { + close(actions) + }() scanner := bufio.NewScanner(r) buf := make([]byte, 0, 64*1024) @@ -74,6 +83,11 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { var req Request b := scanner.Bytes() + if p.s.logRequests { + b = append(b, '\n') + p.rawRequest.Write(b) + } + if err := json.Unmarshal(b, &req); err != nil { atomic.AddUint64(&p.stats.JSONErrors, 1) log.Error().Str("svc", "bulk").Err(err).Msg("json parse error") @@ -82,6 +96,8 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { continue } + p.once.Do(func() { p.setDataSet(&req) }) + select { case actions <- req: case <-gctx.Done(): @@ -128,12 +144,84 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { log.Warn().Err(err).Msg("context canceled during bulk indexing") } + if p.s.logRequests { + if err := p.storeRequest(); err != nil { + p.s.log.Warn().Err(err).Msg("unable to store request for debugging") + } + } + + p.s.log.Info().Msgf("graphs: %d", len(p.graphs)) + if config.Config.RDF.RDFStoreEnabled { if errs := p.RDFBulkInsert(); errs != nil { return errs[0] } } + if err := p.StoreGraphs(); err != nil { + return err + } + + return nil +} + +// logRequest logs the bulk request to disk for inspection and reuse +func (p *Parser) storeRequest() error { + u, err := ulid.New(ulid.Now(), nil) + if err != nil { + p.s.log.Error().Err(err).Msg("unable to create ulid") + return err + } + + path := fmt.Sprintf("/tmp/%s_%s_%d_%s.ldjson", p.ds.OrgID, p.ds.Spec, p.ds.Revision, u.String()) + return os.WriteFile(path, p.rawRequest.Bytes(), os.ModePerm) +} + +func (p *Parser) StoreGraphs() error { + opts := minio.SnowballOptions{ + Opts: minio.PutObjectOptions{ContentType: "application/json"}, + // Keep in memory. We use this since we have small total payload. + InMemory: true, + // Compress data when uploading to a MinIO host. + Compress: true, + } + + input := make(chan minio.SnowballObject, 10) + + go func() { + defer close(input) + for _, su := range p.sparqlUpdates { + + key := fmt.Sprintf("%s/%s/fg/%s.json", su.OrgID, su.Spec, su.HubID) + input <- minio.SnowballObject{ + Key: key, + Size: int64(len(su.Triples)), + ModTime: time.Now(), + Content: strings.NewReader(su.Triples), + } + } + + // TODO: store sparql update with content hash and store it under conten-hash + + // for _, fb := range p.graphs { + // fg := fb.FragmentGraph() + // + // key := fmt.Sprintf("%s/%s/fg/%s.json", fg.Meta.OrgID, fg.Meta.Spec, fg.Meta.HubID) + // size, r, _ := fg.Reader() + // input <- minio.SnowballObject{ + // Key: key, + // Size: int64(size), + // ModTime: time.Now(), + // Content: r, + // } + // } + }() + + err := p.s.mc.PutObjectsSnowball(context.TODO(), p.s.blobCfg.BucketName, opts, input) + if err != nil { + return err + } + return nil } @@ -157,9 +245,9 @@ func containsString(s []string, e string) bool { } func (p *Parser) setDataSet(req *Request) { - ds, _, dsError := models.GetOrCreateDataSet(req.OrgID, req.DatasetID) - if dsError != nil { - // log error + ds, _, dsErr := models.GetOrCreateDataSet(req.OrgID, req.DatasetID) + if dsErr != nil { + p.s.log.Error().Err(dsErr).Msg("unable to get or create dataset") return } @@ -220,8 +308,6 @@ func addLogger(datasetID string) zerolog.Logger { } func (p *Parser) process(ctx context.Context, req *Request) error { - p.once.Do(func() { p.setDataSet(req) }) - subLogger := addLogger(req.DatasetID) if p.ds == nil { @@ -229,7 +315,6 @@ func (p *Parser) process(ctx context.Context, req *Request) error { } req.Revision = p.ds.Revision - // TODO(kiivihal): add logger switch req.Action { case "index": @@ -328,6 +413,10 @@ func (p *Parser) Publish(ctx context.Context, req *Request) error { } } + p.m.Lock() + p.graphs[fb.FragmentGraph().Meta.HubID] = fb + p.m.Unlock() + for _, indexType := range p.indexTypes { switch indexType { case "v1": @@ -393,10 +482,14 @@ func (p *Parser) AppendRDFBulkRequest(req *Request, g *rdf.Graph) error { Triples: b.String(), NamedGraphURI: req.NamedGraphURI, Spec: req.DatasetID, - SpecRevision: req.Revision, + HubID: req.HubID, + OrgID: req.OrgID, + // SpecRevision: req.Revision, // TODO: This can only be removed after the orphan control is fixed } + p.m.Lock() p.sparqlUpdates = append(p.sparqlUpdates, su) + p.m.Unlock() return nil } @@ -430,7 +523,7 @@ func encodeTerm(iterm rdf.Term) string { func serializeNTriples(g *rdf.Graph, w io.Writer) error { var err error - for triple := range g.IterTriples() { + for triple := range g.IterTriplesOrdered() { s := encodeTerm(triple.Subject) if strings.HasPrefix(s, " Date: Tue, 16 May 2023 10:11:37 +0200 Subject: [PATCH 5/7] wip(bulk): basic scaffolding for delta storage workflow --- config/config.go | 30 ++++---- hub3/fragments/graph.go | 4 +- hub3/fragments/sparql.go | 37 +++++++-- hub3/models/dataset.go | 6 +- ikuzo/service/x/bulk/db.go | 29 +++++++ ikuzo/service/x/bulk/parser.go | 137 ++++++++++++++++++++++++++------- ikuzo/service/x/bulk/sparql.go | 42 +++++++--- 7 files changed, 219 insertions(+), 66 deletions(-) create mode 100644 ikuzo/service/x/bulk/db.go diff --git a/config/config.go b/config/config.go index d4f6af32..7b40b3ca 100644 --- a/config/config.go +++ b/config/config.go @@ -22,10 +22,11 @@ import ( "os" "strings" - "github.com/delving/hub3/ikuzo/logger" homedir "github.com/mitchellh/go-homedir" "github.com/rs/zerolog" "github.com/spf13/viper" + + "github.com/delving/hub3/ikuzo/logger" ) var ( @@ -156,20 +157,21 @@ type HTTP struct { // RDF holds all the configuration for SPARQL queries and RDF conversions type RDF struct { - SparqlEnabled bool `json:"sparqlEnabled"` // Enable the SPARQL proxy - SparqlHost string `json:"sparqlHost"` // the base-url to the SPARQL endpoint including the scheme and the port - SparqlPath string `json:"sparqlPath"` // the relative path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build - SparqlUpdatePath string `json:"sparqlUpdatePath"` // the relative path of the update endpoint. This can should contain the database name that is injected when the sparql endpoint is build - GraphStorePath string `json:"dataPath"` // the relative GraphStore path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build - BaseURL string `json:"baseUrl"` // the RDF baseUrl used for minting new URIs (should not include scheme) - BaseScheme string `json:"baseScheme"` // the scheme (http or https) used in the baseURL - RDFStoreEnabled bool `json:"rdfStoreEnabled"` // Store to Triple Store while saving RDF + SparqlEnabled bool `json:"sparqlEnabled,omitempty"` // Enable the SPARQL proxy + SparqlHost string `json:"sparqlHost,omitempty"` // the base-url to the SPARQL endpoint including the scheme and the port + SparqlPath string `json:"sparqlPath,omitempty"` // the relative path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build + SparqlUpdatePath string `json:"sparqlUpdatePath,omitempty"` // the relative path of the update endpoint. This can should contain the database name that is injected when the sparql endpoint is build + GraphStorePath string `json:"dataPath,omitempty"` // the relative GraphStore path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build + BaseURL string `json:"baseUrl,omitempty"` // the RDF baseUrl used for minting new URIs (should not include scheme) + BaseScheme string `json:"baseScheme,omitempty"` // the scheme (http or https) used in the baseURL + RDFStoreEnabled bool `json:"rdfStoreEnabled,omitempty"` // Store to Triple Store while saving RDF + StoreSparqlDeltas bool `json:"storeSparqlDeltas,omitempty"` + Tags string `json:"tags,omitempty" mapstructure:"tags"` + DefaultFormat string `json:"defaultFormat,omitempty"` + RDFStoreTags []string `json:"rdfStoreTags,omitempty"` // the tags that trigger storage in the triple-store // the RDF entryPoints. Lookups are made on the fully qualified URIs. It is sometimes needed to support other baseUrls as well. // The entry-points need to be fully qualified, i.e. with their scheme. - RoutedEntryPoints []string `json:"RoutedEntryPoints"` - Tags string `json:"tags" mapstructure:"tags"` - DefaultFormat string `json:"defaultFormat"` - RDFStoreTags []string `json:"rdfStoreTags"` // the tags that trigger storage in the triple-store + RoutedEntryPoints []string `json:"RoutedEntryPoints,omitempty"` } func (rdf *RDF) HasStoreTag(tags []string) bool { @@ -262,7 +264,7 @@ type EAD struct { GenreFormDefault string `json:"genreFormDefault"` TreeFields []string `json:"treeFields"` SearchFields []string `json:"searchFields"` - Genreforms []string `json:"genreforms"` + Genreforms []string `json:"genreforms"` } func setDefaults() { diff --git a/hub3/fragments/graph.go b/hub3/fragments/graph.go index 268fa60c..3357e10a 100644 --- a/hub3/fragments/graph.go +++ b/hub3/fragments/graph.go @@ -38,8 +38,8 @@ func (fg *FragmentGraph) Marshal() ([]byte, error) { func (fg *FragmentGraph) Reader() (int, io.Reader, error) { // TODO: idempotency is an issue - // fg.Meta.Modified = 0 - // fg.Meta.Revision = 0 + fg.Meta.Modified = 0 + fg.Meta.Revision = 0 b, err := json.MarshalIndent(fg, "", " ") if err != nil { return 0, nil, err diff --git a/hub3/fragments/sparql.go b/hub3/fragments/sparql.go index acf32177..c8a8443e 100644 --- a/hub3/fragments/sparql.go +++ b/hub3/fragments/sparql.go @@ -26,6 +26,7 @@ import ( "text/template" "time" + "github.com/OneOfOne/xxhash" "github.com/parnurzeal/gorequest" "github.com/delving/hub3/config" @@ -33,13 +34,21 @@ import ( // SparqlUpdate contains the elements to perform a SPARQL update query type SparqlUpdate struct { - Triples string `json:"triples,omitempty"` - PreviousTriples string `json:"previousTriples,omitempty"` - NamedGraphURI string `json:"graphUri,omitempty"` - OrgID string `json:"orgID,omitempty"` - HubID string `json:"hubID,omitempty"` - Spec string `json:"datasetSpec,omitempty"` - SpecRevision int `json:"specRevision,omitempty"` + Triples string `json:"triples,omitempty"` + NamedGraphURI string `json:"graphUri,omitempty"` + OrgID string `json:"orgID,omitempty"` + HubID string `json:"hubID,omitempty"` + Spec string `json:"datasetSpec,omitempty"` + SpecRevision int `json:"specRevision,omitempty"` + RDFHash string `json:"rdfHash,omitempty"` +} + +func (su *SparqlUpdate) GetHash() string { + if su.RDFHash != "" { + return su.RDFHash + } + + return hash(su.Triples) } // TripleCount counts the number of Ntriples in a string @@ -73,6 +82,7 @@ func executeTemplate(tmplString string, name string, model interface{}) string { func (su SparqlUpdate) String() string { t := `GRAPH <{{.NamedGraphURI}}> { <{{.NamedGraphURI}}> "{{.Spec}}" . + <{{.NamedGraphURI}}> "{{.RDFHash}}" . <{{.NamedGraphURI}}> "{{.SpecRevision}}"^^ . {{ .Triples }} }` @@ -94,7 +104,7 @@ func RDFBulkInsert(orgID string, sparqlUpdates []SparqlUpdate) (int, []error) { count, err := v.TripleCount() if err != nil { log.Printf("Unable to count triples: %s", err) - return 0, []error{fmt.Errorf("Unable to count triples for %s because :%s", strs[i], err)} + return 0, []error{fmt.Errorf("unable to count triples for %s because :%s", strs[i], err)} } triplesStored += count } @@ -132,3 +142,14 @@ func UpdateViaSparql(orgID, update string) []error { } return nil } + +type hasher string + +func hash(input string) string { + hash := xxhash.Checksum64([]byte(input)) + return fmt.Sprintf("%d", hash) +} + +func getHash(input fmt.Stringer) hasher { + return hasher(hash(input.String())) +} diff --git a/hub3/models/dataset.go b/hub3/models/dataset.go index fa1a5805..115eed18 100644 --- a/hub3/models/dataset.go +++ b/hub3/models/dataset.go @@ -23,11 +23,12 @@ import ( "time" "github.com/asdine/storm/q" + wp "github.com/gammazero/workerpool" + "github.com/rs/zerolog/log" + c "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/fragments" "github.com/delving/hub3/hub3/index" - wp "github.com/gammazero/workerpool" - "github.com/rs/zerolog/log" elastic "github.com/olivere/elastic/v7" ) @@ -720,6 +721,7 @@ func (ds DataSet) deleteAllIndexRecords(ctx context.Context, wp *wp.WorkerPool) // DropOrphans removes all records of different revision that the current from the attached datastores func (ds DataSet) DropOrphans(ctx context.Context, p *elastic.BulkProcessor, wp *wp.WorkerPool) (bool, error) { if c.Config.RDF.RDFStoreEnabled { + // TODO: only do this for non-delta ok, err := ds.deleteGraphsOrphans() if !ok || err != nil { log.Warn().Msgf("Unable to remove RDF orphan graphs from spec %s: %s", ds.Spec, err) diff --git a/ikuzo/service/x/bulk/db.go b/ikuzo/service/x/bulk/db.go new file mode 100644 index 00000000..d34610ab --- /dev/null +++ b/ikuzo/service/x/bulk/db.go @@ -0,0 +1,29 @@ +package bulk + +import "github.com/delving/hub3/hub3/fragments" + +func (s *Service) getPreviousUpdates(ids []string) ([]*fragments.SparqlUpdate, error) { + return []*fragments.SparqlUpdate{}, nil +} + +func (s *Service) storeUpdatedHashes(diffs []*DiffConfig) error { + return nil +} + +func (s *Service) incrementRevisionForSeen(ids []string) error { + return nil +} + +type sparqlOrphan struct { + NamedGraphURI string + OrgID string + DatasetID string +} + +func (s *Service) findOrphans(orgID, dataSetID string, revision int) ([]sparqlOrphan, error) { + return []sparqlOrphan{}, nil +} + +func (s *Service) dropOrphans(orphans []sparqlOrphan) error { + return nil +} diff --git a/ikuzo/service/x/bulk/parser.go b/ikuzo/service/x/bulk/parser.go index 738d245b..8becb951 100644 --- a/ikuzo/service/x/bulk/parser.go +++ b/ikuzo/service/x/bulk/parser.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "os" + "sort" "strings" "sync" "sync/atomic" @@ -153,12 +154,82 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { p.s.log.Info().Msgf("graphs: %d", len(p.graphs)) if config.Config.RDF.RDFStoreEnabled { - if errs := p.RDFBulkInsert(); errs != nil { - return errs[0] + if config.Config.RDF.StoreSparqlDeltas { + if err := p.StoreGraphDeltas(); err != nil { + return err + } + } else { + if errs := p.RDFBulkInsert(); errs != nil { + return errs[0] + } + } + } + + return nil +} + +func (p *Parser) StoreGraphDeltas() error { + // extract the ids and get a query for all stored entries + ids := []string{} + for _, su := range p.sparqlUpdates { + ids = append(ids, su.HubID) + } + + // get previously stored updates + previousUpdates, err := p.s.getPreviousUpdates(ids) + if err != nil { + return err + } + lookUp := map[string]*fragments.SparqlUpdate{} + for _, prev := range previousUpdates { + lookUp[prev.HubID] = prev + } + + // check which updates have changed + changed := []*DiffConfig{} + for _, current := range p.sparqlUpdates { + prev, ok := lookUp[current.HubID] + if !ok { + // new so add it + changed = append(changed, &DiffConfig{su: ¤t}) + continue + } + + if prev.RDFHash == current.RDFHash { + // same has so skip it + continue + } + + changed = append(changed, &DiffConfig{su: ¤t, previousTriples: prev.Triples, previousHash: prev.RDFHash}) + } + + var sparqlUpdate strings.Builder + for _, cfg := range changed { + update, err := diffAsSparqlUpdate(cfg) + if err != nil { + return err } + sparqlUpdate.WriteString(update) + } + + // update the diffs in the triple store + errs := fragments.UpdateViaSparql(p.ds.OrgID, sparqlUpdate.String()) + if errs != nil { + return errs[0] } - if err := p.StoreGraphs(); err != nil { + // store the graphs in the S3 bucket + if err := p.StoreGraphs(changed); err != nil { + return err + } + + // store the update hashes + if err := p.s.storeUpdatedHashes(changed); err != nil { + return err + } + + // update revision for all seen ids + if err := p.s.incrementRevisionForSeen(ids); err != nil { return err } @@ -177,7 +248,7 @@ func (p *Parser) storeRequest() error { return os.WriteFile(path, p.rawRequest.Bytes(), os.ModePerm) } -func (p *Parser) StoreGraphs() error { +func (p *Parser) StoreGraphs(diffs []*DiffConfig) error { opts := minio.SnowballOptions{ Opts: minio.PutObjectOptions{ContentType: "application/json"}, // Keep in memory. We use this since we have small total payload. @@ -190,31 +261,16 @@ func (p *Parser) StoreGraphs() error { go func() { defer close(input) - for _, su := range p.sparqlUpdates { + for _, diff := range diffs { - key := fmt.Sprintf("%s/%s/fg/%s.json", su.OrgID, su.Spec, su.HubID) + key := fmt.Sprintf("%s/%s/rdf/%s.ntriples", diff.su.OrgID, diff.su.Spec, diff.su.GetHash()) input <- minio.SnowballObject{ Key: key, - Size: int64(len(su.Triples)), + Size: int64(len(diff.su.Triples)), ModTime: time.Now(), - Content: strings.NewReader(su.Triples), + Content: strings.NewReader(diff.su.Triples), } } - - // TODO: store sparql update with content hash and store it under conten-hash - - // for _, fb := range p.graphs { - // fg := fb.FragmentGraph() - // - // key := fmt.Sprintf("%s/%s/fg/%s.json", fg.Meta.OrgID, fg.Meta.Spec, fg.Meta.HubID) - // size, r, _ := fg.Reader() - // input <- minio.SnowballObject{ - // Key: key, - // Size: int64(size), - // ModTime: time.Now(), - // Content: r, - // } - // } }() err := p.s.mc.PutObjectsSnowball(context.TODO(), p.s.blobCfg.BucketName, opts, input) @@ -291,6 +347,23 @@ func (p *Parser) dropOrphans(req *Request) error { return err } + if config.Config.RDF.RDFStoreEnabled { + if config.Config.RDF.StoreSparqlDeltas { + orphans, err := p.s.findOrphans(p.ds.OrgID, p.ds.Spec, p.ds.Revision) + if err != nil { + return err + } + if err := p.s.dropOrphans(orphans); err != nil { + return err + } + } else { + _, err := models.DeleteGraphsOrphansBySpec(p.ds.OrgID, p.ds.Spec, p.ds.Revision) + if err != nil { + return err + } + } + } + return nil } @@ -484,9 +557,11 @@ func (p *Parser) AppendRDFBulkRequest(req *Request, g *rdf.Graph) error { Spec: req.DatasetID, HubID: req.HubID, OrgID: req.OrgID, - // SpecRevision: req.Revision, // TODO: This can only be removed after the orphan control is fixed + SpecRevision: req.Revision, // TODO: This can only be removed after the orphan control is fixed } + su.GetHash() + p.m.Lock() p.sparqlUpdates = append(p.sparqlUpdates, su) p.m.Unlock() @@ -523,6 +598,8 @@ func encodeTerm(iterm rdf.Term) string { func serializeNTriples(g *rdf.Graph, w io.Writer) error { var err error + triples := []string{} + for triple := range g.IterTriplesOrdered() { s := encodeTerm(triple.Subject) if strings.HasPrefix(s, " "{{.Spec}}" . + // <{{.NamedGraphURI}}> "{{.RDFHash}}" . + // <{{.NamedGraphURI}}> "{{.SpecRevision}}"^^ . if len(removed) != 0 { // Construct the DELETE statement sb.WriteString("DELETE DATA {\n") sb.WriteString("GRAPH <") - sb.WriteString(graphURI) + sb.WriteString(cfg.su.NamedGraphURI) sb.WriteString("> { ") for _, triple := range removed { sb.WriteString(triple + "\n") } sb.WriteString(" }\n") sb.WriteString("}\n") - } if len(added) != 0 { // Construct the INSERT statement sb.WriteString("INSERT DATA {\n") sb.WriteString("GRAPH <") - sb.WriteString(graphURI) + sb.WriteString(cfg.su.NamedGraphURI) sb.WriteString("> { ") - if spec != "" { - sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", graphURI, spec)) - } + // TODO: generate from DiffConfig + // if su.Spec != "" { + // sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", su.NamedGraphURI, su.Spec)) + // sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", su.NamedGraphURI, su.RDFHash)) + // } for _, triple := range added { sb.WriteString(triple + "\n") } From 81ac24d3ba30bff281308d2170260a9cf7c9ea95 Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Wed, 24 May 2023 08:06:12 +0200 Subject: [PATCH 6/7] wip(delta.go): initial implementation based on redis refactor(config): remove unused SparqlUsername and SparqlPassword fields from RDF configuration feat(config): enable storing only changed triples in the triple store feat(bulk): add support for storing RDF data in Redis for delta updates feat(bulk): add support for finding and dropping orphaned graphs in Redis for delta updates refactor(bulk): remove unused SetDBPath option refactor(parser.go): remove unused imports and variables feat(parser.go): add setUpdateDataset and dataset methods to safely access and modify the dataset feat(parser.go): add storeGraphDeltasOld method to be removed later feat(parser.go): add storeGraphDeltas method to store graph deltas in redis and S3 feat(parser.go): add dropGraphOrphans method to drop orphan graphs from redis and triple store feat(parser.go): add incrementRevision method to increment the revision of the dataset feat(parser.go): add process method to process requests and increment revisions feat(parser.go): add stats field to Stats struct to track graphs stored refactor(service.go): remove unused imports and variables feat(service.go): add Redis support to the bulk service to store and retrieve data --- config/config.go | 6 +- docker-compose.yml | 16 ++ hub3.toml | 8 +- hub3/fragments/sparql.go | 10 +- hub3/models/dataset.go | 3 +- ikuzo/ikuzoctl/cmd/config/elasticsearch.go | 1 - ikuzo/service/x/bulk/delta.go | 114 +++++++++++++ ikuzo/service/x/bulk/handle_upload.go | 14 -- ikuzo/service/x/bulk/options.go | 11 -- ikuzo/service/x/bulk/parser.go | 176 ++++++++++++++------- ikuzo/service/x/bulk/service.go | 96 ++--------- 11 files changed, 283 insertions(+), 172 deletions(-) create mode 100644 ikuzo/service/x/bulk/delta.go diff --git a/config/config.go b/config/config.go index 7b40b3ca..bc7befce 100644 --- a/config/config.go +++ b/config/config.go @@ -157,8 +157,10 @@ type HTTP struct { // RDF holds all the configuration for SPARQL queries and RDF conversions type RDF struct { - SparqlEnabled bool `json:"sparqlEnabled,omitempty"` // Enable the SPARQL proxy - SparqlHost string `json:"sparqlHost,omitempty"` // the base-url to the SPARQL endpoint including the scheme and the port + SparqlEnabled bool `json:"sparqlEnabled,omitempty"` // Enable the SPARQL proxy + SparqlHost string `json:"sparqlHost,omitempty"` // the base-url to the SPARQL endpoint including the scheme and the port + SparqlUsername string + SparqlPassword string SparqlPath string `json:"sparqlPath,omitempty"` // the relative path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build SparqlUpdatePath string `json:"sparqlUpdatePath,omitempty"` // the relative path of the update endpoint. This can should contain the database name that is injected when the sparql endpoint is build GraphStorePath string `json:"dataPath,omitempty"` // the relative GraphStore path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build diff --git a/docker-compose.yml b/docker-compose.yml index 51144daa..874bbce1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,6 +46,22 @@ services: command: ["/jena-fuseki/fuseki-server", "--set", "tdb:unionDefaultGraph=true"] + redis: + image: "redis:alpine" + + command: redis-server --requirepass sOmE_sEcUrE_pAsS + + ports: + - "6379:6379" + + volumes: + - $PWD/docker-data/redis-data:/var/lib/redis + - $PWD/redis.conf:/usr/local/etc/redis/redis.conf + + environment: + - REDIS_REPLICATION_MODE=master + + nats: image: nats-streaming ports: diff --git a/hub3.toml b/hub3.toml index 08d6cd15..a6b50c3d 100644 --- a/hub3.toml +++ b/hub3.toml @@ -109,7 +109,7 @@ apikey = '' [logging] devmode = true sentryDSN = "" -level = "debug" +level = "info" withCaller = true consoleLogger = true # "*" ignores all 404 @@ -120,9 +120,13 @@ exclude404Path = [] [rdf] # Enable storing of the RDF records in the triple store specified in sparqlUpdatePath (default: false) -rdfStoreEnabled = false +rdfStoreEnabled = true +# enable storing only changed triples in the triple store +storeSparqlDeltas = true # the fully qualified URL including the port sparqlHost = "http://localhost:3033" +sparqlUsername = "admin" +sparqlPassword = "pw123" # the path to the SPARQL endpoint. A '%s' can be inserted in the path to be replaced with the orgId. #sparqlPath = "/bigdata/namespace/%s/sparql" sparqlPath = "/%s/sparql" diff --git a/hub3/fragments/sparql.go b/hub3/fragments/sparql.go index c8a8443e..e0bd36fb 100644 --- a/hub3/fragments/sparql.go +++ b/hub3/fragments/sparql.go @@ -41,6 +41,7 @@ type SparqlUpdate struct { Spec string `json:"datasetSpec,omitempty"` SpecRevision int `json:"specRevision,omitempty"` RDFHash string `json:"rdfHash,omitempty"` + SkipDrop bool } func (su *SparqlUpdate) GetHash() string { @@ -100,7 +101,9 @@ func RDFBulkInsert(orgID string, sparqlUpdates []SparqlUpdate) (int, []error) { triplesStored := 0 for i, v := range sparqlUpdates { strs[i] = v.String() - graphs[i] = fmt.Sprintf("DROP GRAPH <%s>;", v.NamedGraphURI) + if v.SkipDrop { + graphs[i] = fmt.Sprintf("DROP GRAPH <%s>;", v.NamedGraphURI) + } count, err := v.TripleCount() if err != nil { log.Printf("Unable to count triples: %s", err) @@ -119,6 +122,9 @@ func RDFBulkInsert(orgID string, sparqlUpdates []SparqlUpdate) (int, []error) { // UpdateViaSparql is a post to sparql function that tasks a valid SPARQL update query func UpdateViaSparql(orgID, update string) []error { request := gorequest.New() + if config.Config.SparqlPassword != "" && config.Config.SparqlUsername != "" { + request = request.SetBasicAuth(config.Config.SparqlUsername, config.Config.SparqlPassword) + } postURL := config.Config.GetSparqlUpdateEndpoint(orgID, "") parameters := url.Values{} @@ -137,7 +143,7 @@ func UpdateViaSparql(orgID, update string) []error { if resp.StatusCode != 200 && resp.StatusCode != 201 { // log.Println(body) log.Printf("unable to store sparqlUpdate: %s", update) - log.Println(resp) + // log.Println(resp) return []error{fmt.Errorf("store error for SparqlUpdate:%s", body)} } return nil diff --git a/hub3/models/dataset.go b/hub3/models/dataset.go index 115eed18..d7512942 100644 --- a/hub3/models/dataset.go +++ b/hub3/models/dataset.go @@ -720,8 +720,7 @@ func (ds DataSet) deleteAllIndexRecords(ctx context.Context, wp *wp.WorkerPool) // DropOrphans removes all records of different revision that the current from the attached datastores func (ds DataSet) DropOrphans(ctx context.Context, p *elastic.BulkProcessor, wp *wp.WorkerPool) (bool, error) { - if c.Config.RDF.RDFStoreEnabled { - // TODO: only do this for non-delta + if c.Config.RDF.RDFStoreEnabled && !c.Config.RDF.StoreSparqlDeltas { ok, err := ds.deleteGraphsOrphans() if !ok || err != nil { log.Warn().Msgf("Unable to remove RDF orphan graphs from spec %s: %s", ds.Spec, err) diff --git a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go index b4d27f96..3fb2ea04 100644 --- a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go +++ b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go @@ -123,7 +123,6 @@ func (e *ElasticSearch) AddOptions(cfg *Config) error { bulk.SetIndexService(is), bulk.SetIndexTypes(e.IndexTypes...), bulk.SetPostHookService(postHooks...), - bulk.SetDBPath(cfg.Bulk.DBPath), bulk.SetBlobConfig(cfg.Bulk.Minio), bulk.SetLogRequests(cfg.Bulk.StoreRequests), ) diff --git a/ikuzo/service/x/bulk/delta.go b/ikuzo/service/x/bulk/delta.go new file mode 100644 index 00000000..0f4ad9b7 --- /dev/null +++ b/ikuzo/service/x/bulk/delta.go @@ -0,0 +1,114 @@ +package bulk + +import ( + "context" + "fmt" + "strings" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" + + "github.com/delving/hub3/hub3/fragments" +) + +const ( + rdfType = "rdf" + esType = "es" +) + +type redisStore struct { + orgID string + spec string + revision int + previousRevision int + c *redis.Client +} + +func (rs *redisStore) datasetKey() string { + return fmt.Sprintf("%s:ds:%s", rs.orgID, rs.spec) +} + +func (rs *redisStore) getStoreKey(hubID string) string { + return hubID +} + +func (rs *redisStore) storeRDFData(su fragments.SparqlUpdate) error { + ctx := context.Background() + key := rs.getStoreKey(su.HubID) + err := rs.c.HMSet(ctx, key, map[string]interface{}{ + // rdfType: su.Triples, + "graphURI": su.NamedGraphURI, + }).Err() + if err != nil { + return err + } + + return nil +} + +func (rs *redisStore) addID(hubID, setType, hash string) (bool, error) { + ctx := context.Background() + err := rs.c.SAdd(ctx, rs.revisionSetName(setType, false), hubID).Err() + if err != nil { + return false, err + } + hasKey := hubID + ":" + hash + if err := rs.c.SAdd(ctx, rs.revisionSetName(setType, true), hasKey).Err(); err != nil { + return false, err + } + + return rs.c.SIsMember(ctx, rs.previousSetName(setType, true), hasKey).Result() +} + +func (rs *redisStore) revisionSetName(setType string, withHash bool) string { + key := fmt.Sprintf("%s:rev:%d:%s", rs.datasetKey(), rs.revision, setType) + if withHash { + key += ":hash" + } + return key +} + +func (rs *redisStore) previousSetName(setType string, withHash bool) string { + key := fmt.Sprintf("%s:rev:%d:%s", rs.datasetKey(), rs.previousRevision, setType) + if withHash { + key += ":hash" + } + return key +} + +func (rs *redisStore) dropOrphansQuery(orphans []string) (string, error) { + var sb strings.Builder + for _, orphan := range orphans { + key := rs.getStoreKey(orphan) + log.Printf("orphan key: %s", key) + res, err := rs.c.HGet(context.Background(), key, "graphURI").Result() + if err != nil { + return "", fmt.Errorf("unable to get hash key: %w", err) + } + if res != "" { + sb.WriteString(fmt.Sprintf("drop graph <%s> ;\n", res)) + } + } + + return sb.String(), nil +} + +func (rs *redisStore) findOrphans(setType string) ([]string, error) { + ctx := context.Background() + return rs.c.SDiff(ctx, rs.previousSetName(setType, false), rs.revisionSetName(setType, false)).Result() +} + +func (rs *redisStore) SetRevision(current, previous int) error { + rs.revision = current + rs.previousRevision = previous + ctx := context.Background() + err := rs.c.HMSet(ctx, rs.datasetKey(), map[string]interface{}{ + "currentRevision": current, + "previousRevision": previous, + }).Err() + if err != nil { + return err + } + + return nil +} diff --git a/ikuzo/service/x/bulk/handle_upload.go b/ikuzo/service/x/bulk/handle_upload.go index ab63f381..2be1ea73 100644 --- a/ikuzo/service/x/bulk/handle_upload.go +++ b/ikuzo/service/x/bulk/handle_upload.go @@ -1,12 +1,9 @@ package bulk import ( - "fmt" "net/http" - "strings" "github.com/go-chi/render" - "github.com/minio/minio-go/v7" "github.com/rs/zerolog/log" "github.com/delving/hub3/hub3/fragments" @@ -69,14 +66,3 @@ func (s *Service) NewParser() *Parser { return p } - -func (s *Service) GetGraph(hubID string, versionID string) error { - parts := strings.Split(hubID, "_") - path := fmt.Sprintf("%s/%s/fg/%s.json", parts[0], parts[1], parts[2]) - _, err := s.mc.GetObject(s.ctx, s.blobCfg.BucketName, path, minio.GetObjectOptions{VersionID: versionID}) - if err != nil { - return err - } - - return nil -} diff --git a/ikuzo/service/x/bulk/options.go b/ikuzo/service/x/bulk/options.go index 4e1dbe36..462ac741 100644 --- a/ikuzo/service/x/bulk/options.go +++ b/ikuzo/service/x/bulk/options.go @@ -7,17 +7,6 @@ import ( type Option func(*Service) error -func SetDBPath(path string) Option { - return func(s *Service) error { - if path == "" { - return nil - } - - s.dbPath = path - return nil - } -} - type BlobConfig struct { Endpoint string `json:"endpoint,omitempty"` AccessKeyID string `json:"accessKeyID,omitempty"` diff --git a/ikuzo/service/x/bulk/parser.go b/ikuzo/service/x/bulk/parser.go index 8becb951..39712dad 100644 --- a/ikuzo/service/x/bulk/parser.go +++ b/ikuzo/service/x/bulk/parser.go @@ -29,7 +29,6 @@ import ( "sync/atomic" "time" - "github.com/minio/minio-go/v7" "github.com/oklog/ulid" "github.com/rs/zerolog" @@ -59,6 +58,19 @@ type Parser struct { s *Service graphs map[string]*fragments.FragmentBuilder // TODO: probably remove this later rawRequest bytes.Buffer + store *redisStore +} + +func (p *Parser) setUpdateDataset(ds *models.DataSet) { + p.m.Lock() + p.ds = ds + p.m.Unlock() +} + +func (p *Parser) dataset() *models.DataSet { + p.m.RLock() + defer p.m.RUnlock() + return p.ds } func (p *Parser) Parse(ctx context.Context, r io.Reader) error { @@ -96,8 +108,11 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { atomic.AddUint64(&p.stats.JSONErrors, 1) continue } - - p.once.Do(func() { p.setDataSet(&req) }) + if p.dataset() == nil { + p.once.Do(func() { + p.setDataSet(&req) + }) + } select { case actions <- req: @@ -152,6 +167,7 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { } p.s.log.Info().Msgf("graphs: %d", len(p.graphs)) + p.s.log.Info().Msgf("%#v", config.Config.RDF) if config.Config.RDF.RDFStoreEnabled { if config.Config.RDF.StoreSparqlDeltas { @@ -168,8 +184,37 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { return nil } +// TODO: implement this func (p *Parser) StoreGraphDeltas() error { - // extract the ids and get a query for all stored entries + p.s.log.Info().Str("storeName", p.store.revisionSetName(rdfType, false)).Int("graphs", len(p.sparqlUpdates)).Msg("store deltas") + updates := []fragments.SparqlUpdate{} + for _, su := range p.sparqlUpdates { + known, err := p.store.addID(su.HubID, rdfType, su.GetHash()) + if err != nil { + return err + } + if known { + continue + } + p.s.log.Info().Msgf("unknown hubID: %s", su.HubID) + updates = append(updates, su) + if err := p.store.storeRDFData(su); err != nil { + return err + } + } + + p.s.log.Info().Int("submitted", len(p.sparqlUpdates)).Int("changed", len(updates)).Msg("after delta check") + + p.sparqlUpdates = updates + if errs := p.RDFBulkInsert(); errs != nil { + return errs[0] + } + + return nil +} + +// TODO: remove this +func (p *Parser) StoreGraphDeltasOld() error { ids := []string{} for _, su := range p.sparqlUpdates { ids = append(ids, su.HubID) @@ -213,15 +258,15 @@ func (p *Parser) StoreGraphDeltas() error { } // update the diffs in the triple store - errs := fragments.UpdateViaSparql(p.ds.OrgID, sparqlUpdate.String()) + errs := fragments.UpdateViaSparql(p.dataset().OrgID, sparqlUpdate.String()) if errs != nil { return errs[0] } // store the graphs in the S3 bucket - if err := p.StoreGraphs(changed); err != nil { - return err - } + // if err := p.StoreGraphs(changed); err != nil { + // return err + // } // store the update hashes if err := p.s.storeUpdatedHashes(changed); err != nil { @@ -248,42 +293,10 @@ func (p *Parser) storeRequest() error { return os.WriteFile(path, p.rawRequest.Bytes(), os.ModePerm) } -func (p *Parser) StoreGraphs(diffs []*DiffConfig) error { - opts := minio.SnowballOptions{ - Opts: minio.PutObjectOptions{ContentType: "application/json"}, - // Keep in memory. We use this since we have small total payload. - InMemory: true, - // Compress data when uploading to a MinIO host. - Compress: true, - } - - input := make(chan minio.SnowballObject, 10) - - go func() { - defer close(input) - for _, diff := range diffs { - - key := fmt.Sprintf("%s/%s/rdf/%s.ntriples", diff.su.OrgID, diff.su.Spec, diff.su.GetHash()) - input <- minio.SnowballObject{ - Key: key, - Size: int64(len(diff.su.Triples)), - ModTime: time.Now(), - Content: strings.NewReader(diff.su.Triples), - } - } - }() - - err := p.s.mc.PutObjectsSnowball(context.TODO(), p.s.blobCfg.BucketName, opts, input) - if err != nil { - return err - } - - return nil -} - // RDFBulkInsert inserts all triples from the bulkRequest in one SPARQL update statement func (p *Parser) RDFBulkInsert() []error { triplesStored, errs := fragments.RDFBulkInsert(p.ds.OrgID, p.sparqlUpdates) + p.stats.GraphsStored = uint64(len(p.sparqlUpdates)) p.sparqlUpdates = nil p.stats.TriplesStored = uint64(triplesStored) @@ -300,11 +313,11 @@ func containsString(s []string, e string) bool { return false } -func (p *Parser) setDataSet(req *Request) { +func (p *Parser) setDataSet(req *Request) error { ds, _, dsErr := models.GetOrCreateDataSet(req.OrgID, req.DatasetID) if dsErr != nil { p.s.log.Error().Err(dsErr).Msg("unable to get or create dataset") - return + return dsErr } if ds.RecordType == "" { @@ -331,8 +344,39 @@ func (p *Parser) setDataSet(req *Request) { p.stats.Spec = req.DatasetID p.stats.OrgID = req.OrgID - req.Revision = ds.Revision - p.ds = ds + p.stats.SpecRevision = uint64(ds.Revision) + p.setUpdateDataset(ds) + + p.store = &redisStore{ + orgID: req.OrgID, + spec: req.DatasetID, + c: p.s.rc, + } + + return nil +} + +func (p *Parser) dropGraphOrphans() error { + p.s.log.Info().Msg("dropping orphans") + orphans, err := p.store.findOrphans(rdfType) + if err != nil { + return err + } + p.s.log.Info().Int("orphanCount", len(orphans)).Msgf("%#v", orphans) + if len(orphans) == 0 { + return nil + } + + updateQuery, err := p.store.dropOrphansQuery(orphans) + if err != nil { + return err + } + + errs := fragments.UpdateViaSparql(p.stats.OrgID, updateQuery) + if len(errs) != 0 { + return errs[0] + } + return nil } func (p *Parser) dropOrphans(req *Request) error { @@ -349,14 +393,16 @@ func (p *Parser) dropOrphans(req *Request) error { if config.Config.RDF.RDFStoreEnabled { if config.Config.RDF.StoreSparqlDeltas { - orphans, err := p.s.findOrphans(p.ds.OrgID, p.ds.Spec, p.ds.Revision) - if err != nil { - return err - } - if err := p.s.dropOrphans(orphans); err != nil { - return err - } + go func() { + // block for orphanWait seconds to allow cluster to be in sync + timer := time.NewTimer(time.Second * time.Duration(5)) + <-timer.C + if err := p.dropGraphOrphans(); err != nil { + p.s.log.Error().Err(err).Msg("unable to drop graph orphans") + } + }() } else { + p.s.log.Info().Msg("wrong orphan") _, err := models.DeleteGraphsOrphansBySpec(p.ds.OrgID, p.ds.Spec, p.ds.Revision) if err != nil { return err @@ -380,10 +426,28 @@ func addLogger(datasetID string) zerolog.Logger { } } +func (p *Parser) IncrementRevision() (int, error) { + previous := p.ds.Revision + ds, err := p.dataset().IncrementRevision() + if err != nil { + return 0, err + } + + if err := p.store.SetRevision(ds.Revision, previous); err != nil { + return 0, err + } + + p.setUpdateDataset(ds) + + p.stats.SpecRevision = uint64(ds.Revision) + + return ds.Revision, nil +} + func (p *Parser) process(ctx context.Context, req *Request) error { subLogger := addLogger(req.DatasetID) - if p.ds == nil { + if p.dataset() == nil { return fmt.Errorf("unable to get dataset") } @@ -397,13 +461,12 @@ func (p *Parser) process(ctx context.Context, req *Request) error { return err } case "increment_revision": - ds, err := p.ds.IncrementRevision() + revision, err := p.IncrementRevision() if err != nil { subLogger.Error().Err(err).Str("datasetID", req.DatasetID).Msg("Unable to increment DataSet") - return err } - subLogger.Info().Str("datasetID", req.DatasetID).Int("revision", ds.Revision).Msg("Incremented dataset") + subLogger.Info().Str("datasetID", req.DatasetID).Int("revision", revision).Msg("Incremented dataset") case "clear_orphans", "drop_orphans": // clear triples if err := p.dropOrphans(req); err != nil { @@ -578,6 +641,7 @@ type Stats struct { RecordsStored uint64 `json:"recordsStored"` // originally json was records_stored JSONErrors uint64 `json:"jsonErrors"` TriplesStored uint64 `json:"triplesStored"` + GraphsStored uint64 `json:"graphsStored"` PostHooksSubmitted uint64 `json:"postHooksSubmitted"` // ContentHashMatches uint64 `json:"contentHashMatches"` // originally json was content_hash_matches } diff --git a/ikuzo/service/x/bulk/service.go b/ikuzo/service/x/bulk/service.go index 5248068c..72ed4908 100644 --- a/ikuzo/service/x/bulk/service.go +++ b/ikuzo/service/x/bulk/service.go @@ -16,19 +16,16 @@ package bulk import ( "context" - "database/sql" "fmt" "net/http" stdlog "log" + "github.com/go-redis/redis/v8" _ "github.com/marcboeker/go-duckdb" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "github.com/go-chi/chi" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/delving/hub3/ikuzo/domain" "github.com/delving/hub3/ikuzo/service/x/index" @@ -42,19 +39,16 @@ type Service struct { postHooks map[string][]domain.PostHookService log zerolog.Logger orgs domain.OrgConfigRetriever - dbPath string - db *sql.DB ctx context.Context blobCfg BlobConfig - mc *minio.Client logRequests bool + rc *redis.Client } func NewService(options ...Option) (*Service, error) { s := &Service{ indexTypes: []string{"v2"}, postHooks: map[string][]domain.PostHookService{}, - dbPath: "hub3-bulksvc.db", ctx: context.Background(), } @@ -65,91 +59,30 @@ func NewService(options ...Option) (*Service, error) { } } - if err := s.setupDB(); err != nil { - stdlog.Printf("unable to setup db: %q", err) + if err := s.setupRedis(); err != nil { + stdlog.Printf("unable to setup redis: %q", err) return nil, err } - if blobSetupErr := s.setupBlobStorage(); blobSetupErr != nil { - return nil, blobSetupErr - } - return s, nil } -func (s *Service) setupDB() error { - db, err := sql.Open("duckdb", s.dbPath+"?access_mode=READ_WRITE") - if err != nil { - s.log.Error().Err(err).Msgf("unable to open duckdb at %s", s.dbPath) - return err - } - - pingErr := db.Ping() - if pingErr != nil { - return pingErr - } - s.db = db - - if setupErr := s.setupTables(); setupErr != nil { - return setupErr - } - - s.log.Info().Str("path", s.dbPath).Msg("started duckdb for bulk service") - log.Printf("started duckdb for bulk service; %q", s.dbPath) - - return nil -} - -func (s *Service) setupTables() error { - query := `create table if not exists dataset ( - orgID text, - datasetID text, - published boolean, -); -create unique index if not exists org_dataset_idx ON dataset (orgID, datasetID); - ` - _, err := s.db.Exec(query) - return err -} - -func (s *Service) setupBlobStorage() error { - if s.blobCfg.Endpoint == "" { - return fmt.Errorf("blob storage config must have endpoint") - } - // Initialize minio client object. - minioClient, err := minio.New(s.blobCfg.Endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(s.blobCfg.AccessKeyID, s.blobCfg.SecretAccessKey, ""), - Secure: s.blobCfg.UseSSL, +func (s *Service) setupRedis() error { + // Create a new Redis client + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Redis server address + Password: "sOmE_sEcUrE_pAsS", // Redis password (if required) + DB: 1, // Redis database index }) - if err != nil { - return err - } - - s.mc = minioClient - err = minioClient.MakeBucket(s.ctx, s.blobCfg.BucketName, minio.MakeBucketOptions{}) - if err != nil { - // Check to see if we already own this bucket (which happens if you run this twice) - exists, errBucketExists := minioClient.BucketExists(s.ctx, s.blobCfg.BucketName) - if errBucketExists == nil && exists { - log.Printf("We already own %s\n", s.blobCfg.BucketName) - } else { - return err - } - } else { - s.log.Info().Msgf("Successfully created %s\n", s.blobCfg.BucketName) - } - - bucketCfg, err := s.mc.GetBucketVersioning(s.ctx, s.blobCfg.BucketName) + // Ping the Redis server to check if it's running + pong, err := client.Ping(context.Background()).Result() if err != nil { return err } + fmt.Println("Connected to Redis:", pong) - if !bucketCfg.Enabled() { - if err := s.mc.EnableVersioning(s.ctx, s.blobCfg.BucketName); err != nil { - return fmt.Errorf("version must be enabled for bucket %s; %w", s.blobCfg.BucketName, err) - } - } + s.rc = client return nil } @@ -161,7 +94,6 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (s *Service) Shutdown(ctx context.Context) error { - s.db.Close() return s.index.Shutdown(ctx) } From ae86dcd708640b6ab25ededeb7746a878f8c5380 Mon Sep 17 00:00:00 2001 From: Sjoerd Siebinga Date: Wed, 24 May 2023 08:08:45 +0200 Subject: [PATCH 7/7] chore(go.mod): update dependencies and remove unused ones to keep the project up to date and reduce clutter chore(go.sum): update dependencies --- go.mod | 8 +++----- go.sum | 15 ++++----------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 1e0c1f57..ac50348a 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( github.com/go-chi/docgen v1.0.5 github.com/go-chi/render v1.0.1 github.com/go-git/go-git/v5 v5.4.3-0.20210630082519-b4368b2a2ca4 + github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redis/v8 v8.11.5 github.com/golang-migrate/migrate/v4 v4.15.1 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 @@ -43,7 +45,6 @@ require ( github.com/marcboeker/go-duckdb v1.2.2 github.com/matryer/is v1.4.0 github.com/microcosm-cc/bluemonday v1.0.19 - github.com/minio/minio-go/v7 v7.0.26 github.com/mitchellh/go-homedir v1.1.0 github.com/nats-io/stan.go v0.10.2 github.com/oklog/ulid v1.3.1 @@ -94,12 +95,12 @@ require ( github.com/containerd/containerd v1.6.8 // indirect github.com/containerd/continuity v0.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/djherbis/buffer v1.2.0 // indirect github.com/djherbis/nio/v3 v3.0.1 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/docker v20.10.9+incompatible // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/fatih/color v1.13.0 // indirect @@ -131,7 +132,6 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/klauspost/compress v1.16.0 // indirect - github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -140,8 +140,6 @@ require ( github.com/mattn/go-runewidth v0.0.13 // indirect github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect - github.com/minio/md5-simd v1.1.2 // indirect - github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/sys/mount v0.2.0 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect diff --git a/go.sum b/go.sum index becd00a1..f793f358 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27 github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dhui/dktest v0.3.7 h1:jWjWgHAPDAdqgUr7lAsB3bqB2DKWC3OaA+isfekjRew= github.com/dhui/dktest v0.3.7/go.mod h1:nYMOkafiA07WchSwKnKFUSbGMb2hMm5DrCGiXYG6gwM= @@ -399,7 +401,6 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNE github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elastic/go-elasticsearch/v8 v8.0.0-20200716073932-4f0b75746dc1 h1:fp+4nNyJtg/qflGLXJQivQJS1VTRq2RlC7LZ9bnCejM= @@ -508,6 +509,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -850,10 +853,6 @@ github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE= -github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 h1:P6Mw09IOeKKS4klYhjzHzaEx2RcNshynjfDhzCQ8BoE= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49/go.mod h1:dQr9I8Xw26daWGE/crxUleRxmpFI5uhfedWqRNHHq0c= github.com/knakk/rdf v0.0.0-20190304171630-8521bf4c5042 h1:Vzdm5hdlLdpJOKK+hKtkV5u7xGZmNW6aUBjGcTfwx84= @@ -948,12 +947,6 @@ github.com/microcosm-cc/bluemonday v1.0.19/go.mod h1:QNzV2UbLK2/53oIIwTOyLUSABMk github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= -github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go/v7 v7.0.26 h1:D0HK+8793etZfRY/vHhDmFaP+vmT41K3K4JV9vmZCBQ= -github.com/minio/minio-go/v7 v7.0.26/go.mod h1:x81+AX5gHSfCSqw7jxRKHvxUXMlE5uKX0Vb75Xk5yYg= -github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= -github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=