package main import ( "bufio" "context" "log" "net" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) type TELNETParser struct { s message.Subscriber c context.Context } func (tp *TELNETParser) Handle() { messages, err := tp.s.Subscribe(tp.c, "telnet.raw") if err != nil { log.Fatalln("couldn't subscribe to telnet.raw") } for message := range messages { log.Printf("%#+v\n", message) message.Ack() } } func NewTELNETParser(c context.Context, s message.Subscriber) *TELNETParser { return &TELNETParser{ c: c, s: s, } } type TELNETServer struct { l net.Listener p message.Publisher c context.Context } func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher) *TELNETServer { ts := TELNETServer{ c: c, l: l, p: p, } return &ts } func (ts *TELNETServer) Handle(conn net.Conn) { s := bufio.NewScanner(conn) defer conn.Close() for s.Scan() { t := s.Text() err := ts.p.Publish("telnet.raw", message.NewMessage(watermill.NewUUID(), []byte(t))) if err != nil { log.Fatalln("couldn't write to telnet.raw") } log.Printf("Received user input: %q\n", t) } }