From 2ab9284c851426eceb82026a7809246a6471122c Mon Sep 17 00:00:00 2001 From: jos3duardo Date: Mon, 11 Aug 2025 23:53:12 -0400 Subject: [PATCH] Refactor payment processors to improve concurrency and streamline payment execution --- .../processor/payment-default.processor.ts | 9 ++++++--- .../processor/payment-fallback.processor.ts | 9 ++++++--- .../payments/processor/payment.processor.ts | 17 +++-------------- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/modules/payments/processor/payment-default.processor.ts b/src/modules/payments/processor/payment-default.processor.ts index 9a6de21..3ba21e7 100644 --- a/src/modules/payments/processor/payment-default.processor.ts +++ b/src/modules/payments/processor/payment-default.processor.ts @@ -6,6 +6,7 @@ import { Repository } from 'typeorm'; import { ProcessorTypeEnum } from '../enumns/processor-type.enum'; import { PaymentStatusEnum } from '../enumns/payment-status.enum'; import { MakePaymentToProcessorService } from '../services/make-payment-to-processor.service'; +import { PaymentJobData } from '../../queue/queue.service'; @Injectable() export class PaymentDefaultProcessor { @@ -17,7 +18,7 @@ export class PaymentDefaultProcessor { private makePaymentToProcessorService: MakePaymentToProcessorService, ) {} - async execute(payment: Payment): Promise { + async execute(payment: PaymentJobData): Promise { const url = this.configService.get('paymentProcessors.defaultUrl'); const responseExists = await this.makePaymentToProcessorService.execute( @@ -26,8 +27,10 @@ export class PaymentDefaultProcessor { ); if (responseExists) { - await this.repository.update(payment.id, { - ...payment, + await this.repository.save({ + amount: payment.paymentData.amount, + correlationId: payment.paymentData.correlationId, + createdAt: payment.createdAt, paymentProcessor: ProcessorTypeEnum.DEFAULT, status: PaymentStatusEnum.SUCCESS, }); diff --git a/src/modules/payments/processor/payment-fallback.processor.ts b/src/modules/payments/processor/payment-fallback.processor.ts index 1e9f893..4fe3eb1 100644 --- a/src/modules/payments/processor/payment-fallback.processor.ts +++ b/src/modules/payments/processor/payment-fallback.processor.ts @@ -6,6 +6,7 @@ import { Repository } from 'typeorm'; import { ProcessorTypeEnum } from '../enumns/processor-type.enum'; import { PaymentStatusEnum } from '../enumns/payment-status.enum'; import { MakePaymentToProcessorService } from '../services/make-payment-to-processor.service'; +import { PaymentJobData } from '../../queue/queue.service'; @Injectable() export class PaymentFallbackProcessor { @@ -17,7 +18,7 @@ export class PaymentFallbackProcessor { private makePaymentToProcessorService: MakePaymentToProcessorService, ) {} - async execute(payment: Payment): Promise { + async execute(payment: PaymentJobData): Promise { const url = this.configService.get('paymentProcessors.fallbackUrl'); const responseExists = await this.makePaymentToProcessorService.execute( @@ -26,8 +27,10 @@ export class PaymentFallbackProcessor { ); if (responseExists) { - await this.repository.update(payment.id, { - ...payment, + await this.repository.save({ + amount: payment.paymentData.amount, + correlationId: payment.paymentData.correlationId, + createdAt: payment.createdAt, paymentProcessor: ProcessorTypeEnum.FALLBACK, status: PaymentStatusEnum.SUCCESS, }); diff --git a/src/modules/payments/processor/payment.processor.ts b/src/modules/payments/processor/payment.processor.ts index 4a7fe3e..b066452 100644 --- a/src/modules/payments/processor/payment.processor.ts +++ b/src/modules/payments/processor/payment.processor.ts @@ -5,7 +5,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { ProcessPaymentService } from '../services/process-payment.service'; import { PAYMENT_QUEUE } from '../../queue/constants/queue.constants'; -@Processor(PAYMENT_QUEUE) +@Processor(PAYMENT_QUEUE, { concurrency: 2 }) @Injectable() export class PaymentProcessor extends WorkerHost { private readonly logger = new Logger(PaymentProcessor.name); @@ -15,18 +15,7 @@ export class PaymentProcessor extends WorkerHost { } async process(job: Job) { - const { paymentId, retryCount = 0 } = job.data; - - try { - await this.processPaymentService.execute(paymentId); - - return { success: true, paymentId }; - } catch (error) { - this.logger.error( - `Failed to process payment ${paymentId}:`, - error.message, - ); - throw error; - } + const payment = job.data; + await this.processPaymentService.execute(payment); } }