From 057f475fc520fd720291a4afd6017543d488d0f4 Mon Sep 17 00:00:00 2001 From: dragongoose <19649813+dragongoose@users.noreply.github.com> Date: Mon, 27 Mar 2023 08:33:51 -0400 Subject: [PATCH] Refactor and add improvements --- routes/chatIrcProxy.ts | 183 ++++++++++++++++++++++++++++ routes/proxyRoute.ts | 75 +----------- types/scraping/Chat.ts | 1 + util/scraping/chat/chat.ts | 234 ++++++++++++++++++++++++------------ util/scraping/chat/utils.ts | 2 +- 5 files changed, 345 insertions(+), 150 deletions(-) create mode 100644 routes/chatIrcProxy.ts diff --git a/routes/chatIrcProxy.ts b/routes/chatIrcProxy.ts new file mode 100644 index 0000000..facde92 --- /dev/null +++ b/routes/chatIrcProxy.ts @@ -0,0 +1,183 @@ +import { randomUUID } from 'crypto'; +import ws from 'ws'; +import { TwitchChat } from '../util/scraping/chat/chat'; + +interface ExtWebSocket extends ws { + id?: string; +} + +export class TwitchChatServer { + private chat: TwitchChat; + private clients: { [k:string]: ExtWebSocket[] }; + + constructor() { + this.clients = {}; + this.chat = new TwitchChat({ + login: { + username: 'justinfan23423', + password: 'none' + }, + channels: [] + }); + this.chat.connect() + this.chat.on('PRIVMSG', this.handlePrivateMessage.bind(this)); + this.chat.on('CLEARCHAT', this.handleClearChat.bind(this)); + this.chat.on('CLEARMSG', this.handleClearMessage.bind(this)); + this.chat.on('GLOBALUSERSTATE', this.handleGlobalUserState.bind(this)); + this.chat.on('HOSTTARGET', this.handleHostTarget.bind(this)); + this.chat.on('NOTICE', this.handleNotice.bind(this)); + this.chat.on('RECONNECT', this.handleReconnect.bind(this)); + this.chat.on('ROOMSTATE', this.handleRoomState.bind(this)); + this.chat.on('USERNOTICE', this.handleUserNotice.bind(this)); + this.chat.on('USERSTATE', this.handleUserState.bind(this)); + this.chat.on('WHISPER', this.handleWhisper.bind(this)); + } + + public async startWebSocketServer(server: ws.Server): Promise { + server.on('connection', (ws: ws) => { + const socket = ws as ExtWebSocket + socket.on('message', this.handleWebSocketMessage.bind(this, socket)); + socket.on('close', this.handleWebSocketClose.bind(this, socket)); + }); + } + + private handlePrivateMessage = async(username: string, type: string, channel: string, message: string, tags: Record) => { + const socketsToSend = await this.findClientsForStreamer(channel) + for(let socket of socketsToSend) { + let payload = { + type: 'PRIVMSG', + username, + channel, + message, + tags + } + socket.send(JSON.stringify(payload)) + } + } + + private handleClearChat = async(channel: string, username: string, tags: Record) => { + const socketsToSend = await this.findClientsForStreamer(channel) + for(let socket of socketsToSend) { + let payload = { + type: 'CLEARCHAT', + username, + channel, + tags + } + socket.send(JSON.stringify(payload)) + } + } + + private handleClearMessage = async(channel: string, messageId: string, tags: Record) => { + const socketsToSend = await this.findClientsForStreamer(channel) + for(let socket of socketsToSend) { + let payload = { + type: 'CLEARMSG', + channel, + tags + } + socket.send(JSON.stringify(payload)) + } + } + + private handleGlobalUserState = async() => { + // Do nothing for now + } + + private handleHostTarget = async(channel: string, viewers: number, target: string, tags: Record) => { + const socketsToSend = await this.findClientsForStreamer(channel) + for(let socket of socketsToSend) { + let payload = { + type: 'HOSTTARGET', + channel, + target, + viewers, + tags + } + socket.send(JSON.stringify(payload)) + } + } + + private handleNotice = async(channel: string, message: string) => { + // Do nothing for now + } + + private handleReconnect = async() => { + // Do nothing for now + } + + private handleRoomState = async(channel: string, isLive: boolean, tags: Record) => { + // Do nothing for now + } + + private handleUserNotice = async(channel: string, type: string, message: string, tags: { [id: string]: string }) => { + const socketsToSend = await this.findClientsForStreamer(channel) + for(let socket of socketsToSend) { + let payload = { + type: 'USERNOTICE', + channel, + message, + tags + } + socket.send(JSON.stringify(payload)) + } + } + + private handleUserState = async(channel: string, tags: { [id: string]: string }) => { + // Do nothing for now + } + + private handleWhisper = async(username: string, message: string, tags: { [id: string]: string }) => { + // Do nothing for now + } + + private async findClientsForStreamer(streamerName: string): Promise { + if(!this.clients[streamerName]) return Promise.reject('No clients following streamer') + + return this.clients[streamerName]; + } + + private async handleWebSocketMessage(socket: ExtWebSocket, message: ws.Data) { + const data = message.toString() + const splitted = data.split(' ') + + if (splitted.length < 2 || splitted[0] !== 'JOIN') { + socket.close(); + return; + } + + const streamersToJoin = splitted[1].split(',') + if (streamersToJoin.length > 1) { + socket.close(); + return; + } + + const id = randomUUID() + for (let streamer of streamersToJoin) { + this.chat.addStreamer(streamer) + + if (this.clients[streamer]) { + this.clients[streamer].push(socket) + } else { + this.clients[streamer] = [socket] + } + } + + socket.id = id + socket.send('OK') + } + + private handleWebSocketClose(socket: ExtWebSocket) { + if (socket.id) { + // Remove the socket from the list of clients following each streamer + Object.entries(this.clients).forEach(([streamer, sockets]) => { + this.clients[streamer] = sockets.filter(s => s !== socket); + if (this.clients[streamer].length === 0) { + // No more clients following this streamer, remove it from the chat + this.chat.removeStreamer(streamer); + delete this.clients[streamer]; + } + }); + } + } +} diff --git a/routes/proxyRoute.ts b/routes/proxyRoute.ts index 0a9446e..e536317 100644 --- a/routes/proxyRoute.ts +++ b/routes/proxyRoute.ts @@ -1,8 +1,8 @@ import { Router, Response, Request, NextFunction } from 'express' import { TwitchAPI } from '../util/scraping/extractor'; import ws, { WebSocket } from 'ws'; -import { TwitchChat } from '../util/scraping/chat/chat'; import { logger } from '../util/logger'; +import { TwitchChatServer } from './chatIrcProxy' const proxyRouter = Router(); const twitch = new TwitchAPI() @@ -72,77 +72,8 @@ proxyRouter.get('/stream/segment/:encodedUrl', async (req: Request, res: Respons -// IRC PROXY -interface ExtWebSocket extends WebSocket { - id: string; -} - -const chat = new TwitchChat({ - login: { - username: 'justinfan23423', - password: 'none' - }, - channels: [] -}) -chat.connect() - - const clients : { [k:string]: ExtWebSocket[] } = {} -import { randomUUID } from 'crypto'; - -const findClientsForStreamer = async (streamerName: string) => { - if(!clients[streamerName]) return Promise.reject(new Error('No clients following streamer')) - - return clients[streamerName] -} - +const twitchChatServer = new TwitchChatServer(); export const wsServer = new ws.Server({ noServer: true }); -wsServer.on('connection', (ws: ExtWebSocket) => { - const socket = ws as ExtWebSocket - socket.on('message', (message) => { - const data = message.toString() - const splitted = data.split(' ') - - if(splitted.length > 2) socket.close() - if(splitted[0] !== 'JOIN') socket.close() - - const streamersToJoin = splitted[1].split(',') - if(streamersToJoin.length > 1) socket.close() - - const id = randomUUID() - for (let streamer of streamersToJoin) { - chat.addStreamer(streamer) - - if(clients[streamer]) { - clients[streamer].push(socket) - } else { - clients[streamer] = [socket] - } - } - socket.id = id - socket.send('OK') - - - }); - - socket.on('close', () => { - if(socket.id) { - } - }) -}); - -chat.on('PRIVMSG', async (username, type, channel, message) => { - const socketsToSend = await findClientsForStreamer(channel) - for(let socket of socketsToSend) { - let payload = { - username, - type, - channel, - message - } - socket.send(JSON.stringify(payload)) - } -}) - - +twitchChatServer.startWebSocketServer(wsServer); export default proxyRouter \ No newline at end of file diff --git a/types/scraping/Chat.ts b/types/scraping/Chat.ts index 6d2fbdb..cdee99e 100644 --- a/types/scraping/Chat.ts +++ b/types/scraping/Chat.ts @@ -14,4 +14,5 @@ export interface Metadata { messageType: MessageType channel: string message: string + tags: { [k:string]:any } } diff --git a/util/scraping/chat/chat.ts b/util/scraping/chat/chat.ts index 2f7bcf3..d163030 100644 --- a/util/scraping/chat/chat.ts +++ b/util/scraping/chat/chat.ts @@ -1,92 +1,172 @@ import { EventEmitter } from 'stream'; -import WebSocket from 'ws' -import { TwitchChatOptions, Metadata, MessageType, MessageTypes } from '../../../types/scraping/Chat' +import WebSocket from 'ws'; +import { TwitchChatOptions, Metadata, MessageType, MessageTypes } from '../../../types/scraping/Chat'; import { parseUsername } from './utils'; import { logger } from '../../logger'; export declare interface TwitchChat { - on(event: 'PRIVMSG', listener: (username: string, messageType: MessageType, channel: string, message: string) => void): this + on(event: 'PRIVMSG', listener: (username: string, messageType: MessageType, channel: string, message: string, tags: Record) => void): this; + on(event: 'CLEARCHAT', listener: (channel: string, username: string, tags: Record) => void): this; + on(event: 'CLEARMSG', listener: (channel: string, messageID: string, tags: Record) => void): this; + on(event: 'GLOBALUSERSTATE', listener: () => void): this; + on(event: 'HOSTTARGET', listener: (channel: string, viewers: number, targetChannel: string, tags: Record) => void): this; + on(event: 'NOTICE', listener: (channel: string, message: string, messageType: MessageType, tags: Record) => void): this; + on(event: 'RECONNECT', listener: () => void): this; + on(event: 'ROOMSTATE', listener: (channel: string, isLive: boolean, tags: Record) => void): this; + on(event: 'USERNOTICE', listener: (channel: string, messageType: MessageType, message: string, tags: Record) => void): this; + on(event: 'USERSTATE', listener: (channel: string, tags: Record) => void): this; + on(event: 'WHISPER', listener: (username: string, message: string, tags: Record) => void): this; } -export class TwitchChat extends EventEmitter{ - public channels: string[] - private url = 'wss://irc-ws.chat.twitch.tv:443' - private ws: WebSocket | null; - private isConnected: boolean = false - private manualDisconnect: boolean = false +export class TwitchChat extends EventEmitter { + private channels: string[]; + private readonly url = 'wss://irc-ws.chat.twitch.tv:443'; + private ws: WebSocket | null = null; + private isConnected = false; + private manualDisconnect = false; - constructor(options: TwitchChatOptions) { - super() - this.channels = options.channels - this.ws = null + constructor(options: TwitchChatOptions) { + super(); + this.channels = options.channels; + } + + private parser(message: string) { + + /* + EXAMPLE MESSAGE + + */ + + const splitted = message.split(' '); + let unparsedTags = splitted[0] + let messageData = splitted.slice(1) + + const metadata: Metadata = { + username: parseUsername(messageData[0]), + messageType: messageData[1], + channel: messageData[2]?.replace('#', '') || '', + message: messageData.slice(3, messageData.length).join(" ").replace(':', ''), + tags: {}, + }; + + const tags = unparsedTags.split(';').reduce((prev: Record, curr: string) => { + const [key, value] = curr.split('='); + prev[key] = value; + return prev; + }, {}); + metadata.tags = tags; + + switch (metadata.messageType) { + case 'PRIVMSG': + this.emit('PRIVMSG', metadata.username, metadata.messageType, metadata.channel, metadata.message, tags); + break; + case 'CLEARCHAT': + this.emit('CLEARCHAT', metadata.channel, metadata.username, tags); + break; + case 'CLEARMSG': + const messageID = metadata.tags['target-msg-id'] || ''; + this.emit('CLEARMSG', metadata.channel, messageID, tags); + break; + case 'GLOBALUSERSTATE': + this.emit('GLOBALUSERSTATE'); + break; + case 'HOSTTARGET': + const [targetChannel, viewersStr] = metadata.message.split(' '); + const viewers = parseInt(viewersStr); + this.emit('HOSTTARGET', metadata.channel, viewers, targetChannel, tags); + break; + case 'NOTICE': + this.emit('NOTICE', metadata.channel, metadata.message, metadata.tags['msg-id'] || '', tags); + break; + case 'RECONNECT': + this.emit('RECONNECT'); + break; + case 'ROOMSTATE': + const isLive = metadata.tags['room-id'] ? true : false; + this.emit('ROOMSTATE', metadata.channel, isLive, metadata.tags, tags); + break; + case 'USERNOTICE': + this.emit('USERNOTICE', metadata.channel, metadata.tags['msg-id'] || '', metadata.message, metadata.tags); + break; + case 'USERSTATE': + this.emit('USERSTATE', metadata.channel, metadata.tags); + break; + case 'WHISPER': + this.emit('WHISPER', metadata.username, metadata.message, metadata.tags); + break; + } + } + + private connectToTwitch() { + this.ws = new WebSocket(this.url); + + this.ws.onclose = () => { + logger.info('Disconnected from Twitch IRC'); + logger.info(`Subscribed to channels ${this.channels}`); + + if (this.manualDisconnect) { + return; + } + + const disconnectMessage = { + type: 'SERVERMSG', + message: 'Disconnected', + }; + + this.emit(JSON.stringify(disconnectMessage)); + + this.ws = null; + this.isConnected = false; + this.connectToTwitch(); + }; + + this.ws.onopen = () => { + if (!this.ws) { + return; + } + + this.ws.send('CAP REQ :twitch.tv/membership'); + this.ws.send('CAP REQ :twitch.tv/tags'); + this.ws.send('CAP REQ :twitch.tv/commands'); + this.ws.send('PASS none'); + this.ws.send('NICK justinfan333333333333'); + + for (const channel of this.channels) { + this.ws.send(`JOIN #${channel}`); + } + + this.ws.on('message', (data) => { + const message = data.toString(); + this.parser(message); + }); + + this.isConnected = true; + }; + } + + public async connect() { + this.connectToTwitch(); + } + + public addStreamer(streamerName: string): void { + if (!this.isConnected || this.channels.includes(streamerName)) { + return; } - private parser() { - this.ws?.on('message', (data) => { - console.log(this.channels) - let normalData = data.toString() - let splitted = normalData.split(":") - - let metadata = splitted[1].split(' ') - let message = splitted[2] + this.channels.push(streamerName); + this.ws?.send(`JOIN #${streamerName}`); + } - if(!MessageTypes.includes(metadata[1])) return; - - let parsedMetadata: Metadata = { - username: parseUsername(metadata[0]), - messageType: metadata[1], - channel: metadata[2].replace('#', ''), - message: message - } - - this.createEmit(parsedMetadata) - }) + public removeStreamer(streamerName: string): void { + if (!this.isConnected) { + return; } - private createEmit(data: Metadata) { - this.emit(data.messageType, ...Object.values(data)) + const index = this.channels.indexOf(streamerName); + + if (index >= 0) { + this.channels.splice(index, 1); + this.ws?.send(`PART #${streamerName}`); } - - public async connect() { - console.log('ss') - this.ws = new WebSocket(this.url) - this.isConnected = true - - this.ws.onclose = () => { - logger.info('Disconnected from twitch IRC'), - logger.info(`Subscribed to channels ${this.channels}`) - if(this.manualDisconnect) return - const toEmit = { - type: 'SERVERMSG', - message: 'Disconnected' - } - this.emit(JSON.stringify(toEmit)) - - this.ws = null - this.isConnected = false - this.connect() - } - - this.ws.onopen = () => { - if(this.ws) { - this.ws.send('PASS none') - this.ws.send('NICK justinfan333333333333') - - for(let channel of this.channels) { - this.ws.send(`JOIN #${channel}`) - } - - this.parser() - return Promise.resolve() - } - } - - } - - public addStreamer(streamerName: string) { - if(!this.isConnected) return; - this.channels.push(streamerName) - this.ws!.send(`JOIN #${streamerName}`) - } - + } } diff --git a/util/scraping/chat/utils.ts b/util/scraping/chat/utils.ts index cadfcd0..8affb39 100644 --- a/util/scraping/chat/utils.ts +++ b/util/scraping/chat/utils.ts @@ -1,4 +1,4 @@ export const parseUsername = (rawUsername: String) => { const splitted = rawUsername.split('!') - return splitted[0] + return splitted[0].replace(':', '') } \ No newline at end of file