14 Job Queues
On this page:
14.1 Jobs
execute-jobs-synchronously?
job?
define-job
retry!
schedule-at
14.2 Brokers
current-broker
broker?
make-broker
14.3 Workers
worker?
make-worker-factory
job-metadata

14 Job Queues🔗

 (require koyo/job) package: koyo-lib

This module provides functionality for declaring and executing asynchronous jobs. The job queuing functionality is implemented on top of PostgreSQL so you don’t need an external message queue. Jobs are guaranteed to be executed at least once after being enqueued.

> (define-system example
   [broker (db) make-broker]
   [db (make-database-factory
        (lambda ()
          (postgresql-connect #:user "example"
                              #:database "example")))]
   [worker (broker) (make-worker-factory)])
>
> (system-start example-system)
> (current-broker (system-ref example-system 'broker))
>
> (define executed? (make-semaphore))
>
; Define a job:
> (define-job (say-hello name)
    (printf "hi ~a!~n" name)
    (semaphore-post executed?))
>
; Enqueue a job:
> (say-hello "Bogdan")

31

>
; Wait a few moments for the job to be dequeued and executed...
> (void (sync executed?))

hi Bogdan!

14.1 Jobs🔗

parameter

(execute-jobs-synchronously?)  boolean?

(execute-jobs-synchronously? sync?)  void?
  sync? : boolean?
 = #f
A parameter that controls whether or not jobs should be executed synchronously when applied. This comes in handy when you want to test jobs at the REPL.

procedure

(job? v)  boolean?

  v : any/c
Returns #t when v is a job.

syntax

(define-job (id args)
  option ...
  body-e ...+)
 
args = arg-id ...
  | arg-id ... . rest-id
     
option = #:queue queue-name
  | #:priority priority
 
  queue-name : string?
  priority : exact-nonnegative-integer?
Binds a function to id that enqueues an asynchronous job when applied. The arguments passed to each job must be prop:serializable. When the job is picked up by a worker, its arguments are deserialized and its body is executed.

The queue-name option controls which queue the jobs are enqueued to. If not supplied, it defaults to "default".

The priority option controls what the priority of each job is within the queue. Zero is the highest priority. If not supplied, this option defaults to 50.

When a job is executed synchronously, its result is always #f, otherwise its result is the id of the job in the koyo_jobs table.

syntax

(retry!)

Immediately terminates the current job and re-enqueues it so that it may be executed again at a later time. Using this form outside the body of a define-job form is a syntax error.

syntax

(schedule-at when-expr job-expr)

 
  when-expr : moment?
Wraps job-expr so that it will be executed some time after the timestamp represented by when-expr.

> (schedule-at
   (+minutes (now/moment) 5)
   (say-hello "Bogdan"))

32

14.2 Brokers🔗

Job brokers handle the details of storing and retrieving jobs to and from the database. Each broker permanently leases a connection from the database pool to listen for notifications and then other connections are leased and put back into the pool as needed.

parameter

(current-broker)  broker?

(current-broker broker)  void?
  broker : broker?
Holds the current broker that is used to enqueue jobs.

procedure

(broker? v)  boolean?

  v : any/c
Returns #t when v is a job broker.

procedure

(make-broker db)  broker?

  db : database?
Creates a broker component that enqueues its jobs in db. The db parameter must refer to a PostgreSQL database.

14.3 Workers🔗

Job workers dequeue and execute jobs.

procedure

(worker? v)  boolean?

  v : any/c
Returns #t when v is a job worker.

procedure

((make-worker-factory [#:queue queue    
  #:pool-size pool-size    
  #:middleware middleware])    
  broker)  worker?
  queue : string? = "default"
  pool-size : exact-positive-integer? = 8
  middleware : (-> job-metadata? procedure?)
   = (lambda (meta proc) proc)
  broker : broker?
Generates a function that, when supplied a job broker, produces a job worker that dequeues and executes jobs published on the queue identified by queue.

The #:pool-size argument controls the maximum number of concurrent jobs for the worker.

The #:middleware argument wraps every job procedure before it is applied to its arguments.

Changed in version 0.24 of package koyo-lib: Added the #:middleware argument.

struct

(struct job-metadata (id queue name attempts)
    #:extra-constructor-name make-job-metadata)
  id : exact-nonnegative-integer?
  queue : string?
  name : string?
  attempts : exact-nonnegative-integer?
Information about a job that’s about to be run. This struct may be extended in the future to contain additional information.

Added in version 0.24 of package koyo-lib.