diff --git a/cmd/echo/logger.go b/cmd/echo/logger.go new file mode 100644 index 0000000..82e9399 --- /dev/null +++ b/cmd/echo/logger.go @@ -0,0 +1,55 @@ +package main + +import ( + "log/slog" + + "github.com/ThreeDotsLabs/watermill" +) + +func LogFieldsToSlogAttrs(in watermill.LogFields) []slog.Attr { + attrs := make([]slog.Attr, 0) + for k, v := range in { + attrs = append(attrs, slog.Attr{ + Key: k, + Value: slog.AnyValue(v), + }) + } + return attrs +} + +type WrappedSlogger struct { + *slog.Logger +} + +func NewWrappedSlogger() *WrappedSlogger { + return &WrappedSlogger{ + slog.Default(), + } +} + +func (wl *WrappedSlogger) Error(msg string, err error, fields watermill.LogFields) { + attrs := LogFieldsToSlogAttrs(fields) + wl.LogAttrs(nil, slog.Level(-5), msg, attrs...) +} + +func (wl *WrappedSlogger) Info(msg string, fields watermill.LogFields) { + attrs := LogFieldsToSlogAttrs(fields) + wl.LogAttrs(nil, slog.LevelInfo, msg, attrs...) +} + +func (wl *WrappedSlogger) Debug(msg string, fields watermill.LogFields) { + attrs := LogFieldsToSlogAttrs(fields) + wl.LogAttrs(nil, slog.LevelDebug, msg, attrs...) +} + +func (wl *WrappedSlogger) Trace(msg string, fields watermill.LogFields) { + attrs := LogFieldsToSlogAttrs(fields) + wl.LogAttrs(nil, slog.Level(-5), msg, attrs...) +} + +func (wl *WrappedSlogger) With(fields watermill.LogFields) watermill.LoggerAdapter { + attrs := LogFieldsToSlogAttrs(fields) + l := slog.Default().With(attrs) + return &WrappedSlogger{l} + +} diff --git a/cmd/echo/server.go b/cmd/echo/server.go index a775548..ed6b0fc 100644 --- a/cmd/echo/server.go +++ b/cmd/echo/server.go @@ -19,7 +19,9 @@ func main() { } defer l.Close() wmLogger := watermill.NewStdLogger(false, false) - router, err := message.NewRouter(message.RouterConfig{}, wmLogger) + + logger := NewWrappedSlogger() + router, err := message.NewRouter(message.RouterConfig{}, logger) router.AddPlugin(plugin.SignalsHandler) @@ -31,8 +33,8 @@ func main() { pubSub := gochannel.NewGoChannel(gochannel.Config{}, wmLogger) ctx := context.Background() - ts := NewTELNETServer(ctx, l, pubSub) - tp := NewTELNETParser(ctx, pubSub) + ts := NewTELNETServer(ctx, l, pubSub, logger) + tp := NewTELNETParser(ctx, pubSub, logger) go tp.Handle() for { diff --git a/cmd/echo/telnet.go b/cmd/echo/telnet.go index 2a4013f..05432ea 100644 --- a/cmd/echo/telnet.go +++ b/cmd/echo/telnet.go @@ -8,11 +8,13 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) type TELNETParser struct { - s message.Subscriber - c context.Context + s message.Subscriber + c context.Context + logger watermill.LoggerAdapter } func (tp *TELNETParser) Handle() { @@ -22,30 +24,36 @@ func (tp *TELNETParser) Handle() { } for message := range messages { - log.Printf("%#+v\n", message) + fields := make(watermill.LogFields) + fields["body"] = string(message.Payload) + fields["correlation_id"] = string(message.Metadata["correlation_id"]) + tp.logger.Info("received raw TCP line", fields) message.Ack() } } -func NewTELNETParser(c context.Context, s message.Subscriber) *TELNETParser { +func NewTELNETParser(c context.Context, s message.Subscriber, wml watermill.LoggerAdapter) *TELNETParser { return &TELNETParser{ - c: c, - s: s, + c: c, + s: s, + logger: wml, } } type TELNETServer struct { - l net.Listener - p message.Publisher - c context.Context + l net.Listener + p message.Publisher + c context.Context + logger watermill.LoggerAdapter } -func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher) *TELNETServer { +func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher, wml watermill.LoggerAdapter) *TELNETServer { ts := TELNETServer{ - c: c, - l: l, - p: p, + c: c, + l: l, + p: p, + logger: wml, } return &ts @@ -55,12 +63,20 @@ func (ts *TELNETServer) Handle(conn net.Conn) { s := bufio.NewScanner(conn) defer conn.Close() + correlationID := watermill.NewUUID() + ts.logger = ts.logger.With( + watermill.LogFields{ + "correlation_id": correlationID}, + ) + for s.Scan() { t := s.Text() - err := ts.p.Publish("telnet.raw", message.NewMessage(watermill.NewUUID(), []byte(t))) + m := message.NewMessage(watermill.NewUUID(), []byte(t)) + middleware.SetCorrelationID(correlationID, m) + err := ts.p.Publish("telnet.raw", m) if err != nil { log.Fatalln("couldn't write to telnet.raw") } - log.Printf("Received user input: %q\n", t) + ts.logger.Info("received bytes over telnet", nil) } }