diff --git a/cmd/echo/server.go b/cmd/echo/server.go index fe7ef68..48ba7b8 100644 --- a/cmd/echo/server.go +++ b/cmd/echo/server.go @@ -1,62 +1,44 @@ package main import ( - "bufio" "context" "log" "net" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) -type TELNETServer struct { - l net.Listener -} - -func NewTELNETServer(l net.Listener) *TELNETServer { - ts := TELNETServer{ - l: l, +func main() { + l, err := net.Listen("tcp", ":5555") + if err != nil { + log.Fatal(err) } + defer l.Close() + wmLogger := watermill.NewStdLogger(false, false) + router, err := message.NewRouter(message.RouterConfig{}, wmLogger) - return &ts -} + router.AddPlugin(plugin.SignalsHandler) + + router.AddMiddleware( + middleware.CorrelationID, + middleware.Recoverer, + ) + + pubSub := gochannel.NewGoChannel(gochannel.Config{}, wmLogger) + + ctx := context.Background() + ts := NewTELNETServer(ctx, l, pubSub) -func (ts *TELNETServer) Run(ctx context.Context) { for { conn, err := ts.l.Accept() if err != nil { log.Fatal(err) } - go func(c net.Conn) { - s := bufio.NewScanner(c) - defer c.Close() - - for s.Scan() { - t := s.Text() - err := ts.Publish("raw_telnet", message.NewMessage(watermill.NewUUID(), []byte(t))) - if err != nil { - log.Fatalln("couldn't write to raw_telnet queue") - } - log.Printf("Received user input: %q\n", t) - } - }(conn) + go ts.Handle(conn) } } - -func (ts *TELNETServer) Publish(topic string, messages ...*message.Message) error { - - return nil -} - -func main() { - l, err := net.Listen("tcp", ":5555") - if err != nil { - log.Fatal(err) - } - defer l.Close() - - ts := NewTELNETServer(l) - ts.Run(context.Background()) -} diff --git a/cmd/echo/telnet.go b/cmd/echo/telnet.go new file mode 100644 index 0000000..10635c2 --- /dev/null +++ b/cmd/echo/telnet.go @@ -0,0 +1,44 @@ +package main + +import ( + "bufio" + "context" + "log" + "net" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type TELNETParser struct { +} + +type TELNETServer struct { + l net.Listener + p message.Publisher + c context.Context +} + +func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher) *TELNETServer { + ts := TELNETServer{ + c: c, + l: l, + p: p, + } + + return &ts +} + +func (ts *TELNETServer) Handle(conn net.Conn) { + s := bufio.NewScanner(conn) + defer conn.Close() + + for s.Scan() { + t := s.Text() + err := ts.p.Publish("telnet.raw", message.NewMessage(watermill.NewUUID(), []byte(t))) + if err != nil { + log.Fatalln("couldn't write to raw_telnet queue") + } + log.Printf("Received user input: %q\n", t) + } +} diff --git a/go.mod b/go.mod index 0d9246a..3f6781d 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,12 @@ go 1.19 require github.com/ThreeDotsLabs/watermill v1.3.3 require ( + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/sony/gobreaker v0.5.0 // indirect ) diff --git a/go.sum b/go.sum index 62d054d..50afbbb 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,17 @@ github.com/ThreeDotsLabs/watermill v1.3.3 h1:ulVgkk7n/hhWmqKKJrquWJWwIDGr9LXJ8W5XmwgSDJ4= github.com/ThreeDotsLabs/watermill v1.3.3/go.mod h1:FUH1a4BEmr5UCmCtg7CIYvEL11mdeVBDp1404+eLP+c= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= @@ -13,5 +19,10 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=