more refactoring

main
Nick Dumas 1 year ago
parent 3566309c3d
commit 71ee479873

@ -1,62 +1,44 @@
package main package main
import ( import (
"bufio"
"context" "context"
"log" "log"
"net" "net"
"github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
) )
type TELNETServer struct { func main() {
l net.Listener l, err := net.Listen("tcp", ":5555")
}
func NewTELNETServer(l net.Listener) *TELNETServer {
ts := TELNETServer{
l: l,
}
return &ts
}
func (ts *TELNETServer) Run(ctx context.Context) {
for {
conn, err := ts.l.Accept()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer l.Close()
wmLogger := watermill.NewStdLogger(false, false)
router, err := message.NewRouter(message.RouterConfig{}, wmLogger)
go func(c net.Conn) { router.AddPlugin(plugin.SignalsHandler)
s := bufio.NewScanner(c)
defer c.Close()
for s.Scan() { router.AddMiddleware(
t := s.Text() middleware.CorrelationID,
err := ts.Publish("raw_telnet", message.NewMessage(watermill.NewUUID(), []byte(t))) middleware.Recoverer,
if err != nil { )
log.Fatalln("couldn't write to raw_telnet queue")
}
log.Printf("Received user input: %q\n", t)
}
}(conn)
}
}
func (ts *TELNETServer) Publish(topic string, messages ...*message.Message) error { pubSub := gochannel.NewGoChannel(gochannel.Config{}, wmLogger)
return nil ctx := context.Background()
} ts := NewTELNETServer(ctx, l, pubSub)
func main() { for {
l, err := net.Listen("tcp", ":5555") conn, err := ts.l.Accept()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer l.Close()
ts := NewTELNETServer(l) go ts.Handle(conn)
ts.Run(context.Background()) }
} }

@ -0,0 +1,44 @@
package main
import (
"bufio"
"context"
"log"
"net"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)
type TELNETParser struct {
}
type TELNETServer struct {
l net.Listener
p message.Publisher
c context.Context
}
func NewTELNETServer(c context.Context, l net.Listener, p message.Publisher) *TELNETServer {
ts := TELNETServer{
c: c,
l: l,
p: p,
}
return &ts
}
func (ts *TELNETServer) Handle(conn net.Conn) {
s := bufio.NewScanner(conn)
defer conn.Close()
for s.Scan() {
t := s.Text()
err := ts.p.Publish("telnet.raw", message.NewMessage(watermill.NewUUID(), []byte(t)))
if err != nil {
log.Fatalln("couldn't write to raw_telnet queue")
}
log.Printf("Received user input: %q\n", t)
}
}

@ -5,8 +5,12 @@ go 1.19
require github.com/ThreeDotsLabs/watermill v1.3.3 require github.com/ThreeDotsLabs/watermill v1.3.3
require ( require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
) )

@ -1,11 +1,17 @@
github.com/ThreeDotsLabs/watermill v1.3.3 h1:ulVgkk7n/hhWmqKKJrquWJWwIDGr9LXJ8W5XmwgSDJ4= github.com/ThreeDotsLabs/watermill v1.3.3 h1:ulVgkk7n/hhWmqKKJrquWJWwIDGr9LXJ8W5XmwgSDJ4=
github.com/ThreeDotsLabs/watermill v1.3.3/go.mod h1:FUH1a4BEmr5UCmCtg7CIYvEL11mdeVBDp1404+eLP+c= github.com/ThreeDotsLabs/watermill v1.3.3/go.mod h1:FUH1a4BEmr5UCmCtg7CIYvEL11mdeVBDp1404+eLP+c=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
@ -13,5 +19,10 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

Loading…
Cancel
Save