diff --git a/config/config.go b/config/config.go index d4f6af32..bc7befce 100644 --- a/config/config.go +++ b/config/config.go @@ -22,10 +22,11 @@ import ( "os" "strings" - "github.com/delving/hub3/ikuzo/logger" homedir "github.com/mitchellh/go-homedir" "github.com/rs/zerolog" "github.com/spf13/viper" + + "github.com/delving/hub3/ikuzo/logger" ) var ( @@ -156,20 +157,23 @@ type HTTP struct { // RDF holds all the configuration for SPARQL queries and RDF conversions type RDF struct { - SparqlEnabled bool `json:"sparqlEnabled"` // Enable the SPARQL proxy - SparqlHost string `json:"sparqlHost"` // the base-url to the SPARQL endpoint including the scheme and the port - SparqlPath string `json:"sparqlPath"` // the relative path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build - SparqlUpdatePath string `json:"sparqlUpdatePath"` // the relative path of the update endpoint. This can should contain the database name that is injected when the sparql endpoint is build - GraphStorePath string `json:"dataPath"` // the relative GraphStore path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build - BaseURL string `json:"baseUrl"` // the RDF baseUrl used for minting new URIs (should not include scheme) - BaseScheme string `json:"baseScheme"` // the scheme (http or https) used in the baseURL - RDFStoreEnabled bool `json:"rdfStoreEnabled"` // Store to Triple Store while saving RDF + SparqlEnabled bool `json:"sparqlEnabled,omitempty"` // Enable the SPARQL proxy + SparqlHost string `json:"sparqlHost,omitempty"` // the base-url to the SPARQL endpoint including the scheme and the port + SparqlUsername string + SparqlPassword string + SparqlPath string `json:"sparqlPath,omitempty"` // the relative path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build + SparqlUpdatePath string `json:"sparqlUpdatePath,omitempty"` // the relative path of the update endpoint. This can should contain the database name that is injected when the sparql endpoint is build + GraphStorePath string `json:"dataPath,omitempty"` // the relative GraphStore path of the endpoint. This can should contain the database name that is injected when the sparql endpoint is build + BaseURL string `json:"baseUrl,omitempty"` // the RDF baseUrl used for minting new URIs (should not include scheme) + BaseScheme string `json:"baseScheme,omitempty"` // the scheme (http or https) used in the baseURL + RDFStoreEnabled bool `json:"rdfStoreEnabled,omitempty"` // Store to Triple Store while saving RDF + StoreSparqlDeltas bool `json:"storeSparqlDeltas,omitempty"` + Tags string `json:"tags,omitempty" mapstructure:"tags"` + DefaultFormat string `json:"defaultFormat,omitempty"` + RDFStoreTags []string `json:"rdfStoreTags,omitempty"` // the tags that trigger storage in the triple-store // the RDF entryPoints. Lookups are made on the fully qualified URIs. It is sometimes needed to support other baseUrls as well. // The entry-points need to be fully qualified, i.e. with their scheme. - RoutedEntryPoints []string `json:"RoutedEntryPoints"` - Tags string `json:"tags" mapstructure:"tags"` - DefaultFormat string `json:"defaultFormat"` - RDFStoreTags []string `json:"rdfStoreTags"` // the tags that trigger storage in the triple-store + RoutedEntryPoints []string `json:"RoutedEntryPoints,omitempty"` } func (rdf *RDF) HasStoreTag(tags []string) bool { @@ -262,7 +266,7 @@ type EAD struct { GenreFormDefault string `json:"genreFormDefault"` TreeFields []string `json:"treeFields"` SearchFields []string `json:"searchFields"` - Genreforms []string `json:"genreforms"` + Genreforms []string `json:"genreforms"` } func setDefaults() { diff --git a/docker-compose.yml b/docker-compose.yml index 500aa513..874bbce1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,20 +19,49 @@ services: networks: - esnet + minio: + image: minio/minio + ports: + - "9010:9000" + - "9001:9001" + volumes: + - ./docker-data/minio_storage:/data + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: Strong#Pass#2023 + MINIO_ACCESS_KEY: "kPxADQXcHFbR9i1V" + MINIO_SECRET_KEY: "LiwsjNxulYJVyey50U1ypTabt8BiMWiU" + command: server --console-address ":9001" /data + fuseki: image: zacanbot/fuseki container_name: hub3-fuseki environment: - JVM_ARGS: -Xmx1g + - JVM_ARGS=-Xmx1g + - ADMIN_PASSWORD=pw123 ports: - "3033:3030" volumes: - ./docker-data/fuseki/data:/data/fuseki - environment: - - ADMIN_PASSWORD=pw123 command: ["/jena-fuseki/fuseki-server", "--set", "tdb:unionDefaultGraph=true"] + redis: + image: "redis:alpine" + + command: redis-server --requirepass sOmE_sEcUrE_pAsS + + ports: + - "6379:6379" + + volumes: + - $PWD/docker-data/redis-data:/var/lib/redis + - $PWD/redis.conf:/usr/local/etc/redis/redis.conf + + environment: + - REDIS_REPLICATION_MODE=master + + nats: image: nats-streaming ports: @@ -60,6 +89,5 @@ services: environment: POSTGRES_PASSWORD: pw123 - networks: esnet: diff --git a/go.mod b/go.mod index bbcac779..ac50348a 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( github.com/go-chi/docgen v1.0.5 github.com/go-chi/render v1.0.1 github.com/go-git/go-git/v5 v5.4.3-0.20210630082519-b4368b2a2ca4 + github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redis/v8 v8.11.5 github.com/golang-migrate/migrate/v4 v4.15.1 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.8 @@ -40,10 +42,12 @@ require ( github.com/lib/pq v1.10.6 github.com/linkeddata/gojsonld v0.0.0-20170418210642-4f5db6791326 github.com/mailgun/groupcache v1.3.0 + github.com/marcboeker/go-duckdb v1.2.2 github.com/matryer/is v1.4.0 github.com/microcosm-cc/bluemonday v1.0.19 github.com/mitchellh/go-homedir v1.1.0 github.com/nats-io/stan.go v0.10.2 + github.com/oklog/ulid v1.3.1 github.com/olivere/elastic/v7 v7.0.32 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 @@ -67,7 +71,7 @@ require ( github.com/tidwall/gjson v1.12.1 github.com/valyala/fasthttp v1.35.0 golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde - golang.org/x/text v0.3.7 + golang.org/x/text v0.7.0 google.golang.org/protobuf v1.28.1 ) @@ -91,6 +95,7 @@ require ( github.com/containerd/containerd v1.6.8 // indirect github.com/containerd/continuity v0.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/djherbis/buffer v1.2.0 // indirect github.com/djherbis/nio/v3 v3.0.1 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect @@ -126,7 +131,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/compress v1.16.0 // indirect github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -162,7 +167,7 @@ require ( github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/rychipman/easylex v0.0.0-20160129204217-49ee7767142f // indirect github.com/satori/go.uuid v1.2.0 // indirect - github.com/sergi/go-diff v1.2.0 // indirect + github.com/sergi/go-diff v1.3.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/snabb/diagio v1.0.0 // indirect github.com/spf13/afero v1.9.2 // indirect @@ -179,9 +184,9 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.10.0 // indirect - golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect - golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect - golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.5.0 // indirect golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect google.golang.org/genproto v0.0.0-20220829175752-36a9c930ecbf // indirect google.golang.org/grpc v1.49.0 // indirect diff --git a/go.sum b/go.sum index 1123b5ca..f793f358 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27 github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0= github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dhui/dktest v0.3.7 h1:jWjWgHAPDAdqgUr7lAsB3bqB2DKWC3OaA+isfekjRew= github.com/dhui/dktest v0.3.7/go.mod h1:nYMOkafiA07WchSwKnKFUSbGMb2hMm5DrCGiXYG6gwM= @@ -379,7 +381,6 @@ github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyG github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= @@ -508,6 +509,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= @@ -848,8 +851,8 @@ github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49 h1:P6Mw09IOeKKS4klYhjzHzaEx2RcNshynjfDhzCQ8BoE= github.com/knakk/digest v0.0.0-20160404164910-fd45becddc49/go.mod h1:dQr9I8Xw26daWGE/crxUleRxmpFI5uhfedWqRNHHq0c= github.com/knakk/rdf v0.0.0-20190304171630-8521bf4c5042 h1:Vzdm5hdlLdpJOKK+hKtkV5u7xGZmNW6aUBjGcTfwx84= @@ -895,6 +898,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/marcboeker/go-duckdb v1.2.2 h1:Qy5yW83qAcZgsEmGo+pkEZeZvxA2dzuQNPLh7wcorb0= +github.com/marcboeker/go-duckdb v1.2.2/go.mod h1:wm91jO2GNKa6iO9NTcjXIRsW+/ykPoJbQcHSXhdAl28= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/pkger v0.15.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= @@ -1003,6 +1008,7 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E= @@ -1181,8 +1187,8 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod github.com/segmentio/ksuid v1.0.3 h1:FoResxvleQwYiPAVKe1tMUlEirodZqlqglIuFsdDntY= github.com/segmentio/ksuid v1.0.3/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= -github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= -github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= @@ -1373,8 +1379,8 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1487,8 +1493,8 @@ golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY= -golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1641,11 +1647,11 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= -golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1654,8 +1660,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/hub3.toml b/hub3.toml index ea8ea960..a6b50c3d 100644 --- a/hub3.toml +++ b/hub3.toml @@ -1,55 +1,27 @@ -[org.dcn] -domains = ["localhost:3000"] +[org.nl-hana] +customID = "NL-HaNA" +domains = ["localhost:3000", "hub3.nl-hana:3000"] default = true -[[org.dcn.sitemaps]] +[[org.nl-hana.sitemaps]] id = "all" baseURL = "http://localhost:3000" filters = "meta.tags:narthex" -[org.dcn.oaipmh] +[org.nl-hana.oaipmh] enabled = true adminEmails = ["info@delving.eu"] repositoryName = "DCN OAI-PMH repository" -[org.hub3] -domains = ["localhost:3001"] -customID = "hub3" -default = true - - -################################### -# NIOD # -################################### -[org.niod] -domains = ["hub3.niod:3000"] - -[org.niod.elasticsearch] -# indexTypes enabled types for the bulk index service -indexTypes = ["v2"] -# configuration for MimimumShouldMatch -minimumShouldMatch = "2<70%" -shards = 3 -replicas = 2 -# the name of the index. If empty it is the name of the OrgId -# indexName = "dcn" -# if non-empty digital objects will be indexed in a dedicated v2 index -# digitalObjectSuffix = "scans" - -[org.niod.sparql] -enabled = true -# the fully qualified URL including the port -sparqlHost = "http://localhost:3033" -# the path to the SPARQL endpoint. A '%s' can be inserted in the path to be replaced with the orgId. -sparqlPath = "/%s/sparql" -# sparqlUpdate path is the path to the write endpoint. A '%s' can be inserted in the path to be replaced with the orgId. -sparqlUpdatePath = "/%s/update" -# dataPath is the path to the GraphStore Protocol endpoint -dataPath = "/%s/data" -# Enable storing of the RDF records in the triple store specified in sparqlUpdatePath (default: false) -storeRecords = false -userName = "" -password = "" +[bulk] +dbPath = "hub3-bulksvc2.db" +storeRequests = false +[bulk.minio] +endpoint = "127.0.0.1:9010" +accessKeyId = "kPxADQXcHFbR9i1V" +secretAccessKey = "LiwsjNxulYJVyey50U1ypTabt8BiMWiU" +useSSL = false +bucketName = "bulk-svc" [http] # all the configuration for the http sub-command @@ -137,7 +109,7 @@ apikey = '' [logging] devmode = true sentryDSN = "" -level = "debug" +level = "info" withCaller = true consoleLogger = true # "*" ignores all 404 @@ -148,9 +120,13 @@ exclude404Path = [] [rdf] # Enable storing of the RDF records in the triple store specified in sparqlUpdatePath (default: false) -rdfStoreEnabled = false +rdfStoreEnabled = true +# enable storing only changed triples in the triple store +storeSparqlDeltas = true # the fully qualified URL including the port sparqlHost = "http://localhost:3033" +sparqlUsername = "admin" +sparqlPassword = "pw123" # the path to the SPARQL endpoint. A '%s' can be inserted in the path to be replaced with the orgId. #sparqlPath = "/bigdata/namespace/%s/sparql" sparqlPath = "/%s/sparql" diff --git a/hub3/fragments/graph.go b/hub3/fragments/graph.go index 7dc53d6b..3357e10a 100644 --- a/hub3/fragments/graph.go +++ b/hub3/fragments/graph.go @@ -36,13 +36,16 @@ func (fg *FragmentGraph) Marshal() ([]byte, error) { return json.Marshal(fg) } -func (fg *FragmentGraph) Reader() (io.Reader, error) { +func (fg *FragmentGraph) Reader() (int, io.Reader, error) { + // TODO: idempotency is an issue + fg.Meta.Modified = 0 + fg.Meta.Revision = 0 b, err := json.MarshalIndent(fg, "", " ") if err != nil { - return nil, err + return 0, nil, err } - return bytes.NewReader(b), nil + return len(b), bytes.NewReader(b), nil } func (fg *FragmentGraph) IndexMessage() (*domainpb.IndexMessage, error) { diff --git a/hub3/fragments/rdfstream.go b/hub3/fragments/rdfstream.go index 9cdc9ca6..f243d187 100644 --- a/hub3/fragments/rdfstream.go +++ b/hub3/fragments/rdfstream.go @@ -23,10 +23,11 @@ import ( "log" "strings" - c "github.com/delving/hub3/config" - "github.com/delving/hub3/ikuzo/domain/domainpb" rdf "github.com/kiivihal/gon3" r "github.com/kiivihal/rdf2go" + + c "github.com/delving/hub3/config" + "github.com/delving/hub3/ikuzo/domain/domainpb" ) // parseTurtleFile creates a graph from an uploaded file @@ -241,6 +242,8 @@ func (upl *RDFUploader) IndexFragments(bi BulkIndex) (int, error) { Triples: triples.String(), NamedGraphURI: fg.Meta.NamedGraphURI, Spec: fg.Meta.Spec, + HubID: fg.Meta.HubID, + OrgID: fg.Meta.OrgID, SpecRevision: revision, } triples.Reset() diff --git a/hub3/fragments/resource.go b/hub3/fragments/resource.go index d146e44a..6348561c 100644 --- a/hub3/fragments/resource.go +++ b/hub3/fragments/resource.go @@ -25,15 +25,16 @@ import ( "time" "unicode" + r "github.com/kiivihal/rdf2go" + elastic "github.com/olivere/elastic/v7" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + c "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/index" "github.com/delving/hub3/ikuzo/rdf" "github.com/delving/hub3/ikuzo/search" "github.com/delving/hub3/ikuzo/storage/x/memory" - r "github.com/kiivihal/rdf2go" - elastic "github.com/olivere/elastic/v7" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" ) const ( @@ -543,7 +544,6 @@ func (tp *TreePaging) CalculatePaging() { tp.HasNext = true tp.PageNext = tp.PageLast + 1 } - return } func (tp *TreePaging) setFirstLastPage() { @@ -568,7 +568,6 @@ func (tp *TreePaging) setFirstLastPage() { } tp.PageFirst = min tp.PageLast = max - return } // TreePageEntry contains information how to merge pages from different responses. @@ -887,7 +886,7 @@ func CreateDateRange(period string) (IndexRange, error) { ir.Greater, _ = padYears(parts[0], true) ir.Less, _ = padYears(parts[1], false) default: - return ir, fmt.Errorf("Unable to create data range for: %#v", parts) + return ir, fmt.Errorf("unable to create data range for: %#v", parts) } if err := ir.Valid(); err != nil { @@ -897,11 +896,11 @@ func CreateDateRange(period string) (IndexRange, error) { return ir, nil } -func padYears(year string, start bool) (string, error) { - parts := strings.Split(year, "-") +func padYears(date string, start bool) (string, error) { + parts := strings.Split(date, "-") switch len(parts) { case 3: - return year, nil + return date, nil case 2: year := parts[0] month := parts[1] @@ -929,15 +928,15 @@ func padYears(year string, start bool) (string, error) { return fmt.Sprintf("%s-12-31", year), nil } default: - // try to hyphenate the date - date, err := hyphenateDate(year) + // try to hyphenate the formattedDate + formattedDate, err := hyphenateDate(year) if err != nil { return "", err } - return padYears(date, start) + return padYears(formattedDate, start) } } - return "", fmt.Errorf("unsupported case for padding: %s", year) + return "", fmt.Errorf("unsupported case for padding: %s", date) } // hyphenateDate converts a string of date string into the hyphenated form. @@ -951,7 +950,7 @@ func hyphenateDate(date string) (string, error) { case 8: return fmt.Sprintf("%s-%s-%s", date[:4], date[4:6], date[6:]), nil } - return "", fmt.Errorf("Unable to hyphenate date string: %#v", date) + return "", fmt.Errorf("unable to hyphenate date string: %#v", date) } func splitPeriod(c rune) bool { @@ -976,12 +975,12 @@ func (fr *FragmentResource) GetLabel() (label, language string) { // SetContextLevels sets FragmentReferrerContext to each level from the root func (rm *ResourceMap) SetContextLevels(subjectURI string) (map[string]*FragmentResource, error) { if len(rm.resources) == 0 { - return nil, fmt.Errorf("ResourceMap cannot be empty for subjecURI: %s", subjectURI) + return nil, fmt.Errorf("subjectURI %q must be present in ResourceMap", subjectURI) } subject, ok := rm.GetResource(subjectURI) if !ok { - return nil, fmt.Errorf("Subject %s is not part of the graph", subjectURI) + return nil, fmt.Errorf("subject %s is not part of the graph", subjectURI) } linkedObjects := map[string]*FragmentResource{} @@ -1110,9 +1109,9 @@ func (re *ResourceEntry) AsTriple(subject rdf.Subject) (*rdf.Triple, error) { case re.Language != "": object, err = rdf.NewLiteralWithLang(re.Value, re.Language) case re.DataType != "": - dt, err := rdf.NewIRI(re.DataType) - if err != nil { - return nil, err + dt, iriErr := rdf.NewIRI(re.DataType) + if iriErr != nil { + return nil, iriErr } object, err = rdf.NewLiteralWithType(re.Value, dt) default: @@ -1183,7 +1182,7 @@ func NewResourceMap(orgID string, g *r.Graph) (*ResourceMap, error) { } if g.Len() == 0 { - return rm, fmt.Errorf("The graph cannot be empty") + return rm, fmt.Errorf("the graph cannot be empty") } seen := 0 @@ -1309,8 +1308,6 @@ func (f *Fragment) SetPath(contextPath string) { f.NestedPath = append(f.NestedPath, path) f.Path = append(f.Path, typedLabel) } - - return } // CreateTriple creates a *rdf2go.Triple from a Fragment diff --git a/hub3/fragments/sparql.go b/hub3/fragments/sparql.go index 223612ff..e0bd36fb 100644 --- a/hub3/fragments/sparql.go +++ b/hub3/fragments/sparql.go @@ -26,16 +26,30 @@ import ( "text/template" "time" - "github.com/delving/hub3/config" + "github.com/OneOfOne/xxhash" "github.com/parnurzeal/gorequest" + + "github.com/delving/hub3/config" ) // SparqlUpdate contains the elements to perform a SPARQL update query type SparqlUpdate struct { - Triples string `json:"triples"` - NamedGraphURI string `json:"graphUri"` - Spec string `json:"datasetSpec"` - SpecRevision int `json:"specRevision"` + Triples string `json:"triples,omitempty"` + NamedGraphURI string `json:"graphUri,omitempty"` + OrgID string `json:"orgID,omitempty"` + HubID string `json:"hubID,omitempty"` + Spec string `json:"datasetSpec,omitempty"` + SpecRevision int `json:"specRevision,omitempty"` + RDFHash string `json:"rdfHash,omitempty"` + SkipDrop bool +} + +func (su *SparqlUpdate) GetHash() string { + if su.RDFHash != "" { + return su.RDFHash + } + + return hash(su.Triples) } // TripleCount counts the number of Ntriples in a string @@ -69,6 +83,7 @@ func executeTemplate(tmplString string, name string, model interface{}) string { func (su SparqlUpdate) String() string { t := `GRAPH <{{.NamedGraphURI}}> { <{{.NamedGraphURI}}> "{{.Spec}}" . + <{{.NamedGraphURI}}> "{{.RDFHash}}" . <{{.NamedGraphURI}}> "{{.SpecRevision}}"^^ . {{ .Triples }} }` @@ -86,11 +101,13 @@ func RDFBulkInsert(orgID string, sparqlUpdates []SparqlUpdate) (int, []error) { triplesStored := 0 for i, v := range sparqlUpdates { strs[i] = v.String() - graphs[i] = fmt.Sprintf("DROP GRAPH <%s>;", v.NamedGraphURI) + if v.SkipDrop { + graphs[i] = fmt.Sprintf("DROP GRAPH <%s>;", v.NamedGraphURI) + } count, err := v.TripleCount() if err != nil { log.Printf("Unable to count triples: %s", err) - return 0, []error{fmt.Errorf("Unable to count triples for %s because :%s", strs[i], err)} + return 0, []error{fmt.Errorf("unable to count triples for %s because :%s", strs[i], err)} } triplesStored += count } @@ -105,6 +122,9 @@ func RDFBulkInsert(orgID string, sparqlUpdates []SparqlUpdate) (int, []error) { // UpdateViaSparql is a post to sparql function that tasks a valid SPARQL update query func UpdateViaSparql(orgID, update string) []error { request := gorequest.New() + if config.Config.SparqlPassword != "" && config.Config.SparqlUsername != "" { + request = request.SetBasicAuth(config.Config.SparqlUsername, config.Config.SparqlPassword) + } postURL := config.Config.GetSparqlUpdateEndpoint(orgID, "") parameters := url.Values{} @@ -123,8 +143,19 @@ func UpdateViaSparql(orgID, update string) []error { if resp.StatusCode != 200 && resp.StatusCode != 201 { // log.Println(body) log.Printf("unable to store sparqlUpdate: %s", update) - log.Println(resp) + // log.Println(resp) return []error{fmt.Errorf("store error for SparqlUpdate:%s", body)} } return nil } + +type hasher string + +func hash(input string) string { + hash := xxhash.Checksum64([]byte(input)) + return fmt.Sprintf("%d", hash) +} + +func getHash(input fmt.Stringer) hasher { + return hasher(hash(input.String())) +} diff --git a/hub3/models/dataset.go b/hub3/models/dataset.go index fa1a5805..d7512942 100644 --- a/hub3/models/dataset.go +++ b/hub3/models/dataset.go @@ -23,11 +23,12 @@ import ( "time" "github.com/asdine/storm/q" + wp "github.com/gammazero/workerpool" + "github.com/rs/zerolog/log" + c "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/fragments" "github.com/delving/hub3/hub3/index" - wp "github.com/gammazero/workerpool" - "github.com/rs/zerolog/log" elastic "github.com/olivere/elastic/v7" ) @@ -719,7 +720,7 @@ func (ds DataSet) deleteAllIndexRecords(ctx context.Context, wp *wp.WorkerPool) // DropOrphans removes all records of different revision that the current from the attached datastores func (ds DataSet) DropOrphans(ctx context.Context, p *elastic.BulkProcessor, wp *wp.WorkerPool) (bool, error) { - if c.Config.RDF.RDFStoreEnabled { + if c.Config.RDF.RDFStoreEnabled && !c.Config.RDF.StoreSparqlDeltas { ok, err := ds.deleteGraphsOrphans() if !ok || err != nil { log.Warn().Msgf("Unable to remove RDF orphan graphs from spec %s: %s", ds.Spec, err) diff --git a/ikuzo/ikuzoctl/cmd/config/bulk.go b/ikuzo/ikuzoctl/cmd/config/bulk.go new file mode 100644 index 00000000..b002d0f6 --- /dev/null +++ b/ikuzo/ikuzoctl/cmd/config/bulk.go @@ -0,0 +1,13 @@ +package config + +type Bulk struct { + DBPath string `json:"dbPath,omitempty"` + StoreRequests bool + Minio struct { + Endpoint string `json:"endpoint,omitempty"` + AccessKeyID string `json:"accessKeyID,omitempty"` + SecretAccessKey string `json:"secretAccessKey,omitempty"` + UseSSL bool `json:"useSSL,omitempty"` + BucketName string `json:"bucketName,omitempty"` + } `json:"minio,omitempty"` +} diff --git a/ikuzo/ikuzoctl/cmd/config/config.go b/ikuzo/ikuzoctl/cmd/config/config.go index 49c44b10..aa41a368 100644 --- a/ikuzo/ikuzoctl/cmd/config/config.go +++ b/ikuzo/ikuzoctl/cmd/config/config.go @@ -17,13 +17,14 @@ package config import ( "fmt" + "github.com/pacedotdev/oto/otohttp" + "github.com/spf13/viper" + "github.com/delving/hub3/ikuzo" "github.com/delving/hub3/ikuzo/domain" "github.com/delving/hub3/ikuzo/logger" "github.com/delving/hub3/ikuzo/service/organization" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/pacedotdev/oto/otohttp" - "github.com/spf13/viper" ) type Option interface { @@ -51,6 +52,7 @@ type Config struct { NDE map[string]NDECfg `json:"nde"` NDERegister NDE `json:"-" toml:"-"` RDF `json:"rdf"` + Bulk `json:"bulk"` Sitemap `json:"sitemap"` oto *otohttp.Server } diff --git a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go index aa95e173..3fb2ea04 100644 --- a/ikuzo/ikuzoctl/cmd/config/elasticsearch.go +++ b/ikuzo/ikuzoctl/cmd/config/elasticsearch.go @@ -19,6 +19,9 @@ import ( "fmt" "sync" + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/rs/zerolog/log" + "github.com/delving/hub3/ikuzo" "github.com/delving/hub3/ikuzo/domain" es "github.com/delving/hub3/ikuzo/driver/elasticsearch" @@ -27,8 +30,6 @@ import ( "github.com/delving/hub3/ikuzo/service/x/bulk" "github.com/delving/hub3/ikuzo/service/x/esproxy" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/rs/zerolog/log" ) type ElasticSearch struct { @@ -122,9 +123,11 @@ func (e *ElasticSearch) AddOptions(cfg *Config) error { bulk.SetIndexService(is), bulk.SetIndexTypes(e.IndexTypes...), bulk.SetPostHookService(postHooks...), + bulk.SetBlobConfig(cfg.Bulk.Minio), + bulk.SetLogRequests(cfg.Bulk.StoreRequests), ) if bulkErr != nil { - return fmt.Errorf("unable to create bulk service; %w", isErr) + return fmt.Errorf("unable to create bulk service; %w", bulkErr) } cfg.options = append( diff --git a/ikuzo/ikuzoctl/cmd/index.go b/ikuzo/ikuzoctl/cmd/index.go index 24a0c4f6..aeb1b34f 100644 --- a/ikuzo/ikuzoctl/cmd/index.go +++ b/ikuzo/ikuzoctl/cmd/index.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -28,9 +28,10 @@ import ( "syscall" "time" - "github.com/delving/hub3/ikuzo/service/x/bulk" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + + "github.com/delving/hub3/ikuzo/service/x/bulk" ) // indexCmd represents the index command @@ -77,7 +78,7 @@ func indexRecords() error { bulk.SetIndexTypes(cfg.ElasticSearch.IndexTypes...), ) if bulkErr != nil { - return fmt.Errorf("unable to create bulk service; %w", isErr) + return fmt.Errorf("unable to create bulk service; %w", bulkErr) } parser := bulkSvc.NewParser() diff --git a/ikuzo/ikuzoctl/cmd/serve.go b/ikuzo/ikuzoctl/cmd/serve.go index d3064c46..0654e4f2 100644 --- a/ikuzo/ikuzoctl/cmd/serve.go +++ b/ikuzo/ikuzoctl/cmd/serve.go @@ -19,10 +19,11 @@ import ( stdlog "log" - "github.com/delving/hub3/hub3/server/http/handlers" - "github.com/delving/hub3/ikuzo" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + + "github.com/delving/hub3/hub3/server/http/handlers" + "github.com/delving/hub3/ikuzo" ) //go:embed static @@ -50,6 +51,7 @@ func serve() { options, err := cfg.Options() if err != nil { + stdlog.Printf("unable to create options: %s", err) log.Fatal(). Err(err). Stack(). diff --git a/ikuzo/service/x/bulk/db.go b/ikuzo/service/x/bulk/db.go new file mode 100644 index 00000000..d34610ab --- /dev/null +++ b/ikuzo/service/x/bulk/db.go @@ -0,0 +1,29 @@ +package bulk + +import "github.com/delving/hub3/hub3/fragments" + +func (s *Service) getPreviousUpdates(ids []string) ([]*fragments.SparqlUpdate, error) { + return []*fragments.SparqlUpdate{}, nil +} + +func (s *Service) storeUpdatedHashes(diffs []*DiffConfig) error { + return nil +} + +func (s *Service) incrementRevisionForSeen(ids []string) error { + return nil +} + +type sparqlOrphan struct { + NamedGraphURI string + OrgID string + DatasetID string +} + +func (s *Service) findOrphans(orgID, dataSetID string, revision int) ([]sparqlOrphan, error) { + return []sparqlOrphan{}, nil +} + +func (s *Service) dropOrphans(orphans []sparqlOrphan) error { + return nil +} diff --git a/ikuzo/service/x/bulk/delta.go b/ikuzo/service/x/bulk/delta.go new file mode 100644 index 00000000..0f4ad9b7 --- /dev/null +++ b/ikuzo/service/x/bulk/delta.go @@ -0,0 +1,114 @@ +package bulk + +import ( + "context" + "fmt" + "strings" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" + + "github.com/delving/hub3/hub3/fragments" +) + +const ( + rdfType = "rdf" + esType = "es" +) + +type redisStore struct { + orgID string + spec string + revision int + previousRevision int + c *redis.Client +} + +func (rs *redisStore) datasetKey() string { + return fmt.Sprintf("%s:ds:%s", rs.orgID, rs.spec) +} + +func (rs *redisStore) getStoreKey(hubID string) string { + return hubID +} + +func (rs *redisStore) storeRDFData(su fragments.SparqlUpdate) error { + ctx := context.Background() + key := rs.getStoreKey(su.HubID) + err := rs.c.HMSet(ctx, key, map[string]interface{}{ + // rdfType: su.Triples, + "graphURI": su.NamedGraphURI, + }).Err() + if err != nil { + return err + } + + return nil +} + +func (rs *redisStore) addID(hubID, setType, hash string) (bool, error) { + ctx := context.Background() + err := rs.c.SAdd(ctx, rs.revisionSetName(setType, false), hubID).Err() + if err != nil { + return false, err + } + hasKey := hubID + ":" + hash + if err := rs.c.SAdd(ctx, rs.revisionSetName(setType, true), hasKey).Err(); err != nil { + return false, err + } + + return rs.c.SIsMember(ctx, rs.previousSetName(setType, true), hasKey).Result() +} + +func (rs *redisStore) revisionSetName(setType string, withHash bool) string { + key := fmt.Sprintf("%s:rev:%d:%s", rs.datasetKey(), rs.revision, setType) + if withHash { + key += ":hash" + } + return key +} + +func (rs *redisStore) previousSetName(setType string, withHash bool) string { + key := fmt.Sprintf("%s:rev:%d:%s", rs.datasetKey(), rs.previousRevision, setType) + if withHash { + key += ":hash" + } + return key +} + +func (rs *redisStore) dropOrphansQuery(orphans []string) (string, error) { + var sb strings.Builder + for _, orphan := range orphans { + key := rs.getStoreKey(orphan) + log.Printf("orphan key: %s", key) + res, err := rs.c.HGet(context.Background(), key, "graphURI").Result() + if err != nil { + return "", fmt.Errorf("unable to get hash key: %w", err) + } + if res != "" { + sb.WriteString(fmt.Sprintf("drop graph <%s> ;\n", res)) + } + } + + return sb.String(), nil +} + +func (rs *redisStore) findOrphans(setType string) ([]string, error) { + ctx := context.Background() + return rs.c.SDiff(ctx, rs.previousSetName(setType, false), rs.revisionSetName(setType, false)).Result() +} + +func (rs *redisStore) SetRevision(current, previous int) error { + rs.revision = current + rs.previousRevision = previous + ctx := context.Background() + err := rs.c.HMSet(ctx, rs.datasetKey(), map[string]interface{}{ + "currentRevision": current, + "previousRevision": previous, + }).Err() + if err != nil { + return err + } + + return nil +} diff --git a/ikuzo/service/x/bulk/handle_upload.go b/ikuzo/service/x/bulk/handle_upload.go index df83c89e..2be1ea73 100644 --- a/ikuzo/service/x/bulk/handle_upload.go +++ b/ikuzo/service/x/bulk/handle_upload.go @@ -3,10 +3,11 @@ package bulk import ( "net/http" - "github.com/delving/hub3/hub3/fragments" - "github.com/delving/hub3/ikuzo/domain" "github.com/go-chi/render" "github.com/rs/zerolog/log" + + "github.com/delving/hub3/hub3/fragments" + "github.com/delving/hub3/ikuzo/domain" ) // bulkApi receives bulkActions in JSON form (1 per line) and processes them in @@ -55,6 +56,8 @@ func (s *Service) NewParser() *Parser { indexTypes: s.indexTypes, bi: s.index, sparqlUpdates: []fragments.SparqlUpdate{}, + s: s, + graphs: map[string]*fragments.FragmentBuilder{}, } if len(s.postHooks) != 0 { diff --git a/ikuzo/service/x/bulk/options.go b/ikuzo/service/x/bulk/options.go index 9903d555..462ac741 100644 --- a/ikuzo/service/x/bulk/options.go +++ b/ikuzo/service/x/bulk/options.go @@ -7,6 +7,21 @@ import ( type Option func(*Service) error +type BlobConfig struct { + Endpoint string `json:"endpoint,omitempty"` + AccessKeyID string `json:"accessKeyID,omitempty"` + SecretAccessKey string `json:"secretAccessKey,omitempty"` + UseSSL bool `json:"useSSL,omitempty"` + BucketName string `json:"bucketName,omitempty"` +} + +func SetBlobConfig(cfg BlobConfig) Option { + return func(s *Service) error { + s.blobCfg = cfg + return nil + } +} + func SetIndexService(is *index.Service) Option { return func(s *Service) error { s.index = is @@ -21,6 +36,13 @@ func SetIndexTypes(indexTypes ...string) Option { } } +func SetLogRequests(enable bool) Option { + return func(s *Service) error { + s.logRequests = enable + return nil + } +} + func SetPostHookService(hooks ...domain.PostHookService) Option { return func(s *Service) error { for _, hook := range hooks { diff --git a/ikuzo/service/x/bulk/parser.go b/ikuzo/service/x/bulk/parser.go index a5dd7f44..39712dad 100644 --- a/ikuzo/service/x/bulk/parser.go +++ b/ikuzo/service/x/bulk/parser.go @@ -22,20 +22,25 @@ import ( "errors" "fmt" "io" + "os" + "sort" "strings" "sync" "sync/atomic" + "time" + "github.com/oklog/ulid" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" + "github.com/delving/hub3/config" "github.com/delving/hub3/hub3/fragments" "github.com/delving/hub3/hub3/models" "github.com/delving/hub3/ikuzo/domain" "github.com/delving/hub3/ikuzo/domain/domainpb" "github.com/delving/hub3/ikuzo/service/x/index" - "github.com/rs/zerolog/log" - "golang.org/x/sync/errgroup" rdf "github.com/kiivihal/rdf2go" ) @@ -50,13 +55,28 @@ type Parser struct { sparqlUpdates []fragments.SparqlUpdate // store all the triples here for bulk insert postHooks []*domain.PostHookItem m sync.RWMutex + s *Service + graphs map[string]*fragments.FragmentBuilder // TODO: probably remove this later + rawRequest bytes.Buffer + store *redisStore +} + +func (p *Parser) setUpdateDataset(ds *models.DataSet) { + p.m.Lock() + p.ds = ds + p.m.Unlock() +} + +func (p *Parser) dataset() *models.DataSet { + p.m.RLock() + defer p.m.RUnlock() + return p.ds } func (p *Parser) Parse(ctx context.Context, r io.Reader) error { ctx, done := context.WithCancel(ctx) g, gctx := errgroup.WithContext(ctx) _ = gctx - defer done() workers := 4 @@ -64,7 +84,9 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { actions := make(chan Request) g.Go(func() error { - defer close(actions) + defer func() { + close(actions) + }() scanner := bufio.NewScanner(r) buf := make([]byte, 0, 64*1024) @@ -74,6 +96,11 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { var req Request b := scanner.Bytes() + if p.s.logRequests { + b = append(b, '\n') + p.rawRequest.Write(b) + } + if err := json.Unmarshal(b, &req); err != nil { atomic.AddUint64(&p.stats.JSONErrors, 1) log.Error().Str("svc", "bulk").Err(err).Msg("json parse error") @@ -81,6 +108,11 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { atomic.AddUint64(&p.stats.JSONErrors, 1) continue } + if p.dataset() == nil { + p.once.Do(func() { + p.setDataSet(&req) + }) + } select { case actions <- req: @@ -128,18 +160,143 @@ func (p *Parser) Parse(ctx context.Context, r io.Reader) error { log.Warn().Err(err).Msg("context canceled during bulk indexing") } + if p.s.logRequests { + if err := p.storeRequest(); err != nil { + p.s.log.Warn().Err(err).Msg("unable to store request for debugging") + } + } + + p.s.log.Info().Msgf("graphs: %d", len(p.graphs)) + p.s.log.Info().Msgf("%#v", config.Config.RDF) + if config.Config.RDF.RDFStoreEnabled { - if errs := p.RDFBulkInsert(); errs != nil { - return errs[0] + if config.Config.RDF.StoreSparqlDeltas { + if err := p.StoreGraphDeltas(); err != nil { + return err + } + } else { + if errs := p.RDFBulkInsert(); errs != nil { + return errs[0] + } + } + } + + return nil +} + +// TODO: implement this +func (p *Parser) StoreGraphDeltas() error { + p.s.log.Info().Str("storeName", p.store.revisionSetName(rdfType, false)).Int("graphs", len(p.sparqlUpdates)).Msg("store deltas") + updates := []fragments.SparqlUpdate{} + for _, su := range p.sparqlUpdates { + known, err := p.store.addID(su.HubID, rdfType, su.GetHash()) + if err != nil { + return err + } + if known { + continue } + p.s.log.Info().Msgf("unknown hubID: %s", su.HubID) + updates = append(updates, su) + if err := p.store.storeRDFData(su); err != nil { + return err + } + } + + p.s.log.Info().Int("submitted", len(p.sparqlUpdates)).Int("changed", len(updates)).Msg("after delta check") + + p.sparqlUpdates = updates + if errs := p.RDFBulkInsert(); errs != nil { + return errs[0] + } + + return nil +} + +// TODO: remove this +func (p *Parser) StoreGraphDeltasOld() error { + ids := []string{} + for _, su := range p.sparqlUpdates { + ids = append(ids, su.HubID) + } + + // get previously stored updates + previousUpdates, err := p.s.getPreviousUpdates(ids) + if err != nil { + return err + } + lookUp := map[string]*fragments.SparqlUpdate{} + for _, prev := range previousUpdates { + lookUp[prev.HubID] = prev + } + + // check which updates have changed + changed := []*DiffConfig{} + for _, current := range p.sparqlUpdates { + prev, ok := lookUp[current.HubID] + if !ok { + // new so add it + changed = append(changed, &DiffConfig{su: ¤t}) + continue + } + + if prev.RDFHash == current.RDFHash { + // same has so skip it + continue + } + + changed = append(changed, &DiffConfig{su: ¤t, previousTriples: prev.Triples, previousHash: prev.RDFHash}) + } + + var sparqlUpdate strings.Builder + for _, cfg := range changed { + update, err := diffAsSparqlUpdate(cfg) + if err != nil { + return err + } + sparqlUpdate.WriteString(update) + } + + // update the diffs in the triple store + errs := fragments.UpdateViaSparql(p.dataset().OrgID, sparqlUpdate.String()) + if errs != nil { + return errs[0] + } + + // store the graphs in the S3 bucket + // if err := p.StoreGraphs(changed); err != nil { + // return err + // } + + // store the update hashes + if err := p.s.storeUpdatedHashes(changed); err != nil { + return err + } + + // update revision for all seen ids + if err := p.s.incrementRevisionForSeen(ids); err != nil { + return err } return nil } +// logRequest logs the bulk request to disk for inspection and reuse +func (p *Parser) storeRequest() error { + u, err := ulid.New(ulid.Now(), nil) + if err != nil { + p.s.log.Error().Err(err).Msg("unable to create ulid") + return err + } + + path := fmt.Sprintf("/tmp/%s_%s_%d_%s.ldjson", p.ds.OrgID, p.ds.Spec, p.ds.Revision, u.String()) + return os.WriteFile(path, p.rawRequest.Bytes(), os.ModePerm) +} + // RDFBulkInsert inserts all triples from the bulkRequest in one SPARQL update statement func (p *Parser) RDFBulkInsert() []error { triplesStored, errs := fragments.RDFBulkInsert(p.ds.OrgID, p.sparqlUpdates) + p.stats.GraphsStored = uint64(len(p.sparqlUpdates)) p.sparqlUpdates = nil p.stats.TriplesStored = uint64(triplesStored) @@ -156,11 +313,11 @@ func containsString(s []string, e string) bool { return false } -func (p *Parser) setDataSet(req *Request) { - ds, _, dsError := models.GetOrCreateDataSet(req.OrgID, req.DatasetID) - if dsError != nil { - // log error - return +func (p *Parser) setDataSet(req *Request) error { + ds, _, dsErr := models.GetOrCreateDataSet(req.OrgID, req.DatasetID) + if dsErr != nil { + p.s.log.Error().Err(dsErr).Msg("unable to get or create dataset") + return dsErr } if ds.RecordType == "" { @@ -187,8 +344,39 @@ func (p *Parser) setDataSet(req *Request) { p.stats.Spec = req.DatasetID p.stats.OrgID = req.OrgID - req.Revision = ds.Revision - p.ds = ds + p.stats.SpecRevision = uint64(ds.Revision) + p.setUpdateDataset(ds) + + p.store = &redisStore{ + orgID: req.OrgID, + spec: req.DatasetID, + c: p.s.rc, + } + + return nil +} + +func (p *Parser) dropGraphOrphans() error { + p.s.log.Info().Msg("dropping orphans") + orphans, err := p.store.findOrphans(rdfType) + if err != nil { + return err + } + p.s.log.Info().Int("orphanCount", len(orphans)).Msgf("%#v", orphans) + if len(orphans) == 0 { + return nil + } + + updateQuery, err := p.store.dropOrphansQuery(orphans) + if err != nil { + return err + } + + errs := fragments.UpdateViaSparql(p.stats.OrgID, updateQuery) + if len(errs) != 0 { + return errs[0] + } + return nil } func (p *Parser) dropOrphans(req *Request) error { @@ -203,6 +391,25 @@ func (p *Parser) dropOrphans(req *Request) error { return err } + if config.Config.RDF.RDFStoreEnabled { + if config.Config.RDF.StoreSparqlDeltas { + go func() { + // block for orphanWait seconds to allow cluster to be in sync + timer := time.NewTimer(time.Second * time.Duration(5)) + <-timer.C + if err := p.dropGraphOrphans(); err != nil { + p.s.log.Error().Err(err).Msg("unable to drop graph orphans") + } + }() + } else { + p.s.log.Info().Msg("wrong orphan") + _, err := models.DeleteGraphsOrphansBySpec(p.ds.OrgID, p.ds.Spec, p.ds.Revision) + if err != nil { + return err + } + } + } + return nil } @@ -219,17 +426,32 @@ func addLogger(datasetID string) zerolog.Logger { } } -func (p *Parser) process(ctx context.Context, req *Request) error { - p.once.Do(func() { p.setDataSet(req) }) +func (p *Parser) IncrementRevision() (int, error) { + previous := p.ds.Revision + ds, err := p.dataset().IncrementRevision() + if err != nil { + return 0, err + } + + if err := p.store.SetRevision(ds.Revision, previous); err != nil { + return 0, err + } + + p.setUpdateDataset(ds) + + p.stats.SpecRevision = uint64(ds.Revision) + return ds.Revision, nil +} + +func (p *Parser) process(ctx context.Context, req *Request) error { subLogger := addLogger(req.DatasetID) - if p.ds == nil { + if p.dataset() == nil { return fmt.Errorf("unable to get dataset") } req.Revision = p.ds.Revision - // TODO(kiivihal): add logger switch req.Action { case "index": @@ -239,13 +461,12 @@ func (p *Parser) process(ctx context.Context, req *Request) error { return err } case "increment_revision": - ds, err := p.ds.IncrementRevision() + revision, err := p.IncrementRevision() if err != nil { subLogger.Error().Err(err).Str("datasetID", req.DatasetID).Msg("Unable to increment DataSet") - return err } - subLogger.Info().Str("datasetID", req.DatasetID).Int("revision", ds.Revision).Msg("Incremented dataset") + subLogger.Info().Str("datasetID", req.DatasetID).Int("revision", revision).Msg("Incremented dataset") case "clear_orphans", "drop_orphans": // clear triples if err := p.dropOrphans(req); err != nil { @@ -328,6 +549,10 @@ func (p *Parser) Publish(ctx context.Context, req *Request) error { } } + p.m.Lock() + p.graphs[fb.FragmentGraph().Meta.HubID] = fb + p.m.Unlock() + for _, indexType := range p.indexTypes { switch indexType { case "v1": @@ -393,10 +618,16 @@ func (p *Parser) AppendRDFBulkRequest(req *Request, g *rdf.Graph) error { Triples: b.String(), NamedGraphURI: req.NamedGraphURI, Spec: req.DatasetID, - SpecRevision: req.Revision, + HubID: req.HubID, + OrgID: req.OrgID, + SpecRevision: req.Revision, // TODO: This can only be removed after the orphan control is fixed } + su.GetHash() + + p.m.Lock() p.sparqlUpdates = append(p.sparqlUpdates, su) + p.m.Unlock() return nil } @@ -410,6 +641,7 @@ type Stats struct { RecordsStored uint64 `json:"recordsStored"` // originally json was records_stored JSONErrors uint64 `json:"jsonErrors"` TriplesStored uint64 `json:"triplesStored"` + GraphsStored uint64 `json:"graphsStored"` PostHooksSubmitted uint64 `json:"postHooksSubmitted"` // ContentHashMatches uint64 `json:"contentHashMatches"` // originally json was content_hash_matches } @@ -430,7 +662,9 @@ func encodeTerm(iterm rdf.Term) string { func serializeNTriples(g *rdf.Graph, w io.Writer) error { var err error - for triple := range g.IterTriples() { + triples := []string{} + + for triple := range g.IterTriplesOrdered() { s := encodeTerm(triple.Subject) if strings.HasPrefix(s, " "{{.Spec}}" . + // <{{.NamedGraphURI}}> "{{.RDFHash}}" . + // <{{.NamedGraphURI}}> "{{.SpecRevision}}"^^ . + + if len(removed) != 0 { + // Construct the DELETE statement + sb.WriteString("DELETE DATA {\n") + sb.WriteString("GRAPH <") + sb.WriteString(cfg.su.NamedGraphURI) + sb.WriteString("> { ") + for _, triple := range removed { + sb.WriteString(triple + "\n") + } + sb.WriteString(" }\n") + sb.WriteString("}\n") + } + + if len(added) != 0 { + // Construct the INSERT statement + sb.WriteString("INSERT DATA {\n") + sb.WriteString("GRAPH <") + sb.WriteString(cfg.su.NamedGraphURI) + sb.WriteString("> { ") + // TODO: generate from DiffConfig + // if su.Spec != "" { + // sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", su.NamedGraphURI, su.Spec)) + // sb.WriteString(fmt.Sprintf("<%s> \"%s\" .\n", su.NamedGraphURI, su.RDFHash)) + // } + for _, triple := range added { + sb.WriteString(triple + "\n") + } + sb.WriteString(" }\n") + sb.WriteString("}\n") + } + + return sb.String() +} + +func diffTriples(a, b string) (insertedLines, deletedLines []string) { + return diffStrings(getSortedLines(a), getSortedLines(b)) +} + +func diffStrings(a, b []string) (added, removed []string) { + mapA := make(map[string]bool) + for _, val := range a { + mapA[val] = true + } + mapB := make(map[string]bool) + for _, val := range b { + mapB[val] = true + } + + for val := range mapB { + if !mapA[val] { + added = append(added, val) + } + } + + for val := range mapA { + if !mapB[val] { + removed = append(removed, val) + } + } + + sort.Strings(added) + sort.Strings(removed) + + return added, removed +} + +func getSortedLines(s string) []string { + lines := make([]string, 0) + scanner := bufio.NewScanner(strings.NewReader(s)) + for scanner.Scan() { + line := scanner.Text() + if strings.TrimSpace(line) != "" { + lines = append(lines, line) + } + } + sort.Strings(lines) + return lines +} diff --git a/ikuzo/service/x/bulk/sparql_test.go b/ikuzo/service/x/bulk/sparql_test.go new file mode 100644 index 00000000..a22a759a --- /dev/null +++ b/ikuzo/service/x/bulk/sparql_test.go @@ -0,0 +1,88 @@ +package bulk + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/matryer/is" +) + +func TestDiffTriples(t *testing.T) { + t.Run("inserted and deleted", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + + b := `<1> "title 2" . + <1> "subject". + <1> "123" . + <1> <2> . + <1> "subject3". + ` + + inserted, deleted := diffTriples(a, b) + t.Logf("inserted: %#v", inserted) + t.Logf("deleted: %#v", deleted) + is.Equal(len(inserted), 2) + is.Equal(len(deleted), 2) + is.Equal(inserted, []string{"\t<1> \"subject3\".", "<1> \"title 2\" ."}) + is.Equal(deleted, []string{"\t<1> \"subject2\".", "<1> \"title\" ."}) + }) + t.Run("no changes", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + inserted, deleted := diffTriples(a, a) + is.Equal(len(inserted), 0) + is.Equal(len(deleted), 0) + }) + t.Run("no changes", func(t *testing.T) { + is := is.New(t) + + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + inserted, deleted := diffTriples(``, a) + is.Equal(len(inserted), 5) + is.Equal(len(deleted), 0) + }) +} + +func TestDiffAsSparqlUpdate(t *testing.T) { + is := is.New(t) + a := `<1> "title" . + <1> "subject". + <1> "subject2". + <1> "123" . + <1> <2> . + ` + + b := `<1> "title 2" . + <1> "subject". + <1> "123" . + <1> <2> . + <1> "subject3". + ` + + want := "DELETE DATA {\nGRAPH { \t<1> \"subject2\".\n<1> \"title\" .\n }\n}\nINSERT DATA {\nGRAPH { \"123\" .\n\t<1> \"subject3\".\n<1> \"title 2\" .\n }\n}\n" + + got, err := diffAsSparqlUpdate(a, b, "urn:123/graph", "123") + t.Logf("query: %#v", got) + is.NoErr(err) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("diffAsSparqlUpdate() mismatch (-want +got):\n%s", diff) + } +}