otimizado para funcionar sem limites de cpu e ram
This commit is contained in:
parent
e2c7dd647d
commit
a929d89fb3
@ -16,10 +16,10 @@ import { Payment } from '../payments/entities/payment.entity';
|
|||||||
database: configService.get('database.database'),
|
database: configService.get('database.database'),
|
||||||
entities: [Payment],
|
entities: [Payment],
|
||||||
extra: {
|
extra: {
|
||||||
max: 5,
|
max: 10,
|
||||||
// connectionTimeoutMillis: 3000,
|
// connectionTimeoutMillis: 3000,
|
||||||
},
|
},
|
||||||
synchronize: false, // Apenas para desenvolvimento
|
synchronize: false,
|
||||||
}),
|
}),
|
||||||
inject: [ConfigService],
|
inject: [ConfigService],
|
||||||
}),
|
}),
|
||||||
|
|||||||
@ -20,20 +20,6 @@ export class PaymentDefaultProcessor {
|
|||||||
async execute(payment: CreatePaymentDto): Promise<boolean> {
|
async execute(payment: CreatePaymentDto): Promise<boolean> {
|
||||||
const url = this.configService.get('paymentProcessors.defaultUrl');
|
const url = this.configService.get('paymentProcessors.defaultUrl');
|
||||||
|
|
||||||
const responseExists = await this.makePaymentToProcessorService.execute(
|
return await this.makePaymentToProcessorService.execute(payment, url);
|
||||||
payment,
|
|
||||||
url,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!responseExists) return false;
|
|
||||||
await this.repository.query(
|
|
||||||
`
|
|
||||||
INSERT INTO payments (correlation_id, amount, payment_processor)
|
|
||||||
VALUES ($1, $2, $3)
|
|
||||||
ON CONFLICT (correlation_id) DO NOTHING
|
|
||||||
`,
|
|
||||||
[payment.correlationId, payment.amount, ProcessorTypeEnum.DEFAULT],
|
|
||||||
);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,13 +15,6 @@ export class PaymentFallbackProcessor {
|
|||||||
async execute(payment: CreatePaymentDto): Promise<boolean> {
|
async execute(payment: CreatePaymentDto): Promise<boolean> {
|
||||||
const url = this.configService.get('paymentProcessors.fallbackUrl');
|
const url = this.configService.get('paymentProcessors.fallbackUrl');
|
||||||
|
|
||||||
const responseExists = await this.makePaymentToProcessorService.execute(
|
return await this.makePaymentToProcessorService.execute(payment, url);
|
||||||
payment,
|
|
||||||
url,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!responseExists) return false;
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import { Payment } from '../entities/payment.entity';
|
|||||||
import { Repository } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
import { CreatePaymentDto } from '../dto/create-payment.dto';
|
import { CreatePaymentDto } from '../dto/create-payment.dto';
|
||||||
|
|
||||||
@Processor(PAYMENT_QUEUE, { concurrency: 20 })
|
@Processor(PAYMENT_QUEUE, { concurrency: 10 })
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class PaymentProcessor extends WorkerHost {
|
export class PaymentProcessor extends WorkerHost {
|
||||||
private readonly logger = new Logger(PaymentProcessor.name);
|
private readonly logger = new Logger(PaymentProcessor.name);
|
||||||
@ -22,18 +22,6 @@ export class PaymentProcessor extends WorkerHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async process(job: Job<CreatePaymentDto>) {
|
async process(job: Job<CreatePaymentDto>) {
|
||||||
const payment = job.data;
|
await this.processPaymentService.execute(job.data);
|
||||||
const exists = await this.paymentRepository.findOne({
|
|
||||||
where: { correlationId: payment.correlationId },
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!exists) {
|
|
||||||
try {
|
|
||||||
await this.processPaymentService.execute(payment);
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`Error processing payment: ${error.message}`);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,7 +23,7 @@ export class MakePaymentToProcessorService {
|
|||||||
|
|
||||||
const response = await firstValueFrom(
|
const response = await firstValueFrom(
|
||||||
this.httpService.post(`${url}/payments`, paymentData, {
|
this.httpService.post(`${url}/payments`, paymentData, {
|
||||||
timeout: 2500,
|
timeout: 3000,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
return response.status === 200;
|
return response.status === 200;
|
||||||
|
|||||||
@ -13,43 +13,63 @@ export class ProcessPaymentService {
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@InjectRepository(Payment) private readonly repository: Repository<Payment>,
|
@InjectRepository(Payment) private readonly repository: Repository<Payment>,
|
||||||
|
|
||||||
private paymentDefaultProcessor: PaymentDefaultProcessor,
|
private paymentDefaultProcessor: PaymentDefaultProcessor,
|
||||||
private paymentFallbackProcessor: PaymentFallbackProcessor,
|
private paymentFallbackProcessor: PaymentFallbackProcessor,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async execute(job: CreatePaymentDto): Promise<void> {
|
async execute(job: CreatePaymentDto): Promise<void> {
|
||||||
let result = false;
|
const existingPayment = await this.repository.findOne({
|
||||||
|
where: { correlationId: job.correlationId },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (existingPayment) return;
|
||||||
|
|
||||||
|
let defaultSuccess = false;
|
||||||
try {
|
try {
|
||||||
result = await this.paymentDefaultProcessor.execute(job);
|
defaultSuccess = await this.paymentDefaultProcessor.execute(job);
|
||||||
|
|
||||||
|
if (defaultSuccess) {
|
||||||
|
await this.savePayment(job, ProcessorTypeEnum.DEFAULT);
|
||||||
|
return;
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.response?.status === 422) {
|
if (error.response?.status === 422) {
|
||||||
await this.repository.query(
|
await this.savePayment(job, ProcessorTypeEnum.DEFAULT);
|
||||||
`
|
return;
|
||||||
INSERT INTO payments (correlation_id, amount, payment_processor)
|
|
||||||
VALUES ($1, $2, $3)
|
|
||||||
ON CONFLICT (correlation_id) DO NOTHING
|
|
||||||
`,
|
|
||||||
[job.correlationId, job.amount, ProcessorTypeEnum.DEFAULT],
|
|
||||||
);
|
|
||||||
this.logger.warn(
|
|
||||||
`Pagamento já realizado para correlationId: ${job.correlationId}`,
|
|
||||||
);
|
|
||||||
return; // Não lança exceção, job é considerado processado
|
|
||||||
}
|
}
|
||||||
this.logger.warn(`Default processor failed: ${error.message}`);
|
this.logger.warn(`Default processor failed: ${error.message}`);
|
||||||
}
|
}
|
||||||
if (!result) {
|
|
||||||
await this.paymentFallbackProcessor.execute(job);
|
let fallbackSuccess = false;
|
||||||
await this.repository.query(
|
try {
|
||||||
`
|
fallbackSuccess = await this.paymentFallbackProcessor.execute(job);
|
||||||
INSERT INTO payments (correlation_id, amount, payment_processor)
|
|
||||||
VALUES ($1, $2, $3)
|
if (fallbackSuccess) {
|
||||||
ON CONFLICT (correlation_id) DO NOTHING
|
await this.savePayment(job, ProcessorTypeEnum.FALLBACK);
|
||||||
`,
|
return;
|
||||||
[job.correlationId, job.amount, ProcessorTypeEnum.FALLBACK],
|
}
|
||||||
);
|
} catch (error) {
|
||||||
|
if (error.response?.status === 422) {
|
||||||
|
await this.savePayment(job, ProcessorTypeEnum.FALLBACK);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(`Payment processing failed for ${job.correlationId}`);
|
||||||
}
|
}
|
||||||
throw new Error('Payment processing failed');
|
}
|
||||||
|
|
||||||
|
private async savePayment(
|
||||||
|
job: CreatePaymentDto,
|
||||||
|
processor: ProcessorTypeEnum,
|
||||||
|
): Promise<void> {
|
||||||
|
await this.repository.query(
|
||||||
|
`
|
||||||
|
INSERT INTO payments (correlation_id, amount, payment_processor)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (correlation_id) DO NOTHING
|
||||||
|
`,
|
||||||
|
[job.correlationId, job.amount, processor],
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import { PAYMENT_QUEUE } from './constants/queue.constants';
|
|||||||
connection: {
|
connection: {
|
||||||
host: 'redis',
|
host: 'redis',
|
||||||
port: 6379,
|
port: 6379,
|
||||||
|
enableReadyCheck: false,
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
BullModule.registerQueue({
|
BullModule.registerQueue({
|
||||||
|
|||||||
@ -11,13 +11,13 @@ export class QueueService {
|
|||||||
async addPaymentJob(data: CreatePaymentDto): Promise<void> {
|
async addPaymentJob(data: CreatePaymentDto): Promise<void> {
|
||||||
await this.paymentQueue.add(PAYMENT_QUEUE, data, {
|
await this.paymentQueue.add(PAYMENT_QUEUE, data, {
|
||||||
jobId: data.correlationId,
|
jobId: data.correlationId,
|
||||||
attempts: 60,
|
attempts: 100,
|
||||||
backoff: {
|
backoff: {
|
||||||
type: 'exponential',
|
type: 'exponential',
|
||||||
delay: 3000,
|
delay: 100,
|
||||||
},
|
},
|
||||||
removeOnComplete: 100,
|
removeOnComplete: true,
|
||||||
removeOnFail: 50,
|
removeOnFail: 101,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user