diff --git a/extractor/chat/chatRelayClientHandler.go b/extractor/chat/chatRelayClientHandler.go new file mode 100644 index 0000000..4a85f41 --- /dev/null +++ b/extractor/chat/chatRelayClientHandler.go @@ -0,0 +1,88 @@ +package chat + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + uuid "github.com/satori/go.uuid" +) + +var ClientStreamerList map[string]Client + +type Client struct { + ID string + Conn *websocket.Conn + send chan string + FollowingStreamers map[string]bool +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func newClient(conn *websocket.Conn) *Client { + client := &Client{ + ID: uuid.NewV1().String(), + Conn: conn, + send: make(chan string, 256), + FollowingStreamers: map[string]bool{}, + } + + ClientHandler.AddClient(client) + return client +} + +func (c *Client) Read() { + defer c.Conn.Close() + + for { + _, msg, err := c.Conn.ReadMessage() + if err != nil { + DisconnectClient(c) + break + } + ClientMessageHandler(c, string(msg)) + } +} + +func (c *Client) Write() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case message := <-c.send: + err := c.Conn.WriteMessage(websocket.TextMessage, []byte(message)) + if err != nil { + break + } + } + } +} + +func (c *Client) Close() { + close(c.send) + c.Conn.Close() +} + +func DisconnectClient(c *Client) { + ClientHandler.DeleteClient(c.ID) +} + +func ServeWS(context *gin.Context) { + ws, err := upgrader.Upgrade(context.Writer, context.Request, nil) + if err != nil { + return + } + + client := newClient(ws) + + go client.Write() + go client.Read() +} diff --git a/extractor/chat/chatRelayClientMessageHandler.go b/extractor/chat/chatRelayClientMessageHandler.go new file mode 100644 index 0000000..67d4f2e --- /dev/null +++ b/extractor/chat/chatRelayClientMessageHandler.go @@ -0,0 +1,19 @@ +package chat + +import ( + "strings" +) + +func ClientMessageHandler(client *Client, msg string) { + splitMsg := strings.Split(msg, " ") + + if len(splitMsg) == 2 && splitMsg[0] == "JOIN" { + client.send <- "OK" + client.FollowingStreamers[splitMsg[1]] = true + FollowStreamer(splitMsg[1]) + return + } + + client.send <- "Invalid request" + client.Close() +} diff --git a/extractor/chat/chatRelayClientSorter.go b/extractor/chat/chatRelayClientSorter.go new file mode 100644 index 0000000..cfdb0ef --- /dev/null +++ b/extractor/chat/chatRelayClientSorter.go @@ -0,0 +1,31 @@ +package chat + +type ClientMap map[*Client]bool + +var ClientHandler = ClientMap{} + +func (c ClientMap) AddClient(client *Client) { + c[client] = true +} + +func (c ClientMap) DeleteClient(ID string) { + for client := range c { + if client.ID == ID { + delete(c, client) + } + } +} + +func (c ClientMap) FindClientsByStreamer(streamer string) []*Client { + var clients []*Client + for client := range c { + // check if the client is following the given streamer + for s, _ := range client.FollowingStreamers { + if s == streamer { + clients = append(clients, client) + break + } + } + } + return clients +} diff --git a/extractor/chat/chatRelayServer.go b/extractor/chat/chatRelayServer.go deleted file mode 100644 index b16753f..0000000 --- a/extractor/chat/chatRelayServer.go +++ /dev/null @@ -1,24 +0,0 @@ -package chat - -import ( - "net" - - "github.com/gin-gonic/gin" - "github.com/gobwas/ws/wsutil" -) - -func CreateWebsocketServer(conn net.Conn, context *gin.Context) { - defer conn.Close() - - for { - msg, op, err := wsutil.ReadClientData(conn) - if err != nil { - break - } - - err = wsutil.WriteServerMessage(conn, op, msg) - if err != nil { - break - } - } -} diff --git a/extractor/chat/twitchChat.go b/extractor/chat/twitchChat.go new file mode 100644 index 0000000..3f9cac2 --- /dev/null +++ b/extractor/chat/twitchChat.go @@ -0,0 +1,105 @@ +package chat + +import ( + "errors" + "fmt" + "log" + "net/url" + "strings" + + "github.com/gorilla/websocket" +) + +var u = url.URL{Scheme: "wss", Host: "irc-ws.chat.twitch.tv:443", Path: "/"} +var conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + +// If the connection is ready to subscribe to channels +var connectionReady = false +var streamersFollowing = map[string]bool{} + +func BeginTwitchChatConnection() { + log.Println("Connecting to Twitch IRC") + if err != nil { + log.Fatal(err) + } + + defer conn.Close() + + // Authenticate with server + err = sendMessage("CAP REQ :twitch.tv/membership twitch.tv/tags twitch.tv/commands") + err = sendMessage("PASS none") + err = sendMessage("NICK justinfan333333333333") + if err != nil { + log.Printf("Failed to send message: %v", err) + } + + // Follow streamers + for streamer := range streamersFollowing { + sendMessage("JOIN #" + streamer) + } + + // Continue to listen for incoming messages from the server. + for { + _, msg, err := conn.ReadMessage() + if err != nil { + log.Printf("Failed to read message: %v", err) + break + } + + parseMessage(string(msg)) + RemoveUnusedStreamers() + + fmt.Println(ClientHandler.FindClientsByStreamer("dino_xx")) + + //fmt.Println("Received message from server:", string(msg)) + } +} + +func sendMessage(msg string) error { + return conn.WriteMessage(websocket.TextMessage, []byte(msg)) +} + +func parseMessage(msg string) error { + if !connectionReady && strings.Contains(msg, "001") { + connectionReady = true + log.Println("Authenticated with Twitch IRC!") + } + + return nil +} + +func FollowStreamer(streamerName string) error { + err := sendMessage("JOIN #" + strings.ToLower(streamerName)) + if err != nil { + return err + } + streamersFollowing[streamerName] = true + + return nil +} + +func SendMessage(msg string) error { + if connectionReady { + return conn.WriteMessage(websocket.TextMessage, []byte(msg)) + } + + return errors.New("connection not ready") +} + +// function to check and remove unused streamers from server map +func RemoveUnusedStreamers() { + for streamer := range streamersFollowing { + found := false + // iterate over each client and their streamer map + for client, _ := range ClientHandler { + if client.FollowingStreamers[streamer] == true { + found = true + break + } + } + if !found { + // remove streamer from server map + delete(streamersFollowing, streamer) + } + } +} diff --git a/main.go b/main.go index 0131a00..f352351 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,9 @@ package main import ( "fmt" + "log" "os" + "safetwitch-backend/extractor/chat" "safetwitch-backend/routes" "strings" @@ -33,17 +35,19 @@ func notFoundHandler(context *gin.Context) { } func main() { + log.Println("Starting Safetwitch...") // check for env env := os.Getenv("URL") if env == "" { - fmt.Println("ENV Variable 'URL' is not present") - os.Exit(10) + log.Fatalln("ENV Variable 'URL' is not present") } + go chat.BeginTwitchChatConnection() + router := gin.Default() router.Use(ErrorHandler) routes.SetRoutes(router) - router.NoRoute(notFoundHandler) + log.Println("Safetwitch API running") router.Run() } diff --git a/routes/root/root.go b/routes/root/root.go index 76c7afe..783087d 100644 --- a/routes/root/root.go +++ b/routes/root/root.go @@ -5,7 +5,6 @@ import ( "safetwitch-backend/extractor/chat" "github.com/gin-gonic/gin" - "github.com/gobwas/ws" ) func Routes(route *gin.Engine) { @@ -14,12 +13,7 @@ func Routes(route *gin.Engine) { auth.GET("/", func(context *gin.Context) { upgradeHeader := context.Request.Header.Get("Upgrade") if upgradeHeader == "websocket" { - conn, _, _, err := ws.UpgradeHTTP(context.Request, context.Writer) - if err != nil { - context.Error(err) - } - - go chat.CreateWebsocketServer(conn, context) + chat.ServeWS(context) } else { context.JSON(200, extractor.FormatMessage("SafeTwitch backend running", true)) }