package main import ( "bufio" "context" "log" "net" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) type TELNETParser struct { s message.Subscriber c context.Context logger watermill.LoggerAdapter } func (tp *TELNETParser) Handle() { messages, err := tp.s.Subscribe(tp.c, "telnet.raw") if err != nil { log.Fatalln("couldn't subscribe to telnet.raw") } for message := range messages { fields := make(watermill.LogFields) fields["body"] = string(message.Payload) fields["correlation_id"] = string(message.Metadata["correlation_id"]) tp.logger.Info("received raw TCP line", fields) message.Ack() } } func NewTELNETParser(c context.Context, s message.Subscriber, wml watermill.LoggerAdapter) *TELNETParser { return &TELNETParser{ c: c, s: s, logger: wml, } } type TELNETServer struct { l net.Listener p message.Publisher c context.Context logger watermill.LoggerAdapter } func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher, wml watermill.LoggerAdapter) *TELNETServer { ts := TELNETServer{ c: c, l: l, p: p, logger: wml, } return &ts } func (ts *TELNETServer) Handle(conn net.Conn) { s := bufio.NewScanner(conn) defer conn.Close() correlationID := watermill.NewUUID() ts.logger = ts.logger.With( watermill.LogFields{ "correlation_id": correlationID}, ) for s.Scan() { t := s.Text() m := message.NewMessage(watermill.NewUUID(), []byte(t)) middleware.SetCorrelationID(correlationID, m) err := ts.p.Publish("telnet.raw", m) if err != nil { log.Fatalln("couldn't write to telnet.raw") } ts.logger.Info("received bytes over telnet", nil) } }