mirror of
https://codeberg.org/SafeTwitch/safetwitch-backend.git
synced 2025-01-18 02:12:28 -05:00
Refactor and add improvements
This commit is contained in:
parent
68a33c41ca
commit
057f475fc5
5 changed files with 345 additions and 150 deletions
183
routes/chatIrcProxy.ts
Normal file
183
routes/chatIrcProxy.ts
Normal file
|
@ -0,0 +1,183 @@
|
|||
import { randomUUID } from 'crypto';
|
||||
import ws from 'ws';
|
||||
import { TwitchChat } from '../util/scraping/chat/chat';
|
||||
|
||||
interface ExtWebSocket extends ws {
|
||||
id?: string;
|
||||
}
|
||||
|
||||
export class TwitchChatServer {
|
||||
private chat: TwitchChat;
|
||||
private clients: { [k:string]: ExtWebSocket[] };
|
||||
|
||||
constructor() {
|
||||
this.clients = {};
|
||||
this.chat = new TwitchChat({
|
||||
login: {
|
||||
username: 'justinfan23423',
|
||||
password: 'none'
|
||||
},
|
||||
channels: []
|
||||
});
|
||||
this.chat.connect()
|
||||
this.chat.on('PRIVMSG', this.handlePrivateMessage.bind(this));
|
||||
this.chat.on('CLEARCHAT', this.handleClearChat.bind(this));
|
||||
this.chat.on('CLEARMSG', this.handleClearMessage.bind(this));
|
||||
this.chat.on('GLOBALUSERSTATE', this.handleGlobalUserState.bind(this));
|
||||
this.chat.on('HOSTTARGET', this.handleHostTarget.bind(this));
|
||||
this.chat.on('NOTICE', this.handleNotice.bind(this));
|
||||
this.chat.on('RECONNECT', this.handleReconnect.bind(this));
|
||||
this.chat.on('ROOMSTATE', this.handleRoomState.bind(this));
|
||||
this.chat.on('USERNOTICE', this.handleUserNotice.bind(this));
|
||||
this.chat.on('USERSTATE', this.handleUserState.bind(this));
|
||||
this.chat.on('WHISPER', this.handleWhisper.bind(this));
|
||||
}
|
||||
|
||||
public async startWebSocketServer(server: ws.Server): Promise<void> {
|
||||
server.on('connection', (ws: ws) => {
|
||||
const socket = ws as ExtWebSocket
|
||||
socket.on('message', this.handleWebSocketMessage.bind(this, socket));
|
||||
socket.on('close', this.handleWebSocketClose.bind(this, socket));
|
||||
});
|
||||
}
|
||||
|
||||
private handlePrivateMessage = async(username: string, type: string, channel: string, message: string, tags: Record<string,string>) => {
|
||||
const socketsToSend = await this.findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
type: 'PRIVMSG',
|
||||
username,
|
||||
channel,
|
||||
message,
|
||||
tags
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
}
|
||||
|
||||
private handleClearChat = async(channel: string, username: string, tags: Record<string,string>) => {
|
||||
const socketsToSend = await this.findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
type: 'CLEARCHAT',
|
||||
username,
|
||||
channel,
|
||||
tags
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
}
|
||||
|
||||
private handleClearMessage = async(channel: string, messageId: string, tags: Record<string,string>) => {
|
||||
const socketsToSend = await this.findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
type: 'CLEARMSG',
|
||||
channel,
|
||||
tags
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
}
|
||||
|
||||
private handleGlobalUserState = async() => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private handleHostTarget = async(channel: string, viewers: number, target: string, tags: Record<string,string>) => {
|
||||
const socketsToSend = await this.findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
type: 'HOSTTARGET',
|
||||
channel,
|
||||
target,
|
||||
viewers,
|
||||
tags
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
}
|
||||
|
||||
private handleNotice = async(channel: string, message: string) => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private handleReconnect = async() => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private handleRoomState = async(channel: string, isLive: boolean, tags: Record<string, string>) => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private handleUserNotice = async(channel: string, type: string, message: string, tags: { [id: string]: string }) => {
|
||||
const socketsToSend = await this.findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
type: 'USERNOTICE',
|
||||
channel,
|
||||
message,
|
||||
tags
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
}
|
||||
|
||||
private handleUserState = async(channel: string, tags: { [id: string]: string }) => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private handleWhisper = async(username: string, message: string, tags: { [id: string]: string }) => {
|
||||
// Do nothing for now
|
||||
}
|
||||
|
||||
private async findClientsForStreamer(streamerName: string): Promise<ExtWebSocket[]> {
|
||||
if(!this.clients[streamerName]) return Promise.reject('No clients following streamer')
|
||||
|
||||
return this.clients[streamerName];
|
||||
}
|
||||
|
||||
private async handleWebSocketMessage(socket: ExtWebSocket, message: ws.Data) {
|
||||
const data = message.toString()
|
||||
const splitted = data.split(' ')
|
||||
|
||||
if (splitted.length < 2 || splitted[0] !== 'JOIN') {
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
const streamersToJoin = splitted[1].split(',')
|
||||
if (streamersToJoin.length > 1) {
|
||||
socket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
const id = randomUUID()
|
||||
for (let streamer of streamersToJoin) {
|
||||
this.chat.addStreamer(streamer)
|
||||
|
||||
if (this.clients[streamer]) {
|
||||
this.clients[streamer].push(socket)
|
||||
} else {
|
||||
this.clients[streamer] = [socket]
|
||||
}
|
||||
}
|
||||
|
||||
socket.id = id
|
||||
socket.send('OK')
|
||||
}
|
||||
|
||||
private handleWebSocketClose(socket: ExtWebSocket) {
|
||||
if (socket.id) {
|
||||
// Remove the socket from the list of clients following each streamer
|
||||
Object.entries(this.clients).forEach(([streamer, sockets]) => {
|
||||
this.clients[streamer] = sockets.filter(s => s !== socket);
|
||||
if (this.clients[streamer].length === 0) {
|
||||
// No more clients following this streamer, remove it from the chat
|
||||
this.chat.removeStreamer(streamer);
|
||||
delete this.clients[streamer];
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
import { Router, Response, Request, NextFunction } from 'express'
|
||||
import { TwitchAPI } from '../util/scraping/extractor';
|
||||
import ws, { WebSocket } from 'ws';
|
||||
import { TwitchChat } from '../util/scraping/chat/chat';
|
||||
import { logger } from '../util/logger';
|
||||
import { TwitchChatServer } from './chatIrcProxy'
|
||||
|
||||
const proxyRouter = Router();
|
||||
const twitch = new TwitchAPI()
|
||||
|
@ -72,77 +72,8 @@ proxyRouter.get('/stream/segment/:encodedUrl', async (req: Request, res: Respons
|
|||
|
||||
|
||||
|
||||
// IRC PROXY
|
||||
interface ExtWebSocket extends WebSocket {
|
||||
id: string;
|
||||
}
|
||||
|
||||
const chat = new TwitchChat({
|
||||
login: {
|
||||
username: 'justinfan23423',
|
||||
password: 'none'
|
||||
},
|
||||
channels: []
|
||||
})
|
||||
chat.connect()
|
||||
|
||||
const clients : { [k:string]: ExtWebSocket[] } = {}
|
||||
import { randomUUID } from 'crypto';
|
||||
|
||||
const findClientsForStreamer = async (streamerName: string) => {
|
||||
if(!clients[streamerName]) return Promise.reject(new Error('No clients following streamer'))
|
||||
|
||||
return clients[streamerName]
|
||||
}
|
||||
|
||||
const twitchChatServer = new TwitchChatServer();
|
||||
export const wsServer = new ws.Server({ noServer: true });
|
||||
wsServer.on('connection', (ws: ExtWebSocket) => {
|
||||
const socket = ws as ExtWebSocket
|
||||
socket.on('message', (message) => {
|
||||
const data = message.toString()
|
||||
const splitted = data.split(' ')
|
||||
|
||||
if(splitted.length > 2) socket.close()
|
||||
if(splitted[0] !== 'JOIN') socket.close()
|
||||
|
||||
const streamersToJoin = splitted[1].split(',')
|
||||
if(streamersToJoin.length > 1) socket.close()
|
||||
|
||||
const id = randomUUID()
|
||||
for (let streamer of streamersToJoin) {
|
||||
chat.addStreamer(streamer)
|
||||
|
||||
if(clients[streamer]) {
|
||||
clients[streamer].push(socket)
|
||||
} else {
|
||||
clients[streamer] = [socket]
|
||||
}
|
||||
}
|
||||
socket.id = id
|
||||
socket.send('OK')
|
||||
|
||||
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
if(socket.id) {
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
chat.on('PRIVMSG', async (username, type, channel, message) => {
|
||||
const socketsToSend = await findClientsForStreamer(channel)
|
||||
for(let socket of socketsToSend) {
|
||||
let payload = {
|
||||
username,
|
||||
type,
|
||||
channel,
|
||||
message
|
||||
}
|
||||
socket.send(JSON.stringify(payload))
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
twitchChatServer.startWebSocketServer(wsServer);
|
||||
|
||||
export default proxyRouter
|
|
@ -14,4 +14,5 @@ export interface Metadata {
|
|||
messageType: MessageType
|
||||
channel: string
|
||||
message: string
|
||||
tags: { [k:string]:any }
|
||||
}
|
||||
|
|
|
@ -1,92 +1,172 @@
|
|||
import { EventEmitter } from 'stream';
|
||||
import WebSocket from 'ws'
|
||||
import { TwitchChatOptions, Metadata, MessageType, MessageTypes } from '../../../types/scraping/Chat'
|
||||
import WebSocket from 'ws';
|
||||
import { TwitchChatOptions, Metadata, MessageType, MessageTypes } from '../../../types/scraping/Chat';
|
||||
import { parseUsername } from './utils';
|
||||
import { logger } from '../../logger';
|
||||
|
||||
export declare interface TwitchChat {
|
||||
on(event: 'PRIVMSG', listener: (username: string, messageType: MessageType, channel: string, message: string) => void): this
|
||||
on(event: 'PRIVMSG', listener: (username: string, messageType: MessageType, channel: string, message: string, tags: Record<string,string>) => void): this;
|
||||
on(event: 'CLEARCHAT', listener: (channel: string, username: string, tags: Record<string,string>) => void): this;
|
||||
on(event: 'CLEARMSG', listener: (channel: string, messageID: string, tags: Record<string,string>) => void): this;
|
||||
on(event: 'GLOBALUSERSTATE', listener: () => void): this;
|
||||
on(event: 'HOSTTARGET', listener: (channel: string, viewers: number, targetChannel: string, tags: Record<string,string>) => void): this;
|
||||
on(event: 'NOTICE', listener: (channel: string, message: string, messageType: MessageType, tags: Record<string,string>) => void): this;
|
||||
on(event: 'RECONNECT', listener: () => void): this;
|
||||
on(event: 'ROOMSTATE', listener: (channel: string, isLive: boolean, tags: Record<string, string>) => void): this;
|
||||
on(event: 'USERNOTICE', listener: (channel: string, messageType: MessageType, message: string, tags: Record<string, string>) => void): this;
|
||||
on(event: 'USERSTATE', listener: (channel: string, tags: Record<string, string>) => void): this;
|
||||
on(event: 'WHISPER', listener: (username: string, message: string, tags: Record<string, string>) => void): this;
|
||||
}
|
||||
|
||||
export class TwitchChat extends EventEmitter{
|
||||
public channels: string[]
|
||||
private url = 'wss://irc-ws.chat.twitch.tv:443'
|
||||
private ws: WebSocket | null;
|
||||
private isConnected: boolean = false
|
||||
private manualDisconnect: boolean = false
|
||||
export class TwitchChat extends EventEmitter {
|
||||
private channels: string[];
|
||||
private readonly url = 'wss://irc-ws.chat.twitch.tv:443';
|
||||
private ws: WebSocket | null = null;
|
||||
private isConnected = false;
|
||||
private manualDisconnect = false;
|
||||
|
||||
constructor(options: TwitchChatOptions) {
|
||||
super()
|
||||
this.channels = options.channels
|
||||
this.ws = null
|
||||
constructor(options: TwitchChatOptions) {
|
||||
super();
|
||||
this.channels = options.channels;
|
||||
}
|
||||
|
||||
private parser(message: string) {
|
||||
|
||||
/*
|
||||
EXAMPLE MESSAGE
|
||||
|
||||
*/
|
||||
|
||||
const splitted = message.split(' ');
|
||||
let unparsedTags = splitted[0]
|
||||
let messageData = splitted.slice(1)
|
||||
|
||||
const metadata: Metadata = {
|
||||
username: parseUsername(messageData[0]),
|
||||
messageType: messageData[1],
|
||||
channel: messageData[2]?.replace('#', '') || '',
|
||||
message: messageData.slice(3, messageData.length).join(" ").replace(':', ''),
|
||||
tags: {},
|
||||
};
|
||||
|
||||
const tags = unparsedTags.split(';').reduce((prev: Record<string, string>, curr: string) => {
|
||||
const [key, value] = curr.split('=');
|
||||
prev[key] = value;
|
||||
return prev;
|
||||
}, {});
|
||||
metadata.tags = tags;
|
||||
|
||||
switch (metadata.messageType) {
|
||||
case 'PRIVMSG':
|
||||
this.emit('PRIVMSG', metadata.username, metadata.messageType, metadata.channel, metadata.message, tags);
|
||||
break;
|
||||
case 'CLEARCHAT':
|
||||
this.emit('CLEARCHAT', metadata.channel, metadata.username, tags);
|
||||
break;
|
||||
case 'CLEARMSG':
|
||||
const messageID = metadata.tags['target-msg-id'] || '';
|
||||
this.emit('CLEARMSG', metadata.channel, messageID, tags);
|
||||
break;
|
||||
case 'GLOBALUSERSTATE':
|
||||
this.emit('GLOBALUSERSTATE');
|
||||
break;
|
||||
case 'HOSTTARGET':
|
||||
const [targetChannel, viewersStr] = metadata.message.split(' ');
|
||||
const viewers = parseInt(viewersStr);
|
||||
this.emit('HOSTTARGET', metadata.channel, viewers, targetChannel, tags);
|
||||
break;
|
||||
case 'NOTICE':
|
||||
this.emit('NOTICE', metadata.channel, metadata.message, metadata.tags['msg-id'] || '', tags);
|
||||
break;
|
||||
case 'RECONNECT':
|
||||
this.emit('RECONNECT');
|
||||
break;
|
||||
case 'ROOMSTATE':
|
||||
const isLive = metadata.tags['room-id'] ? true : false;
|
||||
this.emit('ROOMSTATE', metadata.channel, isLive, metadata.tags, tags);
|
||||
break;
|
||||
case 'USERNOTICE':
|
||||
this.emit('USERNOTICE', metadata.channel, metadata.tags['msg-id'] || '', metadata.message, metadata.tags);
|
||||
break;
|
||||
case 'USERSTATE':
|
||||
this.emit('USERSTATE', metadata.channel, metadata.tags);
|
||||
break;
|
||||
case 'WHISPER':
|
||||
this.emit('WHISPER', metadata.username, metadata.message, metadata.tags);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private connectToTwitch() {
|
||||
this.ws = new WebSocket(this.url);
|
||||
|
||||
this.ws.onclose = () => {
|
||||
logger.info('Disconnected from Twitch IRC');
|
||||
logger.info(`Subscribed to channels ${this.channels}`);
|
||||
|
||||
if (this.manualDisconnect) {
|
||||
return;
|
||||
}
|
||||
|
||||
const disconnectMessage = {
|
||||
type: 'SERVERMSG',
|
||||
message: 'Disconnected',
|
||||
};
|
||||
|
||||
this.emit(JSON.stringify(disconnectMessage));
|
||||
|
||||
this.ws = null;
|
||||
this.isConnected = false;
|
||||
this.connectToTwitch();
|
||||
};
|
||||
|
||||
this.ws.onopen = () => {
|
||||
if (!this.ws) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.ws.send('CAP REQ :twitch.tv/membership');
|
||||
this.ws.send('CAP REQ :twitch.tv/tags');
|
||||
this.ws.send('CAP REQ :twitch.tv/commands');
|
||||
this.ws.send('PASS none');
|
||||
this.ws.send('NICK justinfan333333333333');
|
||||
|
||||
for (const channel of this.channels) {
|
||||
this.ws.send(`JOIN #${channel}`);
|
||||
}
|
||||
|
||||
this.ws.on('message', (data) => {
|
||||
const message = data.toString();
|
||||
this.parser(message);
|
||||
});
|
||||
|
||||
this.isConnected = true;
|
||||
};
|
||||
}
|
||||
|
||||
public async connect() {
|
||||
this.connectToTwitch();
|
||||
}
|
||||
|
||||
public addStreamer(streamerName: string): void {
|
||||
if (!this.isConnected || this.channels.includes(streamerName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
private parser() {
|
||||
this.ws?.on('message', (data) => {
|
||||
console.log(this.channels)
|
||||
let normalData = data.toString()
|
||||
let splitted = normalData.split(":")
|
||||
this.channels.push(streamerName);
|
||||
this.ws?.send(`JOIN #${streamerName}`);
|
||||
}
|
||||
|
||||
let metadata = splitted[1].split(' ')
|
||||
let message = splitted[2]
|
||||
|
||||
if(!MessageTypes.includes(metadata[1])) return;
|
||||
|
||||
let parsedMetadata: Metadata = {
|
||||
username: parseUsername(metadata[0]),
|
||||
messageType: metadata[1],
|
||||
channel: metadata[2].replace('#', ''),
|
||||
message: message
|
||||
}
|
||||
|
||||
this.createEmit(parsedMetadata)
|
||||
})
|
||||
public removeStreamer(streamerName: string): void {
|
||||
if (!this.isConnected) {
|
||||
return;
|
||||
}
|
||||
|
||||
private createEmit(data: Metadata) {
|
||||
this.emit(data.messageType, ...Object.values(data))
|
||||
const index = this.channels.indexOf(streamerName);
|
||||
|
||||
if (index >= 0) {
|
||||
this.channels.splice(index, 1);
|
||||
this.ws?.send(`PART #${streamerName}`);
|
||||
}
|
||||
|
||||
public async connect() {
|
||||
console.log('ss')
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.isConnected = true
|
||||
|
||||
this.ws.onclose = () => {
|
||||
logger.info('Disconnected from twitch IRC'),
|
||||
logger.info(`Subscribed to channels ${this.channels}`)
|
||||
if(this.manualDisconnect) return
|
||||
const toEmit = {
|
||||
type: 'SERVERMSG',
|
||||
message: 'Disconnected'
|
||||
}
|
||||
this.emit(JSON.stringify(toEmit))
|
||||
|
||||
this.ws = null
|
||||
this.isConnected = false
|
||||
this.connect()
|
||||
}
|
||||
|
||||
this.ws.onopen = () => {
|
||||
if(this.ws) {
|
||||
this.ws.send('PASS none')
|
||||
this.ws.send('NICK justinfan333333333333')
|
||||
|
||||
for(let channel of this.channels) {
|
||||
this.ws.send(`JOIN #${channel}`)
|
||||
}
|
||||
|
||||
this.parser()
|
||||
return Promise.resolve()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public addStreamer(streamerName: string) {
|
||||
if(!this.isConnected) return;
|
||||
this.channels.push(streamerName)
|
||||
this.ws!.send(`JOIN #${streamerName}`)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
export const parseUsername = (rawUsername: String) => {
|
||||
const splitted = rawUsername.split('!')
|
||||
return splitted[0]
|
||||
return splitted[0].replace(':', '')
|
||||
}
|
Loading…
Add table
Reference in a new issue