programing

Python Script가 매 시간 정확하게 실행되도록 예약

i4 2023. 5. 11. 21:06
반응형

Python Script가 매 시간 정확하게 실행되도록 예약

질문하기 전에 Cron Jobs와 Task Scheduler가 마지막 옵션이 될 것입니다. 이 스크립트는 Windows와 Linux에서 사용될 것입니다. 최종 사용자에게 이 작업을 완료하도록 맡기는 것보다 코드화된 방법이 더 좋습니다.

작업을 예약하는 데 사용할 수 있는 Python용 라이브러리가 있습니까?한 시간에 한 번씩 기능을 실행해야 하지만 스크립트를 실행하고 .sleep을 사용하면 스크립트 및/또는 기능 실행이 지연되어 "한 시간에 한 번"이 전날과 다른 시간에 실행됩니다.

Cron Job을 사용하거나 작업 스케줄러를 사용하여 예약하지 않고 특정 시간(두 번 이상)에 실행되도록 기능을 예약하는 가장 좋은 방법은 무엇입니까?

또는 이것이 불가능하다면, 당신의 의견도 듣고 싶습니다.

AP Scheduler는 제 요구에 딱 맞습니다.

버전 < 3.0

import datetime
import time
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()

def job_function():
    print("Hello World")
    print(datetime.datetime.now())
    time.sleep(20)

# Schedules job_function to be run once each minute
sched.add_cron_job(job_function,  minute='0-59')

아웃:

>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110

버전 > 3.0

(아래 Animesh Pandey의 답변에서)

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

이것이 도움이 될 수 있습니다: 고급 파이썬 스케줄러

다음은 문서의 작은 코드입니다.

from apscheduler.schedulers.blocking import BlockingScheduler

def some_job():
    print "Decorated job"

scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()

매시 10분마다 무언가를 실행하는 것.

from datetime import datetime, timedelta

while 1:
    print 'Run something..'

    dt = datetime.now() + timedelta(hours=1)
    dt = dt.replace(minute=10)

    while datetime.now() < dt:
        time.sleep(1)

위해서apscheduler< 3.0, Unknown의 답변 참조.

위해서apscheduler> 3.0

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

업데이트:

apscheduler 문서화

은 것은이를 위한 입니다.apscheduler-3.3.1Python 3.6.2.

"""
Following configurations are set for the scheduler:

 - a MongoDBJobStore named “mongo”
 - an SQLAlchemyJobStore named “default” (using SQLite)
 - a ThreadPoolExecutor named “default”, with a worker count of 20
 - a ProcessPoolExecutor named “processpool”, with a worker count of 5
 - UTC as the scheduler’s timezone
 - coalescing turned off for new jobs by default
 - a default maximum instance limit of 3 for new jobs
"""

from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

"""
Method 1:
"""
jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

"""
Method 2 (ini format):
"""
gconfig = {
    'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
}

sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format


@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')


@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')


sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()

sched_method2.configure(gconfig=gconfig)
sched_method2.start()

제가 제안할 수 있는 가장 간단한 옵션은 스케줄 라이브러리를 사용하는 것입니다.

질문에서 "한 시간에 한 번씩 함수를 실행해야 합니다."라는 코드는 매우 간단합니다.

    import schedule

    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().hour.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()

당신은 또한 하루 중 특정한 시간에 어떤 것을 하는 방법에 대한 몇 가지 예를 물었습니다:

    import schedule


    def thing_you_wanna_do():
        ...
        ...
        return


    schedule.every().day.at("10:30").do(thing_you_wanna_do)
    schedule.every().monday.do(thing_you_wanna_do)
    schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
    # If you would like some randomness / variation you could also do something like this
    schedule.every(1).to(2).hours.do(thing_you_wanna_do)

    while True:
        schedule.run_pending()

사용되는 코드의 90%는 스케줄 라이브러리의 예제 코드입니다.즐거운 일정 되세요!

스크립트를 매 시간 15분마다 실행합니다.예를 들어, 15분마다 업데이트되는 15분 주가 견적을 받으려고 합니다.

while True:
    print("Update data:", datetime.now())
    sleep = 15 - datetime.now().minute % 15
    if sleep == 15:
        run_strategy()
        time.sleep(sleep * 60)
    else:
        time.sleep(sleep * 60)
   #For scheduling task execution
import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).minutes.do(job)
#schedule.every().hour.do(job)
#schedule.every().day.at("10:30").do(job)
#schedule.every(5).to(10).minutes.do(job)
#schedule.every().monday.do(job)
#schedule.every().wednesday.at("13:15").do(job)
#schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

Python 표준 라이브러리는 이 작업에 대한 예약스레드를 제공합니다.그러나 이는 스케줄러 스크립트가 OS에 실행을 맡기는 대신 항상 실행된다는 것을 의미합니다. OS가 원하는 대로 실행될 수도 있고 그렇지 않을 수도 있습니다.

"Version < 3.0"이라는 Sunshine Kitty가 게시한 버전에서 apscheduler 2.1.2를 지정해야 할 수도 있습니다. 실수로 2.7을 설치할 때 버전 3을 설치했기 때문에 다음을 수행했습니다.

pip uninstall apscheduler
pip install apscheduler==2.1.2

그 이후로 제대로 작동했습니다.도움이 되길 바랍니다.

clock.py

from apscheduler.schedulers.blocking import BlockingScheduler
import pytz

sched = BlockingScheduler(timezone=pytz.timezone('Africa/Lagos'))

@sched.scheduled_job('cron', day_of_week='mon-sun', hour=22)
def scheduled_job():
    print('This job is run every week at 10pm.')
    #your job here


sched.start()

Proc 파일

clock: python clock.py

요구 사항들.txt

APScheduler==3.0.0

배포 후 마지막 단계는 클럭 프로세스를 확장하는 것입니다.이는 단일 프로세스이므로 이러한 프로세스 중 하나 이상을 확장할 필요가 없습니다.두 개를 실행하면 작업이 중복됩니다.

$ heroku ps:scale clock=1

출처: https://devcenter.heroku.com/articles/clock-processes-python

아마도 로켓리가 당신의 필요에 맞을지도 모릅니다.강력한 스케줄러로 사용하기 쉽고, 많은 스케줄링 옵션이 내장되어 있으며, 다음과 같이 쉽게 확장이 용이합니다.

from rocketry import Rocketry
from rocketry.conds import daily, every, after_success

app = Rocketry()

@app.task(every("1 hour 30 minutes"))
def do_things():
    ...    

@app.task(daily.between("12:00", "17:00"))
def do_daily_afternoon():
    ...

@app.task(daily & after_success(do_things))
def do_daily_after_task():
    ...

if __name__ == "__main__":
    app.run()

하지만 훨씬 더 많은 것을 가지고 있습니다.

  • 문자열 기반 스케줄링 구문
  • 논리적 문(AND, OR, NOT)
  • 많은 기본 제공 예약 옵션
  • 사용자 지정이 용이함(사용자 지정 조건, 매개 변수 등)
  • 병렬화(별도의 스레드 또는 프로세스에서 실행)
  • 파라메타라이제이션(실행 순서 및 입출력)
  • 지속성: 원하는 위치에 로그 저장
  • 런타임에 스케줄러 수정(즉, 그 위에 API 빌드)

링크:

고지 사항:내가 저자야

이미 @lukik 솔루션을 가지고 있을 수 있지만, 예약을 제거하려면 다음을 사용해야 합니다.

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

또는

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

명시적 작업 ID를 사용해야 하는 경우

자세한 내용은 https://apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs 을 참조하십시오.

스케줄러가 매초마다 프로그램을 실행해야 한다는 것을 알게 되었습니다.만약 온라인 서버를 사용한다면 비용이 많이 들 것입니다.그래서 저는 다음과 같습니다.

매분 5초마다 실행되며, 대기시간을 초 단위로 재계산하여 몇 시간 단위로 변경할 수 있습니다.

import time
import datetime
Initiating = True
print(datetime.datetime.now())
while True:
    if Initiating == True:
        print("Initiate")
        print( datetime.datetime.now())
        time.sleep(60 - time.time() % 60+5)
        Initiating = False
    else:
        time.sleep(60)
        print("working")
        print(datetime.datetime.now())

이 방법은 상대적인 델타와 날짜 시간, 매 시간 모듈로 부울 검사를 사용하여 저에게 효과가 있었습니다.시작한 시간부터 매 시간마다 실행됩니다.

import time
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

 #Track next run outside loop and update the next run time within the loop   
    nxt_run=datetime.now()

#because while loops evaluate at microseconds we basically need to use a boolean evaluation to track when it should run next
    while True:
        cnow = datetime.now() #track the current time
        time.sleep(1) #good to have so cpu doesn't spike
        if (cnow.hour % 1 == 0 and cnow >= nxt_run):
           print(f"start @{cnow}: next run @{nxt_run}")
           nxt_run=cnow+relativedelta(hours=1) #add an hour to the next run
        else:
           print(f"next run @{nxt_run}")

한 가지 옵션은 정기적으로 파이썬 스크립트를 실행하는 C/C++ 래퍼를 작성하는 것입니다.최종 사용자는 백그라운드에서 실행되는 C/C++ 실행 파일을 실행하고 주기적으로 파이썬 스크립트를 실행합니다.C/C++를 모르거나 이 100% 파이썬을 유지하려면 이 방법이 최선의 해결책이 아닐 수 있습니다.그러나 사람들이 실행 파일을 클릭하는 데 익숙하기 때문에 가장 사용하기 쉬운 접근 방식인 것 같습니다.이 모든 것은 파이썬이 최종 사용자의 컴퓨터에 설치되어 있다고 가정합니다.

또 다른 옵션은 cron 작업/작업 스케줄러를 사용하지만 최종 사용자가 이 작업을 수행할 필요가 없도록 설치 프로그램에 스크립트로 저장하는 것입니다.

언급URL : https://stackoverflow.com/questions/22715086/scheduling-python-script-to-run-every-hour-accurately

반응형