mirror of
https://github.com/ultravioletrs/cocos.git
synced 2026-06-23 04:10:25 +00:00
NOISSUE - Add health check (#288)
* add health check Signed-off-by: Sammy Oina <sammyoina@gmail.com> * add test case Signed-off-by: Sammy Oina <sammyoina@gmail.com> * fix lint and add test case Signed-off-by: Sammy Oina <sammyoina@gmail.com> * switch context Signed-off-by: Sammy Oina <sammyoina@gmail.com> --------- Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
committed by
GitHub
parent
6043ad150b
commit
2a6fa8da25
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/fatih/color"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/ultravioletrs/cocos/agent/auth"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc/agent"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@@ -32,6 +33,8 @@ func decodeErros(err error) error {
|
||||
case errors.Contains(err, auth.ErrSignatureVerificationFailed):
|
||||
return auth.ErrSignatureVerificationFailed
|
||||
|
||||
case errors.Contains(err, agent.ErrAgentServiceUnavailable):
|
||||
return agent.ErrAgentServiceUnavailable
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/fatih/color"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/ultravioletrs/cocos/agent/auth"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc/agent"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@@ -46,6 +47,11 @@ func TestDecodeErros(t *testing.T) {
|
||||
input: errors.New("other error"),
|
||||
expected: errors.New("other error"),
|
||||
},
|
||||
{
|
||||
name: "Agent Service Unavailable",
|
||||
input: mgerrors.Wrap(agent.ErrAgentServiceUnavailable, errors.New("wrapped error")),
|
||||
expected: agent.ErrAgentServiceUnavailable,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
+2
-1
@@ -3,6 +3,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -100,7 +101,7 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
agentGRPCClient, agentClient, err := agent.NewAgentClient(agentGRPCConfig)
|
||||
agentGRPCClient, agentClient, err := agent.NewAgentClient(context.Background(), agentGRPCConfig)
|
||||
if err != nil {
|
||||
message := color.New(color.FgRed).Sprintf("failed to create %s gRPC client : %s", svcName, err)
|
||||
rootCmd.Println(message)
|
||||
|
||||
@@ -30,6 +30,8 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/health"
|
||||
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -51,6 +53,7 @@ type Server struct {
|
||||
registerService serviceRegister
|
||||
quoteProvider client.QuoteProvider
|
||||
authSvc auth.Authenticator
|
||||
health *health.Server
|
||||
}
|
||||
|
||||
type serviceRegister func(srv *grpc.Server)
|
||||
@@ -165,7 +168,10 @@ func (s *Server) Start() error {
|
||||
grpcServerOptions = append(grpcServerOptions, creds)
|
||||
|
||||
s.server = grpc.NewServer(grpcServerOptions...)
|
||||
s.health = health.NewServer()
|
||||
grpchealth.RegisterHealthServer(s.server, s.health)
|
||||
s.registerService(s.server)
|
||||
s.health.SetServingStatus(s.Name, grpchealth.HealthCheckResponse_SERVING)
|
||||
|
||||
go func() {
|
||||
errCh <- s.server.Serve(listener)
|
||||
@@ -185,6 +191,7 @@ func (s *Server) Stop() error {
|
||||
c := make(chan bool)
|
||||
go func() {
|
||||
defer close(c)
|
||||
s.health.Shutdown()
|
||||
s.server.GracefulStop()
|
||||
}()
|
||||
select {
|
||||
|
||||
@@ -3,16 +3,31 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
var ErrAgentServiceUnavailable = errors.New("agent service is unavailable")
|
||||
|
||||
// NewAgentClient creates new agent gRPC client instance.
|
||||
func NewAgentClient(cfg grpc.Config) (grpc.Client, agent.AgentServiceClient, error) {
|
||||
func NewAgentClient(ctx context.Context, cfg grpc.Config) (grpc.Client, agent.AgentServiceClient, error) {
|
||||
client, err := grpc.NewClient(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
health := grpchealth.NewHealthClient(client.Connection())
|
||||
resp, err := health.Check(ctx, &grpchealth.HealthCheckRequest{
|
||||
Service: "agent",
|
||||
})
|
||||
|
||||
if err != nil || resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
|
||||
return nil, nil, errors.Wrap(err, ErrAgentServiceUnavailable)
|
||||
}
|
||||
|
||||
return client, agent.NewAgentServiceClient(client.Connection()), nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
// Copyright (c) Ultraviolet
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
agentgrpc "github.com/ultravioletrs/cocos/agent/api/grpc"
|
||||
"github.com/ultravioletrs/cocos/agent/mocks"
|
||||
pkggrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health"
|
||||
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
type TestServer struct {
|
||||
agent.UnimplementedAgentServiceServer
|
||||
server *grpc.Server
|
||||
health *health.Server
|
||||
port int
|
||||
listenAddr string
|
||||
}
|
||||
|
||||
func NewTestServer() (*TestServer, error) {
|
||||
listener, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
addr := listener.Addr().(*net.TCPAddr)
|
||||
|
||||
server := grpc.NewServer()
|
||||
healthServer := health.NewServer()
|
||||
|
||||
ts := &TestServer{
|
||||
server: server,
|
||||
health: healthServer,
|
||||
port: addr.Port,
|
||||
listenAddr: fmt.Sprintf("localhost:%d", addr.Port),
|
||||
}
|
||||
|
||||
svc := new(mocks.Service)
|
||||
agent.RegisterAgentServiceServer(server, agentgrpc.NewServer(svc))
|
||||
grpchealth.RegisterHealthServer(server, healthServer)
|
||||
|
||||
go func() {
|
||||
if err := server.Serve(listener); err != nil {
|
||||
fmt.Printf("Server exited with error: %v\n", err)
|
||||
}
|
||||
}()
|
||||
|
||||
healthServer.SetServingStatus("agent", grpchealth.HealthCheckResponse_SERVING)
|
||||
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (s *TestServer) Stop() {
|
||||
if s.server != nil {
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentClientIntegration(t *testing.T) {
|
||||
testServer, err := NewTestServer()
|
||||
require.NoError(t, err)
|
||||
defer testServer.Stop()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
serverRunning bool
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "successful connection",
|
||||
serverRunning: true,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "server not healthy",
|
||||
serverRunning: false,
|
||||
err: ErrAgentServiceUnavailable,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
if !tt.serverRunning {
|
||||
testServer.health.SetServingStatus("agent", grpchealth.HealthCheckResponse_NOT_SERVING)
|
||||
} else {
|
||||
testServer.health.SetServingStatus("agent", grpchealth.HealthCheckResponse_SERVING)
|
||||
}
|
||||
|
||||
cfg := pkggrpc.Config{
|
||||
URL: testServer.listenAddr,
|
||||
Timeout: 1,
|
||||
}
|
||||
|
||||
if !tt.serverRunning {
|
||||
cfg.URL = ""
|
||||
}
|
||||
|
||||
client, agentClient, err := NewAgentClient(ctx, cfg)
|
||||
assert.True(t, errors.Contains(err, tt.err))
|
||||
if err != nil {
|
||||
assert.Nil(t, client)
|
||||
assert.Nil(t, agentClient)
|
||||
return
|
||||
}
|
||||
|
||||
require.NotNil(t, client)
|
||||
require.NotNil(t, agentClient)
|
||||
defer client.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user