// Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 package reports import ( "context" "fmt" "log/slog" "strings" "time" grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" "github.com/absmach/magistrala/pkg/emailer" pkglog "github.com/absmach/magistrala/pkg/logger" "github.com/absmach/magistrala/pkg/reltime" "github.com/absmach/magistrala/pkg/ticker" "github.com/absmach/magistrala/reports/operations" "github.com/absmach/supermq" "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/errors" svcerr "github.com/absmach/supermq/pkg/errors/service" "github.com/absmach/supermq/pkg/policies" "github.com/absmach/supermq/pkg/roles" "github.com/absmach/supermq/pkg/transformers/senml" ) const limit = 1000 type report struct { repo Repository runInfo chan pkglog.RunInfo idp supermq.IDProvider email emailer.Emailer ticker ticker.Ticker readers grpcReadersV1.ReadersServiceClient defaultTemplate ReportTemplate converterURL string roles.ProvisionManageService } func NewService(repo Repository, runInfo chan pkglog.RunInfo, policy policies.Service, idp supermq.IDProvider, tck ticker.Ticker, emailer emailer.Emailer, readers grpcReadersV1.ReadersServiceClient, template ReportTemplate, converterURL string, availableActions []roles.Action, builtInRoles map[roles.BuiltInRoleName][]roles.Action) (Service, error) { rpms, err := roles.NewProvisionManageService(operations.EntityType, repo, policy, idp, availableActions, builtInRoles) if err != nil { return nil, err } return &report{ repo: repo, idp: idp, runInfo: runInfo, email: emailer, ticker: tck, readers: readers, defaultTemplate: template, converterURL: converterURL, ProvisionManageService: rpms, }, nil } func (r *report) AddReportConfig(ctx context.Context, session authn.Session, cfg ReportConfig) (retCfg ReportConfig, retErr error) { id, err := r.idp.ID() if err != nil { return ReportConfig{}, err } now := time.Now().UTC() cfg.ID = id cfg.CreatedAt = now cfg.CreatedBy = session.UserID cfg.DomainID = session.DomainID cfg.Status = EnabledStatus if cfg.Schedule.StartDateTime.IsZero() { cfg.Schedule.StartDateTime = now } cfg.Schedule.Time = cfg.Schedule.StartDateTime reportConfig, err := r.repo.AddReportConfig(ctx, cfg) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrCreateEntity, err) } defer func() { if retErr != nil { if errRollBack := r.repo.RemoveReportConfig(ctx, reportConfig.ID); errRollBack != nil { retErr = errors.Wrap(retErr, errors.Wrap(svcerr.ErrRollbackRepo, errRollBack)) } } }() newBuiltInRoleMembers := map[roles.BuiltInRoleName][]roles.Member{ BuiltInRoleAdmin: {roles.Member(session.UserID)}, } optionalPolicies := []policies.Policy{ { SubjectType: policies.DomainType, Subject: session.DomainID, Relation: policies.DomainRelation, ObjectType: operations.EntityType, Object: reportConfig.ID, }, } _, err = r.AddNewEntitiesRoles(ctx, session.DomainID, session.UserID, []string{reportConfig.ID}, optionalPolicies, newBuiltInRoleMembers) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrAddPolicies, err) } return reportConfig, nil } func (r *report) ViewReportConfig(ctx context.Context, session authn.Session, id string, withRoles bool) (ReportConfig, error) { var cfg ReportConfig var err error switch withRoles { case true: cfg, err = r.repo.RetrieveByIDWithRoles(ctx, id, session.UserID) default: cfg, err = r.repo.ViewReportConfig(ctx, id) } if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrViewEntity, err) } return cfg, nil } func (r *report) UpdateReportConfig(ctx context.Context, session authn.Session, cfg ReportConfig) (ReportConfig, error) { cfg.UpdatedAt = time.Now().UTC() cfg.UpdatedBy = session.UserID reportConfig, err := r.repo.UpdateReportConfig(ctx, cfg) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } return reportConfig, nil } func (r *report) UpdateReportSchedule(ctx context.Context, session authn.Session, cfg ReportConfig) (ReportConfig, error) { cfg.UpdatedAt = time.Now().UTC() cfg.UpdatedBy = session.UserID cfg.Schedule.Time = cfg.Schedule.StartDateTime c, err := r.repo.UpdateReportSchedule(ctx, cfg) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } return c, nil } func (r *report) RemoveReportConfig(ctx context.Context, session authn.Session, id string) error { if err := r.repo.RemoveReportConfig(ctx, id); err != nil { return errors.Wrap(svcerr.ErrRemoveEntity, err) } return nil } func (r *report) ListReportsConfig(ctx context.Context, session authn.Session, pm PageMeta) (ReportConfigPage, error) { pm.Domain = session.DomainID if session.SuperAdmin { page, err := r.repo.ListAllReportsConfig(ctx, pm) if err != nil { return ReportConfigPage{}, errors.Wrap(svcerr.ErrViewEntity, err) } return page, nil } page, err := r.repo.ListUserReportsConfig(ctx, session.UserID, pm) if err != nil { return ReportConfigPage{}, errors.Wrap(svcerr.ErrViewEntity, err) } return page, nil } func (r *report) EnableReportConfig(ctx context.Context, session authn.Session, id string) (ReportConfig, error) { status, err := ToStatus(Enabled) if err != nil { return ReportConfig{}, err } cfg := ReportConfig{ ID: id, UpdatedAt: time.Now().UTC(), UpdatedBy: session.UserID, Status: status, } cfg, err = r.repo.UpdateReportConfigStatus(ctx, cfg) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } return cfg, nil } func (r *report) DisableReportConfig(ctx context.Context, session authn.Session, id string) (ReportConfig, error) { status, err := ToStatus(Disabled) if err != nil { return ReportConfig{}, err } cfg := ReportConfig{ ID: id, UpdatedAt: time.Now().UTC(), UpdatedBy: session.UserID, Status: status, } cfg, err = r.repo.UpdateReportConfigStatus(ctx, cfg) if err != nil { return ReportConfig{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } return cfg, nil } func (r *report) GenerateReport(ctx context.Context, session authn.Session, config ReportConfig, action ReportAction) (ReportPage, error) { config.DomainID = session.DomainID if action != ViewReport && action != DownloadReport && action != EmailReport { if config.Status != EnabledStatus { return ReportPage{}, svcerr.ErrInvalidStatus } } reportPage, err := r.generateReport(ctx, config, action) if err != nil { return ReportPage{}, err } return reportPage, nil } func (r *report) generateReport(ctx context.Context, cfg ReportConfig, action ReportAction) (ReportPage, error) { genReportFile, err := r.generateFileFunc(ctx, action, cfg.Config.FileFormat, cfg.ReportTemplate, cfg.Config.Timezone) if err != nil { return ReportPage{}, err } agg := grpcReadersV1.Aggregation_AGGREGATION_UNSPECIFIED switch cfg.Config.Aggregation.AggType { case AggregationMAX: agg = grpcReadersV1.Aggregation_MAX case AggregationMIN: agg = grpcReadersV1.Aggregation_MIN case AggregationCOUNT: agg = grpcReadersV1.Aggregation_COUNT case AggregationAVG: agg = grpcReadersV1.Aggregation_AVG case AggregationSUM: agg = grpcReadersV1.Aggregation_SUM } loc, err := resolveTimezone(cfg.Config.Timezone) if err != nil { r.runInfo <- pkglog.RunInfo{ Level: slog.LevelWarn, Message: fmt.Sprintf("failed to resolve timezone '%s', falling back to UTC: %s", cfg.Config.Timezone, err), Details: []slog.Attr{ slog.String("report_name", cfg.Name), slog.String("timezone", cfg.Config.Timezone), }, } } from, err := reltime.Parse(cfg.Config.From) if err != nil { return ReportPage{}, err } to, err := reltime.Parse(cfg.Config.To) if err != nil { return ReportPage{}, err } fromDisplay := from.In(loc) toDisplay := to.In(loc) pm := &grpcReadersV1.PageMetadata{ Aggregation: agg, Limit: limit, From: float64(from.UnixNano()), To: float64(to.UnixNano()), Interval: cfg.Config.Aggregation.Interval, } var mets []Metric var reports []Report for _, metric := range cfg.Metrics { switch { case len(metric.ClientIDs) != 0: for _, clientID := range metric.ClientIDs { mets = append(mets, Metric{ ChannelID: metric.ChannelID, ClientID: clientID, Name: metric.Name, Subtopic: metric.Subtopic, Protocol: metric.Protocol, Format: metric.Format, }) } default: mets = append(mets, Metric{ ChannelID: metric.ChannelID, Name: metric.Name, Subtopic: metric.Subtopic, Protocol: metric.Protocol, Format: metric.Format, }) } } for _, metric := range mets { sMsgs := []senml.Message{} pm.Offset = uint64(0) pm.Name = metric.Name if metric.ClientID != "" { pm.Publisher = metric.ClientID } if metric.Subtopic != "" { pm.Subtopic = metric.Subtopic } if metric.Protocol != "" { pm.Protocol = metric.Protocol } if metric.Format != "" { pm.Format = metric.Format } msgs, err := r.readers.ReadMessages(ctx, &grpcReadersV1.ReadMessagesReq{ ChannelId: metric.ChannelID, DomainId: cfg.DomainID, PageMetadata: pm, }) if err != nil { return ReportPage{}, err } for _, msg := range msgs.Messages { sMsgs = append(sMsgs, convertToSenml(msg.GetSenml())) } for msgs.GetTotal() > (pm.Offset + pm.Limit) { pm.Offset = pm.Offset + pm.Limit msgs, err := r.readers.ReadMessages(ctx, &grpcReadersV1.ReadMessagesReq{ ChannelId: metric.ChannelID, DomainId: cfg.DomainID, PageMetadata: pm, }) if err != nil { return ReportPage{}, err } for _, msg := range msgs.Messages { sMsgs = append(sMsgs, convertToSenml(msg.GetSenml())) } } reports = append(reports, convertToReports(metric, sMsgs)...) } switch { case genReportFile != nil: data, err := genReportFile(ctx, cfg.Config.Title, reports) if err != nil { return ReportPage{}, err } timeStr := strings.ReplaceAll(time.Now().Format(time.RFC3339), ":", "") filePrefix := cfg.Name if filePrefix == "" { filePrefix = "report" } fileName := fmt.Sprintf("%s_%s.%s", filePrefix, timeStr, cfg.Config.FileFormat.Extension()) file := ReportFile{ Name: fileName, Data: data, Format: cfg.Config.FileFormat, } switch action { case EmailReport: if err := r.emailReports(*cfg.Email, file); err != nil { return ReportPage{}, errors.Wrap(err, svcerr.ErrCreateEntity) } return ReportPage{}, nil default: return ReportPage{ File: file, }, nil } default: return ReportPage{ From: fromDisplay, To: toDisplay, Aggregation: cfg.Config.Aggregation, Total: uint64(len(reports)), Reports: reports, }, nil } } func (r *report) generateFileFunc(_ context.Context, action ReportAction, format Format, customTemplate ReportTemplate, timezone string) (func(context.Context, string, []Report) ([]byte, error), error) { switch action { case DownloadReport, EmailReport: switch format { case PDF: return func(ctx context.Context, title string, reports []Report) ([]byte, error) { return r.generatePDFReport(ctx, title, reports, customTemplate, timezone) }, nil case CSV: return func(ctx context.Context, title string, reports []Report) ([]byte, error) { return r.generateCSVReport(ctx, title, reports, timezone) }, nil default: return nil, errors.New("file format not supported") } default: return nil, nil } } func (r *report) emailReports(es EmailSetting, file ReportFile) error { if err := es.Validate(); err != nil { return errors.Wrap(svcerr.ErrMalformedEntity, err) } attachments := map[string][]byte{ file.Name: file.Data, } if err := r.email.SendEmailNotification( es.To, "", es.Subject, "", "", es.Content, "", attachments, ); err != nil { return err } return nil } func convertToSenml(g *grpcReadersV1.SenMLMessage) senml.Message { if g == nil { return senml.Message{} } return senml.Message{ Protocol: g.Base.GetProtocol(), Subtopic: g.Base.GetSubtopic(), Publisher: g.Base.GetPublisher(), Channel: g.Base.GetChannel(), Name: g.GetName(), Unit: g.GetUnit(), Time: g.GetTime(), UpdateTime: g.GetUpdateTime(), Value: g.Value, StringValue: g.StringValue, DataValue: g.DataValue, BoolValue: g.BoolValue, Sum: g.Sum, } } func convertToReports(metric Metric, senmlMsgs []senml.Message) []Report { if metric.ClientID != "" { return []Report{ { Metric: metric, Messages: senmlMsgs, }, } } return groupReportsByPublisher(metric, senmlMsgs) } func groupReportsByPublisher(metric Metric, sMsgs []senml.Message) []Report { publishers := map[string][]senml.Message{} for _, msg := range sMsgs { publishers[msg.Publisher] = append(publishers[msg.Publisher], msg) } var groupedReports []Report for publisher, messages := range publishers { gMetric := metric gMetric.ClientID = publisher groupedReports = append(groupedReports, Report{ Metric: gMetric, Messages: messages, }) } if len(groupedReports) == 0 { groupedReports = append(groupedReports, Report{ Metric: metric, Messages: []senml.Message{}, }) } return groupedReports } func (r *report) UpdateReportTemplate(ctx context.Context, session authn.Session, cfg ReportConfig) error { err := r.repo.UpdateReportTemplate(ctx, session.DomainID, cfg.ID, cfg.ReportTemplate) if err != nil { return errors.Wrap(svcerr.ErrUpdateEntity, err) } return nil } func (r *report) ViewReportTemplate(ctx context.Context, session authn.Session, id string) (ReportTemplate, error) { template, err := r.repo.ViewReportTemplate(ctx, session.DomainID, id) if err != nil { return "", errors.Wrap(svcerr.ErrCreateEntity, err) } return template, nil } func (r *report) DeleteReportTemplate(ctx context.Context, session authn.Session, id string) error { err := r.repo.DeleteReportTemplate(ctx, session.DomainID, id) if err != nil { return errors.Wrap(svcerr.ErrRemoveEntity, err) } return nil }