PythonのAPSchedulerのお試しとDjangoやFlaskへの応用の可能性

Python

Pythonでタスクをスケジュールしたいときに使えるAPSchedulerについてご紹介します。

「Pythonのプログラムを定期実行やスケジュール実行したい」

Pythonで何か動くものを作るときにはこんな希望が湧いてくることもあります。

Pythonを定期実行する方法は色々とあると思いますが、以前個人的に作ったPythonのプログラムはUbuntuのcronで実行させていました。crontabでPythonコマンドを実行する形です。

でもスケジュールの日時をcronじゃなくてPythonの範疇で管理したいと考えるようになりました。

なにか方法はないかと探すとスケジューラーのパッケージ「APScheduler」を知り、さっそく試してみることにしました。

GitHub - agronholm/apscheduler: Task scheduling library for Python
Task scheduling library for Python. Contribute to agronholm/apscheduler development by creating an account on GitHub.

APSchedulerのお試し

今回は日時の設定を何かしらの設定ファイルに保存しておいて、それをPythonで読み込んでスケジュールの設定をしたいと考えました。

設定ファイルの形式は色々考えられますが、とりあえずはJSONにして試してみます。

パッケージのインストール

pip install apscheduler

schedule.json

スケジュールの設定を記載するJSONファイルです。

scheduleの下にsecondのデータを6つ保持しています。時刻の時分秒の秒数を10秒刻みで宣言しています。

ある時刻の○○分10秒のとき、ある時刻の○○分20秒のとき、という意味合いの「second」です。

{
"schedule" : [
    { "second" : 0 },
    { "second" : 10 },
    { "second" : 20 },
    { "second" : 30 },
    { "second" : 40 },
    { "second" : 50 }
]
}

schedule.py

スケジュールのお試しプログラム本体です。

スケジュールしたタイミング(例えばある時刻の○○分10秒のとき)に現在時刻を表示するプログラムです。

先程のschedule.jsonから設定を読み込み、それをscheduler.add_job(notice_time, 'cron', second=second_num)に渡しています。

6つのジョブが作られ、それらがバックグラウンドで実行されて、30秒後にすべてのジョブを削除して終了しています。

このプログラムではBackgroundSchedulerを使っていますのでバックグラウンド(他のプロセス)で実行されます。もしBlockingSchedulerを使うとブロッキングということで同一プロセスで処理されます。※ブロッキングにするとtime.sleep(30)が呼ばれません。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import os
import json
import tzlocal
import time
import pprint

# スケジュールで実行する関数
def notice_time():
    print(f"The time is : {datetime.now()}")

    
# スケジュール設定の読み込み
with open('./schedule.json', 'r') as f:
    time_data = json.load(f)

second_data_list = time_data.get("schedule")
print("スケジュールデータの一覧")
pprint.pprint(second_data_list)

# スケジューラー作成
scheduler = BackgroundScheduler(timezone=str(tzlocal.get_localzone()))
for second_data in second_data_list:
    second_num = second_data.get("second")
    # ジョブの追加
    job_data = scheduler.add_job(notice_time, 'cron', second=second_num)
    print(f"ジョブ追加 (ID) : {job_data.id}")

# ジョブ一覧取得
job_list = scheduler.get_jobs()
print("ジョブの一覧")
pprint.pprint(job_list)

# スケジューラー実行
scheduler.start()

# 30秒待ってからジョブを削除
time.sleep(30)

for job in job_list:
    job.remove()

print("END")

タイムゾーンの設定

BackgroundSchedulerを呼び出す際にtimezone=str(tzlocal.get_localzone())を渡していますが、これはタイムゾーンを指定しない場合に以下のワーニングが発生するためです。

PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html

実行結果

スケジュールデータの一覧
[{'second': 0},
 {'second': 10},
 {'second': 20},
 {'second': 30},
 {'second': 40},
 {'second': 50}]
ジョブ追加 (ID) : fe46bbe9897b4745863e78d124cc41c4
ジョブ追加 (ID) : fb5de21bbc6e425fac60780f5833738f
ジョブ追加 (ID) : d4f3ce2afa914c65958e691b8a5ad8d2
ジョブ追加 (ID) : 0922a605164d4d5095343d7e6f5891c6
ジョブ追加 (ID) : caa006193ab04bba9f51a791d53973c8
ジョブ追加 (ID) : 6eacad98fe254f2ca3274b5db0cca0fa
ジョブの一覧
[<Job (id=fe46bbe9897b4745863e78d124cc41c4 name=notice_time)>,
 <Job (id=fb5de21bbc6e425fac60780f5833738f name=notice_time)>,
 <Job (id=d4f3ce2afa914c65958e691b8a5ad8d2 name=notice_time)>,
 <Job (id=0922a605164d4d5095343d7e6f5891c6 name=notice_time)>,
 <Job (id=caa006193ab04bba9f51a791d53973c8 name=notice_time)>,
 <Job (id=6eacad98fe254f2ca3274b5db0cca0fa name=notice_time)>]
The time is : 2022-04-17 21:41:30.001972
The time is : 2022-04-17 21:41:40.002751
The time is : 2022-04-17 21:41:50.003512
END

DjangoやFlaskで使えるか?

さて、このAPSchedulerをDjangoやFlaskで呼び出して使えるか、ということについて考えてみます。

なお、現状自分では結論としては明確な答えは出せません

サクッとDjangoやFlaskのコードに入れ込んでスケジュールを組めたら簡単そうですが、本当にこれができるのか否か。

APSchedulerのドキュメントを見てみると、スレッドで動かしているようなことが書かれています。

User guide — APScheduler 3.10.4.post1 documentation

スレッドを使う場合、Djangoを動かす土台が開発用のrunserverコマンドで呼び出すサーバーなら動いたとしても、本番環境で使うであろうwsgiの環境では怪しそうです。

Djangoでスレッドを扱おうとしたときの問題点については先日書評を書いた『自走プログラマー Pythonの先輩が教えるプロジェクト開発のベストプラクティス120』でも取り上げられています。

現実的に安心な方法としてはCeleryなどの非同期タスクキューを使うと良いということでしょうか。

一方で、Djangoにも使えるよ、という情報としてDjango向けのAPSchedulerのラッパーモジュールがありました。これはAdminページからタスクの情報をみれるようですね。

django-apscheduler
APScheduler for Django

でもよく見てみると使い方に以下のように書かれています。

Use a custom Django management command to start a single scheduler in its own dedicated process (recommended – see the runapscheduler.py example below)

「Django のカスタム管理コマンドを使って、単一のスケジューラを専用のプロセスで起動する(推奨)」といった内容です。ということは、wsgiで動いているDjangoのプロセスの中でスケジュールをすると問題があるということでしょうか。例えばviews.pyの関数から呼び出したら不都合があるかもしれません。

また、他にもAPSchedulerのドキュメントを見るとAsyncIOSchedulerの記載があり、asyncioモジュールを使う際に適用するもののようです。(具体的にどう使うかといったことは理解していません)

(Django 3でasgiによって非同期viewが使えるようになったお話もありますので、このあたりも関連するのか勉強中)

“asyncio”は怖くない! Djangoでも非同期IO/並行処理の恩恵を受けられる
「DjangoCongress JP」は日本で開催されるDjango Webフレームワークのカンファレンスです。参加する全ての人がDjangoについて交流し、出会い、学び、楽しみ、深い理解を得ることを目的にしています。福田氏は、Django 3のASGI対応について発表しました。全2回。後半は、Django Async...

コメント

タイトルとURLをコピーしました