From 53739fbc1d8552c66aa166bdd9a31cbc863deee2 Mon Sep 17 00:00:00 2001 From: Nick Dumas Date: Wed, 23 Aug 2023 13:16:42 -0400 Subject: [PATCH] messages are being passed along the queue --- cmd/echo/server.go | 2 ++ cmd/echo/telnet.go | 24 +++++++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/cmd/echo/server.go b/cmd/echo/server.go index 48ba7b8..a775548 100644 --- a/cmd/echo/server.go +++ b/cmd/echo/server.go @@ -32,6 +32,8 @@ func main() { ctx := context.Background() ts := NewTELNETServer(ctx, l, pubSub) + tp := NewTELNETParser(ctx, pubSub) + go tp.Handle() for { conn, err := ts.l.Accept() diff --git a/cmd/echo/telnet.go b/cmd/echo/telnet.go index 10635c2..2a4013f 100644 --- a/cmd/echo/telnet.go +++ b/cmd/echo/telnet.go @@ -11,6 +11,28 @@ import ( ) type TELNETParser struct { + s message.Subscriber + c context.Context +} + +func (tp *TELNETParser) Handle() { + messages, err := tp.s.Subscribe(tp.c, "telnet.raw") + if err != nil { + log.Fatalln("couldn't subscribe to telnet.raw") + } + + for message := range messages { + log.Printf("%#+v\n", message) + message.Ack() + } + +} + +func NewTELNETParser(c context.Context, s message.Subscriber) *TELNETParser { + return &TELNETParser{ + c: c, + s: s, + } } type TELNETServer struct { @@ -37,7 +59,7 @@ func (ts *TELNETServer) Handle(conn net.Conn) { t := s.Text() err := ts.p.Publish("telnet.raw", message.NewMessage(watermill.NewUUID(), []byte(t))) if err != nil { - log.Fatalln("couldn't write to raw_telnet queue") + log.Fatalln("couldn't write to telnet.raw") } log.Printf("Received user input: %q\n", t) }