Recurring Jobs
In most web apps, users send requests to the server and receive responses with some data. When the server responds quickly, the app feels responsive and smooth.
What if the server needs extra time to fully process the request? This might mean sending an email or making a slow HTTP request to an external API. In that case, it's a good idea to respond to the user as soon as possible and do the remaining work in the background.
Wasp supports background jobs that can help you with this:
- Jobs persist between server restarts,
- Jobs can be retried if they fail,
- Jobs can be delayed until a future time,
- Jobs can have a recurring schedule.
Using Jobs​
Job Definition and Usage​
Let's write an example Job that will print a message to the console and return a list of tasks from the database.
- Start by creating a Job declaration in your
.waspfile:
- JavaScript
- TypeScript
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js"
},
entities: [Task],
}
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js"
},
entities: [Task],
}
- After declaring the Job, implement its worker function:
- JavaScript
- TypeScript
export const foo = async ({ name }, context) => {
console.log(`Hello ${name}!`)
const tasks = await context.entities.Task.findMany({})
return { tasks }
}
import type { MySpecialJob } from '@wasp/jobs/mySpecialJob'
import type { Task } from '@wasp/entities'
type Input = { name: string; }
type Output = { tasks: Task[]; }
export const foo: MySpecialJob<Input, Output> = async ({ name }, context) => {
console.log(`Hello ${name}!`)
const tasks = await context.entities.Task.findMany({})
return { tasks }
}
The worker function must be an async function. The function's return value represents the Job's result.
The worker function accepts two arguments:
args: The data passed into the job when it's submitted.context: { entities }: The context object containing entities you put in the Job declaration.
- After successfully defining the job, you can submit work to be done in your Operations or setupFn (or any other NodeJS code):
- JavaScript
- TypeScript
import { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'
const submittedJob = await mySpecialJob.submit({ job: "Johnny" })
// Or, if you'd prefer it to execute in the future, just add a .delay().
// It takes a number of seconds, Date, or ISO date string.
await mySpecialJob
.delay(10)
.submit({ name: "Johnny" })
import { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'
const submittedJob = await mySpecialJob.submit({ job: "Tony" })
// Or, if you'd prefer it to execute in the future, just add a .delay().
// It takes a number of seconds, Date, or ISO date string.
await mySpecialJob
.delay(10)
.submit({ name: "Tony" })
And that'is it. Your job will be executed by PgBoss as if you called foo({ name: "Johnny" }).
In our example, foo takes an argument, but passing arguments to jobs is not a requirement. It depends on how you've implemented your worker function.
Recurring Jobs​
If you have work that needs to be done on some recurring basis, you can add a schedule to your job declaration:
- JavaScript
- TypeScript
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js"
},
schedule: {
cron: "0 * * * *",
args: {=json { "job": "args" } json=} // optional
}
}
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js"
},
schedule: {
cron: "0 * * * *",
args: {=json { "job": "args" } json=} // optional
}
}
In this example, you don't need to invoke anything in . You can imagine foo({ job: "args" }) getting automatically scheduled and invoked for you every hour.
API Reference​
Declaring Jobs​
- JavaScript
- TypeScript
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js",
executorOptions: {
pgBoss: {=json { "retryLimit": 1 } json=}
}
},
schedule: {
cron: "*/5 * * * *",
args: {=json { "foo": "bar" } json=},
executorOptions: {
pgBoss: {=json { "retryLimit": 0 } json=}
}
},
entities: [Task],
}
job mySpecialJob {
executor: PgBoss,
perform: {
fn: import { foo } from "@server/workers/bar.js",
executorOptions: {
pgBoss: {=json { "retryLimit": 1 } json=}
}
},
schedule: {
cron: "*/5 * * * *",
args: {=json { "foo": "bar" } json=},
executorOptions: {
pgBoss: {=json { "retryLimit": 0 } json=}
}
},
entities: [Task],
}
The Job declaration has the following fields:
executor: JobExecutorrequired
Our jobs need job executors to handle the scheduling, monitoring, and execution.
PgBoss is currently our only job executor, and is recommended for low-volume production use cases. It requires your app.db.system to be PostgreSQL.
We have selected pg-boss as our first job executor to handle the low-volume, basic job queue workloads many web applications have. By using PostgreSQL (and SKIP LOCKED) as its storage and synchronization mechanism, it allows us to provide many job queue pros without any additional infrastructure or complex management.
Keep in mind that pg-boss jobs run alongside your other server-side code, so they are not appropriate for CPU-heavy workloads. Additionally, some care is required if you modify scheduled jobs. Please see pg-boss details below for more information.
pg-boss details
pg-boss provides many useful features, which can be found here.
When you add pg-boss to a Wasp project, it will automatically add a new schema to your database called pgboss with some internal tracking tables, including job and schedule. pg-boss tables have a name column in most tables that will correspond to your Job identifier. Additionally, these tables maintain arguments, states, return values, retry information, start and expiration times, and other metadata required by pg-boss.
If you need to customize the creation of the pg-boss instance, you can set an environment variable called PG_BOSS_NEW_OPTIONS to a stringified JSON object containing these initialization parameters. NOTE: Setting this overwrites all Wasp defaults, so you must include database connection information as well.
pg-boss considerations​
- Wasp starts pg-boss alongside your web server's application, where both are simultaneously operational. This means that jobs running via pg-boss and the rest of the server logic (like Operations) share the CPU, therefore you should avoid running CPU-intensive tasks via jobs.
- Wasp does not (yet) support independent, horizontal scaling of pg-boss-only applications, nor starting them as separate workers/processes/threads.
- The job name/identifier in your
.waspfile is the same name that will be used in thenamecolumn of pg-boss tables. If you change a name that had ascheduleassociated with it, pg-boss will continue scheduling those jobs but they will have no handlers associated, and will thus become stale and expire. To resolve this, you can remove the applicable row from thescheduletable in thepgbossschema of your database.- If you remove a
schedulefrom a job, you will need to do the above as well.
- If you remove a
- If you wish to deploy to Heroku, you need to set an additional environment variable called
PG_BOSS_NEW_OPTIONSto{"connectionString":"<REGULAR_HEROKU_DATABASE_URL>","ssl":{"rejectUnauthorized":false}}. This is because pg-boss uses thepgextension, which does not seem to connect to Heroku over SSL by default, which Heroku requires. Additionally, Heroku uses a self-signed cert, so we must handle that as well. - https://devcenter.heroku.com/articles/connecting-heroku-postgres#connecting-in-node-js
-
perform: dictrequired-
fn: ServerImportrequired- An
asyncfunction that performs the work. Since Wasp executes Jobs on the server, you must import it from@server. - It receives the following arguments:
args: Input: The data passed to the job when it's submitted.context: { entities: Entities }: The context object containing any declared entities.
Here's an example of a
perform.fnfunction:- JavaScript
- TypeScript
bar.jsexport const foo = async ({ name }, context) => {
console.log(`Hello ${name}!`)
const tasks = await context.entities.Task.findMany({})
return { tasks }
}bar.tsimport { MySpecialJob } from '@wasp/jobs/mySpecialJob'
type Input = { name: string; }
type Output = { tasks: Task[]; }
export const foo: MySpecialJob<Input, Output> = async (args, context) => {
console.log(`Hello ${name}!`)
const tasks = await context.entities.Task.findMany({})
return { tasks }
}Read more about type-safe jobs in the Javascript API section.
- An
-
executorOptions: dictExecutor-specific default options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. These can be overridden during invocation with
submit()or in aschedule.-
pgBoss: JSONSee the docs for pg-boss.
-
-
-
schedule: dict-
cron: stringrequiredA 5-placeholder format cron expression string. See rationale for minute-level precision here.
If you need help building cron expressions, Check out Crontab guru.
-
args: JSONThe arguments to pass to the
perform.fnfunction when invoked. -
executorOptions: dictExecutor-specific options to use when submitting jobs. These are passed directly through and you should consult the documentation for the job executor. The
perform.executorOptionsare the default options, andschedule.executorOptionscan override/extend those.-
pgBoss: JSONSee the docs for pg-boss.
-
-
-
entities: [Entity]A list of entities you wish to use inside your Job (similar to Queries and Actions).
JavaScript API​
-
Importing a Job:
- JavaScript
- TypeScript
someAction.jsimport { mySpecialJob } from '@wasp/jobs/mySpecialJob.js'someAction.tsimport { mySpecialJob, type MySpecialJob } from '@wasp/jobs/mySpecialJob.js'Type-safe jobsWasp generates a generic type for each Job declaration, which you can use to type your
perform.fnfunction. The type is named after the job declaration, and is available in the@wasp/jobs/{jobName}module. In the example above, the type isMySpecialJob.The type takes two type arguments:
Input: The type of theargsargument of theperform.fnfunction.Output: The type of the return value of theperform.fnfunction.
-
submit(jobArgs, executorOptions)jobArgs: InputexecutorOptions: object
Submits a Job to be executed by an executor, optionally passing in a JSON job argument your job handler function receives, and executor-specific submit options.
- JavaScript
- TypeScript
const submittedJob = await mySpecialJob.submit({ job: "args" })
const submittedJob = await mySpecialJob.submit({ job: "args" })
-
delay(startAfter)startAfter: int | string | Daterequired
Delaying the invocation of the job handler. The delay can be one of:
- Integer: number of seconds to delay. [Default 0]
- String: ISO date string to run at.
- Date: Date to run at.
- JavaScript
- TypeScript
const submittedJob = await mySpecialJob
.delay(10)
.submit({ job: "args" }, { "retryLimit": 2 })
const submittedJob = await mySpecialJob
.delay(10)
.submit({ job: "args" }, { "retryLimit": 2 })
Tracking​
The return value of submit() is an instance of SubmittedJob, which has the following fields:
jobId: The ID for the job in that executor.jobName: The name of the job you used in your.waspfile.executorName: The Symbol of the name of the job executor.- For pg-boss, you can import a Symbol from:
import { PG_BOSS_EXECUTOR_NAME } from '@wasp/jobs/core/pgBoss/pgBossJob.js'if you wish to compare againstexecutorName.
- For pg-boss, you can import a Symbol from:
There are also some namespaced, job executor-specific objects.