Python 高阶函数:日志的高级用法
日志装饰器的 **7 个高阶优化方案**,结合了生产环境最佳实践和调试深度需求:
---
### 一、**智能动态采样装饰器**
解决高频函数日志过多问题,自动根据错误率调整日志频率
```python
from collections import defaultdict
import time
class AdaptiveLogger:
def __init__(self, base_interval=1.0, error_threshold=3):
self.func_stats = defaultdict(lambda: {'total':0, 'errors':0})
self.base_interval = base_interval
self.error_threshold = error_threshold
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
stats = self.func_stats[func.__name__]
stats['total'] += 1
# 错误率超过阈值时开启详细日志
if stats['errors'] > self.error_threshold:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# 采样控制
if time.time() - stats.get('last_log', 0) > self.base_interval:
logging.log(log_level, f"Sampling call to {func.__name__}")
stats['last_log'] = time.time()
try:
return func(*args, **kwargs)
except Exception:
stats['errors'] += 1
raise
return wrapper
# 使用示例
adaptive_logger = AdaptiveLogger()
@adaptive_logger
def high_frequency_api():
pass
```
---
### 二、**全链路追踪装饰器**
跨函数追踪调用关系,生成可视化调用树
```python
import threading
from graphviz import Digraph
class TraceVisualizer:
_local = threading.local()
def __init__(self, output_file="call_graph"):
self.graph = Digraph(comment='Call Graph')
self.output_file = output_file
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(self._local, 'call_stack'):
self._local.call_stack = []
parent = self._local.call_stack[-1] if self._local.call_stack else None
node_id = f"{func.__name__}_{id(args)}_{id(kwargs)}"
self.graph.node(node_id, func.__name__)
if parent:
self.graph.edge(parent, node_id)
self._local.call_stack.append(node_id)
try:
result = func(*args, **kwargs)
finally:
self._local.call_stack.pop()
if not self._local.call_stack:
self.graph.render(self.output_file, cleanup=True)
return result
return wrapper
# 使用示例
tracer = TraceVisualizer()
@tracer
def a(): b()
@tracer
def b(): c()
@tracer
def c(): pass
a() # 生成call_graph.pdf
```
---
### 三、**内存时间线记录装饰器**
记录函数生命周期内的内存变化曲线(需 `matplotlib`)
```python
import psutil
import matplotlib.pyplot as plt
def memory_timeline(func):
@wraps(func)
def wrapper(*args, **kwargs):
process = psutil.Process()
timeline = []
def record_memory():
while getattr(threading.current_thread(), "do_monitor", True):
timeline.append(process.memory_info().rss / 1024**2)
time.sleep(0.01)
monitor_thread = threading.Thread(target=record_memory)
monitor_thread.start()
try:
result = func(*args, **kwargs)
finally:
monitor_thread.do_monitor = False
monitor_thread.join()
plt.plot(timeline)
plt.title(f"Memory Usage of {func.__name__}")
plt.ylabel("MB")
plt.show()
return result
return wrapper
# 使用示例
@memory_timeline
def process_large_data():
data = [np.random.rand(1000,1000) for _ in range(10)]
time.sleep(1)
```
---
### 四、**AI异常诊断装饰器**
接入大模型自动分析错误原因(需 OpenAI API)
```python
import openai
def ai_diagnosis(api_key):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_info = f"""
Error Type: {type(e).__name__}
Message: {str(e)}
Args: {args}
Kwargs: {kwargs}
Traceback: {traceback.format_exc()}
"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"分析以下Python错误原因,用中文给出解决建议:\n{error_info}"
}]
)
logging.error("AI诊断结果:\n" + response.choices[0].message.content)
raise
return wrapper
return decorator
# 使用示例
@ai_diagnosis(api_key="sk-...")
def buggy_function():
pass
```
---
### 五、**实时仪表盘装饰器**
将运行数据推送至Web仪表盘(需 `websockets`)
```python
from websockets.sync.client import connect
import json
def live_dashboard(websocket_url):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
mem_before = psutil.Process().memory_info().rss
try:
result = func(*args, **kwargs)
status = "success"
except Exception as e:
status = f"error: {str(e)}"
raise
finally:
data = {
"function": func.__name__,
"duration": time.time() - start_time,
"memory_delta": psutil.Process().memory_info().rss - mem_before,
"status": status,
"timestamp": datetime.now().isoformat()
}
with connect(websocket_url) as ws:
ws.send(json.dumps(data))
return result
return wrapper
return decorator
# 使用示例(需配合前端展示)
@live_dashboard("ws://localhost:8000/dashboard")
def monitored_task():
pass
```
---
### 六、**版本感知日志装饰器**
自动记录代码版本和输入数据哈希
```python
import hashlib
import subprocess
def version_aware(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 获取Git版本
try:
git_version = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()
except:
git_version = "unknown"
# 计算输入参数哈希
input_hash = hashlib.sha256()
for arg in args:
input_hash.update(str(arg).encode())
for k, v in kwargs.items():
input_hash.update(f"{k}={v}".encode())
logging.info(f"🔖 CODE VERSION: {git_version[:8]}")
logging.info(f"🔑 INPUT HASH: {input_hash.hexdigest()[:16]}")
return func(*args, **kwargs)
return wrapper
# 使用示例
@version_aware
def versioned_processing(data):
pass
```
---
### 七、**量子化时间记录装饰器**
发现隐藏的时间异常(适合高频交易等场景)
```python
import time
import numpy as np
from scipy import stats
def quantum_time_monitor(window_size=100):
def decorator(func):
history = []
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter_ns()
result = func(*args, **kwargs)
elapsed = time.perf_counter_ns() - start
history.append(elapsed)
if len(history) > window_size:
history.pop(0)
# 检测时间分布异常
z_scores = np.abs(stats.zscore(history))
if np.any(z_scores > 3):
logging.warning(f"⏱️ 时间异常! 当前值 {elapsed/1e6:.2f}ms, "
f"均值 {np.mean(history)/1e6:.2f}ms ± {np.std(history)/1e6:.2f}ms")
return result
return wrapper
return decorator
# 使用示例
@quantum_time_monitor()
def low_latency_trade():
time.sleep(0.001 + random.random()*0.0001)
```
---
### **最佳实践组合方案**
1. **开发阶段**:
```python
@debug_tracer() # 详细错误追踪
@memory_timeline # 内存分析
@trace_execution # 参数追踪
def critical_function():
pass
```
2. **生产环境**:
```python
@adaptive_logger # 智能采样
@version_aware # 版本追踪
@quantum_time_monitor() # 性能监控
def production_api():
pass
```
3. **故障排查**:
```python
@live_dashboard("ws://monitor.example.com") # 实时监控
@ai_diagnosis(API_KEY) # 智能诊断
def troubleshooting():
pass
```
---
这些方案可根据实际需求自由组合,建议先实施 **全链路追踪 + 智能动态采样** 作为基础监控层,再逐步添加 AI 诊断等高级功能。关键是要建立统一的日志分析平台(如 ELK 或 Grafana)来聚合所有装饰器产生的数据。