etcd 的安装及使用
介绍
Etcd 是一个 golang 编写的分布式、高可用的一致性键值存储系统,用于配置共享和服务发现等。它使用 Raft 一致性算法来保持集群数据的一致性,且客户端通过长连接 watch 功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化。
安装
首先,需要在你的系统中安装 Etcd 。 Etcd 是一个分布式键值存储,通常用于服务发现和配置管理。以下是在 Linux 系统上安装 Etcd 的基本步骤:
- 安装 Etcd:
sudo apt-get install etcd
- 启动 Etcd 服务:
sudo systemctl start etcd
- 设置 Etcd 开机自启:
sudo systemctl enable etcd
配置文件
如果是单节点集群其实就可以不用进行配置,默认 etcd 的集群节点通信端口为 2380 ,客户端访问端口为 2379。若需要修改,则可以配置:/etc/default/etcd。
我们可以先查看 etcd 监听的地址,发现其默认监听的都是本地主机端口,要想让外部主机也能连接上 etcd 服务器,则可以修改 /etc/default/etcd 文件下的配置。

#用于客户端连接的 URL
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:2379"
#用于客户端访问的公开,也就是提供服务的 URL,ip要填入主机的公网ip
ETCD_ADVERTISE_CLIENT_URLS="http://ip:2379"
重新启动 etcd 服务后再查看 etcd 服务器的监听情况
运行验证
etcdctl put mykey "this is awesome"
如果出现报错:
No help topic for 'put'
则 sudo vi /etc/profile 在末尾声明环境变量 ETCDCTL_API=3 以确定 etcd 版本。
export ETCDCTL_API=3
完毕后,加载配置文件,并重新执行测试指令
source /etc/profile
etcdctl put mykey "this is awesome"
搭建服务注册发现中心
使用 Etcd 作为服务注册发现中心,你需要定义服务的注册和发现逻辑。这通常涉及到以下几个操作:
- 服务注册:服务启动时,向 Etcd 注册自己的地址和端口。
- 服务发现:客户端通过 Etcd 获取服务的地址和端口,用于远程调用。
- 健康检查:服务定期向 Etcd 发送心跳,以维持其注册信息的有效性。
etcd 采用 golang 编写, v3 版本通信采用 grpc API ,即 (HTTP2+protobuf),官方只维护了 go 语言版本的 client 库,如果想使用 C/C++ 编写,则需要找到 C/C++ 非官方的 client 开发库:
etcd-cpp-apiv3
etcd-cpp-apiv3 是一个 etcd 的 C++ 版本客户端 API 。它依赖于 mipsasm, boost, protobuf, gRPC, cpprestsdk 等库。
etcd-cpp-apiv3 的 GitHub 地址是: https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3
依赖安装:
sudo apt-get install libboost-all-dev libssl-dev
sudo apt-get install libprotobuf-dev protobuf-compiler-grpc
sudo apt-get install libgrpc-dev libgrpc++-dev
sudo apt-get install libcpprest-dev
api 框架安装:
可以先创建一个 etcd 目录并进入该目录,然后在该目录下拉取文件并用 cmake 构建文件:
mkdir etcd && cd etcd
git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr
make -j$(nproc) && sudo make install
客户端类与接口介绍
// pplx::task 并行库异步结果对象
// 阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
// 非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态
namespace etcd
{class Value{bool is_dir(); // 判断是否是一个目录std::string const &key(); // 键值对的 key 值std::string const &as_string(); // 键值对的 val 值int64_t lease(); // 用于创建租约的响应中,返回租约 ID};// etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端// 在通知客户端的时候,会返回改变前的数据和改变后的数据class Event{enum class EventType{PUT, // 键值对新增或数据发生改变DELETE_, // 键值对被删除INVALID,};enum EventType event_type();const Value &kv();const Value &prev_kv();};class Response{bool is_ok();std::string const &error_message();Value const &value(); // 当前的数值 或者 一个请求的处理结果Value const &prev_value(); // 之前的数值Value const &value(int index); //std::vector<Event> const &events(); // 触发的事件};class KeepAlive{KeepAlive(Client const &client, int ttl, int64_t lease_id = 0);// 返回租约 IDint64_t Lease();// 停止保活动作void Cancel();};class Client{// etcd_url: "http://127.0.0.1:2379"Client(std::string const &etcd_url, std::string const &load_balancer = "round_robin");// Put a new key-value pair 新增一个键值对pplx::task<Response> put(std::string const &key, std::string const &value);// 新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)pplx::task<Response> put(std::string const &key, std::string const &value,const int64_t leaseId);// 获取一个指定 key 目录下的数据列表pplx::task<Response> ls(std::string const &key);// 创建并获取一个存活 ttl 时间的租约pplx::task<Response> leasegrant(int ttl);// 获取一个租约保活对象,其参数 ttl 表示租约有效时间pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);// 撤销一个指定的租约pplx::task<Response> leaserevoke(int64_t lease_id);// 数据锁pplx::task<Response> lock(std::string const &key);};class Watcher{Watcher(Client const &client,std::string const &key, // 要监控的键值对 keystd::function<void(Response)> callback, // 发生改变后的回调bool recursive = false); // 是否递归监控目录下的所有数据改变Watcher(std::string const &address,std::string const &key,std::function<void(Response)> callback,bool recursive = false);// 阻塞等待,直到监控任务被停止bool Wait();bool Cancel();};
}
使用案例
put.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <thread>
#include <iostream>
int main()
{// etcd服务器的ip地址std::string etcd_host = "http://127.0.0.1:2379";// 实例化客户端连接etcd服务器etcd::Client client(etcd_host);// 向etcd新增数据auto rsp1 = client.put("/service/user", "127.0.0.1:8080").get();if (rsp1.is_ok() == false){std::cout << "新增数据失败:" << rsp1.error_message() << std::endl;return -1;}// 新增一个有保活期限的数据// 1.获取租约保活对象,设定租期为3sauto keep_alive = client.leasekeepalive(3).get();// 2.获取租约idauto lease_id = keep_alive->Lease();// 3.新增数据auto rsp2 = client.put("/service/friend", "127.0.0.1:9090",lease_id).get();if (rsp2.is_ok() == false){std::cout << "新增数据失败:" << rsp2.error_message() << std::endl;return -1;}std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}
get.cc
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include <iostream>
void callback(const etcd::Response &rsp)
{if (rsp.is_ok() == false){std::cout << "收到一个错误的事件通知:" << rsp.error_message() << std::endl;return;}for (auto &ev : rsp.events()){if (ev.event_type() == etcd::Event::EventType::PUT){std::cout << "服务信息发生了改变:\n";std::cout << "当前值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;std::cout << "原先值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;}else if (ev.event_type() == etcd::Event::EventType::DELETE_){std::cout << "服务信息下线:\n";std::cout << "当前值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;std::cout << "原先值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;}}
}
int main()
{// etcd服务器的ip地址std::string etcd_host = "http://127.0.0.1:2379";// 实例化客户端连接etcd服务器etcd::Client client(etcd_host);// 从etcd获取指定数据auto rsp = client.ls("/service").get();if (rsp.is_ok() == false){std::cout << "获取数据失败:" << rsp.error_message() << std::endl;return -1;}int sz = rsp.keys().size();for (int i = 0; i < sz; i++)std::cout << rsp.value(i).as_string() << "可以提供" << rsp.key(i) << "服务" << std::endl;// 实例化一个键值对事件的监控对象auto watcher = etcd::Watcher(client, "/service", callback, true);watcher.Wait();return 0;
}
makefile
all:put get
put:put.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest
get:get.ccg++ -o $@ $^ -std=c++17 -letcd-cpp-api -lcpprest.PHONY:clean
clean:rm -f put get
封装服务发现与注册功能
在服务的注册与发现中,主要基于 etcd 所提供的可以设置有效时间的键值对存储来实现。
服务注册
主要是在 etcd 服务器上存储一个租期 ns 的保活键值对,表示所能提供指定服务的节点主机,比如 /service/user/instance-1 的 key ,且对应的 val 为提供服务的主机节点地址:
<key, val> -- < /service/user/instance-1, 127.0.0.1:9000>
- /service 是主目录,其下会有不同服务的键值对存储
- /user 是服务名称,表示该键值对是一个用户服务的节点
- /instance-1 是节点实例名称,提供用户服务可能会有很多节点,每个节点都应该有自己独立且唯一的实例名称
当这个键值对注册后,服务发现方可以基于目录进行键值对的发现。且一旦注册节点退出,保活失败,则 租约失效键值对被删除, etcd 会通知发现方数据的失效,进而实现服务下线通知的功能。
服务发现
服务发现分为两个过程:
- 刚启动客户端的时候,进行 ls 目录浏览,进行/service 路径下所有键值对的获取
- 对关心的服务进行 watcher 观测,一旦数值发生变化(新增/删除),收到通知进行节点的管理
如果 ls 的路径为 /service ,则会获取到 /service/user, /service/firend 等其路径下的所有能够提供服务的实例节点数据。如果 ls 的路径为 /service/user ,则会获取到 /service/user/instancd-1, /service/user/instance-2 等所有提供用户服务的实例节点数据。
客户端可以将发现的所有 < 实例 - 地址 > 管理起来,以便于进行节点的管理:
- 收到新增数据通知,则向本地管理添加新增的节点地址 -- 服务上线
- 收到删除数据通知,则从本地管理删除对应的节点地址 -- 服务下线
因为管理了所有的能够提供服务的节点主机的地址,因此当需要进行 rpc 调用的时候,则根据服务名称,获取一个能够提供服务的主机节点地址进行访问就可以了,而这里的获取策略,我们采用 RR 轮转策略。
封装思想
将 etcd 的操作全部封装起来,也不需要管理数据,只需要向外四个基础操作接口:
- 进行服务注册,也就是向 etcd 添加 <服务-主机地址>的数据
- 进行服务发现,获取当前所有能提供服务的信息
- 设置服务上线的处理回调接口
- 设置服务下线的处理回调接口
这样封装之后,外部的 rpc 调用模块,可以先获取所有的当前服务信息,建立通信连接进行 rpc 调用,也能在有新服务上线的时候新增连接,以及下线的时候移除连接。
#pragma once
#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <functional>
// 服务注册客户端
class Registry
{
public:using ptr = std::shared_ptr<Registry>;Registry(const std::string &host): _client(std::make_shared<etcd::Client>(host)),_keep_alive(_client->leasekeepalive(3).get()),_lease_id(_keep_alive->Lease()){}~Registry(){_keep_alive->Cancel();}bool registry(const std::string &key, const std::string &val){auto rsp = _client->put(key, val, _lease_id).get();if (rsp.is_ok() == false){std::cout << "服务注册失败:" << rsp.error_message() << std::endl;return false;}return true;}private:std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::KeepAlive> _keep_alive;uint64_t _lease_id; // 注意租约id的类型必须是uint64_t
};
// 服务发现客户端
class Discovery
{
public:using ptr = std::shared_ptr<Discovery>;using NotifyCallback = function<void(std::string, std::string)>;Discovery(const std::string &host, const std::string &basedir,const NotifyCallback &put_cb, const NotifyCallback &del_cb): _client(std::make_shared<etcd::Client>(host)),_put_cb(put_cb), _del_cb(del_cb){// 先进行服务发现,获取当前已有的数据auto rsp = _client->ls(basedir).get();if (rsp.is_ok() == false){std::cout << "获取服务信息失败:" << rsp.error_message() << std::endl;}int sz = rsp.keys().size();for (int i = 0; i < sz; i++){if (_put_cb)_put_cb(rsp.key(i), rsp.value(i).as_string());}// 然后进行事件监控,监控数据的变化并调用回调函数进行处理_watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,std::bind(&Discovery::callback, this, std::placeholders::_1), true);}void wait(){_watcher->Wait();}~Discovery(){_watcher->Cancel();}private:void callback(const etcd::Response &rsp){if (rsp.is_ok() == false){std::cout << "收到一个错误的事件通知" << std::endl;return;}for (auto &ev : rsp.events()){if (ev.event_type() == etcd::Event::EventType::PUT){if (_put_cb)_put_cb(ev.kv().key(), ev.kv().as_string());cout << "新增服务" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;}else if (ev.event_type() == etcd::Event::EventType::DELETE_){if (_del_cb)_del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());cout << "下线服务" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;}}}private:NotifyCallback _put_cb;NotifyCallback _del_cb;std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::Watcher> _watcher;
};