BackEnd

[라이브러리 학습] BullMQ

mingg123 2024. 6. 23. 22:34

학습 이유

 

- 최근 코드리뷰를 통해 bullMQ 라는 라이브러리에 대해 알게 되었는데, 내부 동작이 궁금하여 뜯어봄 

 

개념

  • Redis 기반의 큐 라이브러리
  • Node.js에서 사용
  • 참조: https://docs.bullmq.io/
  • BullMQ vs RabbitMQ 특징

    특징 BullMQ RabbitMQ
    언어 및 프레임워크 Node.js 및 JavaScript/TypeScript에서 주로 사용됨 여러 언어 지원 (Java, Python, Ruby, JavaScript 등)
    기반 기술 Redis AMQP (Advanced Message Queuing Protocol)
    설치 및 설정 Redis 설치 필요 RabbitMQ 서버 설치 필요
    메시지 처리 방식 FIFO, LIFO, 우선 순위 큐 지원 기본적으로 FIFO, 우선 순위 큐, 다양한 교환 및 큐 타입 지원
    확장성 Redis의 성능과 확장성에 따라 다름 RabbitMQ 클러스터링 및 페더레이션으로 높은 확장성 제공
    내결함성 Redis의 내결함성에 의존 RabbitMQ 클러스터링 및 하이 아베일러빌리티(HA) 큐 제공
    성능 고속 메모리 기반 처리 (Redis) 디스크 기반 처리 가능, 높은 스루풋 및 낮은 대기 시간 제공
    메시지 보관 주로 메모리 기반, 디스크 지속성 옵션 제공 메시지 디스크에 지속성 있게 저장 가능
    사용 사례 작업 큐, 백그라운드 작업 처리, 간단한 메시징 마이크로서비스 통신, 이벤트 스트리밍, 복잡한 메시징 요구 사항
    관리 도구 BullMQ UI (제한적 기능) RabbitMQ Management Plugin (강력한 관리 및 모니터링 기능)
    커뮤니티 및 지원 상대적으로 작은 커뮤니티, 공식 문서 및 GitHub 이슈 트래커 제공 대규모 커뮤니티, 다양한 문서, 상업적 지원 및 플러그인 제공
    프로토콜 Redis 프로토콜 AMQP, MQTT, STOMP 등 다양한 프로토콜 지원
    보안 Redis 보안 기능 사용 (TLS, 인증 등) TLS, 사용자 인증, 권한 관리, 가상 호스트 등을 통한 강력한 보안 기능 제공
    확장 및 플러그인 기본 제공 기능 중심, 제한된 확장성 다양한 플러그인 및 확장 기능 제공, 필요에 따라 커스터마이징 가능
    메시지 확인 및 재시도 작업 완료 시 ACK, 실패 시 재시도 기능 제공 ACK 및 NACK, 메시지 TTL, 재시도 정책 등 다양한 메시지 관리 기능 제공
    상태 및 모니터링 기본 모니터링 기능, Redis 모니터링 도구 활용 가능 RabbitMQ 관리 플러그인을 통한 상세한 모니터링 및 관리 기능 제공
    요약 
    • BullMQ는 Node.js 환경에서 작업 큐를 구현하는 데 적합하며, Redis 기반으로 높은 성능과 간단한 설정을 제공
    • RabbitMQ는 다양한 언어와 복잡한 메시징 요구 사항을 지원하며, 높은 확장성과 내결함성을 제공하는 완전한 메시징 솔루션
  •  Queue, Job, Worker 로 구성 (by GPT)
    const { Queue, Worker } = require('bullmq');
    const IORedis = require('ioredis');
    
    // Redis 연결 설정
    const connection = new IORedis();
    
    // 큐 생성
    const myQueue = new Queue('my-queue', {
      connection,
    });
    
    // 작업 추가 함수
    const addJob = async () => {
      await myQueue.add('my-job', { foo: 'bar' }, {
        priority: 1, // 우선순위 설정
        attempts: 3, // 재시도 횟수 설정
      });
    };
    
    // 작업 처리 로직 정의
    const worker = new Worker('my-queue', async job => {
      console.log(`Processing job ${job.id} with data ${JSON.stringify(job.data)}`);
      // 작업 처리 로직 작성
      // 예: job.data.foo 사용
    }, {
      connection,
    });
    
    // 작업 완료 이벤트 리스너 추가
    worker.on('completed', job => {
      console.log(`Job ${job.id} has completed!`);
    });
    
    // 작업 실패 이벤트 리스너 추가
    worker.on('failed', (job, err) => {
      console.error(`Job ${job.id} has failed with error ${err.message}`);
    });
    
    

 

프로젝트 구성

├── husky/ # git hooks 관리        
├── config/   
├── docs/   # 문서
├── python/ # py 로 작성된 테스트 코드 및 로직 개발중인듯  
├── test/   # ts 로 작성된 테스트 코드 
src/
├── classes/            
│   ├── queue.ts       # queue 관련 로직 
│   ├── job.ts         # job 관련 로직
│   ├── worker.ts      # worker 관련 로직 
└──        

 

 

내부 로직

 

  • Queue
    • 작업을 관리하는 기본 단위
    • Queue내 job을 추가하고, Worker로 Queue를 전달

 

queue.ts

   * Adds a new job to the queue.
   *
   * @param name - Name of the job to be added to the queue.
   * @param data - Arbitrary data to append to the job.
   * @param opts - Job options that affects how the job is going to be processed.
   *
  async add(
    name: NameType,
    data: DataType,
    opts?: JobsOptions,
  ): Promise<Job<DataType, ResultType, NameType>> {
    if (opts && opts.repeat) {
      if (opts.repeat.endDate) {
        if (+new Date(opts.repeat.endDate) < Date.now()) {
          throw new Error('End date must be greater than current timestamp');
        }
      }

      return (await this.repeat).addNextRepeatableJob<
        DataType,
        ResultType,
        NameType
      >(name, data, { ...this.jobsOpts, ...opts }, true);
    } else {
      const jobId = opts?.jobId;

      if (jobId == '0' || jobId?.startsWith('0:')) {
        throw new Error("JobId cannot be '0' or start with 0:");
      }

      const job = await this.Job.create<DataType, ResultType, NameType>(
        this as MinimalQueue,
        name,
        data,
        {
          ...this.jobsOpts,
          ...opts,
          jobId,
        },
      );
      this.emit('waiting', job);
      return job;
    }
  }

 

  • Job
    • queue 내 실행되는 작업의 단위

job.ts

 

  /**
   * Creates a new job and adds it to the queue.
   *
   * @param queue - the queue where to add the job.
   * @param name - the name of the job.
   * @param data - the payload of the job.
   * @param opts - the options bag for this job.
   * @returns
   */
  static async create<T = any, R = any, N extends string = string>(
    queue: MinimalQueue,
    name: N,
    data: T,
    opts?: JobsOptions,
  ): Promise<Job<T, R, N>> {
    const client = await queue.client;

    const job = new this<T, R, N>(queue, name, data, opts, opts && opts.jobId);

    job.id = await job.addJob(client, {
      parentKey: job.parentKey,
      parentDependenciesKey: job.parentKey
        ? `${job.parentKey}:dependencies`
        : '',
    });

    return job;
  }

 

 

 

 

  •  Worker
    • Queue에 추가된 작업을 수행하는 인스턴스

worker.ts

 

 async run() {
    if (!this.processFn) {
      throw new Error('No process function is defined.');
    }

    if (this.running) {
      throw new Error('Worker is already running.');
    }

    try {
      this.running = true;

      if (this.closing) {
        return;
      }

      await this.startStalledCheckTimer();

      const jobsInProgress = new Set<{ job: Job; ts: number }>();
      this.startLockExtenderTimer(jobsInProgress);

      const asyncFifoQueue = (this.asyncFifoQueue =
        new AsyncFifoQueue<void | Job<DataType, ResultType, NameType>>());

      let tokenPostfix = 0;

      const client = await this.client;
      const bclient = await this.blockingConnection.client;

      /**
       * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue
       * as efficiently as possible, providing concurrency and minimal unnecessary calls
       * to Redis.
       */
      while (!this.closing) {
        let numTotal = asyncFifoQueue.numTotal();

        /**
         * This inner loop tries to fetch jobs concurrently, but if we are waiting for a job
         * to arrive at the queue we should not try to fetch more jobs (as it would be pointless)
         */
        while (
          !this.waiting &&
          numTotal < this.opts.concurrency &&
          (!this.limitUntil || numTotal == 0)
        ) {
          const token = `${this.id}:${tokenPostfix++}`;

          const fetchedJob = this.retryIfFailed<void | Job<
            DataType,
            ResultType,
            NameType
          >>(
            () => this._getNextJob(client, bclient, token, { block: true }),
            this.opts.runRetryDelay,
          );
          asyncFifoQueue.add(fetchedJob);

          numTotal = asyncFifoQueue.numTotal();

          if (this.waiting && numTotal > 1) {
            // We are waiting for jobs but we have others that we could start processing already
            break;
          }

          // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls
          // to Redis in high concurrency scenarios.
          const job = await fetchedJob;

          // No more jobs waiting but we have others that could start processing already
          if (!job && numTotal > 1) {
            break;
          }

          // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
          // for processing this job.
          if (this.blockUntil) {
            break;
          }
        }

        // Since there can be undefined jobs in the queue (when a job fails or queue is empty)
        // we iterate until we find a job.
        let job: Job<DataType, ResultType, NameType> | void;
        do {
          job = await asyncFifoQueue.fetch();
        } while (!job && asyncFifoQueue.numQueued() > 0);

        if (job) {
          const token = job.token;
          asyncFifoQueue.add(
            this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
              () =>
                this.processJob(
                  <Job<DataType, ResultType, NameType>>job,
                  token,
                  () => asyncFifoQueue.numTotal() <= this.opts.concurrency,
                  jobsInProgress,
                ),
              this.opts.runRetryDelay,
            ),
          );
        }
      }

      this.running = false;
      return asyncFifoQueue.waitAll();
    } catch (error) {
      this.running = false;
      throw error;
    }
  }
  • QueEvents
  • FlowProducer

 

 

 

 

기타 코드 스타일

  • 형 변환
    • as {type} 말고 이런식으로 씀
    if (<number>result < 0) {
        throw this.finishedErrors({
          code: <number>result,
          parentKey: parentOpts.parentKey,
          command: 'addJob',
        });
      }
    
      return <string>result;