mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
SMQ-2801 - Add health check endpoint to HTTP adapter (#3140)
Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
@@ -21,3 +21,14 @@ func sendMessageEndpoint() endpoint.Endpoint {
|
||||
return publishMessageRes{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func healthCheckEndpoint() endpoint.Endpoint {
|
||||
return func(ctx context.Context, request any) (any, error) {
|
||||
req := request.(healthCheckReq)
|
||||
if err := req.validate(); err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
|
||||
return healthCheckRes{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,3 +345,124 @@ func TestPublish(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthCheck(t *testing.T) {
|
||||
clients := new(climocks.ClientsServiceClient)
|
||||
authn := new(authnMocks.Authentication)
|
||||
channels := new(chmocks.ChannelsServiceClient)
|
||||
domains := new(dmocks.DomainsServiceClient)
|
||||
clientKey := "client_key"
|
||||
invalidKey := invalidValue
|
||||
validToken := "token"
|
||||
invalidToken := "invalid_token"
|
||||
svc, _, err := newService(authn, clients, channels, domains)
|
||||
assert.Nil(t, err, fmt.Sprintf("failed to create service with err: %v", err))
|
||||
target := newTargetHTTPServer()
|
||||
defer target.Close()
|
||||
ts, err := newProxyHTPPServer(svc, target)
|
||||
assert.Nil(t, err, fmt.Sprintf("failed to create proxy server with err: %v", err))
|
||||
|
||||
defer ts.Close()
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
domainID string
|
||||
clientID string
|
||||
clientType string
|
||||
key string
|
||||
status int
|
||||
basicAuth bool
|
||||
bearerToken bool
|
||||
authnErr error
|
||||
authnRes *grpcClientsV1.AuthnRes
|
||||
authnRes1 smqauthn.Session
|
||||
}{
|
||||
{
|
||||
desc: "health check successfully",
|
||||
domainID: domainID,
|
||||
key: clientKey,
|
||||
status: http.StatusOK,
|
||||
authnRes: &grpcClientsV1.AuthnRes{Id: clientID, Authenticated: true},
|
||||
},
|
||||
{
|
||||
desc: "health check with basic auth",
|
||||
domainID: domainID,
|
||||
key: clientKey,
|
||||
basicAuth: true,
|
||||
status: http.StatusOK,
|
||||
authnRes: &grpcClientsV1.AuthnRes{Id: clientID, Authenticated: true},
|
||||
},
|
||||
{
|
||||
desc: "health check with invalid key",
|
||||
domainID: domainID,
|
||||
key: invalidKey,
|
||||
status: http.StatusUnauthorized,
|
||||
authnRes: &grpcClientsV1.AuthnRes{Authenticated: false},
|
||||
},
|
||||
{
|
||||
desc: "health check with invalid basic auth",
|
||||
domainID: domainID,
|
||||
key: invalidKey,
|
||||
basicAuth: true,
|
||||
status: http.StatusUnauthorized,
|
||||
authnRes: &grpcClientsV1.AuthnRes{Authenticated: false},
|
||||
},
|
||||
{
|
||||
desc: "health check with valid bearer token",
|
||||
domainID: domainID,
|
||||
key: validToken,
|
||||
bearerToken: true,
|
||||
status: http.StatusOK,
|
||||
authnRes1: smqauthn.Session{UserID: userID},
|
||||
},
|
||||
{
|
||||
desc: "health check with invalid bearer token",
|
||||
domainID: domainID,
|
||||
key: invalidToken,
|
||||
bearerToken: true,
|
||||
status: http.StatusUnauthorized,
|
||||
authnRes1: smqauthn.Session{},
|
||||
authnErr: svcerr.ErrAuthentication,
|
||||
},
|
||||
{
|
||||
desc: "health check with empty key",
|
||||
domainID: domainID,
|
||||
key: "",
|
||||
status: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
desc: "health check with empty domain ID",
|
||||
domainID: "",
|
||||
key: clientKey,
|
||||
status: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
desc: "health check with invalid domain ID",
|
||||
domainID: invalidValue,
|
||||
key: clientKey,
|
||||
status: http.StatusUnauthorized,
|
||||
authnRes: &grpcClientsV1.AuthnRes{},
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
clientsCall := clients.On("Authenticate", mock.Anything, &grpcClientsV1.AuthnReq{Token: smqauthn.AuthPack(smqauthn.DomainAuth, tc.domainID, tc.key)}).Return(tc.authnRes, tc.authnErr)
|
||||
authCall := authn.On("Authenticate", mock.Anything, tc.key).Return(tc.authnRes1, tc.authnErr)
|
||||
domainsCall := domains.On("RetrieveIDByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: tc.domainID}}, nil)
|
||||
req := testRequest{
|
||||
client: ts.Client(),
|
||||
method: http.MethodPost,
|
||||
url: fmt.Sprintf("%s/hc/%s", ts.URL, tc.domainID),
|
||||
token: tc.key,
|
||||
basicAuth: tc.basicAuth,
|
||||
bearerToken: tc.bearerToken,
|
||||
}
|
||||
res, err := req.make()
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
|
||||
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
|
||||
clientsCall.Unset()
|
||||
authCall.Unset()
|
||||
domainsCall.Unset()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,3 +23,19 @@ func (req publishReq) validate() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type healthCheckReq struct {
|
||||
domain string
|
||||
token string
|
||||
}
|
||||
|
||||
func (req healthCheckReq) validate() error {
|
||||
if req.token == "" {
|
||||
return apiutil.ErrBearerKey
|
||||
}
|
||||
if req.domain == "" {
|
||||
return apiutil.ErrMissingDomainID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
+18
-1
@@ -9,7 +9,10 @@ import (
|
||||
"github.com/absmach/supermq"
|
||||
)
|
||||
|
||||
var _ supermq.Response = (*publishMessageRes)(nil)
|
||||
var (
|
||||
_ supermq.Response = (*publishMessageRes)(nil)
|
||||
_ supermq.Response = (*healthCheckRes)(nil)
|
||||
)
|
||||
|
||||
type publishMessageRes struct{}
|
||||
|
||||
@@ -24,3 +27,17 @@ func (res publishMessageRes) Headers() map[string]string {
|
||||
func (res publishMessageRes) Empty() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type healthCheckRes struct{}
|
||||
|
||||
func (res healthCheckRes) Code() int {
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res healthCheckRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res healthCheckRes) Empty() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -47,6 +47,14 @@ func MakeHandler(logger *slog.Logger, instanceID string) http.Handler {
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "publish").ServeHTTP)
|
||||
|
||||
r.Post("/hc/{domain}", otelhttp.NewHandler(kithttp.NewServer(
|
||||
healthCheckEndpoint(),
|
||||
decodeHealthCheckRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "health_check").ServeHTTP)
|
||||
|
||||
r.Get("/health", supermq.Health("http", instanceID))
|
||||
r.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
@@ -78,3 +86,21 @@ func decodeRequest(_ context.Context, r *http.Request) (any, error) {
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func decodeHealthCheckRequest(_ context.Context, r *http.Request) (any, error) {
|
||||
var req healthCheckReq
|
||||
req.domain = chi.URLParam(r, "domain")
|
||||
_, pass, ok := r.BasicAuth()
|
||||
switch {
|
||||
case ok:
|
||||
req.token = pass
|
||||
case !ok:
|
||||
req.token = r.Header.Get(authzHeaderKey)
|
||||
}
|
||||
|
||||
if err := req.validate(); err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
+7
-1
@@ -113,7 +113,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
return errors.Wrap(errFailedPublish, errClientNotInitialized)
|
||||
}
|
||||
|
||||
domainID, channelID, subtopic, _, err := h.parser.ParsePublishTopic(ctx, *topic, true)
|
||||
domainID, channelID, subtopic, topicType, err := h.parser.ParsePublishTopic(ctx, *topic, true)
|
||||
if err != nil {
|
||||
return errors.Wrap(errMalformedTopic, err)
|
||||
}
|
||||
@@ -146,6 +146,12 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
return mgate.NewHTTPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
|
||||
}
|
||||
|
||||
// Health topics are not published to message broker.
|
||||
if topicType == messaging.HealthType {
|
||||
h.logger.Info(fmt.Sprintf(logInfoPublished, clientType, clientID, *topic))
|
||||
return nil
|
||||
}
|
||||
|
||||
msg := messaging.Message{
|
||||
Protocol: protocol,
|
||||
Domain: domainID,
|
||||
|
||||
+33
-15
@@ -32,24 +32,24 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
clientID = "513d02d2-16c1-4f23-98be-9e12f8fee898"
|
||||
clientID1 = "513d02d2-16c1-4f23-98be-9e12f8fee899"
|
||||
clientKey = "password"
|
||||
chanID = "123e4567-e89b-12d3-a456-000000000001"
|
||||
invalidID = "invalidID"
|
||||
invalidValue = "invalidValue"
|
||||
invalidChannelIDTopic = "m/**/c"
|
||||
clientID = "513d02d2-16c1-4f23-98be-9e12f8fee898"
|
||||
clientKey = "password"
|
||||
chanID = "123e4567-e89b-12d3-a456-000000000001"
|
||||
invalidValue = "invalidValue"
|
||||
)
|
||||
|
||||
var (
|
||||
domainID = testsutil.GenerateUUID(&testing.T{})
|
||||
topicMsg = "/m/%s/c/%s"
|
||||
subtopicMsg = "/m/%s/c/%s/subtopic"
|
||||
topic = fmt.Sprintf(topicMsg, domainID, chanID)
|
||||
subtopic = fmt.Sprintf(subtopicMsg, domainID, chanID)
|
||||
invalidTopic = invalidValue
|
||||
payload = []byte("[{'n':'test-name', 'v': 1.2}]")
|
||||
sessionClient = session.Session{
|
||||
domainID = testsutil.GenerateUUID(&testing.T{})
|
||||
topicMsg = "/m/%s/c/%s"
|
||||
subtopicMsg = "/m/%s/c/%s/subtopic"
|
||||
hcTopicFmt = "/hc/%s"
|
||||
hcTopic = fmt.Sprintf(hcTopicFmt, domainID)
|
||||
topic = fmt.Sprintf(topicMsg, domainID, chanID)
|
||||
subtopic = fmt.Sprintf(subtopicMsg, domainID, chanID)
|
||||
invalidTopic = invalidValue
|
||||
invalidHCTopic = "/hc"
|
||||
payload = []byte("[{'n':'test-name', 'v': 1.2}]")
|
||||
sessionClient = session.Session{
|
||||
ID: clientID,
|
||||
Password: []byte(clientKey),
|
||||
}
|
||||
@@ -346,6 +346,24 @@ func TestPublish(t *testing.T) {
|
||||
publishErr: errors.New("failed to publish"),
|
||||
err: errFailedPublishToMsgBroker,
|
||||
},
|
||||
{
|
||||
desc: "publish health check with token successfully",
|
||||
topic: &hcTopic,
|
||||
payload: &payload,
|
||||
password: validToken,
|
||||
session: &tokenSession,
|
||||
authNRes1: smqauthn.Session{DomainUserID: validID, UserID: validID, DomainID: validID},
|
||||
authNErr: nil,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "publish health check with invalid topic",
|
||||
topic: &invalidHCTopic,
|
||||
status: http.StatusBadRequest,
|
||||
password: validToken,
|
||||
session: &tokenSession,
|
||||
err: errMalformedTopic,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user