diff --git a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts index c87342013..b0df92f25 100644 --- a/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts +++ b/src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts @@ -99,6 +99,7 @@ import makeWASocket, { Chat, ConnectionState, Contact, + decryptPollVote, delay, DisconnectReason, downloadContentFromMessage, @@ -113,6 +114,7 @@ import makeWASocket, { isJidGroup, isJidNewsletter, isPnUser, + jidNormalizedUser, makeCacheableSignalKeyStore, MessageUpsertType, MessageUserReceiptUpdate, @@ -133,6 +135,7 @@ import { Label } from 'baileys/lib/Types/Label'; import { LabelAssociation } from 'baileys/lib/Types/LabelAssociation'; import { spawn } from 'child_process'; import { isArray, isBase64, isURL } from 'class-validator'; +import { createHash } from 'crypto'; import EventEmitter2 from 'eventemitter2'; import ffmpeg from 'fluent-ffmpeg'; import FormData from 'form-data'; @@ -247,6 +250,7 @@ export class BaileysStartupService extends ChannelStartupService { private readonly userDevicesCache: CacheStore = new NodeCache({ stdTTL: 300000, useClones: false }); private endSession = false; private logBaileys = this.configService.get('LOG').BAILEYS; + private eventProcessingQueue: Promise = Promise.resolve(); // Cache TTL constants (in seconds) private readonly MESSAGE_CACHE_TTL_SECONDS = 5 * 60; // 5 minutes - avoid duplicate message processing @@ -1121,6 +1125,11 @@ export class BaileysStartupService extends ChannelStartupService { ); await this.sendDataWebhook(Events.MESSAGES_EDITED, editedMessage); + + if (received.key?.id && editedMessage.key?.id) { + await this.baileysCache.set(`protocol_${received.key.id}`, editedMessage.key.id, 60 * 60 * 24); + } + const oldMessage = await this.getMessage(editedMessage.key, true); if ((oldMessage as any)?.id) { const editedMessageTimestamp = Long.isLong(received?.messageTimestamp) @@ -1148,12 +1157,7 @@ export class BaileysStartupService extends ChannelStartupService { } } - if ( - (type !== 'notify' && type !== 'append') || - editedMessage || - received.message?.pollUpdateMessage || - !received?.message - ) { + if ((type !== 'notify' && type !== 'append') || editedMessage || !received?.message) { continue; } @@ -1193,6 +1197,107 @@ export class BaileysStartupService extends ChannelStartupService { const messageRaw = this.prepareMessage(received); + if (messageRaw.messageType === 'pollUpdateMessage') { + const pollCreationKey = messageRaw.message.pollUpdateMessage.pollCreationMessageKey; + const pollMessage = (await this.getMessage(pollCreationKey, true)) as proto.IWebMessageInfo; + const pollMessageSecret = (await this.getMessage(pollCreationKey)) as any; + + if (pollMessage) { + const pollOptions = + (pollMessage.message as any).pollCreationMessage?.options || + (pollMessage.message as any).pollCreationMessageV3?.options || + []; + const pollVote = messageRaw.message.pollUpdateMessage.vote; + + const voterJid = received.key.fromMe + ? this.instance.wuid + : received.key.participant || received.key.remoteJid; + + let pollEncKey = pollMessageSecret?.messageContextInfo?.messageSecret; + + let successfulVoterJid = voterJid; + + if (typeof pollEncKey === 'string') { + pollEncKey = Buffer.from(pollEncKey, 'base64'); + } else if (pollEncKey?.type === 'Buffer' && Array.isArray(pollEncKey.data)) { + pollEncKey = Buffer.from(pollEncKey.data); + } + + if (Buffer.isBuffer(pollEncKey) && pollEncKey.length === 44) { + pollEncKey = Buffer.from(pollEncKey.toString('utf8'), 'base64'); + } + + if (pollVote.encPayload && pollEncKey) { + const creatorCandidates = [ + this.instance.wuid, + this.client.user?.lid, + pollMessage.key.participant, + (pollMessage.key as any).participantAlt, + pollMessage.key.remoteJid, + ]; + + const key = received.key as any; + const voterCandidates = [ + this.instance.wuid, + this.client.user?.lid, + key.participant, + key.participantAlt, + key.remoteJidAlt, + key.remoteJid, + ]; + + const uniqueCreators = [ + ...new Set(creatorCandidates.filter(Boolean).map((id) => jidNormalizedUser(id))), + ]; + const uniqueVoters = [...new Set(voterCandidates.filter(Boolean).map((id) => jidNormalizedUser(id)))]; + + let decryptedVote; + + for (const creator of uniqueCreators) { + for (const voter of uniqueVoters) { + try { + decryptedVote = decryptPollVote(pollVote, { + pollCreatorJid: creator, + pollMsgId: pollMessage.key.id, + pollEncKey, + voterJid: voter, + } as any); + if (decryptedVote) { + successfulVoterJid = voter; + break; + } + } catch { + // Continue trying + } + } + if (decryptedVote) break; + } + + if (decryptedVote) { + Object.assign(pollVote, decryptedVote); + } + } + + const selectedOptions = pollVote?.selectedOptions || []; + + const selectedOptionNames = pollOptions + .filter((option) => { + const hash = createHash('sha256').update(option.optionName).digest(); + return selectedOptions.some((selected) => Buffer.compare(selected, hash) === 0); + }) + .map((option) => option.optionName); + + messageRaw.message.pollUpdateMessage.vote.selectedOptions = selectedOptionNames; + + const pollUpdates = pollOptions.map((option) => ({ + name: option.optionName, + voters: selectedOptionNames.includes(option.optionName) ? [successfulVoterJid] : [], + })); + + messageRaw.pollUpdates = pollUpdates; + } + } + const isMedia = received?.message?.imageMessage || received?.message?.videoMessage || @@ -1242,7 +1347,9 @@ export class BaileysStartupService extends ChannelStartupService { } if (this.configService.get('DATABASE').SAVE_DATA.NEW_MESSAGE) { - const msg = await this.prismaRepository.message.create({ data: messageRaw }); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { pollUpdates, ...messageData } = messageRaw; + const msg = await this.prismaRepository.message.create({ data: messageData }); const { remoteJid } = received.key; const timestamp = msg.messageTimestamp; @@ -1447,18 +1554,26 @@ export class BaileysStartupService extends ChannelStartupService { continue; } - if (update.message !== null && update.status === undefined) continue; - const updateKey = `${this.instance.id}_${key.id}_${update.status}`; const cached = await this.baileysCache.get(updateKey); - if (cached) { + const secondsSinceEpoch = Math.floor(Date.now() / 1000); + console.log('CACHE:', { cached, updateKey, messageTimestamp: update.messageTimestamp, secondsSinceEpoch }); + + if ( + (update.messageTimestamp && update.messageTimestamp === cached) || + (!update.messageTimestamp && secondsSinceEpoch === cached) + ) { this.logger.info(`Update Message duplicated ignored [avoid deadlock]: ${updateKey}`); continue; } - await this.baileysCache.set(updateKey, true, 30 * 60); + if (update.messageTimestamp) { + await this.baileysCache.set(updateKey, update.messageTimestamp, 30 * 60); + } else { + await this.baileysCache.set(updateKey, secondsSinceEpoch, 30 * 60); + } if (status[update.status] === 'READ' && key.fromMe) { if (this.configService.get('CHATWOOT').ENABLED && this.localChatwoot?.enabled) { @@ -1489,19 +1604,32 @@ export class BaileysStartupService extends ChannelStartupService { remoteJid: key?.remoteJid, fromMe: key.fromMe, participant: key?.participant, - status: status[update.status] ?? 'DELETED', + status: status[update.status] ?? 'SERVER_ACK', pollUpdates, instanceId: this.instanceId, }; + if (update.message) { + message.message = update.message; + } + let findMessage: any; const configDatabaseData = this.configService.get('DATABASE').SAVE_DATA; if (configDatabaseData.HISTORIC || configDatabaseData.NEW_MESSAGE) { // Use raw SQL to avoid JSON path issues + const protocolMapKey = `protocol_${key.id}`; + const originalMessageId = (await this.baileysCache.get(protocolMapKey)) as string; + + if (originalMessageId) { + message.keyId = originalMessageId; + } + + const searchId = originalMessageId || key.id; + const messages = (await this.prismaRepository.$queryRaw` SELECT * FROM "Message" WHERE "instanceId" = ${this.instanceId} - AND "key"->>'id' = ${key.id} + AND "key"->>'id' = ${searchId} LIMIT 1 `) as any[]; findMessage = messages[0] || null; @@ -1514,7 +1642,7 @@ export class BaileysStartupService extends ChannelStartupService { } if (update.message === null && update.status === undefined) { - this.sendDataWebhook(Events.MESSAGES_DELETE, key); + this.sendDataWebhook(Events.MESSAGES_DELETE, { ...key, status: 'DELETED' }); if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) await this.prismaRepository.messageUpdate.create({ data: message }); @@ -1562,8 +1690,11 @@ export class BaileysStartupService extends ChannelStartupService { this.sendDataWebhook(Events.MESSAGES_UPDATE, message); - if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) - await this.prismaRepository.messageUpdate.create({ data: message }); + if (this.configService.get('DATABASE').SAVE_DATA.MESSAGE_UPDATE) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { message: _msg, ...messageData } = message; + await this.prismaRepository.messageUpdate.create({ data: messageData }); + } const existingChat = await this.prismaRepository.chat.findFirst({ where: { instanceId: this.instanceId, remoteJid: message.remoteJid }, @@ -1614,9 +1745,9 @@ export class BaileysStartupService extends ChannelStartupService { // This enables LID to phoneNumber conversion without breaking existing webhook consumers // Helper to normalize participantId as phone number - const normalizePhoneNumber = (id: string): string => { + const normalizePhoneNumber = (id: string | null | undefined): string => { // Remove @lid, @s.whatsapp.net suffixes and extract just the number part - return id.split('@')[0]; + return String(id || '').split('@')[0]; }; try { @@ -1732,135 +1863,141 @@ export class BaileysStartupService extends ChannelStartupService { private eventHandler() { this.client.ev.process(async (events) => { - if (!this.endSession) { - const database = this.configService.get('DATABASE'); - const settings = await this.findSettings(); + this.eventProcessingQueue = this.eventProcessingQueue.then(async () => { + try { + if (!this.endSession) { + const database = this.configService.get('DATABASE'); + const settings = await this.findSettings(); - if (events.call) { - const call = events.call[0]; + if (events.call) { + const call = events.call[0]; - if (settings?.rejectCall && call.status == 'offer') { - this.client.rejectCall(call.id, call.from); - } + if (settings?.rejectCall && call.status == 'offer') { + this.client.rejectCall(call.id, call.from); + } - if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { - if (call.from.endsWith('@lid')) { - call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); - } - const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); + if (settings?.msgCall?.trim().length > 0 && call.status == 'offer') { + if (call.from.endsWith('@lid')) { + call.from = await this.client.signalRepository.lidMapping.getPNForLID(call.from as string); + } + const msg = await this.client.sendMessage(call.from, { text: settings.msgCall }); - this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); - } + this.client.ev.emit('messages.upsert', { messages: [msg], type: 'notify' }); + } - this.sendDataWebhook(Events.CALL, call); - } + this.sendDataWebhook(Events.CALL, call); + } - if (events['connection.update']) { - this.connectionUpdate(events['connection.update']); - } + if (events['connection.update']) { + this.connectionUpdate(events['connection.update']); + } - if (events['creds.update']) { - this.instance.authState.saveCreds(); - } + if (events['creds.update']) { + this.instance.authState.saveCreds(); + } - if (events['messaging-history.set']) { - const payload = events['messaging-history.set']; - this.messageHandle['messaging-history.set'](payload); - } + if (events['messaging-history.set']) { + const payload = events['messaging-history.set']; + await this.messageHandle['messaging-history.set'](payload); + } - if (events['messages.upsert']) { - const payload = events['messages.upsert']; + if (events['messages.upsert']) { + const payload = events['messages.upsert']; - this.messageProcessor.processMessage(payload, settings); - // this.messageHandle['messages.upsert'](payload, settings); - } + // this.messageProcessor.processMessage(payload, settings); + await this.messageHandle['messages.upsert'](payload, settings); + } - if (events['messages.update']) { - const payload = events['messages.update']; - this.messageHandle['messages.update'](payload, settings); - } + if (events['messages.update']) { + const payload = events['messages.update']; + await this.messageHandle['messages.update'](payload, settings); + } - if (events['message-receipt.update']) { - const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; - const remotesJidMap: Record = {}; + if (events['message-receipt.update']) { + const payload = events['message-receipt.update'] as MessageUserReceiptUpdate[]; + const remotesJidMap: Record = {}; - for (const event of payload) { - if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { - remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; - } - } + for (const event of payload) { + if (typeof event.key.remoteJid === 'string' && typeof event.receipt.readTimestamp === 'number') { + remotesJidMap[event.key.remoteJid] = event.receipt.readTimestamp; + } + } - await Promise.all( - Object.keys(remotesJidMap).map(async (remoteJid) => - this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), - ), - ); - } + await Promise.all( + Object.keys(remotesJidMap).map(async (remoteJid) => + this.updateMessagesReadedByTimestamp(remoteJid, remotesJidMap[remoteJid]), + ), + ); + } - if (events['presence.update']) { - const payload = events['presence.update']; + if (events['presence.update']) { + const payload = events['presence.update']; - if (settings?.groupsIgnore && payload.id.includes('@g.us')) { - return; - } + if (settings?.groupsIgnore && payload.id.includes('@g.us')) { + return; + } - this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); - } + this.sendDataWebhook(Events.PRESENCE_UPDATE, payload); + } - if (!settings?.groupsIgnore) { - if (events['groups.upsert']) { - const payload = events['groups.upsert']; - this.groupHandler['groups.upsert'](payload); - } + if (!settings?.groupsIgnore) { + if (events['groups.upsert']) { + const payload = events['groups.upsert']; + this.groupHandler['groups.upsert'](payload); + } - if (events['groups.update']) { - const payload = events['groups.update']; - this.groupHandler['groups.update'](payload); - } + if (events['groups.update']) { + const payload = events['groups.update']; + this.groupHandler['groups.update'](payload); + } - if (events['group-participants.update']) { - const payload = events['group-participants.update'] as any; - this.groupHandler['group-participants.update'](payload); - } - } + if (events['group-participants.update']) { + const payload = events['group-participants.update'] as any; + this.groupHandler['group-participants.update'](payload); + } + } - if (events['chats.upsert']) { - const payload = events['chats.upsert']; - this.chatHandle['chats.upsert'](payload); - } + if (events['chats.upsert']) { + const payload = events['chats.upsert']; + this.chatHandle['chats.upsert'](payload); + } - if (events['chats.update']) { - const payload = events['chats.update']; - this.chatHandle['chats.update'](payload); - } + if (events['chats.update']) { + const payload = events['chats.update']; + this.chatHandle['chats.update'](payload); + } - if (events['chats.delete']) { - const payload = events['chats.delete']; - this.chatHandle['chats.delete'](payload); - } + if (events['chats.delete']) { + const payload = events['chats.delete']; + this.chatHandle['chats.delete'](payload); + } - if (events['contacts.upsert']) { - const payload = events['contacts.upsert']; - this.contactHandle['contacts.upsert'](payload); - } + if (events['contacts.upsert']) { + const payload = events['contacts.upsert']; + this.contactHandle['contacts.upsert'](payload); + } - if (events['contacts.update']) { - const payload = events['contacts.update']; - this.contactHandle['contacts.update'](payload); - } + if (events['contacts.update']) { + const payload = events['contacts.update']; + this.contactHandle['contacts.update'](payload); + } - if (events[Events.LABELS_ASSOCIATION]) { - const payload = events[Events.LABELS_ASSOCIATION]; - this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); - return; - } + if (events[Events.LABELS_ASSOCIATION]) { + const payload = events[Events.LABELS_ASSOCIATION]; + this.labelHandle[Events.LABELS_ASSOCIATION](payload, database); + return; + } - if (events[Events.LABELS_EDIT]) { - const payload = events[Events.LABELS_EDIT]; - this.labelHandle[Events.LABELS_EDIT](payload); - return; + if (events[Events.LABELS_EDIT]) { + const payload = events[Events.LABELS_EDIT]; + this.labelHandle[Events.LABELS_EDIT](payload); + return; + } + } + } catch (error) { + this.logger.error(error); } - } + }); }); } diff --git a/src/api/integrations/chatbot/base-chatbot.service.ts b/src/api/integrations/chatbot/base-chatbot.service.ts index 11f71b17e..064a2a973 100644 --- a/src/api/integrations/chatbot/base-chatbot.service.ts +++ b/src/api/integrations/chatbot/base-chatbot.service.ts @@ -211,7 +211,7 @@ export abstract class BaseChatbotService { try { if (mediaType === 'audio') { await instance.audioWhatsapp({ - number: remoteJid.split('@')[0], + number: remoteJid, delay: (settings as any)?.delayMessage || 1000, audio: url, caption: altText, @@ -219,7 +219,7 @@ export abstract class BaseChatbotService { } else { await instance.mediaMessage( { - number: remoteJid.split('@')[0], + number: remoteJid, delay: (settings as any)?.delayMessage || 1000, mediatype: mediaType, media: url, @@ -290,7 +290,7 @@ export abstract class BaseChatbotService { setTimeout(async () => { await instance.textMessage( { - number: remoteJid.split('@')[0], + number: remoteJid, delay: settings?.delayMessage || 1000, text: message, linkPreview, diff --git a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts index 3b156c312..906fff188 100644 --- a/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts +++ b/src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts @@ -346,6 +346,16 @@ export class ChatwootService { return contact; } catch (error) { + if ((error.status === 422 || error.response?.status === 422) && jid) { + this.logger.warn(`Contact with identifier ${jid} creation failed (422). Checking if it already exists...`); + const existingContact = await this.findContactByIdentifier(instance, jid); + if (existingContact) { + const contactId = existingContact.id; + await this.addLabelToContact(this.provider.nameInbox, contactId); + return existingContact; + } + } + this.logger.error('Error creating contact'); console.log(error); return null; @@ -415,6 +425,55 @@ export class ChatwootService { } } + public async findContactByIdentifier(instance: InstanceDto, identifier: string) { + const client = await this.clientCw(instance); + + if (!client) { + this.logger.warn('client not found'); + return null; + } + + // Direct search by query (q) - most common way to search by identifier/email/phone + const contact = (await (client as any).get('contacts/search', { + params: { + q: identifier, + sort: 'name', + }, + })) as any; + + if (contact && contact.data && contact.data.payload && contact.data.payload.length > 0) { + return contact.data.payload[0]; + } + + // Fallback for older API versions or different response structures + if (contact && contact.payload && contact.payload.length > 0) { + return contact.payload[0]; + } + + // Try search by attribute + const contactByAttr = (await (client as any).post('contacts/filter', { + payload: [ + { + attribute_key: 'identifier', + filter_operator: 'equal_to', + values: [identifier], + query_operator: null, + }, + ], + })) as any; + + if (contactByAttr && contactByAttr.payload && contactByAttr.payload.length > 0) { + return contactByAttr.payload[0]; + } + + // Check inside data property if using axios interceptors wrapper + if (contactByAttr && contactByAttr.data && contactByAttr.data.payload && contactByAttr.data.payload.length > 0) { + return contactByAttr.data.payload[0]; + } + + return null; + } + public async findContact(instance: InstanceDto, phoneNumber: string) { const client = await this.clientCw(instance); @@ -1574,7 +1633,11 @@ export class ChatwootService { this.logger.verbose(`Update result: ${result} rows affected`); if (this.isImportHistoryAvailable()) { - chatwootImport.updateMessageSourceID(chatwootMessageIds.messageId, key.id); + try { + await chatwootImport.updateMessageSourceID(chatwootMessageIds.messageId, key.id); + } catch (error) { + this.logger.error(`Error updating Chatwoot message source ID: ${error}`); + } } } @@ -2024,7 +2087,7 @@ export class ChatwootService { if (body.key.remoteJid.includes('@g.us')) { const participantName = body.pushName; const rawPhoneNumber = - body.key.addressingMode === 'lid' && !body.key.fromMe + body.key.addressingMode === 'lid' && !body.key.fromMe && body.key.participantAlt ? body.key.participantAlt.split('@')[0].split(':')[0] : body.key.participant.split('@')[0].split(':')[0]; const formattedPhoneNumber = parsePhoneNumberFromString(`+${rawPhoneNumber}`).formatInternational(); @@ -2206,7 +2269,7 @@ export class ChatwootService { if (body.key.remoteJid.includes('@g.us')) { const participantName = body.pushName; const rawPhoneNumber = - body.key.addressingMode === 'lid' && !body.key.fromMe + body.key.addressingMode === 'lid' && !body.key.fromMe && body.key.participantAlt ? body.key.participantAlt.split('@')[0].split(':')[0] : body.key.participant.split('@')[0].split(':')[0]; const formattedPhoneNumber = parsePhoneNumberFromString(`+${rawPhoneNumber}`).formatInternational(); @@ -2464,7 +2527,13 @@ export class ChatwootService { } } - public getNumberFromRemoteJid(remoteJid: string) { + public normalizeJidIdentifier(remoteJid: string) { + if (!remoteJid) { + return ''; + } + if (remoteJid.includes('@lid')) { + return remoteJid; + } return remoteJid.replace(/:\d+/, '').split('@')[0]; } diff --git a/src/api/integrations/chatbot/typebot/services/typebot.service.ts b/src/api/integrations/chatbot/typebot/services/typebot.service.ts index 683203675..03712bfdb 100644 --- a/src/api/integrations/chatbot/typebot/services/typebot.service.ts +++ b/src/api/integrations/chatbot/typebot/services/typebot.service.ts @@ -327,7 +327,7 @@ export class TypebotService extends BaseChatbotService { if (message.type === 'image') { await instance.mediaMessage( { - number: session.remoteJid.split('@')[0], + number: session.remoteJid, delay: settings?.delayMessage || 1000, mediatype: 'image', media: message.content.url, @@ -342,7 +342,7 @@ export class TypebotService extends BaseChatbotService { if (message.type === 'video') { await instance.mediaMessage( { - number: session.remoteJid.split('@')[0], + number: session.remoteJid, delay: settings?.delayMessage || 1000, mediatype: 'video', media: message.content.url, @@ -357,7 +357,7 @@ export class TypebotService extends BaseChatbotService { if (message.type === 'audio') { await instance.audioWhatsapp( { - number: session.remoteJid.split('@')[0], + number: session.remoteJid, delay: settings?.delayMessage || 1000, encoding: true, audio: message.content.url, @@ -441,7 +441,7 @@ export class TypebotService extends BaseChatbotService { */ private async processListMessage(instance: any, formattedText: string, remoteJid: string) { const listJson = { - number: remoteJid.split('@')[0], + number: remoteJid, title: '', description: '', buttonText: '', @@ -490,7 +490,7 @@ export class TypebotService extends BaseChatbotService { */ private async processButtonMessage(instance: any, formattedText: string, remoteJid: string) { const buttonJson = { - number: remoteJid.split('@')[0], + number: remoteJid, thumbnailUrl: undefined, title: '', description: '',