• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    Python使用signal定时结束AsyncIOScheduler任务的问题

    在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题

    1. 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)
    2. 如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)

    原调试代码如下:

    from datetime import datetime, timedelta
    
    import aiohttp
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    async def get(session):
        url = 'https://httpbin.org/get?a=1'
        async with session.get(url) as res:
            print('get', res.status)
            return await res.text()
    
    async def post(session):
        url = 'https://httpbin.org/post?b=2'
        async with session.post(url) as res:
            print('post', res.status)
            return await res.text()
    async def main():
        async with aiohttp.ClientSession() as session:
            await get(session)
            await post(session)
    
    if __name__ == '__main__':
        jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
        scheduler = AsyncIOScheduler(jobstores=jobstores)
        for i in range(10):  # 添加10个任务
            job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
        scheduler.start()

    Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()来启动协程任务。
    但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
    此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。

    # -*- coding: utf-8 -*-
    """
    @Time : 2021/7/23
    @Auth : hanzhichao
    @Desc:
    """
    from datetime import datetime, timedelta
    import signal
    import asyncio
    
    import aiohttp
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    
    async def get(session):
        url = 'https://httpbin.org/get?a=1'
        async with session.get(url) as res:
            print('get', res.status)
            return await res.text()
    
    async def post(session):
        url = 'https://httpbin.org/post?b=2'
        async with session.post(url) as res:
            print('post', res.status)
            return await res.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            await get(session)
            await post(session)
    
    if __name__ == '__main__':
        jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
        scheduler = AsyncIOScheduler(jobstores=jobstores)
        for i in range(10):  # 添加10个任务
            job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
        scheduler.start()
        signal.alarm(20)  # 20秒后终止程序
        asyncio.get_event_loop().run_forever()  # 永远运行

    到此这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这了,更多相关Python定时结束AsyncIOScheduler任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    您可能感兴趣的文章:
    • 将Python脚本打包成MACOSAPP程序过程
    • [项目布局配置]Nosql与PythonWeb-Flask框架组合
    • Python连接Postgres/Mysql/Mongo数据库基本操作大全
    • python生成可执行exe控制Microsip自动填写号码并拨打功能
    • Python之os模块案例详解
    上一篇:opencv用VS2013调试时用Image Watch插件查看图片
    下一篇:pytho多张图片的无损拼接的实现示例
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    Python使用signal定时结束AsyncIOScheduler任务的问题 Python,使用,signal,定时,结束,