align with ATOM changes

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2026-06-02 14:45:05 +05:30
parent 32319881a9
commit 78bcf0a1d5
22 changed files with 851 additions and 169 deletions
+132
View File
@@ -141,6 +141,138 @@ Magistrala provides a complete set of building blocks for IoT systems — from d
- Documentation focused on getting you running quickly
---
## Atom Integration Model
Magistrala uses **Atom** as the backend for identity, authorization, and the core catalog.
Atom is the source of truth for:
- domains
- users
- clients
- channels
- groups
- roles
- access policies
Magistrala services such as rules, alarms, and reports remain Magistrala services, but they use Atom for identity and authorization.
### Core Entity Mapping
| Magistrala concept | Atom concept | Meaning |
|--------------------|--------------|---------|
| Domain | Tenant | Isolation boundary for one organization, project, or environment |
| User | Entity with kind `human` | A person who logs in and uses the UI/API |
| Client | Entity with kind `device` | A device or application that sends/receives data |
| Channel | Resource with kind `channel` | A messaging/data path that clients can publish or subscribe to |
| Group | Group | A collection of users, clients, channels, or other grouped objects |
In simple terms:
```text
MG Domain = Atom Tenant
MG User = Atom Human Entity
MG Client = Atom Device Entity
MG Channel = Atom Channel Resource
MG Group = Atom Group
```
### Actions, Permission Blocks, Roles, and Assignments
Atom access control has these basic parts:
| Atom word | Simple meaning | Example |
|-----------|----------------|---------|
| Action | One permission verb | `read`, `write`, `delete`, `role.manage`, `policy.manage` |
| Permission Block | Where actions apply | all channels in domain `d1` can `read`, `publish` |
| Role | A bundle of permission blocks | `tenant-admin` bundles domain, role, and member access |
| Role Assignment | Who gets a role | give `user1` the `tenant-admin` role |
Read an assignment like this:
```text
Give <who> this <role>.
The role contains permission blocks that say where and what.
```
Example:
```text
Give user1 the tenant-admin role on domain d1.
```
That means:
```text
user1 can use the tenant-admin permissions inside domain d1.
```
### How MG Roles Work With Atom
MG UI shows actions such as:
- read
- update
- delete
- manage roles
- add/remove members
- publish
- subscribe
These are mapped to Atom actions:
| MG action | Atom action |
|-----------|-----------------|
| view/read | `read` |
| create/update/edit/connect | `write` |
| delete/remove | `delete` |
| manage roles | `role.manage` |
| add/remove members or access | `policy.manage` |
| channel publish | `publish` |
| channel subscribe | `subscribe` |
So when MG UI checks:
```text
Can user1 manage roles for client1?
```
Atom checks:
```text
Does user1 have role.manage on client1, or on the domain that contains client1?
```
When MG UI checks:
```text
Can user1 add a member to channel1?
```
Atom checks:
```text
Does user1 have policy.manage on channel1, or on the domain that contains channel1?
```
### Practical Rule
If a user is domain admin, they usually receive a tenant-scoped role in Atom.
That tenant-scoped role can allow them to manage objects inside the domain:
- clients
- channels
- groups
- rules
- alarms
- reports
For narrower access, create object-scoped roles. For example:
```text
Give user2 a reader role only on channel1.
```
Then user2 can read only that channel, not the whole domain.
## Installation
```bash
-12
View File
@@ -27,11 +27,9 @@ import (
"github.com/absmach/magistrala/pkg/permissions"
"github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
rconsumer "github.com/absmach/magistrala/pkg/re/events/consumer"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
rpostgres "github.com/absmach/magistrala/re/postgres"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)
@@ -51,8 +49,6 @@ type config struct {
InstanceID string `env:"MG_ALARMS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
ESConsumerName string `env:"MG_ALARMS_EVENT_CONSUMER" envDefault:"alarms"`
PermissionsFile string `env:"MG_PERMISSIONS_FILE" envDefault:"permission.yaml"`
}
@@ -117,14 +113,6 @@ func main() {
logger.Info("AuthN configured to use Atom bearer tokens")
logger.Info("AuthZ configured to use Atom PDP")
am := smqauthn.NewAuthNMiddleware(atomauthn.NewAuthentication())
rdatabase := postgres.NewDatabase(db, dbConfig, tracer)
rrepo := rpostgres.NewRepository(rdatabase)
if err := rconsumer.RulesEventsSubscribe(ctx, rrepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to rules events: %s", err))
exitCode = 1
return
}
idp := uuid.New()
+43 -3
View File
@@ -9,6 +9,7 @@ package main
import (
"context"
"errors"
"fmt"
"log"
"net/http"
@@ -27,7 +28,7 @@ import (
atomauthn "github.com/absmach/magistrala/pkg/authn/atom"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging"
broker "github.com/absmach/magistrala/pkg/messaging/brokers"
fluxmqbroker "github.com/absmach/magistrala/pkg/messaging/fluxmq"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
@@ -53,6 +54,27 @@ type config struct {
InstanceID string `env:"MG_FLUXMQ_INSTANCE_ID" envDefault:""`
}
type fanoutPublisher struct {
publishers []messaging.Publisher
}
func (fp fanoutPublisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
for _, publisher := range fp.publishers {
if err := publisher.Publish(ctx, topic, msg); err != nil {
return err
}
}
return nil
}
func (fp fanoutPublisher) Close() error {
errs := make([]error, 0, len(fp.publishers))
for _, publisher := range fp.publishers {
errs = append(errs, publisher.Close())
}
return errors.Join(errs...)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
@@ -153,13 +175,31 @@ func main() {
MaxHeaderBytes: grpcServerConfig.MaxHeaderBytes,
}
publisher, err := broker.NewPublisher(ctx, cfg.BrokerURL, broker.ConnectionName("fluxmq-ui-publish-proxy"))
messagePublisher, err := fluxmqbroker.NewUndeclaredPublisher(
ctx,
cfg.BrokerURL,
fluxmqbroker.ConnectionName("fluxmq-ui-message-publish-proxy"),
)
if err != nil {
logger.Error(fmt.Sprintf("failed to create publish proxy message publisher: %s", err))
exitCode = 1
return
}
defer publisher.Close()
defer messagePublisher.Close()
writerPublisher, err := fluxmqbroker.NewUndeclaredPublisher(
ctx,
cfg.BrokerURL,
fluxmqbroker.Prefix("writers"),
fluxmqbroker.ConnectionName("fluxmq-ui-publish-proxy"),
)
if err != nil {
logger.Error(fmt.Sprintf("failed to create publish proxy writer publisher: %s", err))
exitCode = 1
return
}
defer writerPublisher.Close()
publisher := fanoutPublisher{publishers: []messaging.Publisher{messagePublisher, writerPublisher}}
httpServerConfig := server.Config{Port: "9026"}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
+3 -2
View File
@@ -147,8 +147,8 @@ ATOM_JWKS_URL=http://atom:8080/.well-known/jwks.json
ATOM_JWT_ISSUER=http://nginx:80
ATOM_JWT_AUDIENCE=magistrala
ATOM_CORS_ALLOWED_ORIGINS=http://localhost:3000,http://localhost
ATOM_GRAPHQL_CONSOLE_ENABLED=true
ATOM_GRAPHQL_CONSOLE_DIST_DIR=/app/console/dist
ATOM_UI_RELEASE_TAG=latest
ATOM_UI_HTTP_PORT=3005
ATOM_SERVICE_TOKEN=
ATOM_SERVICE_USERNAME=mg-service
ATOM_SERVICE_SECRET=change-me-service-secret
@@ -756,6 +756,7 @@ MG_JOURNAL_URL=http://journal:9021
### UI Configuration
MG_UI_TYPE=mg
MG_UI_CLIENT_TYPE=client
MG_UI_BASE_PATH=/
MG_NEXTAUTH_BASE_PATH=/api/auth
NEXTAUTH_SECRET=4WdW0Z0tAOyQ/ZAI3YLVV/wNu+yUZXBLDDQ3AGrgfJ4=
+25 -2
View File
@@ -71,8 +71,6 @@ services:
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_CORS_ALLOWED_ORIGINS: ${ATOM_CORS_ALLOWED_ORIGINS}
ATOM_GRAPHQL_CONSOLE_ENABLED: ${ATOM_GRAPHQL_CONSOLE_ENABLED}
ATOM_GRAPHQL_CONSOLE_DIST_DIR: ${ATOM_GRAPHQL_CONSOLE_DIST_DIR}
ATOM_INVITATION_REDIRECT: ${ATOM_INVITATION_REDIRECT}
ATOM_INVITATION_EXPIRY_SECS: ${ATOM_INVITATION_EXPIRY_SECS}
ATOM_SMTP_HOST: ${ATOM_SMTP_HOST}
@@ -82,6 +80,7 @@ services:
ATOM_SMTP_FROM: ${ATOM_SMTP_FROM}
ATOM_SMTP_TLS: ${ATOM_SMTP_TLS}
ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_SERVICE_SECRET: ${ATOM_SERVICE_SECRET}
ATOM_MIN_PASSWORD_CHARS: ${ATOM_MIN_PASSWORD_CHARS}
RUST_LOG: ${ATOM_RUST_LOG}
ports:
@@ -89,6 +88,20 @@ services:
networks:
- magistrala-base-net
atom-ui:
image: ghcr.io/absmach/atom-ui:${ATOM_UI_RELEASE_TAG:-latest}
container_name: magistrala-atom-ui
restart: on-failure
profiles: ["atom-ui"]
depends_on:
- atom
environment:
ATOM_GRAPHQL_URL: http://atom:8080/graphql
ports:
- ${ATOM_UI_HTTP_PORT:-3005}:3000
networks:
- magistrala-base-net
domains-db:
image: docker.io/postgres:18.0-alpine3.22
container_name: magistrala-domains-db
@@ -1300,6 +1313,7 @@ services:
ui:
image: ghcr.io/absmach/magistrala/ui-mg:${MG_RELEASE_TAG}
container_name: magistrala-ui
restart: unless-stopped
ports:
- 3000:3000
networks:
@@ -1330,6 +1344,7 @@ services:
MG_UI_BASE_PATH: ${MG_UI_BASE_PATH}
MG_NEXTAUTH_BASE_PATH: ${MG_NEXTAUTH_BASE_PATH}
MG_UI_TYPE: ${MG_UI_TYPE}
MG_UI_CLIENT_TYPE: ${MG_UI_CLIENT_TYPE}
MG_UI_BASEURL: ${MG_UI_BASEURL}
NEXTAUTH_URL: ${NEXTAUTH_URL}
NEXTAUTH_SECRET: ${NEXTAUTH_SECRET}
@@ -1575,6 +1590,8 @@ services:
ATOM_SERVICE_USERNAME: ${ATOM_SERVICE_USERNAME}
ATOM_SERVICE_SECRET: ${ATOM_SERVICE_SECRET}
ATOM_JWKS_URL: ${ATOM_JWKS_URL}
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_ADMIN_TOKEN: ${ATOM_ADMIN_TOKEN}
ATOM_ADMIN_USERNAME: ${ATOM_ADMIN_USERNAME}
ATOM_ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
@@ -1695,6 +1712,8 @@ services:
ATOM_ADMIN_USERNAME: ${ATOM_ADMIN_USERNAME}
ATOM_ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_JWKS_URL: ${ATOM_JWKS_URL}
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_TIMEOUT: ${ATOM_TIMEOUT}
MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT}
MG_RE_HTTP_HOST: ${MG_RE_HTTP_HOST}
@@ -1778,6 +1797,8 @@ services:
ATOM_ADMIN_USERNAME: ${ATOM_ADMIN_USERNAME}
ATOM_ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_JWKS_URL: ${ATOM_JWKS_URL}
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_TIMEOUT: ${ATOM_TIMEOUT}
MG_ALARMS_HTTP_PORT: ${MG_ALARMS_HTTP_PORT}
MG_ALARMS_HTTP_HOST: ${MG_ALARMS_HTTP_HOST}
@@ -1840,6 +1861,8 @@ services:
ATOM_ADMIN_USERNAME: ${ATOM_ADMIN_USERNAME}
ATOM_ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_JWKS_URL: ${ATOM_JWKS_URL}
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_TIMEOUT: ${ATOM_TIMEOUT}
MG_REPORTS_HTTP_PORT: ${MG_REPORTS_HTTP_PORT}
MG_REPORTS_HTTP_HOST: ${MG_REPORTS_HTTP_HOST}
-12
View File
@@ -95,18 +95,6 @@ http {
proxy_pass http://$atom_upstream;
}
location ^~ /graphql/console {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://$atom_upstream;
}
location = /graphql/playground {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://$atom_upstream;
}
# Proxy pass to Atom custom GraphQL-backed endpoints
location ^~ /api/custom {
include snippets/proxy-headers.conf;
-12
View File
@@ -104,18 +104,6 @@ http {
proxy_pass http://$atom_upstream;
}
location ^~ /graphql/console {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://$atom_upstream;
}
location = /graphql/playground {
include snippets/proxy-headers.conf;
add_header Access-Control-Expose-Headers Location;
proxy_pass http://$atom_upstream;
}
# Proxy pass to Atom custom GraphQL-backed endpoints
location ^~ /api/custom {
include snippets/proxy-headers.conf;
+1 -1
View File
@@ -108,7 +108,7 @@ func (s *connectServer) Authorize(ctx context.Context, req *connect.Request[auth
SubjectID: req.Msg.GetExternalId(),
Action: "connect",
ResourceID: channelID,
ObjectKind: atom.KindChannel,
ObjectKind: "resource",
ObjectID: channelID,
Context: map[string]any{
"domain_id": domainID,
+3 -6
View File
@@ -110,10 +110,7 @@ func (h publishHandler) publish(w http.ResponseWriter, r *http.Request) {
}
subtopic := cleanSubtopic(req.Subtopic)
topic := fmt.Sprintf("m/%s/c/%s", domainID, channelID)
if subtopic != "" {
topic = topic + "/" + subtopic
}
topic := messaging.EncodeTopicSuffix(domainID, channelID, subtopic)
msg := &messaging.Message{
Domain: domainID,
@@ -146,7 +143,7 @@ func (h publishHandler) ensureUserPublish(
SubjectID: userID,
Action: "publish",
ResourceID: channelID,
ObjectKind: atom.KindChannel,
ObjectKind: "resource",
ObjectID: channelID,
Context: map[string]any{
"domain_id": domainID,
@@ -199,7 +196,7 @@ func (h publishHandler) ensureClientPublisher(
SubjectID: clientID,
Action: "publish",
ResourceID: channelID,
ObjectKind: atom.KindChannel,
ObjectKind: "resource",
ObjectID: channelID,
Context: map[string]any{
"domain_id": domainID,
+27
View File
@@ -7,8 +7,10 @@ import (
"context"
"testing"
channelsv1 "github.com/absmach/magistrala/api/grpc/channels/v1"
"github.com/absmach/magistrala/internal/atom"
"github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/pkg/policies"
"github.com/stretchr/testify/assert"
@@ -64,3 +66,28 @@ func TestAuthorizeDenied(t *testing.T) {
assert.True(t, errors.Contains(err, errors.ErrAuthorization))
}
func TestChannelsCompatAuthorizeBuildsResourceRequest(t *testing.T) {
client := &authzClient{res: atom.AuthzResponse{Allowed: true}}
compat := atom.NewChannelsCompat(client)
res, err := compat.Authorize(context.Background(), &channelsv1.AuthzReq{
ClientId: "domain-1_user-1",
DomainId: "domain-1",
Type: uint32(connections.Subscribe),
ChannelId: "channel-1",
})
assert.NoError(t, err)
assert.True(t, res.GetAuthorized())
assert.Equal(t, atom.AuthzRequest{
SubjectID: "user-1",
Action: "subscribe",
ResourceID: "channel-1",
ObjectKind: "resource",
ObjectID: "channel-1",
Context: map[string]any{
"domain_id": "domain-1",
},
}, client.req)
}
+117 -22
View File
@@ -265,12 +265,12 @@ func (c *Client) CheckAuthzWithToken(ctx context.Context, token string, req Auth
func (c *Client) ListCapabilities(ctx context.Context) (CapabilityList, error) {
var out struct {
Capabilities CapabilityList `json:"capabilities"`
Actions CapabilityList `json:"actions"`
}
err := c.graphQL(ctx, `query Capabilities {
capabilities { items { id name resource_kind: resourceKind description } }
err := c.graphQL(ctx, `query Actions {
actions { items { id name description } }
}`, nil, &out)
return out.Capabilities, err
return out.Actions, err
}
func (c *Client) CapabilityID(ctx context.Context, name string) (string, error) {
@@ -286,16 +286,69 @@ func (c *Client) CapabilityID(ctx context.Context, name string) (string, error)
return "", Error{StatusCode: http.StatusNotFound, Message: "capability " + name + " not found"}
}
func (c *Client) CreatePolicy(ctx context.Context, policy CreatePolicyBinding) (PolicyBinding, error) {
func (c *Client) CreatePermissionBlock(ctx context.Context, block CreatePermissionBlock) (PermissionBlock, error) {
var out struct {
CreatePolicy PolicyBinding `json:"createPolicy"`
CreatePermissionBlock PermissionBlock `json:"createPermissionBlock"`
}
err := c.graphQL(ctx, `mutation CreatePolicy($input: CreatePolicyInput!) {
createPolicy(input: $input) {
id tenant_id: tenantId subject_kind: subjectKind subject_id: subjectId grant_kind: grantKind grant_id: grantId scope_kind: scopeKind scope_ref: scopeRef effect conditions created_at: createdAt
err := c.graphQL(ctx, `mutation CreatePermissionBlock($input: CreatePermissionBlockInput!) {
createPermissionBlock(input: $input) {
id tenant_id: tenantId scope_mode: scopeMode object_kind: objectKind object_type: objectType object_id: objectId group_id: groupId effect conditions
actions { id name description }
}
}`, map[string]any{"input": policyInput(policy)}, &out)
return out.CreatePolicy, err
}`, map[string]any{"input": permissionBlockInput(block)}, &out)
return out.CreatePermissionBlock, err
}
func (c *Client) CreateDirectPolicy(ctx context.Context, policy CreateDirectPolicy) (DirectPolicy, error) {
var out struct {
CreateDirectPolicy DirectPolicy `json:"createDirectPolicy"`
}
err := c.graphQL(ctx, `mutation CreateDirectPolicy($input: CreateDirectPolicyInput!) {
createDirectPolicy(input: $input) {
id tenant_id: tenantId subject_kind: subjectKind subject_id: subjectId permission_block_id: permissionBlockId created_at: createdAt
permission_block: permissionBlock {
id tenant_id: tenantId scope_mode: scopeMode object_kind: objectKind object_type: objectType object_id: objectId group_id: groupId effect conditions
actions { id name description }
}
}
}`, map[string]any{"input": directPolicyInput(policy)}, &out)
return out.CreateDirectPolicy, err
}
func (c *Client) ListDirectPolicies(ctx context.Context, q DirectPolicyQuery) (DirectPolicyList, error) {
var out struct {
DirectPolicies DirectPolicyList `json:"directPolicies"`
}
err := c.graphQL(ctx, `query DirectPolicies($tenantId: ID, $subjectKind: SubjectKind, $subjectId: ID, $limit: Int, $offset: Int) {
directPolicies(tenantId: $tenantId, subjectKind: $subjectKind, subjectId: $subjectId, limit: $limit, offset: $offset) {
total
items {
id tenant_id: tenantId subject_kind: subjectKind subject_id: subjectId permission_block_id: permissionBlockId created_at: createdAt
permission_block: permissionBlock {
id tenant_id: tenantId scope_mode: scopeMode object_kind: objectKind object_type: objectType object_id: objectId group_id: groupId effect conditions
actions { id name description }
}
}
}
}`, directPolicyQueryVariables(q), &out)
return out.DirectPolicies, err
}
func (c *Client) DeleteDirectPolicy(ctx context.Context, id string) error {
return c.graphQL(ctx, `mutation DeleteDirectPolicy($id: ID!) { deleteDirectPolicy(id: $id) }`, map[string]any{"id": id}, nil)
}
func (c *Client) AuthorizedObjectIDs(ctx context.Context, q AuthorizedObjectIDsQuery) (AuthorizedObjectIDs, error) {
var out struct {
AuthorizedObjectIDs AuthorizedObjectIDs `json:"authorizedObjectIds"`
}
err := c.graphQL(ctx, `query AuthorizedObjectIDs($input: AuthorizedObjectIdsInput!) {
authorizedObjectIds(input: $input) {
ids
total
}
}`, authorizedObjectIDVariables(q), &out)
return out.AuthorizedObjectIDs, err
}
func (c *Client) LoginPassword(ctx context.Context, identifier, secret string) (LoginResponse, error) {
@@ -547,23 +600,65 @@ func authzInput(req AuthzRequest) map[string]any {
return input
}
func policyInput(policy CreatePolicyBinding) map[string]any {
func permissionBlockInput(block CreatePermissionBlock) map[string]any {
input := map[string]any{
"subjectKind": policy.SubjectKind,
"subjectId": policy.SubjectID,
"grantKind": policy.GrantKind,
"grantId": policy.GrantID,
"scopeKind": policy.ScopeKind,
"scopeMode": block.ScopeMode,
"actionIds": block.ActionIDs,
}
setIfNotEmpty(input, "tenantId", policy.TenantID)
setIfNotEmpty(input, "scopeRef", policy.ScopeRef)
setIfNotEmpty(input, "effect", policy.Effect)
if policy.Conditions != nil {
input["conditions"] = policy.Conditions
setIfNotEmpty(input, "tenantId", block.TenantID)
setIfNotEmpty(input, "objectKind", block.ObjectKind)
setIfNotEmpty(input, "objectType", block.ObjectType)
setIfNotEmpty(input, "objectId", block.ObjectID)
setIfNotEmpty(input, "groupId", block.GroupID)
setIfNotEmpty(input, "effect", block.Effect)
if block.Conditions != nil {
input["conditions"] = block.Conditions
}
return input
}
func directPolicyInput(policy CreateDirectPolicy) map[string]any {
input := map[string]any{
"subjectKind": policy.SubjectKind,
"subjectId": policy.SubjectID,
"permissionBlockId": policy.PermissionBlockID,
}
setIfNotEmpty(input, "tenantId", policy.TenantID)
return input
}
func directPolicyQueryVariables(q DirectPolicyQuery) map[string]any {
vars := map[string]any{}
setIfNotEmpty(vars, "tenantId", q.TenantID)
setIfNotEmpty(vars, "subjectKind", q.SubjectKind)
setIfNotEmpty(vars, "subjectId", q.SubjectID)
if q.Limit > 0 {
vars["limit"] = int(q.Limit)
}
if q.Offset > 0 {
vars["offset"] = int(q.Offset)
}
return vars
}
func authorizedObjectIDVariables(q AuthorizedObjectIDsQuery) map[string]any {
input := map[string]any{
"subjectId": q.SubjectID,
"action": q.Action,
"objectKind": q.ObjectKind,
}
setIfNotEmpty(input, "objectType", q.ObjectType)
setIfNotEmpty(input, "tenantId", q.TenantID)
setIfNotEmpty(input, "q", q.Q)
if q.Limit > 0 {
input["limit"] = int(q.Limit)
}
if q.Offset > 0 {
input["offset"] = int(q.Offset)
}
return map[string]any{"input": input}
}
func queryVariables(q Query) map[string]any {
vars := map[string]any{}
setIfNotEmpty(vars, "q", q.Q)
+1 -1
View File
@@ -120,7 +120,7 @@ func (c AtomChannelsCompat) Authorize(ctx context.Context, in *channelsv1.AuthzR
SubjectID: subjectID,
Action: action,
ResourceID: in.GetChannelId(),
ObjectKind: KindChannel,
ObjectKind: "resource",
ObjectID: in.GetChannelId(),
Context: map[string]any{
"domain_id": in.GetDomainId(),
+165 -31
View File
@@ -16,7 +16,15 @@ var errUnsupportedPolicyOperation = stderrors.New("atom policy service: unsuppor
type policyClient interface {
Authorizer
ListEntities(ctx context.Context, q Query) (EntityList, error)
AuthorizedObjectIDs(ctx context.Context, q AuthorizedObjectIDsQuery) (AuthorizedObjectIDs, error)
}
type policyWriter interface {
CapabilityID(ctx context.Context, name string) (string, error)
CreatePermissionBlock(ctx context.Context, block CreatePermissionBlock) (PermissionBlock, error)
CreateDirectPolicy(ctx context.Context, policy CreateDirectPolicy) (DirectPolicy, error)
ListDirectPolicies(ctx context.Context, q DirectPolicyQuery) (DirectPolicyList, error)
DeleteDirectPolicy(ctx context.Context, id string) error
}
type PolicyService struct {
@@ -27,20 +35,82 @@ func NewPolicyService(client policyClient) PolicyService {
return PolicyService{client: client}
}
func (ps PolicyService) AddPolicy(context.Context, policies.Policy) error {
return errUnsupportedPolicyOperation
func (ps PolicyService) AddPolicy(ctx context.Context, pr policies.Policy) error {
writer, ok := ps.client.(policyWriter)
if !ok {
return errUnsupportedPolicyOperation
}
capID, err := writer.CapabilityID(ctx, CapabilityName(pr.Permission))
if err != nil {
return err
}
block, err := writer.CreatePermissionBlock(ctx, CreatePermissionBlock{
TenantID: pr.Domain,
ScopeMode: policyGrantScopeMode(pr),
ObjectKind: policyGrantObjectKind(pr),
ObjectType: policyGrantObjectType(pr),
ObjectID: policyGrantObjectID(pr),
Effect: "allow",
Conditions: map[string]any{},
ActionIDs: []string{capID},
})
if err != nil {
return err
}
_, err = writer.CreateDirectPolicy(ctx, CreateDirectPolicy{
TenantID: pr.Domain,
SubjectKind: policyGrantSubjectKind(pr),
SubjectID: policySubjectID(pr),
PermissionBlockID: block.ID,
})
return err
}
func (ps PolicyService) AddPolicies(context.Context, []policies.Policy) error {
return errUnsupportedPolicyOperation
func (ps PolicyService) AddPolicies(ctx context.Context, prs []policies.Policy) error {
for _, pr := range prs {
if err := ps.AddPolicy(ctx, pr); err != nil {
return err
}
}
return nil
}
func (ps PolicyService) DeletePolicyFilter(context.Context, policies.Policy) error {
return errUnsupportedPolicyOperation
func (ps PolicyService) DeletePolicyFilter(ctx context.Context, pr policies.Policy) error {
writer, ok := ps.client.(policyWriter)
if !ok {
return errUnsupportedPolicyOperation
}
capID, err := writer.CapabilityID(ctx, CapabilityName(pr.Permission))
if err != nil {
return err
}
page, err := writer.ListDirectPolicies(ctx, DirectPolicyQuery{
TenantID: pr.Domain,
SubjectKind: policyGrantSubjectKind(pr),
SubjectID: policySubjectID(pr),
Limit: policyPageLimit,
})
if err != nil {
return err
}
for _, policy := range page.Items {
if !directPolicyMatches(policy, capID, pr) {
continue
}
if err := writer.DeleteDirectPolicy(ctx, policy.ID); err != nil {
return err
}
}
return nil
}
func (ps PolicyService) DeletePolicies(context.Context, []policies.Policy) error {
return errUnsupportedPolicyOperation
func (ps PolicyService) DeletePolicies(ctx context.Context, prs []policies.Policy) error {
for _, pr := range prs {
if err := ps.DeletePolicyFilter(ctx, pr); err != nil {
return err
}
}
return nil
}
func (ps PolicyService) ListObjects(ctx context.Context, pr policies.Policy, _ string, limit uint64) (policies.PolicyPage, error) {
@@ -62,34 +132,22 @@ func (ps PolicyService) ListAllObjects(ctx context.Context, pr policies.Policy)
var ids []string
for offset := uint64(0); ; offset += policyPageLimit {
page, err := ps.client.ListEntities(ctx, Query{
Kind: entityKind(KindClient),
Limit: policyPageLimit,
Offset: offset,
page, err := ps.client.AuthorizedObjectIDs(ctx, AuthorizedObjectIDsQuery{
SubjectID: policySubjectID(pr),
Action: CapabilityName(pr.Permission),
ObjectKind: policyObjectKind(pr),
ObjectType: entityKind(KindClient),
TenantID: pr.Domain,
Limit: policyPageLimit,
Offset: offset,
})
if err != nil {
return policies.PolicyPage{}, err
}
for _, entity := range page.Items {
allowed, err := ps.client.CheckAuthz(ctx, AuthzRequest{
SubjectID: policySubjectID(pr),
Action: pr.Permission,
ObjectKind: policyObjectKind(pr),
ObjectID: entity.ID,
Context: map[string]any{
"legacy_object_type": pr.ObjectType,
},
})
if err != nil {
return policies.PolicyPage{}, err
}
if allowed.Allowed {
ids = append(ids, entity.ID)
}
}
ids = append(ids, page.IDs...)
if uint64(len(page.Items)) < policyPageLimit || offset+uint64(len(page.Items)) >= page.Total {
if uint64(len(page.IDs)) < policyPageLimit || offset+uint64(len(page.IDs)) >= page.Total {
break
}
}
@@ -127,3 +185,79 @@ func isSupportedObjectList(pr policies.Policy) bool {
pr.ObjectType == policies.ClientType &&
pr.Permission == policies.ViewPermission
}
func policyGrantSubjectKind(pr policies.Policy) string {
if pr.SubjectType == policies.GroupType || pr.SubjectKind == policies.GroupsKind {
return "group"
}
return "entity"
}
func policyGrantScopeMode(pr policies.Policy) string {
switch pr.ObjectType {
case policies.PlatformType:
return "platform"
case policies.DomainType:
return "tenant"
default:
return "object"
}
}
func policyGrantObjectKind(pr policies.Policy) string {
if policyGrantScopeMode(pr) != "object" {
return ""
}
if pr.ObjectType == policies.ClientType {
return "entity"
}
return "resource"
}
func policyGrantObjectType(pr policies.Policy) string {
if policyGrantScopeMode(pr) != "object" {
return ""
}
if pr.ObjectType == policies.ClientType {
return "entity:" + entityKind(KindClient)
}
switch pr.ObjectType {
case policies.ChannelType:
return "resource:" + KindChannel
case policies.RulesType:
return "resource:" + KindRule
case policies.ReportsType:
return "resource:" + KindReport
case policies.AlarmsType:
return "resource:" + KindAlarm
case policies.GroupType:
return ""
default:
return ""
}
}
func policyGrantObjectID(pr policies.Policy) string {
if policyGrantScopeMode(pr) != "object" {
return ""
}
return policyResourceID(pr)
}
func directPolicyMatches(policy DirectPolicy, actionID string, pr policies.Policy) bool {
block := policy.PermissionBlock
if block.ID == "" || block.ScopeMode != policyGrantScopeMode(pr) {
return false
}
if block.ObjectKind != policyGrantObjectKind(pr) ||
block.ObjectType != policyGrantObjectType(pr) ||
block.ObjectID != policyGrantObjectID(pr) {
return false
}
for _, action := range block.Actions {
if action.ID == actionID {
return true
}
}
return false
}
+142 -20
View File
@@ -11,30 +11,63 @@ import (
)
type fakePolicyClient struct {
entities []Entity
allowed map[string]bool
checks []AuthzRequest
authorized AuthorizedObjectIDs
queries []AuthorizedObjectIDsQuery
capID string
blocks []CreatePermissionBlock
created []CreateDirectPolicy
policies []DirectPolicy
deleted []string
}
func (f *fakePolicyClient) ListEntities(context.Context, Query) (EntityList, error) {
return EntityList{
Items: f.entities,
Total: uint64(len(f.entities)),
func (f *fakePolicyClient) AuthorizedObjectIDs(_ context.Context, q AuthorizedObjectIDsQuery) (AuthorizedObjectIDs, error) {
f.queries = append(f.queries, q)
return f.authorized, nil
}
func (f *fakePolicyClient) CheckAuthz(context.Context, AuthzRequest) (AuthzResponse, error) {
return AuthzResponse{Allowed: true}, nil
}
func (f *fakePolicyClient) CapabilityID(context.Context, string) (string, error) {
if f.capID == "" {
return "cap-publish", nil
}
return f.capID, nil
}
func (f *fakePolicyClient) CreatePermissionBlock(_ context.Context, block CreatePermissionBlock) (PermissionBlock, error) {
f.blocks = append(f.blocks, block)
return PermissionBlock{
ID: "block-1",
TenantID: block.TenantID,
ScopeMode: block.ScopeMode,
ObjectKind: block.ObjectKind,
ObjectType: block.ObjectType,
ObjectID: block.ObjectID,
Effect: block.Effect,
Conditions: block.Conditions,
Actions: []Capability{{ID: block.ActionIDs[0]}},
}, nil
}
func (f *fakePolicyClient) CheckAuthz(_ context.Context, req AuthzRequest) (AuthzResponse, error) {
f.checks = append(f.checks, req)
return AuthzResponse{Allowed: f.allowed[req.ObjectID]}, nil
func (f *fakePolicyClient) CreateDirectPolicy(_ context.Context, policy CreateDirectPolicy) (DirectPolicy, error) {
f.created = append(f.created, policy)
return DirectPolicy{ID: "policy-1", PermissionBlockID: policy.PermissionBlockID}, nil
}
func TestPolicyServiceListAllObjectsFiltersByAtomAuthz(t *testing.T) {
func (f *fakePolicyClient) ListDirectPolicies(context.Context, DirectPolicyQuery) (DirectPolicyList, error) {
return DirectPolicyList{Items: f.policies, Total: uint64(len(f.policies))}, nil
}
func (f *fakePolicyClient) DeleteDirectPolicy(_ context.Context, id string) error {
f.deleted = append(f.deleted, id)
return nil
}
func TestPolicyServiceListAllObjectsUsesAtomAuthorizedObjectIds(t *testing.T) {
client := &fakePolicyClient{
entities: []Entity{
{ID: "client-1", Kind: entityKind(KindClient)},
{ID: "client-2", Kind: entityKind(KindClient)},
},
allowed: map[string]bool{"client-2": true},
authorized: AuthorizedObjectIDs{IDs: []string{"client-2"}, Total: 1},
}
svc := NewPolicyService(client)
@@ -51,11 +84,100 @@ func TestPolicyServiceListAllObjectsFiltersByAtomAuthz(t *testing.T) {
if len(page.Policies) != 1 || page.Policies[0] != "client-2" {
t.Fatalf("unexpected policies: %+v", page.Policies)
}
if len(client.checks) != 2 {
t.Fatalf("unexpected authz checks: %d", len(client.checks))
if len(client.queries) != 1 {
t.Fatalf("unexpected authorized object queries: %d", len(client.queries))
}
if client.checks[0].SubjectID != "user-1" || client.checks[0].Action != policies.ViewPermission || client.checks[0].ObjectKind != "entity" {
t.Fatalf("unexpected authz request: %+v", client.checks[0])
query := client.queries[0]
if query.SubjectID != "user-1" ||
query.Action != "read" ||
query.ObjectKind != "entity" ||
query.ObjectType != entityKind(KindClient) ||
query.TenantID != "domain-1" {
t.Fatalf("unexpected authorized object query: %+v", query)
}
}
func TestPolicyServiceAddPolicyCreatesInternalCapabilityPolicy(t *testing.T) {
client := &fakePolicyClient{capID: "cap-publish"}
svc := NewPolicyService(client)
err := svc.AddPolicy(context.Background(), policies.Policy{
Domain: "domain-1",
Subject: "domain-1_client-1",
SubjectType: policies.ClientType,
Object: "channel-1",
ObjectType: policies.ChannelType,
Permission: policies.PublishPermission,
})
if err != nil {
t.Fatalf("add policy failed: %v", err)
}
if len(client.blocks) != 1 || len(client.created) != 1 {
t.Fatalf("expected one permission block and direct policy, got %d/%d", len(client.blocks), len(client.created))
}
block := client.blocks[0]
if block.TenantID != "domain-1" ||
block.ScopeMode != "object" ||
block.ObjectKind != "resource" ||
block.ObjectType != "resource:channel" ||
block.ObjectID != "channel-1" ||
block.Effect != "allow" ||
len(block.ActionIDs) != 1 ||
block.ActionIDs[0] != "cap-publish" {
t.Fatalf("unexpected permission block: %+v", block)
}
created := client.created[0]
if created.TenantID != "domain-1" ||
created.SubjectKind != "entity" ||
created.SubjectID != "client-1" ||
created.PermissionBlockID != "block-1" {
t.Fatalf("unexpected direct policy: %+v", created)
}
}
func TestPolicyServiceDeletePolicyFilterRemovesMatchingCapabilityPolicy(t *testing.T) {
client := &fakePolicyClient{
capID: "cap-subscribe",
policies: []DirectPolicy{
{
ID: "keep",
PermissionBlock: PermissionBlock{
ID: "keep-block",
ScopeMode: "object",
ObjectKind: "resource",
ObjectType: "resource:channel",
ObjectID: "channel-1",
Actions: []Capability{{ID: "cap-other"}},
},
},
{
ID: "delete",
PermissionBlock: PermissionBlock{
ID: "delete-block",
ScopeMode: "object",
ObjectKind: "resource",
ObjectType: "resource:channel",
ObjectID: "channel-1",
Actions: []Capability{{ID: "cap-subscribe"}},
},
},
},
}
svc := NewPolicyService(client)
err := svc.DeletePolicyFilter(context.Background(), policies.Policy{
Domain: "domain-1",
Subject: "domain-1_client-1",
SubjectType: policies.ClientType,
Object: "channel-1",
ObjectType: policies.ChannelType,
Permission: policies.SubscribePermission,
})
if err != nil {
t.Fatalf("delete policy failed: %v", err)
}
if len(client.deleted) != 1 || client.deleted[0] != "delete" {
t.Fatalf("unexpected deleted policies: %+v", client.deleted)
}
}
+70 -26
View File
@@ -81,40 +81,84 @@ type AuthzResponse struct {
}
type Capability struct {
ID string `json:"id"`
Name string `json:"name"`
ResourceKind string `json:"resource_kind,omitempty"`
Description string `json:"description,omitempty"`
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
}
type CapabilityList struct {
Items []Capability `json:"items"`
}
type PolicyBinding struct {
ID string `json:"id"`
TenantID string `json:"tenant_id,omitempty"`
SubjectKind string `json:"subject_kind"`
SubjectID string `json:"subject_id"`
GrantKind string `json:"grant_kind"`
GrantID string `json:"grant_id"`
ScopeKind string `json:"scope_kind"`
ScopeRef string `json:"scope_ref,omitempty"`
Effect string `json:"effect"`
Conditions map[string]any `json:"conditions,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
type PermissionBlock struct {
ID string `json:"id"`
TenantID string `json:"tenant_id,omitempty"`
ScopeMode string `json:"scope_mode"`
ObjectKind string `json:"object_kind,omitempty"`
ObjectType string `json:"object_type,omitempty"`
ObjectID string `json:"object_id,omitempty"`
GroupID string `json:"group_id,omitempty"`
Effect string `json:"effect"`
Conditions map[string]any `json:"conditions,omitempty"`
Actions []Capability `json:"actions,omitempty"`
}
type CreatePolicyBinding struct {
TenantID string `json:"tenant_id,omitempty"`
SubjectKind string `json:"subject_kind"`
SubjectID string `json:"subject_id"`
GrantKind string `json:"grant_kind"`
GrantID string `json:"grant_id"`
ScopeKind string `json:"scope_kind"`
ScopeRef string `json:"scope_ref,omitempty"`
Effect string `json:"effect,omitempty"`
Conditions map[string]any `json:"conditions,omitempty"`
type CreatePermissionBlock struct {
TenantID string `json:"tenant_id,omitempty"`
ScopeMode string `json:"scope_mode"`
ObjectKind string `json:"object_kind,omitempty"`
ObjectType string `json:"object_type,omitempty"`
ObjectID string `json:"object_id,omitempty"`
GroupID string `json:"group_id,omitempty"`
Effect string `json:"effect,omitempty"`
Conditions map[string]any `json:"conditions,omitempty"`
ActionIDs []string `json:"action_ids"`
}
type DirectPolicy struct {
ID string `json:"id"`
TenantID string `json:"tenant_id,omitempty"`
SubjectKind string `json:"subject_kind"`
SubjectID string `json:"subject_id"`
PermissionBlockID string `json:"permission_block_id"`
PermissionBlock PermissionBlock `json:"permission_block,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
}
type CreateDirectPolicy struct {
TenantID string `json:"tenant_id,omitempty"`
SubjectKind string `json:"subject_kind"`
SubjectID string `json:"subject_id"`
PermissionBlockID string `json:"permission_block_id"`
}
type DirectPolicyQuery struct {
TenantID string
SubjectKind string
SubjectID string
Limit uint64
Offset uint64
}
type DirectPolicyList struct {
Items []DirectPolicy `json:"items"`
Total uint64 `json:"total"`
}
type AuthorizedObjectIDsQuery struct {
SubjectID string
Action string
ObjectKind string
ObjectType string
TenantID string
Q string
Limit uint64
Offset uint64
}
type AuthorizedObjectIDs struct {
IDs []string `json:"ids"`
Total uint64 `json:"total"`
}
type TokenClaims struct {
+9 -2
View File
@@ -39,7 +39,7 @@ func (am *authorizationMiddleware) RetrieveAll(ctx context.Context, session smqa
permission := readPermission
objectType := page.EntityType.String()
object := page.EntityID
subject := session.DomainUserID
subject := subjectID(session)
// If the entity is a user, we need to check if the user is an admin
if page.EntityType.String() == policies.UserType {
@@ -70,7 +70,7 @@ func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context,
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.DomainUserID,
Subject: subjectID(session),
Permission: readPermission,
ObjectType: policies.ClientType,
Object: clientID,
@@ -82,3 +82,10 @@ func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context,
return am.svc.RetrieveClientTelemetry(ctx, session, clientID)
}
func subjectID(session smqauthn.Session) string {
if session.UserID != "" {
return session.UserID
}
return session.DomainUserID
}
+16 -4
View File
@@ -21,7 +21,17 @@ type publisher struct {
}
// NewPublisher creates a FluxMQ-backed message publisher.
func NewPublisher(_ context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) {
func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) {
return newPublisher(ctx, url, true, opts...)
}
// NewUndeclaredPublisher creates a FluxMQ-backed publisher without declaring
// the stream queue. Use it only when another service owns queue declaration.
func NewUndeclaredPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) {
return newPublisher(ctx, url, false, opts...)
}
func newPublisher(_ context.Context, url string, declare bool, opts ...messaging.Option) (messaging.Publisher, error) {
pub := &publisher{
options: defaultOptions(),
}
@@ -52,9 +62,11 @@ func NewPublisher(_ context.Context, url string, opts ...messaging.Option) (mess
if err := client.Connect(); err != nil {
return nil, err
}
if err := declareStream(client, pub.prefix); err != nil {
_ = client.Close()
return nil, err
if declare {
if err := declareStream(client, pub.prefix); err != nil {
_ = client.Close()
return nil, err
}
}
pub.client = client
+6 -6
View File
@@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
@@ -226,11 +225,12 @@ func messageFromDelivery(body []byte, headers map[string]any, ts time.Time, pref
protocol = "mqtt"
}
created := ts.UnixNano()
if s := stringHeader(headers, "created"); s != "" {
if v, err := strconv.ParseInt(s, 10, 64); err == nil {
created = v
}
created := time.Now().UnixNano()
if !ts.IsZero() {
created = ts.UnixNano()
}
if v, ok := int64Header(headers, "created"); ok {
created = v
}
return &messaging.Message{
+14 -2
View File
@@ -139,7 +139,7 @@ func TestMessageFromDelivery(t *testing.T) {
{
name: "use explicit publisher header when present",
body: []byte("raw"),
headers: map[string]any{"external_id": "tenant-user", "client_id": "client-22"},
headers: map[string]any{"external_id": "tenant-user", "client_id": "client-22", "created": int64(1710000000000000250)},
ts: time.Unix(1710000000, 250),
prefix: "m",
mqttTopic: "m/dom/c/ch",
@@ -151,7 +151,7 @@ func TestMessageFromDelivery(t *testing.T) {
Publisher: "tenant-user",
ClientId: "client-22",
Protocol: "mqtt",
Created: time.Unix(1710000000, 250).UnixNano(),
Created: 1710000000000000250,
},
},
{
@@ -214,3 +214,15 @@ func TestMessageFromDelivery(t *testing.T) {
})
}
}
func TestMessageFromDeliveryZeroTimestampFallsBackToNow(t *testing.T) {
before := time.Now().UnixNano()
got, err := messageFromDelivery([]byte("raw"), nil, time.Time{}, "m", "m/dom/c/ch")
after := time.Now().UnixNano()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got.Created < before || got.Created > after {
t.Fatalf("expected created timestamp between %d and %d, got %d", before, after, got.Created)
}
}
+27
View File
@@ -5,6 +5,7 @@ package fluxmq
import (
"fmt"
"strconv"
"strings"
fluxamqp "github.com/absmach/fluxmq/client/amqp"
@@ -113,6 +114,32 @@ func stringHeader(headers map[string]any, key string) string {
}
}
func int64Header(headers map[string]any, key string) (int64, bool) {
if headers == nil {
return 0, false
}
v, ok := headers[key]
if !ok {
return 0, false
}
switch val := v.(type) {
case int64:
return val, true
case int:
return int64(val), true
case int32:
return int64(val), true
case string:
parsed, err := strconv.ParseInt(val, 10, 64)
return parsed, err == nil
case []byte:
parsed, err := strconv.ParseInt(string(val), 10, 64)
return parsed, err == nil
default:
return 0, false
}
}
func declareStream(client *fluxamqp.Client, prefix string) error {
_, err := client.DeclareStreamQueue(&fluxamqp.StreamQueueOptions{
Name: prefix,
+7 -5
View File
@@ -3,14 +3,16 @@
package messaging
// ClientIdentity returns the transport client identifier carried by the message.
// It falls back to Publisher for backward compatibility with older messages.
// ClientIdentity returns the authenticated application identity carried by the
// message. FluxMQ stores the protocol connection identifier in client_id and the
// Atom/Magistrala entity identifier in publisher/external_id, so publisher wins
// when both are present.
func (m *Message) ClientIdentity() string {
if m == nil {
return ""
}
if clientID := m.GetClientId(); clientID != "" {
return clientID
if publisher := m.GetPublisher(); publisher != "" {
return publisher
}
return m.GetPublisher()
return m.GetClientId()
}
+43
View File
@@ -0,0 +1,43 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package messaging
import "testing"
func TestClientIdentity(t *testing.T) {
cases := []struct {
name string
msg *Message
want string
}{
{
name: "nil message",
msg: nil,
want: "",
},
{
name: "publisher wins over transport client id",
msg: &Message{
Publisher: "entity-1",
ClientId: "amqp091:connection",
},
want: "entity-1",
},
{
name: "fallback to transport client id for legacy messages",
msg: &Message{
ClientId: "legacy-client",
},
want: "legacy-client",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := tc.msg.ClientIdentity(); got != tc.want {
t.Fatalf("expected %q, got %q", tc.want, got)
}
})
}
}