diff --git a/telnet/options.go b/telnet/options.go new file mode 100644 index 0000000..e90ab1b --- /dev/null +++ b/telnet/options.go @@ -0,0 +1,7 @@ +package telnet + +type TELNETOption struct { + Us, UsQ, Them, ThemQ bool +} + +type TELNETOptions map[string]TELNETOption diff --git a/telnet/parser.go b/telnet/parser.go new file mode 100644 index 0000000..4e461d8 --- /dev/null +++ b/telnet/parser.go @@ -0,0 +1,40 @@ +package telnet + +import ( + "context" + "log" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +type TELNETParser struct { + s message.Subscriber + c context.Context + logger watermill.LoggerAdapter + to TELNETOptions +} + +func (tp *TELNETParser) Handle() { + messages, err := tp.s.Subscribe(tp.c, "telnet.raw") + if err != nil { + log.Fatalln("couldn't subscribe to telnet.raw") + } + + for message := range messages { + fields := make(watermill.LogFields) + fields["body"] = string(message.Payload) + fields["correlation_id"] = string(message.Metadata["correlation_id"]) + tp.logger.Trace("received raw TCP line", fields) + message.Ack() + } + +} + +func NewTELNETParser(c context.Context, s message.Subscriber, wml watermill.LoggerAdapter) *TELNETParser { + return &TELNETParser{ + c: c, + s: s, + logger: wml, + } +} diff --git a/telnet/telnet.go b/telnet/server.go similarity index 59% rename from telnet/telnet.go rename to telnet/server.go index 942a3df..63a4603 100644 --- a/telnet/telnet.go +++ b/telnet/server.go @@ -11,43 +11,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) -type TELNETOption struct { - Us, UsQ, Them, ThemQ bool -} - -type TELNETOptions map[string]TELNETOption - -type TELNETParser struct { - s message.Subscriber - c context.Context - logger watermill.LoggerAdapter - to TELNETOptions -} - -func (tp *TELNETParser) Handle() { - messages, err := tp.s.Subscribe(tp.c, "telnet.raw") - if err != nil { - log.Fatalln("couldn't subscribe to telnet.raw") - } - - for message := range messages { - fields := make(watermill.LogFields) - fields["body"] = string(message.Payload) - fields["correlation_id"] = string(message.Metadata["correlation_id"]) - tp.logger.Trace("received raw TCP line", fields) - message.Ack() - } - -} - -func NewTELNETParser(c context.Context, s message.Subscriber, wml watermill.LoggerAdapter) *TELNETParser { - return &TELNETParser{ - c: c, - s: s, - logger: wml, - } -} - type TELNETServer struct { l net.Listener p message.Publisher