当前位置: 首页 > news >正文

CPU Loading and Task Loading Visualization Tool

用LLM AI完成tool开发。


利用ftrace的调度trace event (sched_switch 和 sched_stat_runtime) 进行cpu loading和task loading的统计分析,以及可视化展示。

项目名称

Ftrace 日志分析器

项目概述

此项目是一个用于分析 Ftrace 日志的 Python 脚本,它能够解析 Ftrace 日志文件,统计每个线程和 CPU 的使用情况,并生成包含 CPU 负载曲线和线程使用情况表格的 HTML 报告。

功能特性

  • 日志解析:解析 Ftrace 日志文件,提取关键信息,如线程名、进程 ID、CPU 编号、时间戳和事件类型。
  • 统计信息:统计每个线程在各个 CPU 上的运行时间,以及每个 CPU 的总活跃时间和空闲时间。
  • 可视化报告:生成 HTML 报告,包含 CPU 负载曲线和线程使用情况表格,方便用户直观地查看系统的 CPU 使用情况。

安装依赖

运行此脚本前,需要安装以下 Python 库:

  • numpy
  • bokeh

可以使用以下命令进行安装:pip install numpy bokeh

使用的 Ftrace 事件

本工具主要使用以下两个 Ftrace 事件进行分析:

  • sched_switch:该事件在进程切换时触发,记录了从一个进程切换到另一个进程的相关信息。通过解析这个事件,可以追踪每个线程在不同 CPU 上的运行时间,以及 CPU 的空闲时间。
  • sched_stat_runtime:该事件用于统计进程的运行时间。通过这个事件,可以获取每个线程的运行时长,进一步完善线程的统计信息。

使用 trace-cmd 抓取 Ftrace 日志

trace-cmd 是一个用于控制 Ftrace 功能的命令行工具,以下是使用 trace-cmd 抓取所需 Ftrace 日志的步骤:

1. 安装 trace-cmd

如果你还没有安装 trace-cmd,可以使用包管理器进行安装。例如,在 Ubuntu 上可以使用以下命令:sudo apt-get install trace-cmd

2. 配置跟踪事件

此脚本需要跟踪 sched_switch 和 sched_stat_runtime 事件,你可以使用以下命令启用这些事件:sudo trace-cmd record -e sched_switch -e sched_stat_runtime

3. 开始记录日志

运行上述命令后,trace-cmd 会开始记录指定事件的日志。你可以让系统运行一段时间,以收集足够的日志数据。

4. 停止记录日志

当你认为已经收集到足够的日志数据时,可以按下 Ctrl + C 停止记录。trace-cmd 会将日志保存到一个文件中,默认文件名是 trace.dat

5. 转换日志文件格式

由于脚本需要的是文本格式的日志文件,你可以使用以下命令将 trace.dat 转换为文本格式:sudo trace-cmd report -i trace.dat > ftrace.log

使用方法

  1. 确保已经安装了所需的依赖库。
  2. 使用 trace-cmd 抓取 Ftrace 日志,并将其保存为文本文件(如 ftrace.log)。
  3. 运行脚本,命令如下:python ftrace_analyzer.py <ftrace.log>其中 <ftrace.log> 是你要分析的 Ftrace 日志文件的路径。
  4. 脚本运行完成后,会在当前目录下生成一个名为 report.html 的 HTML 报告文件,打开该文件即可查看分析结果。

代码结构

  • 类型定义:定义了一些数据类型,如 TimestampCPUID 和 PID,用于明确代码中变量的含义。
  • 日志解析模块FtraceParser 类负责解析 Ftrace 日志文件的每一行,将其转换为结构化的数据。
  • 事件处理模块EventProcessor 类处理 sched_switch 和 sched_stat_runtime 两种事件,统计线程和 CPU 的使用情况。
  • 报告生成模块ReportGenerator 类生成 HTML 报告,包含 CPU 负载曲线和线程使用情况表格。
  • 主流程analyze_ftrace 函数是主分析流程,读取日志文件,调用解析和处理函数,最后生成报告。

代码优化

在 ReportGenerator.generate_html_report 方法中,使用了滑动窗口算法来计算 CPU 负载曲线,使曲线更加平滑,粒度更细致。

注意事项

  • 确保输入的 Ftrace 日志文件格式正确,否则可能会导致解析失败。
  • 报告文件 report.html 会覆盖当前目录下已有的同名文件,请谨慎操作。
  • 使用 trace-cmd 抓取日志需要 root 权限,请确保你有足够的权限执行相关命令。

贡献与反馈

如果你发现任何问题或有改进建议,欢迎提交 issue 或 pull request。

1.Bokeh

import re
import datetime
import numpy as np
import logging
from collections import defaultdict
from typing import Dict, Tuple, Optional, Listfrom bokeh.plotting import figure, output_file, save
from bokeh.models import ColumnDataSource, DataTable, TableColumn, HoverTool
from bokeh.layouts import column# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)# ----------------------
# 类型定义
# ----------------------
Timestamp = float  # 时间戳(秒)
CPUID = int  # CPU标识符
PID = int  # 进程IDclass ThreadStats:"""线程统计数据结构"""def __init__(self):self.name: str = ""  # 线程名self.cpu_time: Dict[CPUID, float] = defaultdict(float)  # 各CPU累计时间self.total_time: float = 0.0  # 总运行时间class CPUStats:"""CPU统计数据结构"""def __init__(self):self.total_active: float = 0.0  # 总活跃时间(非swapper线程)self.total_idle: float = 0.0  # 总空闲时间(swapper线程)self.time_periods: List[Tuple[float, float, bool]] = []  # 时间段记录(开始时间,结束时间,是否活跃)class GlobalState:"""全局状态容器"""def __init__(self):self.threads: Dict[Tuple[str, PID], ThreadStats] = defaultdict(ThreadStats)  # (线程名, PID) -> 线程统计self.cpus: Dict[CPUID, CPUStats] = defaultdict(CPUStats)  # CPU -> 统计信息self.current_pid: Dict[CPUID, Tuple[Tuple[str, PID], Timestamp]] = {}  # CPU当前运行的(线程名, PID)及开始时间self.pid_to_name: Dict[PID, str] = {}  # PID到线程名的映射# ----------------------
# 日志解析模块
# ----------------------
class FtraceParser:"""Ftrace日志解析器"""LOG_REGEX = re.compile(r'^(?P<comm>[^-]+)-(?P<pid>\d+)\s+\[(?P<cpu>\d+)\]\s+'r'(?P<timestamp>\d+\.\d+):\s+'r'(?P<event>\w+):\s+(?P<details>.*)$')@classmethoddef parse_line(cls, line: str) -> Optional[dict]:"""解析单行日志,返回结构化数据"""start_time = datetime.datetime.now()match = cls.LOG_REGEX.match(line.strip())if not match:logger.debug(f"无法解析的行: {line.strip()}")return Noneparsed_data = {'comm': match.group('comm'),'pid': int(match.group('pid')),'cpu': int(match.group('cpu')),'timestamp': float(match.group('timestamp')),'event': match.group('event'),'details': match.group('details')}duration = (datetime.datetime.now() - start_time).total_seconds() * 1000logger.debug(f"解析成功: {parsed_data}, 耗时: {duration:.2f}ms")return parsed_data# ----------------------
# 事件处理模块
# ----------------------
class EventProcessor:"""事件处理器"""SWAPPER_NAMES = ('swapper', 'swapper/')  # 空闲线程名称特征@classmethoddef _is_swapper(cls, comm: str, pid: int) -> bool:"""判断是否为表示空闲的swapper线程"""return pid == 0 or comm.startswith(cls.SWAPPER_NAMES)@classmethoddef handle_sched_switch(cls, state: GlobalState, entry: dict):"""处理进程切换事件"""start_time = datetime.datetime.now()details = entry['details']match = re.match(r'(?P<prev_comm>[^:]+(?::[^:]+)*):(?P<prev_pid>\d+).*?==>\s+'r'(?P<next_comm>[^:]+(?::[^:]+)*):(?P<next_pid>\d+)',details)if not match:logger.warning(f"无法解析的sched_switch事件: {details}")returnlogger.debug(f"开始处理sched_switch事件: CPU={entry['cpu']}, 从 {match.group('prev_comm')}:{match.group('prev_pid')} 切换到 {match.group('next_comm')}:{match.group('next_pid')}")cpu = entry['cpu']ts = entry['timestamp']# 提取 prev 线程信息prev_comm = match.group('prev_comm')prev_pid = int(match.group('prev_pid'))prev_key = (prev_comm, prev_pid)# 提取 next 线程信息next_comm = match.group('next_comm')next_pid = int(match.group('next_pid'))next_key = (next_comm, next_pid)# 结束前一个线程的时间统计if cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)duration = ts - start_tsprev_running_comm = running_comm# 记录时间段is_active = not cls._is_swapper(prev_running_comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, ts, is_active))# 根据线程类型统计时间if cls._is_swapper(prev_running_comm, running_pid):state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += duration# 记录新线程信息state.current_pid[cpu] = (next_key, ts)state.pid_to_name[prev_pid] = prev_commstate.pid_to_name[next_pid] = next_commstate.threads[prev_key].name = prev_commstate.threads[next_key].name = next_comm@classmethoddef handle_sched_stat_runtime(cls, state: GlobalState, entry: dict):"""处理运行时统计事件"""start_time = datetime.datetime.now()details = entry['details']comm = re.search(r'comm=([^\s]+)', details).group(1)pid = int(re.search(r'pid=(\d+)', details).group(1))runtime_ns = int(re.search(r'runtime=(\d+)', details).group(1))cpu = entry['cpu']runtime = runtime_ns / 1e9  # 转换为秒key = (comm, pid)logger.debug(f"开始处理sched_stat_runtime: CPU={cpu}, {comm}:{pid}, 运行时间={runtime:.6f}s")if cls._is_swapper(comm, pid):state.cpus[cpu].total_idle += runtimestate.threads[key].cpu_time[cpu] += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = commelse:state.cpus[cpu].total_active += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = comm# ----------------------
# 报告生成模块
# ----------------------
class ReportGenerator:"""统计报告生成器"""@staticmethoddef generate_html_report(state: GlobalState, start_ts: float, end_ts: float, output_filename: str = "report.html"):"""生成HTML可视化报告"""report_start = datetime.datetime.now()logger.info(f"start generate report, duration:{start_ts:.2f}s-{end_ts:.2f}s, output file: {output_filename}")# 生成CPU负载曲线数据window_size = 1.0  # 滑动窗口大小(秒)interval = 0.1  # 采样间隔(秒)time_points = np.arange(start_ts, end_ts, interval)cpu_load_data = defaultdict(list)logger.info(f"start analysis cpu loading, cpu number: {len(state.cpus)}")for cpu in state.cpus:periods = state.cpus[cpu].time_periodsfor t in time_points:window_start = t - window_size / 2window_end = t + window_size / 2if window_start < start_ts or window_end > end_ts:continuetotal_active_time = 0.0for (p_start, p_end, is_active) in periods:if p_end <= window_start or p_start >= window_end:continueoverlap_start = max(p_start, window_start)overlap_end = min(p_end, window_end)if is_active:total_active_time += overlap_end - overlap_startload_pct = (total_active_time / window_size) * 100 if window_size > 0 else 0cpu_load_data[cpu].append((t - start_ts, load_pct))logger.info(f"create chart...")# 创建图表output_file(output_filename)p = figure(title="CPU Load Percentage Over Time",x_axis_label='Time (s from start)',y_axis_label='Load (%)',width=1200,height=400,tools="pan,wheel_zoom,box_zoom,reset")colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728','#9467bd', '#8c564b', '#e377c2', '#7f7f7f']# 为每个CPU添加折线for idx, cpu in enumerate(sorted(cpu_load_data.keys())):data = cpu_load_data[cpu]if not data:continuetimes, loads = zip(*data)source = ColumnDataSource(data={'time': times,'load': loads,'cpu': [f'CPU {cpu}'] * len(times)})p.line('time', 'load',source=source,legend_label=f'CPU {cpu}',line_width=1.5,color=colors[idx % len(colors)])# 配置悬停工具hover = HoverTool(tooltips=[('CPU', '@cpu'),('Time', '@time{0.2f} s'),('Load', '@load{0.2f}%'),])p.add_tools(hover)p.legend.location = "top_left"p.legend.click_policy = "hide"# 生成线程表格数据cpus = sorted(state.cpus.keys())thread_data = []for (name, pid), stats in state.threads.items():total_time = stats.total_timetotal_percent = total_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row = {'PID': pid, 'Name': name, 'Total (%)': f"{total_percent:.2f}%"}for cpu in cpus:cpu_time = stats.cpu_time.get(cpu, 0.0)cpu_percent = cpu_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row[f'CPU {cpu}'] = f"{cpu_percent:.2f}%"thread_data.append(row)# 将 thread_data 转换为字典形式column_data = defaultdict(list)for row in thread_data:for key, value in row.items():column_data[key].append(value)# 创建数据表格columns = [TableColumn(field="PID", title="PID", width=80),TableColumn(field="Name", title="Thread Name", width=200),TableColumn(field="Total (%)", title="Total Load (%)", width=100),]for cpu in cpus:columns.append(TableColumn(field=f'CPU {cpu}', title=f'CPU {cpu} (%)', width=100))source = ColumnDataSource(column_data)data_table = DataTable(source=source,columns=columns,width=1200,height=400,editable=False,index_position=None,autosize_mode="none")# 组合布局并保存layout = column(p, data_table)save(layout)report_duration = (datetime.datetime.now() - report_start).total_seconds()logger.info(f"end report, {output_filename}, consume duration: {report_duration:.2f}s")# ----------------------
# 主流程
# ----------------------
def analyze_ftrace(log_path: str):"""主分析流程"""state = GlobalState()first_ts = Nonelast_ts = 0.0parse_start = datetime.datetime.now()logger.info("start parse ftrace log...")with open(log_path) as f:line_count = sum(1 for _ in f)logger.info(f"ftrace log file contains {line_count} lines")f.seek(0)for line in f:entry = FtraceParser.parse_line(line)if not entry:continuets = entry['timestamp']if first_ts is None:first_ts = tslast_ts = tsif entry['event'] == 'sched_switch':EventProcessor.handle_sched_switch(state, entry)elif entry['event'] == 'sched_stat_runtime':EventProcessor.handle_sched_stat_runtime(state, entry)# 处理最后未结束的时间段for cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)comm = running_commis_active = not EventProcessor._is_swapper(comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, last_ts, is_active))duration = last_ts - start_tsif is_active:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationduration = (datetime.datetime.now() - parse_start).total_seconds()logger.info(f"end parse ftrace log, duration:{duration:.2f}s")# 生成报告if first_ts is not None:ReportGenerator.generate_html_report(state, first_ts, last_ts)if __name__ == "__main__":import sysif len(sys.argv) != 2:print("Usage: python ftrace_analyzer.py <ftrace.log>")sys.exit(1)analyze_ftrace(sys.argv[1])

2.Plotly

import re
import datetime
import numpy as np
import logging
from collections import defaultdict
from typing import Dict, Tuple, Optional, List
import plotly.graph_objects as go
import pandas as pd# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)# ----------------------
# 类型定义
# ----------------------
Timestamp = float  # 时间戳(秒)
CPUID = int  # CPU标识符
PID = int  # 进程IDclass ThreadStats:"""线程统计数据结构"""def __init__(self):self.name: str = ""  # 线程名self.cpu_time: Dict[CPUID, float] = defaultdict(float)  # 各CPU累计时间self.total_time: float = 0.0  # 总运行时间class CPUStats:"""CPU统计数据结构"""def __init__(self):self.total_active: float = 0.0  # 总活跃时间(非swapper线程)self.total_idle: float = 0.0  # 总空闲时间(swapper线程)self.time_periods: List[Tuple[float, float, bool]] = []  # 时间段记录(开始时间,结束时间,是否活跃)class GlobalState:"""全局状态容器"""def __init__(self):self.threads: Dict[Tuple[str, PID], ThreadStats] = defaultdict(ThreadStats)  # (线程名, PID) -> 线程统计self.cpus: Dict[CPUID, CPUStats] = defaultdict(CPUStats)  # CPU -> 统计信息self.current_pid: Dict[CPUID, Tuple[Tuple[str, PID], Timestamp]] = {}  # CPU当前运行的(线程名, PID)及开始时间self.pid_to_name: Dict[PID, str] = {}  # PID到线程名的映射# ----------------------
# 日志解析模块
# ----------------------
class FtraceParser:"""Ftrace日志解析器"""LOG_REGEX = re.compile(r'^(?P<comm>[^-]+)-(?P<pid>\d+)\s+\[(?P<cpu>\d+)\]\s+'r'(?P<timestamp>\d+\.\d+):\s+'r'(?P<event>\w+):\s+(?P<details>.*)$')@classmethoddef parse_line(cls, line: str) -> Optional[dict]:"""解析单行日志,返回结构化数据"""start_time = datetime.datetime.now()match = cls.LOG_REGEX.match(line.strip())if not match:logger.debug(f"无法解析的行: {line.strip()}")return Noneparsed_data = {'comm': match.group('comm'),'pid': int(match.group('pid')),'cpu': int(match.group('cpu')),'timestamp': float(match.group('timestamp')),'event': match.group('event'),'details': match.group('details')}duration = (datetime.datetime.now() - start_time).total_seconds() * 1000logger.debug(f"解析成功: {parsed_data}, 耗时: {duration:.2f}ms")return parsed_data# ----------------------
# 事件处理模块
# ----------------------
class EventProcessor:"""事件处理器"""SWAPPER_NAMES = ('swapper', 'swapper/')  # 空闲线程名称特征@classmethoddef _is_swapper(cls, comm: str, pid: int) -> bool:"""判断是否为表示空闲的swapper线程"""return pid == 0 or comm.startswith(cls.SWAPPER_NAMES)@classmethoddef handle_sched_switch(cls, state: GlobalState, entry: dict):"""处理进程切换事件"""start_time = datetime.datetime.now()details = entry['details']match = re.match(r'(?P<prev_comm>[^:]+(?::[^:]+)*):(?P<prev_pid>\d+).*?==>\s+'r'(?P<next_comm>[^:]+(?::[^:]+)*):(?P<next_pid>\d+)',details)if not match:logger.warning(f"无法解析的sched_switch事件: {details}")returnlogger.debug(f"开始处理sched_switch事件: CPU={entry['cpu']}, 从 {match.group('prev_comm')}:{match.group('prev_pid')} 切换到 {match.group('next_comm')}:{match.group('next_pid')}")cpu = entry['cpu']ts = entry['timestamp']# 提取 prev 线程信息prev_comm = match.group('prev_comm')prev_pid = int(match.group('prev_pid'))prev_key = (prev_comm, prev_pid)# 提取 next 线程信息next_comm = match.group('next_comm')next_pid = int(match.group('next_pid'))next_key = (next_comm, next_pid)# 结束前一个线程的时间统计if cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)duration = ts - start_tsprev_running_comm = running_comm# 记录时间段is_active = not cls._is_swapper(prev_running_comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, ts, is_active))# 根据线程类型统计时间if cls._is_swapper(prev_running_comm, running_pid):state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += duration# 记录新线程信息state.current_pid[cpu] = (next_key, ts)state.pid_to_name[prev_pid] = prev_commstate.pid_to_name[next_pid] = next_commstate.threads[prev_key].name = prev_commstate.threads[next_key].name = next_comm@classmethoddef handle_sched_stat_runtime(cls, state: GlobalState, entry: dict):"""处理运行时统计事件"""start_time = datetime.datetime.now()details = entry['details']comm = re.search(r'comm=([^\s]+)', details).group(1)pid = int(re.search(r'pid=(\d+)', details).group(1))runtime_ns = int(re.search(r'runtime=(\d+)', details).group(1))cpu = entry['cpu']runtime = runtime_ns / 1e9  # 转换为秒key = (comm, pid)logger.debug(f"开始处理sched_stat_runtime: CPU={cpu}, {comm}:{pid}, 运行时间={runtime:.6f}s")if cls._is_swapper(comm, pid):state.cpus[cpu].total_idle += runtimestate.threads[key].cpu_time[cpu] += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = commelse:state.cpus[cpu].total_active += runtimestate.threads[key].total_time += runtimestate.threads[key].name = commstate.pid_to_name[pid] = comm# ----------------------
# 报告生成模块
# ----------------------
class ReportGenerator:"""统计报告生成器"""@staticmethoddef generate_html_report(state: GlobalState, start_ts: float, end_ts: float, output_filename: str = "report.html"):"""生成HTML可视化报告"""report_start = datetime.datetime.now()logger.info(f"start generate report, duration:{start_ts:.2f}s-{end_ts:.2f}s, output file: {output_filename}")# 生成CPU负载曲线数据window_size = 1.0  # 滑动窗口大小(秒)interval = 0.1  # 采样间隔(秒)time_points = np.arange(start_ts, end_ts, interval)cpu_load_data = defaultdict(list)logger.info(f"start analysis cpu loading, cpu number: {len(state.cpus)}")for cpu in state.cpus:periods = state.cpus[cpu].time_periodsfor t in time_points:window_start = t - window_size / 2window_end = t + window_size / 2if window_start < start_ts or window_end > end_ts:continuetotal_active_time = 0.0for (p_start, p_end, is_active) in periods:if p_end <= window_start or p_start >= window_end:continueoverlap_start = max(p_start, window_start)overlap_end = min(p_end, window_end)if is_active:total_active_time += overlap_end - overlap_startload_pct = (total_active_time / window_size) * 100 if window_size > 0 else 0cpu_load_data[cpu].append((t - start_ts, load_pct))logger.info(f"create chart...")# 创建图表fig = go.Figure()colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728','#9467bd', '#8c564b', '#e377c2', '#7f7f7f']# 为每个CPU添加折线for idx, cpu in enumerate(sorted(cpu_load_data.keys())):data = cpu_load_data[cpu]if not data:continuetimes, loads = zip(*data)fig.add_trace(go.Scatter(x=list(times),y=list(loads),mode='lines',name=f'CPU {cpu}',line=dict(color=colors[idx % len(colors)]),hovertemplate='CPU: %{name}<br>Time: %{x:.2f} s<br>Load: %{y:.2f}%'))fig.update_layout(title="CPU Load Percentage Over Time",xaxis_title='Time (s from start)',yaxis_title='Load (%)',legend=dict(orientation="h",yanchor="bottom",y=1.02,xanchor="right",x=1),hovermode='x unified')# 生成线程表格数据cpus = sorted(state.cpus.keys())thread_data = []for (name, pid), stats in state.threads.items():total_time = stats.total_timetotal_percent = total_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row = {'PID': pid, 'Name': name, 'Total (%)': f"{total_percent:.2f}%"}for cpu in cpus:cpu_time = stats.cpu_time.get(cpu, 0.0)cpu_percent = cpu_time / (end_ts - start_ts) * 100 if (end_ts - start_ts) > 0 else 0row[f'CPU {cpu}'] = f"{cpu_percent:.2f}%"thread_data.append(row)# 创建数据表格df = pd.DataFrame(thread_data)table = go.Table(header=dict(values=list(df.columns),fill_color='paleturquoise',align='left'),cells=dict(values=[df[col] for col in df.columns],fill_color='lavender',align='left'))# 创建布局fig2 = go.Figure(data=[table])fig2.update_layout(title="Thread Load Statistics")# 组合布局并保存from plotly.subplots import make_subplotscombined_fig = make_subplots(rows=2, cols=1,subplot_titles=("CPU Load Percentage Over Time", "Thread Load Statistics"),specs=[[{'type': 'xy'}], [{'type': 'domain'}]])for trace in fig.data:combined_fig.add_trace(trace, row=1, col=1)for trace in fig2.data:combined_fig.add_trace(trace, row=2, col=1)combined_fig.update_layout(height=800, width=1200)combined_fig.write_html(output_filename)report_duration = (datetime.datetime.now() - report_start).total_seconds()logger.info(f"end report, {output_filename}, consume duration: {report_duration:.2f}s")# ----------------------
# 主流程
# ----------------------
def analyze_ftrace(log_path: str):"""主分析流程"""state = GlobalState()first_ts = Nonelast_ts = 0.0parse_start = datetime.datetime.now()logger.info("start parse ftrace log...")with open(log_path) as f:line_count = sum(1 for _ in f)logger.info(f"ftrace log file contains {line_count} lines")f.seek(0)for line in f:entry = FtraceParser.parse_line(line)if not entry:continuets = entry['timestamp']if first_ts is None:first_ts = tslast_ts = tsif entry['event'] == 'sched_switch':EventProcessor.handle_sched_switch(state, entry)elif entry['event'] == 'sched_stat_runtime':EventProcessor.handle_sched_stat_runtime(state, entry)# 处理最后未结束的时间段for cpu in state.current_pid:(running_comm, running_pid), start_ts = state.current_pid[cpu]running_key = (running_comm, running_pid)comm = running_commis_active = not EventProcessor._is_swapper(comm, running_pid)state.cpus[cpu].time_periods.append((start_ts, last_ts, is_active))duration = last_ts - start_tsif is_active:state.cpus[cpu].total_active += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationelse:state.cpus[cpu].total_idle += durationstate.threads[running_key].cpu_time[cpu] += durationstate.threads[running_key].total_time += durationduration = (datetime.datetime.now() - parse_start).total_seconds()logger.info(f"end parse ftrace log, duration:{duration:.2f}s")# 生成报告if first_ts is not None:ReportGenerator.generate_html_report(state, first_ts, last_ts)if __name__ == "__main__":import sysif len(sys.argv) != 2:print("Usage: python ftrace_analyzer.py <ftrace.log>")sys.exit(1)analyze_ftrace(sys.argv[1])

相关文章:

  • 加一:从简单问题到复杂边界的深度思考
  • 每日一记:CRT和图论
  • 【软考-高级】【信息系统项目管理师】【论文基础】资源管理过程输入输出及工具技术的使用方法
  • vue3专题1------父组件中更改子组件的属性
  • 【信息系统项目管理师】高分论文:论信息系统项目的干系人管理(商业银行绩效考核系统)
  • Prompt-Tuning 提示词微调
  • 离线安装elasticdump并导入和导出数据
  • Android Studio 获取配置资源与第三方包信息详解
  • ProfiNet转DeviceNet边缘计算网关多品牌集成实践:污水处理厂设备网络融合全流程解析
  • [特殊字符] Kotlin与C的类型别名终极对决:typealias vs typedef,如何让代码脱胎换骨?
  • 大模型API中转平台选择指南:如何找到优质稳定的服务
  • 从头开始掌握扩散概率模型
  • 知识就是力量——一些硬件的使用方式
  • 【Lua语言】Lua语言快速入门
  • C++用于保留浮点数的两位小数,使用宏定义方法(可兼容低版本Visual Studio)
  • 【内置函数】84个Python内置函数全整理
  • 每日OJ_牛客_kotori和素因子_DFS_C++_Java
  • Warcraft Logs [Classic] [WCL] BOSS ID query
  • 关于viewpager常见的泄漏
  • 23种设计模式全解析及其在自动驾驶开发中的应用
  • 中国船协发布关于美对华造船业实施限制措施的严正声明
  • 张巍|另眼看古典学⑩:再创作让古希腊神话重获生机——重述厄勒克特拉
  • 女子报警称醉酒后疑似被性侵,长沙警方:嫌犯邱某某已被刑拘
  • 中国正在俄罗斯国内生产武器?外交部:坚决反对无端指责和政治操弄
  • 昆明一垃圾车致人身亡事故调查报告:驻车制动装置失效,司机欲阻停被撞
  • 海南陵水县一别墅区被指违建已获确认,60岁举报人曾两度遭人蒙面袭击