pg_cron 是 PostgreSQL 数据库的一个作业调度器插件,通过该插件我们可以定时的执行一些特殊的任务。它遵循 Linux crontab 的配置语法,您可以通过在线工具 crontab.guru 来验证您的 cron 表达式。目前,pg_cron 仅支持 PostgreSQL 10 及其之后的版本。本文将简要介绍 pg_cron 的安装及其使用。
准备 在开始之前,我们需要安装 pg_cron 插件,这里采用源码编译的方式安装。
1 2 3 $ git clone https://github.com/citusdata/pg_cron.git $ cd pg_cron$ make && make install
在执行上面的命令之前,您需要配置好 PostgreSQL 的环境变量,以便 make
执行的时候可以找到 pg_config
命令来获取相关的信息(包括编译选项、安装路径等)。
接着,我们需要修改 PostgreSQL 的配置参数,并重启数据库,最后在创建 pg_cron 插件,您可以使用如下的命令来完成这些工作。
1 2 3 $ psql postgres - c 'ALTER SYSTEM SET shared_preload_libraries TO pg_cron;' $ pg_ctl restart $ psql postgres - c 'CREATE EXTENSION pg_cron;'
当我们成功执行上述命令之后,pg_cron 将创建一个 cron
的模式,并在该模式下创建如下函数:
alter_job - 修改 job 信息1 2 3 4 5 6 7 void cron.alter_job( job_id bigint , schedule text DEFAULT NULL ::text, command text DEFAULT NULL ::text, database text DEFAULT NULL ::text, username text DEFAULT NULL ::text, active boolean DEFAULT NULL ::boolean );
job_cache_invalidate - 清除 job 缓存信息触发器1 trigger cron.job_cache_invalidate();
schedule - 创建 job 任务,支持匿名 job 和命名 job 两种类型1 2 bigint cron.schedule(schedule text, command text);bigint cron.schedule(job_name text, schedule text, command text);
schedule_in_database - 为其他数据库创建 job1 2 3 4 5 6 7 bigint cron.schedule_in_database( job_name text, schedule text, command text, database text, username text DEFAULT NULL ::text, active boolean DEFAULT true );
unschedule - 删除 job 任务,支持以 job id 和 job 名字两种方式来删除1 2 boolean cron.unschedule(job_id bigint );boolean cron.unschedule(job_name name);
以及 job
和 job_run_details
表。
job - job 信息表,记录当前创建的所有 job 信息
job_run_details - job 运行信息表,用于记录 job 运行后的状态及其相关信息
测试 我们创建一个 mylog
表来记录日志。
1 CREATE TABLE mylog(create_date timestamp , info text);
随后创建一个函数来向 mylog
中插入数据。
1 2 3 4 5 6 CREATE OR REPLACE FUNCTION insert_log_job() RETURNS voidAS $body$BEGIN INSERT INTO mylog VALUES (now(), 'hello world' ); END ;$body$ LANGUAGE plpgsql;
接下来我们创建便可以创建 job 了,这里我们将创建一个匿名 job 和一个命令 job。
1 2 3 4 SELECT cron.schedule( '*/1 * * * *' , $$ BEGIN SELECT insert_log_job() END ; $$ );
上面的命令创建的 job 存在语法问题,我们可以通过 alter_job
来对其进行修改。
1 select cron.alter_job(1 , '*/1 * * * *' , 'SELECT insert_log_job();' );
现在我们可以在 job_run_details
中看到 job 的详细信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 postgres= # select * from cron.job_run_details \gx - [ RECORD 1 ]jobid | 1 runid | 1 job_pid | 1392744 database | postgres username | px command | BEGIN SELECT insert_log_job() END ; status | failed return_message | ERROR: syntax error at or near "SELECT" + | LINE 1 : BEGIN SELECT insert_log_job() END ; + | ^ + | start_time | 2021 -11 -18 23 :08 :00.018493 + 08 end_time | 2021 -11 -18 23 :08 :00.02138 + 08 - [ RECORD 2 ]jobid | 1 runid | 2 job_pid | 1392892 database | postgres username | px command | BEGIN SELECT insert_log_job() END ; status | failed return_message | ERROR: syntax error at or near "SELECT" + | LINE 1 : BEGIN SELECT insert_log_job() END ; + | ^ + | start_time | 2021 -11 -18 23 :09 :00.018235 + 08 end_time | 2021 -11 -18 23 :09 :00.020252 + 08 - [ RECORD 3 ]jobid | 1 runid | 3 job_pid | 1393038 database | postgres username | px command | BEGIN SELECT insert_log_job() END ; status | failed return_message | ERROR: syntax error at or near "SELECT" + | LINE 1 : BEGIN SELECT insert_log_job() END ; + | ^ + | start_time | 2021 -11 -18 23 :10 :00.02294 + 08 end_time | 2021 -11 -18 23 :10 :00.02633 + 08 - [ RECORD 4 ]jobid | 1 runid | 4 job_pid | 1393183 database | postgres username | px command | SELECT insert_log_job(); status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :11 :00.011896 + 08 end_time | 2021 -11 -18 23 :11 :00.016711 + 08
同时,我们也可以看到在 mylog
表中插入了一条新的记录。
1 2 3 4 5 postgres= # SELECT * FROM mylog; create_date | info 2021 -11 -18 23 :11 :00.011996 | hello world (1 row )
随后,我们修改一下其执行的周期,将 1 分钟执行一次改为 10 分钟执行一次(避免过多的日志)。
1 2 3 4 5 postgres= # SELECT cron.alter_job(1 , '*/10 * * * *' ); alter_job (1 row )
现在,我们再来创建一个命名的 job。
1 2 3 4 5 6 7 8 9 10 11 12 postgres= # SELECT cron.schedule('job1' , '*/5 * * * *' , 'SELECT insert_log_job()' ); schedule 2 (1 row ) postgres= # select * from cron.job; jobid | schedule | command | nodename | nodeport | database | username | active | jobname 1 | * / 10 * * * * | SELECT insert_log_job(); | localhost | 5432 | postgres | px | t | 2 | * / 5 * * * * | SELECT insert_log_job() | localhost | 5432 | postgres | px | t | job1 (2 rows )
等待 job 执行后,我们可以查看 job_run_details
来验证 job 的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 postgres= # SELECT * FROM cron.job_run_details \gx ... - [ RECORD 4 ]jobid | 1 runid | 4 job_pid | 1393183 database | postgres username | px command | SELECT insert_log_job(); status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :11 :00.011896 + 08 end_time | 2021 -11 -18 23 :11 :00.016711 + 08 - [ RECORD 5 ]jobid | 2 runid | 5 job_pid | 1394493 database | postgres username | px command | SELECT insert_log_job() status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :20 :00.021353 + 08 end_time | 2021 -11 -18 23 :20 :00.033443 + 08 - [ RECORD 6 ]jobid | 1 runid | 6 job_pid | 1394494 database | postgres username | px command | SELECT insert_log_job(); status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :20 :00.025503 + 08 end_time | 2021 -11 -18 23 :20 :00.039575 + 08
最后,我们来看看如何删除 job。pg_cron 提供了两种方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 postgres= # SELECT cron.unschedule(1 ); unschedule t (1 row ) postgres= # SELECT * FROM cron.job; jobid | schedule | command | nodename | nodeport | database | username | active | jobname 2 | * / 5 * * * * | SELECT insert_log_job() | localhost | 5432 | postgres | px | t | job1 (1 row ) postgres= # SELECT cron.unschedule('job1' ); unschedule t (1 row ) postgres= # SELECT * FROM cron.job; jobid | schedule | command | nodename | nodeport | database | username | active | jobname (0 rows )
进阶 在了解了 pg_cron 的基本使用之后,我们来看看其他的一些使用,我们首先创建一个用户和数据库,并切换到该数据库。
1 2 3 4 5 6 postgres= # CREATE USER japin WITH ENCRYPTED PASSWORD 'japin' ; CREATE ROLEpostgres= # CREATE DATABASE testdb OWNER japin; CREATE DATABASEpostgres= # \c testdb japin You are now connected to database "testdb" as user "japin".
随后在 testdb
中新建表和函数。
1 2 3 4 5 6 7 8 CREATE TABLE log (info text, at timestmap);CREATE OR REPLACE FUNCTION testdb_insert_log_job() RETURNS voidAS $body$BEGIN INSERT INTO log VALUES ('hello, testdb' , now()); END ;$body$ LANGUAGE plpgsql;
接着我们切回到超级用户并执行下面的语句,它将为 testdb
创建一个 job。
1 2 3 4 5 6 7 SELECT cron.schedule_in_database( 'testdb_job1' , '*/1 * * * *' , 'SELECT testdb_insert_log_job()' , 'testdb' , 'japin' , true );
注意: pg_cron 是通过 libpq 连接到目标数据库上去执行任务的,因此我们指定的用户需要在目标数据库上有足够的权限执行 job 指定的任务。因此您可能需要配置 pg_hba.conf
文件以及 ~/.pgpass
文件中提供连接信息。
我们同样可以通过 cron.job
表查看 job 的信息,以及 cron.job_run_details
查看运行状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 postgres= # SELECT * FROM cron.job; jobid | schedule | command | nodename | nodeport | database | username | active | jobname 4 | * / 1 * * * * | SELECT testdb_insert_log_job() | localhost | 5432 | testdb | japin | t | testdb_job1 (1 row ) postgres= # SELECT * FROM cron.job_run_details \gx ... - [ RECORD 8 ]jobid | 3 runid | 8 job_pid | 1397538 database | testdb username | japin command | SELECT testdb_insert_log_job() status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :41 :00.028916 + 08 end_time | 2021 -11 -18 23 :41 :00.03695 + 08 - [ RECORD 9 ]jobid | 3 runid | 9 job_pid | 1397689 database | testdb username | japin command | SELECT testdb_insert_log_job() status | succeeded return_message | 1 row start_time | 2021 -11 -18 23 :42 :00.028686 + 08 end_time | 2021 -11 -18 23 :42 :00.033203 + 08
最后我们查看 log
表来验证 job 是否执行。
1 2 3 4 5 6 testdb= > SELECT * FROM log; info | at hello, testdb | 2021 -11 -18 23 :40 :00.022578 hello, testdb | 2021 -11 -18 23 :41 :00.029197 (2 rows )
最后,还需要主要的是,pg_cron 的定时时间是基于 GMT 的,因此,您可能需要换算时区。
参考 [1] https://github.com/citusdata/pg_cron
笑林广记 - 武弁夜巡
一武弁夜巡。 有犯夜者,自称书生会课归迟。 武弁曰:“既是书生,且考你一考。” 生请题,武弁思之不得,喝曰:“造化了你,今夜幸而没有题目。”