Skip to main content

Background jobs

BackgroundJobsPlugin adds a durable background-job system to AdminForth. Jobs are stored in your data store (via a resource), executed by registered handlers, and automatically resumed after server restarts.

Setup

First, install the plugin:

pnpm i @adminforth/background-jobs

and create a resource for jobs:

./resources/jobs.ts
import AdminForth, { AdminForthDataTypes } from 'adminforth';
import type { AdminForthResourceInput, AdminUser } from 'adminforth';
import { randomUUID } from 'crypto';
import BackgroundJobsPlugin from '@adminforth/background-jobs';

async function allowedForSuperAdmin({ adminUser }: { adminUser: AdminUser }): Promise<boolean> {
return adminUser.dbUser.role === 'superadmin';
}

export default {
dataSource: 'maindb',
table: 'jobs',
resourceId: 'jobs',
label: 'Jobs',
options: {
allowedActions: {
edit: allowedForSuperAdmin,
delete: allowedForSuperAdmin,
},
},
columns: [
{
name: 'id',
primaryKey: true,
type: AdminForthDataTypes.STRING,
fillOnCreate: ({ initialRecord, adminUser }) => randomUUID(),
showIn: {
edit: false,
create: false,
},
},
{
name: 'name',
type: AdminForthDataTypes.STRING,
},
{
name: 'created_at',
type: AdminForthDataTypes.DATETIME,
showIn: {
edit: false,
create: false,
},
fillOnCreate: ({ initialRecord, adminUser }) => (new Date()).toISOString(),
},
{
name: 'finished_at',
type: AdminForthDataTypes.DATETIME,
showIn: {
edit: false,
create: false,
},
},
{
name: 'started_by',
type: AdminForthDataTypes.STRING,
foreignResource: {
resourceId: 'adminuser',
searchableFields: ["id", "email"],
}
},
{
name: 'state',
type: AdminForthDataTypes.JSON,
},
{
name: 'progress',
type: AdminForthDataTypes.STRING,
},
{
name: 'status',
type: AdminForthDataTypes.STRING,
enum: [
{
label: 'IN_PROGRESS',
value: 'IN_PROGRESS',
},
{
label: 'DONE',
value: 'DONE',
},
{
label: 'DONE_WITH_ERRORS',
value: 'DONE_WITH_ERRORS',
},
{
label: 'CANCELLED',
value: 'CANCELLED',
}
]
},
{
name: 'job_handler_name',
type: AdminForthDataTypes.STRING,
},
],
plugins: [
new BackgroundJobsPlugin({
createdAtField: 'created_at',
finishedAtField: 'finished_at',
startedByField: 'started_by',
stateField: 'state',
progressField: 'progress',
statusField: 'status',
nameField: 'name',
jobHandlerField: 'job_handler_name',
})
]
} as AdminForthResourceInput;

Then make add table schema:

model jobs {
id String @id
created_at DateTime
finished_at DateTime?
started_by String
name String
state String?
progress String
status String
job_handler_name String
}

and make migration

Usage

The plugin saves tasks and keeps executing them even after a server restart, so you should register job task handlers at the start of the AdminForth application.

./index.ts
  import BackgroundJobsPlugin from '@adminforth/background-jobs';
import jobs_resource from './resources/jobs.js';

...

resources: [

...

jobs_resource,

...

],

...

admin.express.serve(app);

admin.discoverDatabases().then(async () => {
if (await admin.resource('adminuser').count() === 0) {
await admin.resource('adminuser').create({
email: 'adminforth',
password_hash: await AdminForth.Utils.generatePasswordHash('adminforth'),
role: 'superadmin',
});
}
});

const backgroundJobsPlugin = admin.getPluginByClassName<BackgroundJobsPlugin>('BackgroundJobsPlugin');

backgroundJobsPlugin.registerTaskHandler({
// job handler name
jobHandlerName: 'example_job_handler',
//handler function
handler: async ({ jobId, setTaskStateField, getTaskStateField }) => {
const state = await getTaskStateField();
console.log('State of the task at the beginning of the job handler:', state);
await new Promise(resolve => setTimeout(resolve, 3000));
await setTaskStateField({[state.step]: `Step ${state.step} completed`});
const updatedState = await getTaskStateField();
console.log('State of the task after setting the new state in the job handler:', updatedState);
},
//limit of tasks, that are running in parallel
parallelLimit: 1
})

...

After registering a handler, you can create a job. For example:

./index.ts

...

if (fileURLToPath(import.meta.url) === path.resolve(process.argv[1])) {
const app = express();
app.use(express.json());

app.post(`${ADMIN_BASE_URL}/api/create-job/`,
admin.express.authorize(
async (req: any, res: any) => {
const backgroundJobsPlugin = admin.getPluginByClassName<BackgroundJobsPlugin>('BackgroundJobsPlugin');
if (!backgroundJobsPlugin) {
res.status(404).json({ error: 'BackgroundJobsPlugin not found' });
return;
}
const jobId = await backgroundJobsPlugin.startNewJob(
'Example Job', //job name
req.adminUser, // adminuser
[
{ state: { step: 1 } },
{ state: { step: 2 } },
{ state: { step: 3 } },
{ state: { step: 4 } },
{ state: { step: 5 } },
{ state: { step: 6 } },
], //initial tasks
'example_job_handler', //job handler name
)
res.json({ok: true, message: 'Job started' });
}
),
);

...

Custom job state renderer

There may be cases when you need to display the state of job tasks. For this, you can register a custom component.

./custom/JobCustomComponent.vue
<template>
<div class="w-[1000px] h-[500px] bg-gray-100 rounded-lg p-4 flex flex-col items-center justify-center ">
<Button class="h-10" @click="loadTasks">
Get Job Tasks
</Button>
{{ tasks }}
</div>
</template>


<script setup lang="ts">
import { Button, JsonViewer } from '@/afcl';
import { onMounted, onUnmounted, ref } from 'vue';
import websocket from '@/websocket';
import type { AdminForthComponentDeclarationFull } from 'adminforth';


const tasks = ref<{state: Record<string, any>, status: string}[]>([]);


const props = defineProps<{
meta: any;
getJobTasks: (limit?: number, offset?: number) => Promise<{state: Record<string, any>, status: string}[]>;
job: {
id: string;
name: string;
status: 'IN_PROGRESS' | 'DONE' | 'DONE_WITH_ERRORS' | 'CANCELLED';
progress: number; // 0 to 100
createdAt: Date;
finishedAt: Date;
customComponent?: AdminForthComponentDeclarationFull;
};
}>();

const loadTasks = async () => {
tasks.value = await props.getJobTasks(10, 0);
console.log('Loaded tasks for job:', tasks.value);
}


onMounted(async () => {
loadTasks();
websocket.subscribe(`/background-jobs-task-update/${props.job.id}`, (data: { taskIndex: number, status?: string, state?: Record<string, any> }) => {
console.log('Received WebSocket message for job:', data.status);

if (data.state) {
tasks.value[data.taskIndex].state = data.state;
}
if (data.status) {
tasks.value[data.taskIndex].status = data.status;
}

});
});

onUnmounted(() => {
console.log('Unsubscribing from WebSocket for job:', props.job.id);
websocket.unsubscribe(`/background-jobs-task-update/${props.job.id}`);
});


</script>

Now register this component explicitly:

./index.ts
export const admin = new AdminForth({
baseUrl: ADMIN_BASE_URL,
auth: {
usersResourceId: 'adminuser',
usernameField: 'email',
passwordHashField: 'password_hash',
rememberMeDuration: '30d',
loginBackgroundImage: 'https://images.unsplash.com/photo-1534239697798-120952b76f2b?q=80&w=3389&auto=format&fit=crop&ixlib=rb-4.0.3&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D',
loginBackgroundPosition: '1/2',
loginPromptHTML: async () => {
const adminforthUserExists = await admin.resource("adminuser").count(Filters.EQ('email', 'adminforth')) > 0;
if (adminforthUserExists) {
return "Please use <b>adminforth</b> as username and <b>adminforth</b> as password"
}
},
},

componentsToExplicitRegister: [
{
file: '@@/JobCustomComponent.vue',
meta: {
label: 'Job Custom Component',
}
}
],
...

Finally, register this component alongside the job task handler:

./index.ts
  ...

const backgroundJobsPlugin = admin.getPluginByClassName<BackgroundJobsPlugin>('BackgroundJobsPlugin');

backgroundJobsPlugin.registerTaskHandler({
jobHandlerName: 'example_job_handler', // Handler name
handler: async ({ setTaskStateField, getTaskStateField }) => { //handler function
const state = await getTaskStateField();
console.log('State of the task at the beginning of the job handler:', state);
await new Promise(resolve => setTimeout(resolve, 3000));
await setTaskStateField({[state.step]: `Step ${state.step} completed`});
const updatedState = await getTaskStateField();
console.log('State of the task after setting the new state in the job handler:', updatedState);
},
parallelLimit: 1 //parallel tasks limit
})

backgroundJobsPlugin.registerTaskDetailsComponent({
jobHandlerName: 'example_job_handler', // Handler name
component: {
file: '@@/JobCustomComponent.vue' //custom component for the job details
},
})


Frontend API

Job info popup

If you want to imedeatelly open job info popup, you shoul return job id from you API, that creates job:

For example:

  ...

const res = await callAdminForthApi({
path: `/plugin/${props.meta.pluginInstanceId}/translate-selected-to-languages`,
method: 'POST',
body: {
selectedIds: listOfIds,
selectedLanguages: Object.keys(checkedLanguages.value).filter(lang => checkedLanguages.value[lang]),
},
silentError: true,
});

if (res.ok) {
const jobId = res.jobId;
if (jobId) {
window.OpenJobInfoPopup(jobId);
}
}

Backend api

Pluging provides some handy methods, that can be used in different situations:

//set key:value to the job state in the DB
setJobField(jobId: string, key: string, value: any)
//get job field from the state in db
getJobField(jobId: string, key: string)
//get job state from the db
getJobState(jobId: string)
/**
*
* executes code atomically, if you have many task, that can update task state,
* better use this method to avoid cases, when in the task state writes invalid data.
*
**/
updateJobFieldsAtomically(jobId: string, updateFunction: () => Promise<void>)

//for example
backgroundJobsPlugin.updateJobFieldsAtomically(jobId, async () => {
// do all set / get fields in this function to make state update atomic and there is no conflicts when 2 tasks in parallel do get before set.
// don't do long awaits in this callback, since it has exclusive lock.
let totalUsedTokens = await backgroundJobsPlugin.getJobField(jobId, 'totalUsedTokens');
totalUsedTokens += promptCost;
await backgroundJobsPlugin.setJobField(jobId, 'totalUsedTokens', totalUsedTokens);
})