Files
Exchange/server/api/amqp.ts
2025-11-07 22:24:40 +01:00

113 lines
3.5 KiB
TypeScript
Executable File

'use strict'
// This script fetches queued messages from RabbitMQ and delivers these to SMTP
import * as nodemailer from 'nodemailer'
import * as amqp from 'amqp'
const queueHost = process.env.AMQP_URL
const queueName = 'outgoing'
const smtpHost = {
host: process.env.NODEMAILER_HOST,
port: process.env.NODEMAILER_PORT,
// NB! Must be pooled connection, otherwise 'idle' is never fired and nothing gets sent
pool: true,
auth: {
user: process.env.NODEMAILER_USER,
pass: process.env.NODEMAILER_PASS
},
tls: {
// testserver uses self signed certificate, so we need to lax a bit
rejectUnauthorized: false
},
logger: false
}
// array of prefetched messages waiting for delivery
const waiting = []
// Create a SMTP transporter object
const transporter = nodemailer.createTransport(smtpHost, {
// default message fields
from: process.env.SENDER_ADDRESS
})
// Create connection to RabbitMQ
const queueConnection = amqp.createConnection({
url: queueHost
})
queueConnection.on('error', function (e) {
console.log('Error from amqp: ', e)
})
queueConnection.on('ready', function (err) {
console.log('AMPQ server is running on port 5672.')
queueConnection.queue(queueName, { durable: true }, function (q) {
q.bind('#')
q.subscribe({
ack: true, // do not fetch next messages until previous are acked
prefetchCount: 10 // prefetch 10 messages
}, function (message, headers, deliveryInfo, ack) {
// check if the message object is even valid
if (!message || !message.to) {
console.log('Invalid message, skipping')
// reject, do not requeue
return ack.reject()
}
// push to cache
waiting.push({
message: message,
deliveryTag: deliveryInfo.deliveryTag.toString('hex'),
ack: ack
})
// try to flush cached messages by sending these to SMTP
flushWaitingMessages()
})
})
})
// Whenever transporter gets into idling, try to send some mail
transporter.on('idle', flushWaitingMessages)
// Flushes cached messages to nodemailer for delivery
function flushWaitingMessages () {
// actual send function
var send = function (data) {
// sendMail does not immediatelly send, instead it tries to allocate a free connection to SMTP server
// and if fails, then pushes the message into internal queue. As we only prefetch 10 messages
// then the internal queue can never grow into something too large. At most there will be 5 messages
// idling in the queue (another 5 are being currently sent by the default number of 5 connections)
transporter.sendMail(data.message, function (err, info) {
if (err) {
console.log('Message failed (%s): %s', data.deliveryTag, err.message)
// reject and requeue on error (wait 1 sec. before requeueing)
// NB! If the failure is permanent then this approach results in an
// infinite loop since failing message is never removed from the queue
setTimeout(function () {
data.ack.reject(true)
}, 1000)
return
}
console.log('Message delivered (%s): %s', data.deliveryTag, info.response)
data.ack.acknowledge()
})
}
// send cached messages if transporter is idling
while (transporter.isIdle() && waiting.length) {
send(waiting.shift())
}
}
export const publishQueueConnection = (mailOptions) => {
console.log('publishQueueConnection')
queueConnection.publish(queueName, mailOptions, (err) => {
if (err) {
console.log(err)
}
})
}