diff --git a/clinics/service.go b/clinics/service.go index 8ef3adaa7..101dc0acd 100644 --- a/clinics/service.go +++ b/clinics/service.go @@ -28,6 +28,7 @@ type Client interface { ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error) SyncEHRData(ctx context.Context, clinicID string) error GetPatients(ctx context.Context, clinicId string, userToken string, params *clinic.ListPatientsParams, injectedParams url.Values) ([]clinic.Patient, error) + GetPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) } type config struct { @@ -155,7 +156,7 @@ func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patie } if response.StatusCode() == http.StatusConflict { // User is already shared with the clinic - return d.getPatient(ctx, clinicID, patientID) + return d.GetPatient(ctx, clinicID, patientID) } if response.StatusCode() != http.StatusOK { err = errors.Preparedf(ErrorCodeClinicClientFailure, @@ -182,7 +183,7 @@ func (d *defaultClient) SyncEHRData(ctx context.Context, clinicID string) error return nil } -func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) { +func (d *defaultClient) GetPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) { response, err := d.httpClient.GetPatientWithResponse(ctx, clinic.ClinicId(clinicID), clinic.PatientId(patientID)) if err != nil { return nil, err diff --git a/conditionalnotifications/claimaccount/processor.go b/conditionalnotifications/claimaccount/processor.go new file mode 100644 index 000000000..7ca6514e0 --- /dev/null +++ b/conditionalnotifications/claimaccount/processor.go @@ -0,0 +1,129 @@ +package claimaccount + +import ( + "context" + "fmt" + "time" + + "github.com/tidepool-org/platform/conditionalnotifications" + "github.com/tidepool-org/platform/pointer" + "github.com/tidepool-org/platform/structure" + "github.com/tidepool-org/platform/work" +) + +const ( + processorType = "org.tidepool.processors.claim.account" + quantity = 4 + frequency = time.Minute + processingTimeoutSeconds = 60 +) + +// NewGroupID returns a string suitable for [work.Work.GroupID] that is meant +// to group related claim account notifications together so they can all be +// deleted if the condition to send them is no longer active. For example, if a +// user has already claimed their account but there is a pending notification +// that hasn't been processed yetm the processor should delete all work items +// of the same group id when it is time to process the item. +func NewGroupID(userId string) string { + return fmt.Sprintf("%s:%s", processorType, userId) +} + +type Metadata struct { + ClinicId string `json:"clinicId,omitempty"` + UserId string `json:"userId,omitempty"` + WhenToSend time.Time `json:"whenToSend,omitzero"` +} + +func (d *Metadata) Parse(parser structure.ObjectParser) { + d.ClinicId = pointer.ToString(parser.String("clinicId")) + d.UserId = pointer.ToString(parser.String("userId")) + d.WhenToSend = pointer.ToTime(parser.Time("whenToSend", time.RFC3339Nano)) +} + +func (d *Metadata) Validate(validator structure.Validator) { + validator.String("clinicId", &d.ClinicId).NotEmpty() + validator.String("userId", &d.UserId).NotEmpty() +} + +func NewWorkCreate(notBefore time.Time, metadata Metadata) *work.Create { + return &work.Create{ + Type: processorType, + SerialID: pointer.FromString(metadata.UserId), + GroupID: pointer.FromString(NewGroupID(metadata.UserId)), + ProcessingTimeout: processingTimeoutSeconds, + ProcessingAvailableTime: notBefore, + Metadata: fromClaimAccountData(metadata), + } +} + +type processor struct { + dependencies conditionalnotifications.Dependencies +} + +func NewProcessor(dependencies conditionalnotifications.Dependencies) *processor { + return &processor{ + dependencies: dependencies, + } +} + +func (p *processor) Type() string { + return processorType +} + +func (p *processor) Quantity() int { + return quantity +} + +func (p *processor) Frequency() time.Duration { + return frequency +} + +func (p *processor) Process(ctx context.Context, wrk *work.Work, updater work.ProcessingUpdater) work.ProcessResult { + data, err := toClaimAccountData(wrk) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + + patient, err := p.dependencies.Clinics.GetPatient(ctx, data.ClinicId, data.UserId) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + if patient == nil { + return conditionalnotifications.NewFailingResult(fmt.Errorf(`unable to find patient with userId "%v"`, data.UserId), wrk) + } + if pointer.ToString(patient.Email) == "" { + return conditionalnotifications.NewFailingResult(fmt.Errorf(`unable to find email for patient with userId "%v"`, data.UserId), wrk) + } + // If user already claimed they will no longer have the custodian field set + if patient != nil && (patient.Permissions == nil || patient.Permissions.Custodian == nil) { + return *work.NewProcessResultDelete() + } + + if _, err := p.dependencies.Confirmations.ResendAccountSignupWithResponse(ctx, *patient.Email); err != nil { + return conditionalnotifications.NewFailingResult(fmt.Errorf(`unable to resend account signup email: %w`, err), wrk) + } + return *work.NewProcessResultDelete() +} + +func toClaimAccountData(wrk *work.Work) (*Metadata, error) { + wrk.EnsureMetadata() + var data Metadata + if userId, ok := wrk.Metadata["userId"].(string); ok { + data.UserId = userId + } else { + return nil, fmt.Errorf(`expected field "userId" to exist and be a string, received %T`, wrk.Metadata["userId"]) + } + if clinicId, ok := wrk.Metadata["clinicId"].(string); ok { + data.ClinicId = clinicId + } else { + return nil, fmt.Errorf(`expected field "clinicId" to exist and be a string, received %T`, wrk.Metadata["clinicId"]) + } + return &data, nil +} + +func fromClaimAccountData(data Metadata) map[string]any { + return map[string]any{ + "userId": data.UserId, + "clinicId": data.ClinicId, + } +} diff --git a/conditionalnotifications/connectaccount/processor.go b/conditionalnotifications/connectaccount/processor.go new file mode 100644 index 000000000..4ed2e5c3a --- /dev/null +++ b/conditionalnotifications/connectaccount/processor.go @@ -0,0 +1,173 @@ +package connectaccount + +import ( + "context" + "fmt" + "time" + + "github.com/tidepool-org/go-common/events" + "github.com/tidepool-org/platform/conditionalnotifications" + "github.com/tidepool-org/platform/data/source" + "github.com/tidepool-org/platform/pointer" + "github.com/tidepool-org/platform/structure" + "github.com/tidepool-org/platform/work" +) + +const ( + processorType = "org.tidepool.processors.connect.account" + quantity = 4 + frequency = time.Minute + processingTimeoutSeconds = 60 +) + +// NewGroupID returns a string suitable for [work.Work.GroupID] for batch deletions. +func NewGroupID(email, providerName string) string { + return fmt.Sprintf("%s:%s:%s", processorType, email, providerName) +} + +type Metadata struct { + ClinicId string `json:"clinicId,omitempty"` + Email string `json:"email,omitempty"` + EmailTemplate string `json:"emailTemplate,omitempty"` + PatientName string `json:"patientName,omitempty"` + ProviderName string `json:"providerName,omitempty"` + RestrictedTokenId string `json:"restrictedTokenId,omitempty"` + UserId string `json:"userId,omitempty"` + WhenToSend time.Time `json:"whenToSend,omitzero"` +} + +type processor struct { + dependencies conditionalnotifications.Dependencies +} + +func NewWorkCreate(notBefore time.Time, metadata Metadata) *work.Create { + return &work.Create{ + Type: processorType, + SerialID: pointer.FromString(metadata.UserId), + GroupID: pointer.FromString(NewGroupID(metadata.Email, metadata.ProviderName)), + ProcessingTimeout: processingTimeoutSeconds, + ProcessingAvailableTime: notBefore, + Metadata: fromConnectAccountData(metadata), + } +} + +func (d *Metadata) Parse(parser structure.ObjectParser) { + d.ClinicId = pointer.ToString(parser.String("clinicId")) + d.Email = pointer.ToString(parser.String("email")) + d.EmailTemplate = pointer.ToString(parser.String("emailTemplate")) + d.PatientName = pointer.ToString(parser.String("patientName")) + d.ProviderName = pointer.ToString(parser.String("providerName")) + d.RestrictedTokenId = pointer.ToString(parser.String("restrictedTokenId")) + d.UserId = pointer.ToString(parser.String("userId")) + d.WhenToSend = pointer.ToTime(parser.Time("whenToSend", time.RFC3339Nano)) +} + +func (d *Metadata) Validate(validator structure.Validator) { + validator.String("clinicId", &d.ClinicId).NotEmpty() + validator.String("email", &d.Email).NotEmpty() + validator.String("emailTemplate", &d.EmailTemplate).NotEmpty() + validator.String("patientName", &d.PatientName).NotEmpty() + validator.String("providerName", &d.ProviderName).NotEmpty() + validator.String("restrictedTokenId", &d.RestrictedTokenId).NotEmpty() + validator.String("userId", &d.UserId).NotEmpty() +} + +func NewProcessor(dependencies conditionalnotifications.Dependencies) *processor { + return &processor{ + dependencies: dependencies, + } +} + +func (p *processor) Type() string { + return processorType +} + +func (p *processor) Quantity() int { + return quantity +} + +func (p *processor) Frequency() time.Duration { + return frequency +} + +func (p *processor) Process(ctx context.Context, wrk *work.Work, updater work.ProcessingUpdater) work.ProcessResult { + data, err := toConnectAccountData(wrk) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + + user, err := p.dependencies.Users.Get(ctx, data.UserId) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + if user == nil || user.Username == nil { + return conditionalnotifications.NewFailingResult(fmt.Errorf(`unable to find user for userId "%s"`, data.UserId), wrk) + } + filter := source.NewFilter() + filter.ProviderName = pointer.FromStringArray([]string{data.ProviderName}) + filter.State = pointer.FromStringArray([]string{"connected"}) + connectedDataSources, err := p.dependencies.DataSources.List(ctx, data.UserId, filter, nil) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + if len(connectedDataSources) > 0 { + // User now has a connected dataSource so no email to send. + return *work.NewProcessResultDelete() + } + + emailVars := map[string]string{ + "RestrictedTokenId": data.RestrictedTokenId, + "PatientName": data.PatientName, + "ProviderName": data.ProviderName, + } + templateEvent := events.SendEmailTemplateEvent{ + Recipient: *user.Username, + Template: data.EmailTemplate, + Variables: emailVars, + } + if err := p.dependencies.Mailer.SendEmailTemplate(ctx, templateEvent); err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + return *work.NewProcessResultDelete() +} + +func toConnectAccountData(wrk *work.Work) (*Metadata, error) { + wrk.EnsureMetadata() + var data Metadata + if userId, ok := wrk.Metadata["userId"].(string); ok { + data.UserId = userId + } else { + return nil, fmt.Errorf(`expected field "userId" to exist and be a string, received %T`, wrk.Metadata["userId"]) + } + if providerName, ok := wrk.Metadata["providerName"].(string); ok { + data.ProviderName = providerName + } else { + return nil, fmt.Errorf(`expected field "providerName" to exist and be a string, received %T`, wrk.Metadata["providerName"]) + } + if patientName, ok := wrk.Metadata["patientName"].(string); ok { + data.PatientName = patientName + } else { + return nil, fmt.Errorf(`expected field "patientName" to exist and be a string, received %T`, wrk.Metadata["patientName"]) + } + if restrictedTokenId, ok := wrk.Metadata["restrictedTokenId"].(string); ok { + data.RestrictedTokenId = restrictedTokenId + } else { + return nil, fmt.Errorf(`expected field "restrictedTokenId" to exist and be a string, received %T`, wrk.Metadata["restrictedTokenId"]) + } + if emailTemplate, ok := wrk.Metadata["emailTemplate"].(string); ok { + data.EmailTemplate = emailTemplate + } else { + return nil, fmt.Errorf(`expected field "emailTemplate" to exist and be a string, received %T`, wrk.Metadata["emailTemplate"]) + } + return &data, nil +} + +func fromConnectAccountData(data Metadata) map[string]any { + return map[string]any{ + "userId": data.UserId, + "providerName": data.ProviderName, + "patientName": data.PatientName, + "restrictedTokenId": data.RestrictedTokenId, + "emailTemplate": data.EmailTemplate, + } +} diff --git a/conditionalnotifications/connectionissues/processor.go b/conditionalnotifications/connectionissues/processor.go new file mode 100644 index 000000000..6b39d7572 --- /dev/null +++ b/conditionalnotifications/connectionissues/processor.go @@ -0,0 +1,164 @@ +package connectionissues + +import ( + "context" + "fmt" + "time" + + "github.com/tidepool-org/go-common/events" + "github.com/tidepool-org/platform/pointer" + "github.com/tidepool-org/platform/work" + + "github.com/tidepool-org/platform/conditionalnotifications" + "github.com/tidepool-org/platform/structure" +) + +const ( + processorType = "org.tidepool.processors.device.connection.issues" + quantity = 2 + frequency = time.Minute + processingTimeoutSeconds = 60 +) + +// NewGroupID returns a string suitable for [work.Work.GroupID] for batch deletions. +func NewGroupID(dataSourceId string) string { + return fmt.Sprintf("%s:%s", processorType, dataSourceId) +} + +type processor struct { + dependencies conditionalnotifications.Dependencies +} + +type Metadata struct { + DataSourceState string `json:"dataSourceState,omitempty"` + DataSourceId string `json:"dataSourceId,omitempty"` + EmailTemplate string `json:"emailTemplate,omitempty"` + PatientName string `json:"patientName,omitempty"` + ProviderName string `json:"providerName,omitempty"` + RestrictedTokenId string `json:"restrictedTokenId,omitempty"` + UserId string `json:"userId,omitempty"` +} + +func (d *Metadata) Parse(parser structure.ObjectParser) { + d.DataSourceState = pointer.ToString(parser.String("dataSourceState")) + d.DataSourceId = pointer.ToString(parser.String("dataSourceId")) + d.EmailTemplate = pointer.ToString(parser.String("emailTemplate")) + d.PatientName = pointer.ToString(parser.String("patientName")) + d.ProviderName = pointer.ToString(parser.String("providerName")) + d.RestrictedTokenId = pointer.ToString(parser.String("restrictedTokenId")) + d.UserId = pointer.ToString(parser.String("userId")) +} + +func (d *Metadata) Validate(validator structure.Validator) { + validator.String("dataSourceState", &d.DataSourceState).NotEmpty() + validator.String("dataSourceId", &d.DataSourceId).NotEmpty() + validator.String("emailTemplate", &d.EmailTemplate).NotEmpty() + validator.String("patientName", &d.PatientName).NotEmpty() + validator.String("providerName", &d.ProviderName).NotEmpty() + validator.String("userId", &d.UserId).NotEmpty() +} + +func NewWorkCreate(metadata Metadata) *work.Create { + return &work.Create{ + Type: processorType, + SerialID: pointer.FromString(metadata.UserId), + GroupID: pointer.FromString(NewGroupID(metadata.DataSourceId)), + ProcessingTimeout: processingTimeoutSeconds, + Metadata: fromMetadata(metadata), + } +} + +func NewProcessor(dependencies conditionalnotifications.Dependencies) *processor { + return &processor{ + dependencies: dependencies, + } +} + +func (p *processor) Type() string { + return processorType +} + +func (p *processor) Quantity() int { + return quantity +} + +func (p *processor) Frequency() time.Duration { + return frequency +} + +func (p *processor) Process(ctx context.Context, wrk *work.Work, updater work.ProcessingUpdater) work.ProcessResult { + data, err := toMetadata(wrk) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + + user, err := p.dependencies.Users.Get(ctx, data.UserId) + if err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + if user == nil || user.Username == nil { + return conditionalnotifications.NewFailingResult(fmt.Errorf(`unable to find user for userId "%s"`, data.UserId), wrk) + } + + emailVars := map[string]string{ + "RestrictedTokenId": data.RestrictedTokenId, + "PatientName": data.PatientName, + "ProviderName": data.ProviderName, + } + templateEvent := events.SendEmailTemplateEvent{ + Recipient: *user.Username, + Template: data.EmailTemplate, + Variables: emailVars, + } + if err := p.dependencies.Mailer.SendEmailTemplate(ctx, templateEvent); err != nil { + return conditionalnotifications.NewFailingResult(err, wrk) + } + return *work.NewProcessResultDelete() +} + +func toMetadata(wrk *work.Work) (*Metadata, error) { + wrk.EnsureMetadata() + var data Metadata + if userId, ok := wrk.Metadata["userId"].(string); ok { + data.UserId = userId + } else { + return nil, fmt.Errorf(`expected field "userId" to exist and be a string, received %T`, wrk.Metadata["userId"]) + } + if providerName, ok := wrk.Metadata["providerName"].(string); ok { + data.ProviderName = providerName + } else { + return nil, fmt.Errorf(`expected field "providerName" to exist and be a string, received %T`, wrk.Metadata["providerName"]) + } + if dataSourceState, ok := wrk.Metadata["dataSourceState"].(string); ok { + data.DataSourceState = dataSourceState + } else { + return nil, fmt.Errorf(`expected field "dataSourceState" to exist and be a string, received %T`, wrk.Metadata["dataSourceState"]) + } + if patientName, ok := wrk.Metadata["patientName"].(string); ok { + data.PatientName = patientName + } else { + return nil, fmt.Errorf(`expected field "patientName" to exist and be a string, received %T`, wrk.Metadata["patientName"]) + } + if restrictedTokenId, ok := wrk.Metadata["restrictedTokenId"].(string); ok { + data.RestrictedTokenId = restrictedTokenId + } else { + return nil, fmt.Errorf(`expected field "restrictedTokenId" to exist and be a string, received %T`, wrk.Metadata["restrictedTokenId"]) + } + if emailTemplate, ok := wrk.Metadata["emailTemplate"].(string); ok { + data.EmailTemplate = emailTemplate + } else { + return nil, fmt.Errorf(`expected field "emailTemplate" to exist and be a string, received %T`, wrk.Metadata["emailTemplate"]) + } + return &data, nil +} + +func fromMetadata(data Metadata) map[string]any { + return map[string]any{ + "userId": data.UserId, + "providerName": data.ProviderName, + "dataSourceState": data.DataSourceState, + "patientName": data.PatientName, + "restrictedTokenId": data.RestrictedTokenId, + "emailTemplate": data.EmailTemplate, + } +} diff --git a/conditionalnotifications/notifications.go b/conditionalnotifications/notifications.go new file mode 100644 index 000000000..f308b5592 --- /dev/null +++ b/conditionalnotifications/notifications.go @@ -0,0 +1,46 @@ +package conditionalnotifications + +import ( + "math/rand" + "time" + + confirmationClient "github.com/tidepool-org/hydrophone/client" + "github.com/tidepool-org/platform/auth" + "github.com/tidepool-org/platform/clinics" + dataSourceStore "github.com/tidepool-org/platform/data/source/store/structured" + "github.com/tidepool-org/platform/errors" + "github.com/tidepool-org/platform/mailer" + "github.com/tidepool-org/platform/pointer" + "github.com/tidepool-org/platform/user" + "github.com/tidepool-org/platform/work" +) + +const ( + baseRetryDuration = 1 * time.Minute + retryDurationJitter = 5 * time.Second +) + +type Dependencies struct { + Auth auth.RestrictedTokenAccessor + Clinics clinics.Client + Confirmations confirmationClient.ClientWithResponsesInterface + DataSources dataSourceStore.DataSourcesRepository + Mailer mailer.Mailer + Users user.Client + Worker work.Client +} + +func NewFailingResult(err error, wrk *work.Work) work.ProcessResult { + failingRetryCount := pointer.DefaultInt(wrk.FailingRetryCount, 0) + 1 + return *work.NewProcessResultFailing(work.FailingUpdate{ + FailingError: errors.Serializable{Error: err}, + FailingRetryCount: pointer.DefaultInt(wrk.FailingRetryCount, 0) + 1, + FailingRetryTime: time.Now().Add(retryDuration(failingRetryCount)), + }) +} + +func retryDuration(retryCount int) time.Duration { + fallbackFactor := time.Duration(1 << (retryCount - 1)) + retryDurationJitter := int64(retryDurationJitter * fallbackFactor) + return baseRetryDuration*fallbackFactor + time.Duration(rand.Int63n(2*retryDurationJitter)-retryDurationJitter) +} diff --git a/data/service/api/v1/email_notifications.go b/data/service/api/v1/email_notifications.go new file mode 100644 index 000000000..cfe03e640 --- /dev/null +++ b/data/service/api/v1/email_notifications.go @@ -0,0 +1,93 @@ +package v1 + +import ( + "net/http" + "time" + + dataService "github.com/tidepool-org/platform/data/service" + "github.com/tidepool-org/platform/request" + serviceApi "github.com/tidepool-org/platform/service/api" + + "github.com/tidepool-org/platform/conditionalnotifications/claimaccount" + "github.com/tidepool-org/platform/conditionalnotifications/connectaccount" + "github.com/tidepool-org/platform/conditionalnotifications/connectionissues" +) + +func NotificationRoutes() []dataService.Route { + return []dataService.Route{ + dataService.Post("/v1/notifications/account/claims", queueClaimAccountNotification, serviceApi.RequireServer), + dataService.Post("/v1/notifications/account/connections", queueConnectAccountNotification, serviceApi.RequireServer), + dataService.Post("/v1/notifications/device/issues", sendDeviceIssuesNotification, serviceApi.RequireServer), + } +} + +func queueClaimAccountNotification(dataServiceContext dataService.Context) { + res := dataServiceContext.Response() + req := dataServiceContext.Request() + + responder := request.MustNewResponder(res, req) + + var data claimaccount.Metadata + if err := request.DecodeRequestBody(req.Request, &data); err != nil { + request.MustNewResponder(res, req).Error(http.StatusBadRequest, err) + return + } + + notBefore := data.WhenToSend + if notBefore.IsZero() { + notBefore = time.Now().Add(time.Hour * 24 * 7) + } + + createDetails := claimaccount.NewWorkCreate(notBefore, data) + _, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails) + if err != nil { + responder.Error(http.StatusInternalServerError, err) + return + } + + responder.Empty(http.StatusCreated) +} + +func queueConnectAccountNotification(dataServiceContext dataService.Context) { + res := dataServiceContext.Response() + req := dataServiceContext.Request() + + responder := request.MustNewResponder(res, req) + + var data connectaccount.Metadata + if err := request.DecodeRequestBody(req.Request, &data); err != nil { + request.MustNewResponder(res, req).Error(http.StatusBadRequest, err) + return + } + + createDetails := connectaccount.NewWorkCreate(time.Now().Add(time.Hour*24*7), data) + _, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails) + if err != nil { + responder.Error(http.StatusInternalServerError, err) + return + } + + responder.Empty(http.StatusCreated) +} + +func sendDeviceIssuesNotification(dataServiceContext dataService.Context) { + res := dataServiceContext.Response() + req := dataServiceContext.Request() + + responder := request.MustNewResponder(res, req) + + var data connectionissues.Metadata + if err := request.DecodeRequestBody(req.Request, &data); err != nil { + request.MustNewResponder(res, req).Error(http.StatusBadRequest, err) + return + } + + createDetails := connectionissues.NewWorkCreate(data) + _, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails) + if err != nil { + responder.Error(http.StatusInternalServerError, err) + return + } + + responder.Empty(http.StatusCreated) +} diff --git a/data/service/api/v1/v1.go b/data/service/api/v1/v1.go index 954882e1e..3c4d1f9aa 100644 --- a/data/service/api/v1/v1.go +++ b/data/service/api/v1/v1.go @@ -33,6 +33,7 @@ func Routes() []service.Route { routes = append(routes, SourcesRoutes()...) routes = append(routes, SummaryRoutes()...) routes = append(routes, AlertsRoutes()...) + routes = append(routes, NotificationRoutes()...) routes = append(routes, abbottServiceApiV1.Routes()...) return routes diff --git a/data/service/service/standard.go b/data/service/service/standard.go index 85d31e0df..c87d4b98f 100644 --- a/data/service/service/standard.go +++ b/data/service/service/standard.go @@ -3,9 +3,11 @@ package service import ( "context" "log" + "net/http" "os" "github.com/IBM/sarama" + "github.com/kelseyhightower/envconfig" eventsCommon "github.com/tidepool-org/go-common/events" @@ -13,7 +15,9 @@ import ( abbottProvider "github.com/tidepool-org/platform-plugin-abbott/abbott/provider" abbottWork "github.com/tidepool-org/platform-plugin-abbott/abbott/work" + confirmationClient "github.com/tidepool-org/hydrophone/client" "github.com/tidepool-org/platform/application" + "github.com/tidepool-org/platform/auth" "github.com/tidepool-org/platform/clinics" dataDeduplicatorDeduplicator "github.com/tidepool-org/platform/data/deduplicator/deduplicator" dataDeduplicatorFactory "github.com/tidepool-org/platform/data/deduplicator/factory" @@ -29,6 +33,7 @@ import ( "github.com/tidepool-org/platform/errors" "github.com/tidepool-org/platform/events" logInternal "github.com/tidepool-org/platform/log" + "github.com/tidepool-org/platform/mailer" metricClient "github.com/tidepool-org/platform/metric/client" oauthProvider "github.com/tidepool-org/platform/oauth/provider" "github.com/tidepool-org/platform/permission" @@ -41,8 +46,16 @@ import ( summaryClient "github.com/tidepool-org/platform/summary/client" syncTaskMongo "github.com/tidepool-org/platform/synctask/store/mongo" "github.com/tidepool-org/platform/twiist" + "github.com/tidepool-org/platform/user" + userClient "github.com/tidepool-org/platform/user/client" + "github.com/tidepool-org/platform/work" workService "github.com/tidepool-org/platform/work/service" workStoreStructuredMongo "github.com/tidepool-org/platform/work/store/structured/mongo" + + notifications "github.com/tidepool-org/platform/conditionalnotifications" + "github.com/tidepool-org/platform/conditionalnotifications/claimaccount" + "github.com/tidepool-org/platform/conditionalnotifications/connectaccount" + "github.com/tidepool-org/platform/conditionalnotifications/connectionissues" ) type Standard struct { @@ -55,13 +68,16 @@ type Standard struct { syncTaskStore *syncTaskMongo.Store workStructuredStore *workStoreStructuredMongo.Store dataDeduplicatorFactory *dataDeduplicatorFactory.Factory - clinicsClient *clinics.Client + clinicsClient clinics.Client dataClient *Client dataRawClient *dataRawService.Client dataSourceClient *dataSourceServiceClient.Client + mailerClient mailer.Mailer summaryClient *summaryClient.Client workClient *workService.Client abbottClient *abbottClient.Client + userClient user.Client + confirmationClient confirmationClient.ClientWithResponsesInterface workCoordinator *workService.Coordinator userEventsHandler events.Runner twiistServiceAccountAuthorizer *twiist.ServiceAccountAuthorizer @@ -116,9 +132,18 @@ func (s *Standard) Initialize(provider application.Provider) error { if err := s.initializeDataSourceClient(); err != nil { return err } + if err := s.initializeMailerClient(); err != nil { + return err + } + if err := s.initializeUserClient(); err != nil { + return err + } if err := s.initializeSummaryClient(); err != nil { return err } + if err := s.initializeConfirmationClient(); err != nil { + return err + } if err := s.initializeWorkClient(); err != nil { return err } @@ -470,7 +495,7 @@ func (s *Standard) initializeClinicsClient() error { if err != nil { return errors.Wrap(err, "unable to create clinics client") } - s.clinicsClient = &clnt + s.clinicsClient = clnt return nil } @@ -511,6 +536,65 @@ func (s *Standard) initializeDataSourceClient() error { return nil } +func (s *Standard) initializeMailerClient() error { + s.Logger().Debug("Initializing mailer client") + client, err := mailer.Client() + if err != nil { + return errors.Wrap(err, "unable to create mailer client") + } + s.mailerClient = client + return nil +} + +func (s *Standard) initializeUserClient() error { + s.Logger().Debug("Initializing user client") + client, err := userClient.NewDefaultClient(userClient.Params{ + ConfigReporter: s.ConfigReporter(), + Logger: s.Logger(), + UserAgent: s.UserAgent(), + }) + if err != nil { + return errors.Wrap(err, "unable to create user client") + } + s.userClient = client + return nil +} + +type confirmationClientConfig struct { + ServiceAddress string `envconfig:"TIDEPOOL_CONFIRMATION_CLIENT_ADDRESS"` +} + +func (c *confirmationClientConfig) Load() error { + return envconfig.Process("", c) +} + +func (s *Standard) initializeConfirmationClient() error { + s.Logger().Debug("Initializing confirmation client") + + cfg := &confirmationClientConfig{} + if err := cfg.Load(); err != nil { + return errors.Wrap(err, "unable to load confirmations client config") + } + + opts := confirmationClient.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { + token, err := s.AuthClient().ServerSessionToken() + if err != nil { + return err + } + + req.Header.Add(auth.TidepoolSessionTokenHeaderKey, token) + return nil + }) + + client, err := confirmationClient.NewClientWithResponses(cfg.ServiceAddress, opts) + if err != nil { + return errors.Wrap(err, "unable to create confirmations client") + } + s.confirmationClient = client + + return nil +} + func (s *Standard) initializeSummaryClient() error { s.Logger().Debug("Creating summarizer registry") @@ -618,6 +702,25 @@ func (s *Standard) initializeWorkCoordinator() error { return errors.Wrap(err, "unable to register abbott processors") } + notificationDependencies := notifications.Dependencies{ + Auth: s.AuthClient(), + Clinics: s.clinicsClient, + Confirmations: s.confirmationClient, + DataSources: s.dataSourceStructuredStore.NewDataSourcesRepository(), + Mailer: s.mailerClient, + Users: s.userClient, + Worker: s.workClient, + } + scheduledProcessors := []work.Processor{ + claimaccount.NewProcessor(notificationDependencies), + connectaccount.NewProcessor(notificationDependencies), + connectionissues.NewProcessor(notificationDependencies), + } + + if err = s.workCoordinator.RegisterProcessors(scheduledProcessors); err != nil { + return errors.Wrap(err, "unable to register conditional notifications processors") + } + s.Logger().Debug("Starting work coordinator") s.workCoordinator.Start() diff --git a/work/service/coordinator.go b/work/service/coordinator.go index e373cf557..de1b9408c 100644 --- a/work/service/coordinator.go +++ b/work/service/coordinator.go @@ -45,7 +45,6 @@ type Coordinator struct { managerContext context.Context managerCancelFunc context.CancelFunc managerWaitGroup sync.WaitGroup - timer *time.Timer } func NewCoordinator(logger log.Logger, serverSessionTokenProvider ServerSessionTokenProvider, workClient WorkClient) (*Coordinator, error) { @@ -156,9 +155,6 @@ func (c *Coordinator) startManager() { go func() { defer c.managerWaitGroup.Done() - c.startTimer() - defer c.stopTimer() - for { select { case <-c.managerContext.Done(): // Drain and complete any interrupted tasks @@ -167,13 +163,10 @@ func (c *Coordinator) startManager() { } return case completion := <-c.workersCompletionChannel: - c.stopTimer() c.completeWork(completion) c.requestAndDispatchWork() - c.startTimer() - case <-c.timer.C: + case <-c.tick(): c.requestAndDispatchWork() - c.startTimer() } } }() @@ -331,23 +324,10 @@ func (c *Coordinator) completeWork(completion *coordinatorProcessingCompletion) } } -func (c *Coordinator) startTimer() { +func (c *Coordinator) tick() <-chan time.Time { jitter := int64(float64(c.frequency) * CoordinatorDelayJitter) frequencyWithJitter := c.frequency + time.Duration(rand.Int63n(jitter*2+1)-jitter) - if c.timer == nil { - c.timer = time.NewTimer(frequencyWithJitter) - } else { - c.timer.Reset(frequencyWithJitter) - } -} - -func (c *Coordinator) stopTimer() { - if c.timer != nil { - if !c.timer.Stop() { - <-c.timer.C - } - c.timer = nil - } + return time.After(frequencyWithJitter) } type coordinatorProcessingIdentifier struct {