【sylar-webserver】重构日志系统
文章目录
- 主要工作
- 流程图
- FiberCondition
- Buffer
- BufferManager
- LogEvent 序列化 & 反序列化
- Logger
- RotatingFileLogAppender
主要工作
- 实现, LogEvent 序列化和反序列化 (使用序列化是为了更标准,如果转成最终的日志格式再存储(确实会快点,但是对于缓冲区的利用就不够了)
- 优化,使用 LoggerBuild 建造者 实现和管理日志器;
- 双缓冲区设计,使用条件变量等同步技术实现日志异步处理器。支持定时检查缓冲区 和 生产者缓冲区 唤醒;
- 使用 WorkerManager 管理多个调度器;
- 增加 循环日志写入,按时间分片的 LoggerAppender
支持上传下载和展示功能,支持备份重要日志;(待实现网络库后实现)- 性能测试,2h4g 服务器下,异步日志器每秒输出 130MB 日志器。
流程图
FiberCondition
仿照 std::condition_variable
存在的问题,由于是 notify_one 是把 fiber 重新添加调度,并且调度策略是先来先服务。
所以,存在日志时序性错误问题。(解决方法,后期修改调度策略,增加优先级。)⭐
class FiberCondition{
public:using MutexType = Spinlock;void wait(MutexType::Lock& lock);template <typename Predicate>void wait(MutexType::Lock& lock, Predicate pred){while(!pred()){wait(lock);}}void notify_one();void notify_all();private:void printWaiters() const;private:MutexType m_mutex;std::list<std::pair<Scheduler*, Fiber::ptr>> m_waiters;
};
void FiberCondition::wait(MutexType::Lock& lock){SYLAR_ASSERT(Scheduler::GetThis());{MutexType::Lock lock(m_mutex);m_waiters.push_back(std::make_pair(Scheduler::GetThis(), Fiber::GetThis()));printWaiters();}lock.unlock();Fiber::GetThis()->yield();lock.lock();
}void FiberCondition::notify_one(){MutexType::Lock lock(m_mutex);if (!m_waiters.empty()) {auto next = m_waiters.front();m_waiters.pop_front();next.first->schedule(next.second);}
}void FiberCondition::notify_all() {MutexType::Lock lock(m_mutex);for (auto& waiter : m_waiters) {waiter.first->schedule(waiter.second);}m_waiters.clear();
}
Buffer
class Buffer {public:using ptr = std::shared_ptr<Buffer>;using MutexType = Spinlock;Buffer(size_t buffer_size);Buffer(size_t buffer_size, size_t threshold, size_t linear_growth);void push(const char* data, size_t len);void push(const std::string& str);char* readBegin(int len);bool isEmpty();void swap(Buffer& buf);size_t writeableSize();size_t readableSize() const;const char* Begin() const;void moveWritePos(int len);void moveReadPos(int len);void Reset();protected:void ToBeEnough(size_t len);private:MutexType m_mutex;size_t m_buffer_size;size_t m_threshold;size_t m_linear_growth;std::vector<char> m_buffer;size_t m_write_pos = 0;size_t m_read_pos = 0;
};
BufferManager
重点操作:
BufferManager::BufferManager(const functor& cb, // 消费者缓冲区写入的回调函数 ⭐AsyncType::Type asyncType,size_t buffer_size,size_t threshold,size_t linear_growth,size_t swap_time,IOManager* iom): m_stop(false),m_swap_status(false),m_asyncType(asyncType),m_buffer_productor(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),m_buffer_consumer(std::make_shared<Buffer>(buffer_size, threshold, linear_growth)),m_callback(cb),m_swap_time(swap_time)
{assert(iom != nullptr);iom->schedule(std::bind(&BufferManager::ThreadEntry, this));m_timer = iom->addTimer(m_swap_time, std::bind(&BufferManager::TimerThreadEntry, this), true);
}// 写入线程,生产者
void BufferManager::push(const char* data, size_t len) {MutexType::Lock lock(m_mutex);if(m_asyncType == AsyncType::ASYNC_SAFE){if (len > m_buffer_productor->writeableSize()) {SYLAR_LOG_DEBUG(g_logger) << "notify consumer";m_cond_consumer.notify_one();}m_cond_producer.wait(lock, [&](){return (m_stop || (len <= m_buffer_productor->writeableSize()));});}if(m_stop){throw std::runtime_error("BufferManager is stopped");}m_buffer_productor->push(data, len);
}// 使用Timer,按照频率访问缓冲区
// 如果生产者没有就退出
void BufferManager::TimerThreadEntry(){{MutexType::Lock lock(m_mutex);if ((!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty()) || m_stop) {swap_buffers();if(m_asyncType == AsyncType::ASYNC_SAFE){m_cond_producer.notify_all();}}else{return;}}{MutexType::Lock lock(m_swap_mutex);m_callback(m_buffer_consumer);m_buffer_consumer->Reset();}
}void BufferManager::ThreadEntry() {while(true){{MutexType::Lock lock(m_mutex);SYLAR_LOG_DEBUG(g_logger) << "ThreadEntry started.";m_cond_consumer.wait(lock, [&](){return m_stop || (!m_buffer_productor->isEmpty() && m_buffer_consumer->isEmpty());});swap_buffers();if(m_asyncType == AsyncType::ASYNC_SAFE){m_cond_consumer.notify_all();}}{MutexType::Lock lock(m_swap_mutex);m_callback(m_buffer_consumer);m_buffer_consumer->Reset();if(m_stop && m_buffer_productor->isEmpty()) return;}}
}
LogEvent 序列化 & 反序列化
#pragma pack(push, 1)
struct LogMeta {uint64_t timestamp; // 时间戳uint32_t threadId; // 线程IDuint32_t fiberId; // 协程IDint32_t line; // 行号uint32_t elapse;LogLevel::Level level; // 日志级别uint16_t fileLen; // 文件名长度uint32_t threadNameLen;// 线程名长度uint32_t msgLen; // 消息内容长度
};
#pragma pack(pop)
Buffer::ptr LogEvent::serialize() const {LogMeta meta{ // ⭐.timestamp = m_time,.threadId = m_threadId,.fiberId = m_fiberId,.line = m_line,.elapse = m_elapse,.level = m_level,.fileLen = static_cast<uint16_t>(m_file.size()),.threadNameLen = static_cast<uint16_t>(m_threadName.size()),.msgLen = static_cast<uint32_t>(m_ss.str().size())};const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;auto buffer = std::make_shared<Buffer>(total_need); // 使用 shared_ptr 管理内存// 序列化元数据buffer->push(reinterpret_cast<const char*>(&meta), sizeof(meta));// 序列化变长数据(包含终止符)buffer->push(m_file.c_str(), meta.fileLen);buffer->push(m_threadName.c_str(), meta.threadNameLen);buffer->push(m_ss.str().c_str(), meta.msgLen);return buffer; // 返回 shared_ptr}// 每次调用,解析一个LogEventstatic LogEvent::ptr LogEvent::deserialize(Buffer& buffer) {if(buffer.readableSize() < sizeof(LogMeta)) {return nullptr;}LogMeta meta;memcpy(&meta, buffer.Begin(), sizeof(LogMeta));const size_t total_need = sizeof(LogMeta) + meta.fileLen + meta.threadNameLen + meta.msgLen;if(buffer.readableSize() < total_need){return nullptr;}// 4. 提取各字段数据(使用临时指针操作)const char* data_ptr = buffer.Begin() + sizeof(LogMeta);// 文件名处理std::string file(data_ptr, meta.fileLen);data_ptr += meta.fileLen;// 线程名std::string thread_name(data_ptr, meta.threadNameLen);data_ptr += meta.threadNameLen;// 消息内容处理std::string message(data_ptr, meta.msgLen);// 5. 统一移动读指针(原子操作保证数据一致性)buffer.moveReadPos(total_need);// 6. 构建日志事件对象auto event = std::make_shared<LogEvent>(std::move(file),meta.line,meta.elapse,meta.threadId,std::move(thread_name),meta.fiberId,meta.timestamp,meta.level);event->getSS() << message;return event;}
Logger
重构时,出现的问题:Logger 对 BufferManager 的依赖,并且 BufferManger 也依赖 IOMgr调度器。
简单说,就说 全局静态变量的初始化顺序问题
解决方法:Logger默认构造的时候,不提供BufferParams,就使用同步方式创建。
导入 yaml 配置后,重置 logger,再创建 异步日志器
Logger(const std::string name, LogLevel::Level level,std::vector<LogAppender::ptr>& appenders, const BufferParams& bufParams) :m_name(name), m_level(level), m_appenders(appenders.begin(), appenders.end()){if(bufParams.isValid()){m_bufMgr = std::make_shared<BufferManager>(std::bind(&Logger::realLog, this, std::placeholders::_1), bufParams);}else{m_bufMgr = nullptr;}}// 由 iom_log 写入真正的文件。void realLog(Buffer::ptr buffer) {MutexType::Lock lock(m_log_mutex); // 强制 只能 一个线程写入。if (!buffer) {std::cerr << "realLog: invalid buffer pointer" << std::endl;return;}std::vector<LogEvent::ptr> events;while (true) { // 解析 bufferLogEvent::ptr event = LogEvent::deserialize(*buffer);// 理论上 buffer 里是多个Event的数据,不存在处理失败。if (event) {events.push_back(event);} else {if (buffer->readableSize() == 0) { // 读完了break;} else {// 处理失败但数据未读完(说明发生严重错误)std::cout << "Log deserialization error, remaining data: " << buffer->readableSize() << std::endl;break;}}}auto self = shared_from_this();for (auto& appender : m_appenders) {appender->log(self, events);}}// 写入缓冲区// 多个线程的 写日志,写入缓存区void log(LogEvent::ptr event){if(event->getLevel() >= m_level){if(m_bufMgr != nullptr){// MutexType::Lock lock(m_mutex); 当协程阻塞,这个锁就一直没释放。搞半天,给我整的怀疑人生了。Buffer::ptr buf = event->serialize();m_bufMgr->push(buf);}else{// 如果没有配置iom,直接同步输出日志auto self = shared_from_this();for(auto& appender : m_appenders) {appender->log(self, event);}}}}
RotatingFileLogAppender
FileLogAppeneder 改为使用 FILE 库函数
支持功能:
- max_size,限制单个日志文件大小(按照时间片创建文件名)
- m_maxFile = 0,无限增加日志
- m_maxFile > 0,限制日志文件的个数。当超过,从第一个文件循环写入。
class RotatingFileLogAppender : public LogAppender{
public:typedef std::shared_ptr<RotatingFileLogAppender> ptr;RotatingFileLogAppender(const std::string& filename, LogLevel::Level level, LogFormatter::ptr formatter,size_t max_size,size_t max_file = 0, // 默认是无限增加FlushRule::Rule flush_rule = FlushRule::Rule::FFLUSH // 默认是普通日志 );~RotatingFileLogAppender(){if(m_curFile){fclose(m_curFile);m_curFile = NULL;}}std::string toYamlString();void log(std::shared_ptr<Logger> logger, LogEvent::ptr event) override;void log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events) override; private:void initLogFile(size_t len = 0);/*** 判断是否写的下,如果写的下就 ss<<str,缓存* 如果写不写了,就把 ss 缓存一次性写入。重置ss */bool checkLogFile(const std::string& str);std::string createFilename();
private:std::string m_filename;FILE* m_curFile = NULL;std::vector<std::string> m_fileNames;size_t m_maxSize;size_t m_maxFile;FlushRule::Rule m_flushRule;size_t m_curFilePos = 0;size_t m_curFileIndex = 0;Buffer m_buffer;
};
void RotatingFileLogAppender::initLogFile(size_t len){if(m_curFile == NULL || (m_curFilePos + len) > m_maxSize){// 写不下了,保证日志的完整性,直接新建文件。if(m_curFile != NULL){fflush(m_curFile);fclose(m_curFile);if(m_maxFile == 0){// 无限增加日志文件m_curFileIndex++;}else{m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;if(!m_fileNames[m_curFileIndex].empty()){ // 说明 循环到 已有的文件了。std::string newfilename = createFilename();if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){ // 文件 改新名字perror("rename failed");} m_fileNames[m_curFileIndex] = newfilename;m_curFile = fopen(newfilename.c_str(), "r+b");fseek(m_curFile, 0, SEEK_SET); // 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。m_curFilePos = 0;return;}}}std::string filename = createFilename();m_fileNames[m_curFileIndex] = filename;m_curFile = fopen(filename.c_str(), "ab");if(m_curFile==NULL){std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;perror(NULL);}m_curFilePos = 0;return;}
}std::string RotatingFileLogAppender::createFilename() {time_t now = time(nullptr);struct tm tm;localtime_r(&now, &tm);char time_buf[64];strftime(time_buf, sizeof(time_buf), "%Y%m%d_%H%M%S", &tm);return m_filename + "_" + time_buf + "_" + std::to_string(m_curFileIndex) + ".log";
}void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, LogEvent::ptr event){MutexType::Lock lock(m_mutex);if(event->getLevel() >= m_level){std::string data = m_formatter->format(logger , event);initLogFile(data.size());fwrite(data.c_str(), 1, data.size() , m_curFile);if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}m_curFilePos += data.size();if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}}
}bool RotatingFileLogAppender::checkLogFile(const std::string& data){if(m_curFile == NULL || (m_curFilePos + data.size()) > m_maxSize){// 写不下了,保证日志的完整性,直接新建文件。if(m_curFile != NULL){// 把 ss 缓存一次性写入fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);m_buffer.Reset();// 判断错误信息if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}fclose(m_curFile);if(m_maxFile == 0){// 无限增加日志文件m_curFileIndex++;}else{m_curFileIndex = (m_curFileIndex + 1) % m_maxFile;if(!m_fileNames[m_curFileIndex].empty()){ // 说明 循环到 已有的文件了。std::string newfilename = createFilename();if(rename(m_fileNames[m_curFileIndex].c_str(), newfilename.c_str()) != 0){ // 文件 改新名字std::cout << "rename failed" << std::endl;perror(NULL);}m_fileNames[m_curFileIndex] = newfilename;m_curFile = fopen(newfilename.c_str(), "r+b");if (m_curFile == NULL) {std::cout << __FILE__ << __LINE__ << "open file failed" << std::endl;perror(NULL);}// 从头 覆盖,不考虑 日志文件名了。默认最后一个文件可能会存在过往的日志信息。fseek(m_curFile, 0, SEEK_SET); m_curFilePos = 0;// 模拟写入文件,实际上写入缓存。m_buffer.push(data);m_curFilePos += data.size();return true;}}}// m_curFile为空,创建新文件 std::string filename = createFilename();if(m_maxFile > 0){m_fileNames[m_curFileIndex] = filename; // 只有限制最大文件数,记录文件名}m_curFile = fopen(filename.c_str(), "ab");if(m_curFile==NULL){std::cout <<__FILE__<<__LINE__<<"open file failed"<< std::endl;perror(NULL);}m_curFilePos = 0;}// 模拟写入文件,实际上写入缓存。m_buffer.push(data);m_curFilePos += data.size();return false;
}void RotatingFileLogAppender::log(std::shared_ptr<Logger> logger, std::vector<LogEvent::ptr> events){// 这个时候,m_curFilePos 转变成对 m_buffer 写入数据的 pos 长度。 ⭐MutexType::Lock lock(m_mutex);for(auto& event : events){if(event->getLevel() >= m_level){std::string data = m_formatter->format(logger , event);checkLogFile(data);}}// 最后再次,把缓存里的写入if(m_buffer.readableSize() > 0 && m_curFile != NULL){fwrite(m_buffer.Begin(), 1, m_buffer.readableSize(), m_curFile);m_buffer.Reset();// 判断错误信息if(ferror(m_curFile)){std::cout <<__FILE__<<__LINE__<<"write log file failed"<< std::endl;perror(NULL);}if(m_flushRule == FlushRule::Rule::FFLUSH){if(fflush(m_curFile)==EOF){ // 刚好最后一个日志把文件写满了。std::cout <<__FILE__<<__LINE__<<"fflush file failed"<< std::endl;perror(NULL);}}else if(m_flushRule == FlushRule::Rule::FSYNC){fflush(m_curFile);fsync(fileno(m_curFile));}}
}
剩下的就是:
works,多个调度器的的管理~
对Config监听函数的修改,补充BufferParams
分离works.yml和log.yml,分别导入。提前导入works.yml 保证 调度器创建完成。
详见 代码 https://github.com/star-cs/webserver