15 Job Queues
(require koyo/job) | package: koyo-lib |
This module provides functionality for declaring and executing asynchronous jobs. The job queuing functionality is implemented on top of PostgreSQL so you don’t need an external message queue. Jobs are guaranteed to be executed at least once after being enqueued, assuming there are workers available to execute them.
> (define-system example [broker (db) make-broker] [db (make-database-factory (lambda () (postgresql-connect #:user "example" #:database "example")))] [worker (broker) (make-worker-factory)]) > > (system-start example-system) > (current-broker (system-ref example-system 'broker)) > > (define executed? (make-semaphore)) > ; Define a job:
> (define-job (say-hello name) (printf "hi ~a!~n" name) (semaphore-post executed?)) > ; Enqueue a job: > (say-hello "Bogdan") 31
> ; Wait a few moments for the job to be dequeued and executed... > (void (sync executed?)) hi Bogdan!
15.1 Jobs
parameter
(execute-jobs-synchronously? sync?) → void? sync? : boolean?
= #f
syntax
(define-job (id args) option ... body-e ...+)
args = arg-id ... | arg-id ... . rest-id option = #:queue queue-name | #:priority priority
queue-name : string?
priority : exact-nonnegative-integer?
The queue-name option controls which queue the jobs are enqueued to. If not supplied, it defaults to "default".
The priority option controls what the priority of each job is within the queue. Zero is the highest priority. If not supplied, this option defaults to 50.
When a job is executed synchronously, its result is always #f, otherwise its result is the id of the job in the koyo_jobs table.
syntax
(retry!)
syntax
(schedule-at when-expr job-expr)
when-expr : moment?
> (schedule-at (+minutes (now/moment) 5) (say-hello "Bogdan")) 32
15.2 Brokers
Job brokers handle the details of storing and retrieving jobs to and from the database. Each broker permanently leases a connection from the database pool to listen for notifications and then other connections are leased and put back into the pool as needed.
parameter
(current-broker broker) → void? broker : broker?
procedure
(make-broker db) → broker?
db : database?
15.3 Workers
Job workers dequeue and execute jobs.
procedure
((make-worker-factory [ #:queue queue #:pool-size pool-size #:middleware middleware]) broker) → worker? queue : string? = "default" pool-size : exact-positive-integer? = 8
middleware : (-> job-metadata? procedure? procedure?) = (lambda (meta proc) proc) broker : broker?
The #:pool-size argument controls the maximum number of concurrent jobs for the worker.
The #:middleware argument wraps every job procedure before it is applied to its arguments.
Changed in version 0.28 of package koyo-lib: Added the #:middleware argument.
struct
(struct job-metadata (id queue name attempts) #:extra-constructor-name make-job-metadata) id : exact-nonnegative-integer? queue : string? name : string? attempts : exact-nonnegative-integer?
Added in version 0.28 of package koyo-lib.
15.3.1 Broker Admin UI
contract
procedure
(make-broker-admin broker) → broker-admin/c
broker : broker?
Added in version 0.28 of package koyo-lib.