This commit is contained in:
68
backend/src/queues/email-queue.ts
Normal file
68
backend/src/queues/email-queue.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import { MonkeyQueue } from "./monkey-queue";
|
||||
|
||||
const QUEUE_NAME = "email-tasks";
|
||||
|
||||
export type EmailType = "verify" | "resetPassword";
|
||||
|
||||
export type EmailTask<M extends EmailType> = {
|
||||
type: M;
|
||||
email: string;
|
||||
ctx: EmailTaskContexts[M];
|
||||
};
|
||||
|
||||
export type EmailTaskContexts = {
|
||||
verify: {
|
||||
name: string;
|
||||
verificationLink: string;
|
||||
};
|
||||
resetPassword: {
|
||||
name: string;
|
||||
passwordResetLink: string;
|
||||
};
|
||||
};
|
||||
|
||||
function buildTask(
|
||||
taskName: EmailType,
|
||||
email: string,
|
||||
taskContext: EmailTaskContexts[EmailType],
|
||||
): EmailTask<EmailType> {
|
||||
return {
|
||||
type: taskName,
|
||||
email: email,
|
||||
ctx: taskContext,
|
||||
};
|
||||
}
|
||||
|
||||
class EmailQueue extends MonkeyQueue<EmailTask<EmailType>> {
|
||||
async sendVerificationEmail(
|
||||
email: string,
|
||||
name: string,
|
||||
verificationLink: string,
|
||||
): Promise<void> {
|
||||
const taskName = "verify";
|
||||
const task = buildTask(taskName, email, { name, verificationLink });
|
||||
await this.add(taskName, task);
|
||||
}
|
||||
|
||||
async sendForgotPasswordEmail(
|
||||
email: string,
|
||||
name: string,
|
||||
passwordResetLink: string,
|
||||
): Promise<void> {
|
||||
const taskName = "resetPassword";
|
||||
const task = buildTask(taskName, email, { name, passwordResetLink });
|
||||
await this.add(taskName, task);
|
||||
}
|
||||
}
|
||||
|
||||
export default new EmailQueue(QUEUE_NAME, {
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
attempts: 1,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 2000,
|
||||
},
|
||||
},
|
||||
});
|
||||
124
backend/src/queues/george-queue.ts
Normal file
124
backend/src/queues/george-queue.ts
Normal file
@@ -0,0 +1,124 @@
|
||||
import { LeaderboardEntry } from "@monkeytype/schemas/leaderboards";
|
||||
import { MonkeyQueue } from "./monkey-queue";
|
||||
|
||||
const QUEUE_NAME = "george-tasks";
|
||||
|
||||
type GeorgeTask = {
|
||||
name: string;
|
||||
args: unknown[];
|
||||
};
|
||||
|
||||
function buildGeorgeTask(taskName: string, taskArgs: unknown[]): GeorgeTask {
|
||||
return {
|
||||
name: taskName,
|
||||
args: taskArgs,
|
||||
};
|
||||
}
|
||||
|
||||
class GeorgeQueue extends MonkeyQueue<GeorgeTask> {
|
||||
async sendReleaseAnnouncement(releaseName: string): Promise<void> {
|
||||
const taskName = "sendReleaseAnnouncement";
|
||||
const sendReleaseAnnouncementTask = buildGeorgeTask(taskName, [
|
||||
releaseName,
|
||||
]);
|
||||
await this.add(taskName, sendReleaseAnnouncementTask);
|
||||
}
|
||||
|
||||
async updateDiscordRole(discordId: string, wpm: number): Promise<void> {
|
||||
const taskName = "updateRole";
|
||||
const updateDiscordRoleTask = buildGeorgeTask(taskName, [discordId, wpm]);
|
||||
await this.add(taskName, updateDiscordRoleTask);
|
||||
}
|
||||
|
||||
async linkDiscord(
|
||||
discordId: string,
|
||||
uid: string,
|
||||
lbOptOut: boolean,
|
||||
): Promise<void> {
|
||||
const taskName = "linkDiscord";
|
||||
const linkDiscordTask = buildGeorgeTask(taskName, [
|
||||
discordId,
|
||||
uid,
|
||||
lbOptOut,
|
||||
]);
|
||||
await this.add(taskName, linkDiscordTask);
|
||||
}
|
||||
|
||||
async unlinkDiscord(discordId: string, uid: string): Promise<void> {
|
||||
const taskName = "unlinkDiscord";
|
||||
const unlinkDiscordTask = buildGeorgeTask(taskName, [discordId, uid]);
|
||||
await this.add(taskName, unlinkDiscordTask);
|
||||
}
|
||||
|
||||
async awardChallenge(
|
||||
discordId: string,
|
||||
challengeName: string,
|
||||
): Promise<void> {
|
||||
const taskName = "awardChallenge";
|
||||
const awardChallengeTask = buildGeorgeTask(taskName, [
|
||||
discordId,
|
||||
challengeName,
|
||||
]);
|
||||
await this.add(taskName, awardChallengeTask);
|
||||
}
|
||||
|
||||
async userBanned(discordId: string, banned: boolean): Promise<void> {
|
||||
const taskName = "userBanned";
|
||||
const userBannedTask = buildGeorgeTask(taskName, [discordId, banned]);
|
||||
await this.add(taskName, userBannedTask);
|
||||
}
|
||||
|
||||
async announceLeaderboardUpdate(
|
||||
newRecords: Omit<LeaderboardEntry, "_id">[],
|
||||
leaderboardId: string,
|
||||
): Promise<void> {
|
||||
const taskName = "announceLeaderboardUpdate";
|
||||
|
||||
const leaderboardUpdateTasks = newRecords.map((record) => {
|
||||
const taskData = buildGeorgeTask(taskName, [
|
||||
record.discordId ?? record.name,
|
||||
record.rank,
|
||||
leaderboardId,
|
||||
record.wpm,
|
||||
record.raw,
|
||||
record.acc,
|
||||
record.consistency,
|
||||
]);
|
||||
|
||||
return {
|
||||
name: taskName,
|
||||
data: taskData,
|
||||
};
|
||||
});
|
||||
|
||||
await this.addBulk(leaderboardUpdateTasks);
|
||||
}
|
||||
|
||||
async announceDailyLeaderboardTopResults(
|
||||
leaderboardId: string,
|
||||
leaderboardTimestamp: number,
|
||||
topResults: LeaderboardEntry[],
|
||||
): Promise<void> {
|
||||
const taskName = "announceDailyLeaderboardTopResults";
|
||||
|
||||
const dailyLeaderboardTopResultsTask = buildGeorgeTask(taskName, [
|
||||
leaderboardId,
|
||||
leaderboardTimestamp,
|
||||
topResults,
|
||||
]);
|
||||
|
||||
await this.add(taskName, dailyLeaderboardTopResultsTask);
|
||||
}
|
||||
}
|
||||
|
||||
export default new GeorgeQueue(QUEUE_NAME, {
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: "exponential",
|
||||
delay: 2000,
|
||||
},
|
||||
},
|
||||
});
|
||||
5
backend/src/queues/index.ts
Normal file
5
backend/src/queues/index.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
import LaterQueue from "./later-queue";
|
||||
import GeorgeQueue from "./george-queue";
|
||||
import EmailQueue from "./email-queue";
|
||||
|
||||
export default [GeorgeQueue, LaterQueue, EmailQueue];
|
||||
121
backend/src/queues/later-queue.ts
Normal file
121
backend/src/queues/later-queue.ts
Normal file
@@ -0,0 +1,121 @@
|
||||
import LRUCache from "lru-cache";
|
||||
import Logger from "../utils/logger";
|
||||
import { MonkeyQueue } from "./monkey-queue";
|
||||
import { ValidModeRule } from "@monkeytype/schemas/configuration";
|
||||
import {
|
||||
getCurrentDayTimestamp,
|
||||
getCurrentWeekTimestamp,
|
||||
} from "@monkeytype/util/date-and-time";
|
||||
|
||||
const QUEUE_NAME = "later";
|
||||
|
||||
export type LaterTaskType =
|
||||
| "daily-leaderboard-results"
|
||||
| "weekly-xp-leaderboard-results";
|
||||
|
||||
export type LaterTask<T extends LaterTaskType> = {
|
||||
taskName: LaterTaskType;
|
||||
ctx: LaterTaskContexts[T];
|
||||
};
|
||||
|
||||
export type LaterTaskContexts = {
|
||||
"daily-leaderboard-results": {
|
||||
yesterdayTimestamp: number;
|
||||
modeRule: ValidModeRule;
|
||||
};
|
||||
"weekly-xp-leaderboard-results": {
|
||||
lastWeekTimestamp: number;
|
||||
};
|
||||
};
|
||||
|
||||
const ONE_MINUTE_IN_MILLISECONDS = 1000 * 60;
|
||||
const ONE_DAY_IN_MILLISECONDS = 1000 * 60 * 60 * 24;
|
||||
|
||||
class LaterQueue extends MonkeyQueue<LaterTask<LaterTaskType>> {
|
||||
private scheduledJobCache = new LRUCache<string, boolean>({
|
||||
max: 100,
|
||||
});
|
||||
|
||||
private async scheduleTask(
|
||||
taskName: string,
|
||||
task: LaterTask<LaterTaskType>,
|
||||
jobId: string,
|
||||
delay: number,
|
||||
): Promise<void> {
|
||||
await this.add(taskName, task, {
|
||||
delay,
|
||||
jobId, // Prevent duplicate jobs
|
||||
backoff: 60 * ONE_MINUTE_IN_MILLISECONDS, // Try again every hour on failure
|
||||
attempts: 23,
|
||||
});
|
||||
|
||||
this.scheduledJobCache.set(jobId, true);
|
||||
|
||||
Logger.info(
|
||||
`Scheduled ${task.taskName} for ${new Date(Date.now() + delay)}`,
|
||||
);
|
||||
}
|
||||
|
||||
async scheduleForNextWeek(
|
||||
taskName: LaterTaskType,
|
||||
taskId: string,
|
||||
): Promise<void> {
|
||||
const currentWeekTimestamp = getCurrentWeekTimestamp();
|
||||
const jobId = `${taskName}:${currentWeekTimestamp}:${taskId}`;
|
||||
|
||||
if (this.scheduledJobCache.has(jobId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const task: LaterTask<LaterTaskType> = {
|
||||
taskName,
|
||||
ctx: {
|
||||
lastWeekTimestamp: currentWeekTimestamp,
|
||||
},
|
||||
};
|
||||
|
||||
const delay =
|
||||
currentWeekTimestamp +
|
||||
7 * ONE_DAY_IN_MILLISECONDS -
|
||||
Date.now() +
|
||||
ONE_MINUTE_IN_MILLISECONDS;
|
||||
|
||||
await this.scheduleTask("todo-next-week", task, jobId, delay);
|
||||
}
|
||||
|
||||
async scheduleForTomorrow(
|
||||
taskName: LaterTaskType,
|
||||
taskId: string,
|
||||
modeRule: ValidModeRule,
|
||||
): Promise<void> {
|
||||
const currentDayTimestamp = getCurrentDayTimestamp();
|
||||
const jobId = `${taskName}:${currentDayTimestamp}:${taskId}`;
|
||||
|
||||
if (this.scheduledJobCache.has(jobId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const task: LaterTask<LaterTaskType> = {
|
||||
taskName,
|
||||
ctx: {
|
||||
modeRule,
|
||||
yesterdayTimestamp: currentDayTimestamp,
|
||||
},
|
||||
};
|
||||
|
||||
const delay =
|
||||
currentDayTimestamp +
|
||||
ONE_DAY_IN_MILLISECONDS -
|
||||
Date.now() +
|
||||
ONE_MINUTE_IN_MILLISECONDS;
|
||||
|
||||
await this.scheduleTask("todo-tomorrow", task, jobId, delay);
|
||||
}
|
||||
}
|
||||
|
||||
export default new LaterQueue(QUEUE_NAME, {
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
});
|
||||
62
backend/src/queues/monkey-queue.ts
Normal file
62
backend/src/queues/monkey-queue.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import IORedis from "ioredis";
|
||||
import {
|
||||
type BulkJobOptions,
|
||||
type ConnectionOptions,
|
||||
type JobsOptions,
|
||||
Queue,
|
||||
type QueueOptions,
|
||||
QueueScheduler,
|
||||
} from "bullmq";
|
||||
|
||||
export class MonkeyQueue<T> {
|
||||
private jobQueue: Queue | undefined;
|
||||
private _queueScheduler: QueueScheduler;
|
||||
public readonly queueName: string;
|
||||
private queueOpts: QueueOptions;
|
||||
|
||||
constructor(queueName: string, queueOpts: QueueOptions) {
|
||||
this.queueName = queueName;
|
||||
this.queueOpts = queueOpts;
|
||||
}
|
||||
|
||||
init(redisConnection?: IORedis.Redis): void {
|
||||
if (this.jobQueue !== undefined || !redisConnection) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.jobQueue = new Queue(this.queueName, {
|
||||
...this.queueOpts,
|
||||
connection: redisConnection as ConnectionOptions,
|
||||
});
|
||||
|
||||
this._queueScheduler = new QueueScheduler(this.queueName, {
|
||||
connection: redisConnection as ConnectionOptions,
|
||||
});
|
||||
}
|
||||
|
||||
async add(taskName: string, task: T, jobOpts?: JobsOptions): Promise<void> {
|
||||
if (this.jobQueue === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.jobQueue.add(taskName, task, jobOpts);
|
||||
}
|
||||
|
||||
async getJobCounts(): Promise<Record<string, number>> {
|
||||
if (this.jobQueue === undefined) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return await this.jobQueue.getJobCounts();
|
||||
}
|
||||
|
||||
async addBulk(
|
||||
tasks: { name: string; data: T; opts?: BulkJobOptions }[],
|
||||
): Promise<void> {
|
||||
if (this.jobQueue === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.jobQueue.addBulk(tasks);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user