forwardspam_cyprus_off/bot/scheduler.py
2024-09-14 23:27:11 +03:00

38 lines
1.4 KiB
Python

import asyncio
import time
from typing import Callable, Awaitable, Coroutine
class AsyncScheduler:
"""
Запускает, останавливает ассинхронные задачаи в заданное время
"""
def __init__(self):
self.tasks = []
async def task_runner(self, task: Callable, run_time: str):
"""
Запускает задачу, которая выполняет каждый день в заданный run_time
:param task: await Функция
:param run_time: строка "HH:MM"
"""
while True:
current_time = time.strftime("%H:%M", time.localtime())
if current_time == run_time:
await task()
await asyncio.sleep(60)
async def add_task(self, task, run_time):
"""Добавление новой задачи в определенное время"""
task_instance = asyncio.create_task(self.task_runner(task, run_time))
self.tasks.append((task, run_time, task_instance))
async def remove_task(self, task, run_time) -> bool:
"""Удаление сущесвтующей задачи по функции и времени"""
for t, r_t, task_instance in self.tasks:
if t == task and r_t == run_time:
task_instance.cancel()
self.tasks.remove((t, r_t, task_instance))
return True
return False