APScheduler 入门到使用指定接口定时调用指南

内容纲要

APScheduler 入门到使用指定接口定时调用指南

1. APScheduler 简介

APScheduler(Advanced Python Scheduler)是一个轻量级的 Python 任务调度库。它允许你在指定的时间执行任务、以固定的间隔执行任务、或是根据复杂的规则执行任务。

2. 安装 APScheduler

首先,我们需要安装 APScheduler。你可以使用 pip 来安装:

pip install apscheduler

3. 基本使用

APScheduler 提供了多种调度器,其中最常用的是 BackgroundScheduler。下面是一个简单的例子:

from apscheduler.schedulers.background import BackgroundScheduler
import time

def my_job():
    print("Hello, this is a scheduled task!")

# 创建调度器
scheduler = BackgroundScheduler()
# 添加任务
scheduler.add_job(my_job, 'interval', seconds=10)
# 启动调度器
scheduler.start()

try:
    # 模拟应用保持运行
    while True:
        time.sleep(2)
except (KeyboardInterrupt, SystemExit):
    # 关闭调度器
    scheduler.shutdown()

上面的代码每隔 10 秒打印一次 "Hello, this is a scheduled task!"。

4. 使用指定接口的定时调用

假设我们需要每隔一段时间调用一个 REST API 接口。我们可以使用 Python 的 requests 库来实现这个功能。

4.1 安装 requests 库

pip install requests

4.2 定时调用接口的示例

下面的例子展示了如何每隔 1 分钟调用一次指定的 REST API 接口:

from apscheduler.schedulers.background import BackgroundScheduler
import requests
import time

def call_api():
    url = "https://api.example.com/data"
    response = requests.get(url)
    if response.status_code == 200:
        print("API call successful:", response.json())
    else:
        print("API call failed with status code:", response.status_code)

# 创建调度器
scheduler = BackgroundScheduler()
# 添加任务,设置每分钟调用一次
scheduler.add_job(call_api, 'interval', minutes=1)
# 启动调度器
scheduler.start()

try:
    # 模拟应用保持运行
    while True:
        time.sleep(2)
except (KeyboardInterrupt, SystemExit):
    # 关闭调度器
    scheduler.shutdown()

在这个例子中,call_api 函数每分钟调用一次指定的 REST API 接口,并输出接口返回的数据。

5. 高级使用

5.1 任务的其他触发器

除了按间隔执行任务外,APScheduler 还支持多种触发器,如 cron、date 等。下面是一些例子:

5.1.1 使用 cron 触发器

# 每天早上8点调用一次接口
scheduler.add_job(call_api, 'cron', hour=8, minute=0)

5.1.2 使用 date 触发器

from datetime import datetime

# 在指定时间调用接口
run_date = datetime(2023, 7, 19, 8, 0, 0)
scheduler.add_job(call_api, 'date', run_date=run_date)

5.2 任务持久化

为了在程序重启后仍能保持任务调度,我们可以使用 APScheduler 的持久化功能。以下是一个使用 SQLite 数据库保存任务的例子:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import requests
import time

def call_api():
    url = "https://api.example.com/data"
    response = requests.get(url)
    if response.status_code == 200:
        print("API call successful:", response.json())
    else:
        print("API call failed with status code:", response.status_code)

# 配置 SQLite Job Store
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 创建调度器
scheduler = BackgroundScheduler(jobstores=jobstores)
# 添加任务
scheduler.add_job(call_api, 'interval', minutes=1)
# 启动调度器
scheduler.start()

try:
    # 模拟应用保持运行
    while True:
        time.sleep(2)
except (KeyboardInterrupt, SystemExit):
    # 关闭调度器
    scheduler.shutdown()

6. 总结

APScheduler 是一个功能强大的任务调度库,支持多种触发器和持久化功能,非常适合在 Python 应用中进行定时任务调度。通过本文的介绍,你应该能够在自己的项目中使用 APScheduler 进行定时调用指定的接口。如果你有任何问题或需要进一步的帮助,请随时告诉我。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward