mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:20:18 +00:00
+26
-1
@@ -1 +1,26 @@
|
||||
build/
|
||||
# Build artifacts
|
||||
/build/
|
||||
*.exe
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
# Test artifacts
|
||||
*.test
|
||||
*.out
|
||||
coverage.html
|
||||
|
||||
# IDE
|
||||
.idea/
|
||||
.vscode/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Temporary files
|
||||
*.tmp
|
||||
*.log
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
# MQTT Broker Makefile
|
||||
|
||||
# Build configuration
|
||||
BUILD_DIR := build
|
||||
BINARY := mqttd
|
||||
GO := go
|
||||
|
||||
# Build flags
|
||||
LDFLAGS := -s -w
|
||||
GOFLAGS := -trimpath
|
||||
|
||||
# Default target
|
||||
.PHONY: all
|
||||
all: build
|
||||
|
||||
# Build the broker binary
|
||||
.PHONY: build
|
||||
build: $(BUILD_DIR)/$(BINARY)
|
||||
|
||||
$(BUILD_DIR)/$(BINARY): cmd/broker/main.go $(shell find . -name '*.go' -not -path './build/*')
|
||||
@mkdir -p $(BUILD_DIR)
|
||||
$(GO) build $(GOFLAGS) -ldflags "$(LDFLAGS)" -o $(BUILD_DIR)/$(BINARY) ./cmd/broker
|
||||
|
||||
# Run the broker
|
||||
.PHONY: run
|
||||
run: build
|
||||
$(BUILD_DIR)/$(BINARY) --log=info
|
||||
|
||||
# Run with debug logging
|
||||
.PHONY: run-debug
|
||||
run-debug: build
|
||||
$(BUILD_DIR)/$(BINARY) --log=debug
|
||||
|
||||
# Run tests
|
||||
.PHONY: test
|
||||
test:
|
||||
$(GO) test -v ./...
|
||||
|
||||
# Run tests with coverage
|
||||
.PHONY: test-cover
|
||||
test-cover:
|
||||
$(GO) test -coverprofile=$(BUILD_DIR)/coverage.out ./...
|
||||
$(GO) tool cover -html=$(BUILD_DIR)/coverage.out -o $(BUILD_DIR)/coverage.html
|
||||
|
||||
# Run linter
|
||||
.PHONY: lint
|
||||
lint:
|
||||
golangci-lint run ./...
|
||||
|
||||
# Format code
|
||||
.PHONY: fmt
|
||||
fmt:
|
||||
$(GO) fmt ./...
|
||||
goimports -w .
|
||||
|
||||
# Clean build artifacts
|
||||
.PHONY: clean
|
||||
clean:
|
||||
rm -rf $(BUILD_DIR)
|
||||
|
||||
# Install dependencies
|
||||
.PHONY: deps
|
||||
deps:
|
||||
$(GO) mod tidy
|
||||
$(GO) mod download
|
||||
|
||||
# Show help
|
||||
.PHONY: help
|
||||
help:
|
||||
@echo "MQTT Broker Makefile"
|
||||
@echo ""
|
||||
@echo "Targets:"
|
||||
@echo " build Build the broker binary to $(BUILD_DIR)/$(BINARY)"
|
||||
@echo " run Build and run the broker"
|
||||
@echo " run-debug Build and run with debug logging"
|
||||
@echo " test Run all tests"
|
||||
@echo " test-cover Run tests with coverage report"
|
||||
@echo " lint Run golangci-lint"
|
||||
@echo " fmt Format code"
|
||||
@echo " clean Remove build artifacts"
|
||||
@echo " deps Download and tidy dependencies"
|
||||
@echo " help Show this help message"
|
||||
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/dborovcanin/mqtt/broker"
|
||||
"github.com/dborovcanin/mqtt/packets"
|
||||
"github.com/dborovcanin/mqtt/transport"
|
||||
)
|
||||
|
||||
func main() {
|
||||
addr := flag.String("addr", ":1883", "MQTT broker listen address")
|
||||
logLevel := flag.String("log", "info", "Log level (debug, info, warn, error)")
|
||||
flag.Parse()
|
||||
|
||||
// Setup structured logging
|
||||
level := parseLogLevel(*logLevel)
|
||||
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: level,
|
||||
}))
|
||||
slog.SetDefault(logger)
|
||||
|
||||
slog.Info("starting MQTT broker", "addr", *addr)
|
||||
|
||||
// Create broker server
|
||||
server := broker.NewServer()
|
||||
|
||||
// Create TCP frontend
|
||||
tcp, err := transport.NewTCPFrontend(*addr)
|
||||
if err != nil {
|
||||
slog.Error("failed to create TCP frontend", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Wrap with logging if debug level
|
||||
var frontend broker.Frontend = tcp
|
||||
if level == slog.LevelDebug {
|
||||
frontend = &loggingFrontend{Frontend: tcp, logger: logger}
|
||||
}
|
||||
|
||||
// Add frontend to server
|
||||
if err := server.AddFrontend(frontend); err != nil {
|
||||
slog.Error("failed to add frontend", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
slog.Info("MQTT broker started", "addr", tcp.Addr().String())
|
||||
|
||||
// Wait for shutdown signal
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
sig := <-sigCh
|
||||
|
||||
slog.Info("shutting down", "signal", sig.String())
|
||||
|
||||
if err := server.Close(); err != nil {
|
||||
slog.Error("error during shutdown", "error", err)
|
||||
}
|
||||
|
||||
slog.Info("broker stopped")
|
||||
}
|
||||
|
||||
func parseLogLevel(level string) slog.Level {
|
||||
switch level {
|
||||
case "debug":
|
||||
return slog.LevelDebug
|
||||
case "warn":
|
||||
return slog.LevelWarn
|
||||
case "error":
|
||||
return slog.LevelError
|
||||
default:
|
||||
return slog.LevelInfo
|
||||
}
|
||||
}
|
||||
|
||||
// loggingFrontend wraps a Frontend to add packet logging.
|
||||
type loggingFrontend struct {
|
||||
broker.Frontend
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (f *loggingFrontend) Serve(handler broker.ConnectionHandler) error {
|
||||
return f.Frontend.Serve(&loggingHandler{
|
||||
handler: handler,
|
||||
logger: f.logger,
|
||||
})
|
||||
}
|
||||
|
||||
// loggingHandler wraps ConnectionHandler to log connections.
|
||||
type loggingHandler struct {
|
||||
handler broker.ConnectionHandler
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (h *loggingHandler) HandleConnection(conn broker.Connection) {
|
||||
h.logger.Debug("new connection", "remote", conn.RemoteAddr().String())
|
||||
|
||||
// Wrap connection for packet logging
|
||||
logged := &loggingConnection{
|
||||
Connection: conn,
|
||||
logger: h.logger,
|
||||
}
|
||||
|
||||
h.handler.HandleConnection(logged)
|
||||
}
|
||||
|
||||
// loggingConnection wraps Connection to log packets.
|
||||
type loggingConnection struct {
|
||||
broker.Connection
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (c *loggingConnection) ReadPacket() (packets.ControlPacket, error) {
|
||||
pkt, err := c.Connection.ReadPacket()
|
||||
if err != nil {
|
||||
if err.Error() != "EOF" {
|
||||
c.logger.Debug("read error", "remote", c.RemoteAddr().String(), "error", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.logger.Debug("packet received",
|
||||
"remote", c.RemoteAddr().String(),
|
||||
"type", packetTypeName(pkt.Type()),
|
||||
"packet", pkt.String(),
|
||||
)
|
||||
return pkt, nil
|
||||
}
|
||||
|
||||
func (c *loggingConnection) WritePacket(pkt packets.ControlPacket) error {
|
||||
c.logger.Debug("packet sent",
|
||||
"remote", c.RemoteAddr().String(),
|
||||
"type", packetTypeName(pkt.Type()),
|
||||
"packet", pkt.String(),
|
||||
)
|
||||
return c.Connection.WritePacket(pkt)
|
||||
}
|
||||
|
||||
func (c *loggingConnection) Close() error {
|
||||
c.logger.Debug("connection closed", "remote", c.RemoteAddr().String())
|
||||
return c.Connection.Close()
|
||||
}
|
||||
|
||||
// packetTypeName returns a human-readable packet type name.
|
||||
func packetTypeName(t byte) string {
|
||||
names := []string{
|
||||
"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK",
|
||||
"PUBREC", "PUBREL", "PUBCOMP", "SUBSCRIBE", "SUBACK",
|
||||
"UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH",
|
||||
}
|
||||
if int(t) < len(names) {
|
||||
return names[t]
|
||||
}
|
||||
return "UNKNOWN"
|
||||
}
|
||||
+512
-566
File diff suppressed because it is too large
Load Diff
+488
-826
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user