需求背景
定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:
- 支持异步操作,避免阻塞主线程;
- 单例化事件循环,节省资源;
- 优雅地管理定时器的生命周期;
- 提供简单的接口,易于使用。
为此,设计了一个 Timer
类,结合 asyncio
和 threading
,实现了一个高效的定时器。
完整代码
import asyncio
import threading
import time
import sys
class Timer:
_loop = None
_thread = None
_lock = threading.Lock()
_running_timers = 0
@classmethod
def _ensure_loop(cls):
with cls._lock:
if cls._loop is None or not cls._thread or not cls._thread.is_alive():
cls._loop = asyncio.new_event_loop()
cls._thread = threading.Thread(
target=cls._run_loop,
args=(cls._loop,),
daemon=True
)
cls._thread.start()
@classmethod
def _run_loop(cls, loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"事件循环异常: {e}")
finally:
loop.close()
@classmethod
def _shutdown(cls):
with cls._lock:
if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
cls._loop.call_soon_threadsafe(cls._loop.stop)
# 不使用 join,因为守护线程会在主线程退出时自动结束
def __init__(self):
self.is_running = False
self._stop_event = asyncio.Event()
self._task = None
async def _timer_loop(self, interval, callback):
try:
while not self._stop_event.is_set():
await asyncio.sleep(interval)
if not self._stop_event.is_set():
await asyncio.get_event_loop().run_in_executor(None, callback)
except asyncio.CancelledError:
pass # 正常取消时忽略
except Exception as e:
print(f"定时器循环异常: {e}")
finally:
self.is_running = False
Timer._running_timers -= 1
Timer._shutdown()
def start(self, interval, callback):
if not self.is_running:
Timer._ensure_loop()
self.is_running = True
self._stop_event.clear()
self._task = asyncio.run_coroutine_threadsafe(
self._timer_loop(interval, callback),
Timer._loop
)
Timer._running_timers += 1
# print(f"定时器已启动,每{interval}秒执行一次")
def stop(self):
if self.is_running:
self._stop_event.set()
if self._task:
Timer._loop.call_soon_threadsafe(self._task.cancel)
self.is_running = False
# print("定时器已停止")
def is_active(self):
return self.is_running
# 使用示例
def callback1():
print(f"回调1触发: {time.strftime('%H:%M:%S')}")
def callback2():
print(f"回调2触发: {time.strftime('%H:%M:%S')}")
if __name__ == "__main__":
timer1 = Timer()
timer2 = Timer()
timer1.start(2, callback1)
timer2.start(3, callback2)
try:
time.sleep(100)
timer1.stop()
time.sleep(2)
timer2.stop()
except KeyboardInterrupt:
timer1.stop()
timer2.stop()
finally:
# 确保在程序退出时清理
Timer._shutdown()
1. 单例事件循环的实现
为了避免每个定时器都创建一个独立的事件循环,在 Timer
类中使用了类变量和类方法来管理全局唯一的事件循环:
class Timer:
_loop = None
_thread = None
_lock = threading.Lock()
_running_timers = 0
@classmethod
def _ensure_loop(cls):
with cls._lock:
if cls._loop is None or not cls._thread or not cls._thread.is_alive():
cls._loop = asyncio.new_event_loop()
cls._thread = threading.Thread(
target=cls._run_loop,
args=(cls._loop,),
daemon=True
)
cls._thread.start()
_loop
:存储全局的asyncio
事件循环。_thread
:将事件循环运行在一个独立的守护线程中,避免阻塞主线程。_lock
:线程锁,确保在多线程环境中创建事件循环时的线程安全。_ensure_loop
:在需要时创建或重用事件循环,确保只有一个全局循环。
守护线程(daemon=True
)的设计使得程序退出时无需显式关闭线程,简化了资源清理。
2. 事件循环的运行与关闭
事件循环的运行逻辑封装在 _run_loop
中:
@classmethod
def _run_loop(cls, loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except Exception as e:
print(f"事件循环异常: {e}")
finally:
loop.close()
run_forever
:让事件循环持续运行,直到被外部停止。- 异常处理:捕获可能的错误并打印,便于调试。
finally
:确保循环关闭时资源被正确释放。
关闭逻辑则由 _shutdown
方法控制:
@classmethod
def _shutdown(cls):
with cls._lock:
if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
cls._loop.call_soon_threadsafe(cls._loop.stop)
当所有定时器都停止时(_running_timers == 0
),事件循环会被安全停止。
3. 定时器核心逻辑
每个 Timer
实例负责管理一个独立的定时任务:
def __init__(self):
self.is_running = False
self._stop_event = asyncio.Event()
self._task = None
async def _timer_loop(self, interval, callback):
try:
while not self._stop_event.is_set():
await asyncio.sleep(interval)
if not self._stop_event.is_set():
await asyncio.get_event_loop().run_in_executor(None, callback)
except asyncio.CancelledError:
pass # 正常取消时忽略
finally:
self.is_running = False
Timer._running_timers -= 1
Timer._shutdown()
_stop_event
:一个asyncio.Event
对象,用于控制定时器的停止。_timer_loop
:异步协程,每隔interval
秒执行一次回调函数callback
。run_in_executor
:将回调函数运行在默认的线程池中,避免阻塞事件循环。
4. 启动与停止
启动和停止方法是用户的主要接口:
def start(self, interval, callback):
if not self.is_running:
Timer._ensure_loop()
self.is_running = True
self._stop_event.clear()
self._task = asyncio.run_coroutine_threadsafe(
self._timer_loop(interval, callback),
Timer._loop
)
Timer._running_timers += 1
def stop(self):
if self.is_running:
self._stop_event.set()
if self._task:
Timer._loop.call_soon_threadsafe(self._task.cancel)
self.is_running = False
start
:启动定时器,确保事件循环可用,并记录运行中的定时器数量。stop
:通过设置_stop_event
并取消任务来停止定时器。
5. 使用示例
以下是一个简单的使用示例:
def callback1():
print(f"回调1触发: {time.strftime('%H:%M:%S')}")
def callback2():
print(f"回调2触发: {time.strftime('%H:%M:%S')}")
timer1 = Timer()
timer2 = Timer()
timer1.start(2, callback1) # 每2秒触发一次
timer2.start(3, callback2) # 每3秒触发一次
time.sleep(10) # 运行10秒
timer1.stop()
timer2.stop()
输出可能如下:
回调1触发: 14:30:02
回调2触发: 14:30:03
回调1触发: 14:30:04
回调1触发: 14:30:06
回调2触发: 14:30:06
...
设计亮点
- 异步与多线程结合:通过
asyncio
和threading
,实现了非阻塞的定时器,适合高并发场景。 - 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
- 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
- 线程安全:使用锁机制确保多线程环境下的稳定性。
适用场景
- 周期性任务调度,如数据刷新、状态检查。
- 后台服务中的定时监控。
- 游戏或实时应用中的计时器需求。
总结
这个异步定时器实现结合了 Python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!
以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注
© 版权声明
本站资源来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!
THE END