当前位置: 首页 > news >正文

5.实现简化版raft协议完成选举

1.设计

前面已经完成了netty的集成,接下来就是借助netty完成选举就行了。

针对选举,我们用到了VotRequestMessage、VotRespMessage、当节点下线时NodeOfflineMessage、NodeOnlineMessage、NodeOnlineRespMessage

1.1 节点详细的交互

1.2 对所有消息的处理使用策略模式

由于我们是Spring的应用,我可以借助Spring的容器完成对消息类型的选择

1.2.1 创建 IMessageService 接口

所有消息的处理都实现此接口

public interface IMessageService {byte getMessageType();void execute(ChannelHandlerContext ctx, DttaskMessage message);}

1.2.2 DttaskMessage

@Data
public class DttaskMessage {public static final byte COMMON_RESP = 0X00;public static final byte PING = 0X01;public static final byte PONG = 0X02;public static final byte VOTING = 0X03;public static final byte VOT_RESP = 0X04;public static final byte NODE_OFFLINE = 0X05;public static final byte NODE_ONLINE = 0X06;public static final byte NODE_ONLINE_RESP = 0X07;// 类型private byte type;// 消息实际信息private String info;public static DttaskMessage buildPingMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(PING);dttaskMessage.setInfo(JSON.toJSONString(new PingMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildPongMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(PONG);dttaskMessage.setInfo(JSON.toJSONString(new PongMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildCommonRespMessage(String message, boolean successFlag) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(COMMON_RESP);dttaskMessage.setInfo(JSON.toJSONString(new CommonRespMessage(message, successFlag)));return dttaskMessage;}public static DttaskMessage buildNodeOnlineRespMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_ONLINE_RESP);dttaskMessage.setInfo(JSON.toJSONString(new NodeOnlineRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildNodeOnlineMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_ONLINE);dttaskMessage.setInfo(JSON.toJSONString(new NodeOnlineMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildNodeOfflineMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(NODE_OFFLINE);dttaskMessage.setInfo(JSON.toJSONString(new VotRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildVotRespMessage(long serverId) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(VOT_RESP);dttaskMessage.setInfo(JSON.toJSONString(new VotRespMessage(serverId)));return dttaskMessage;}public static DttaskMessage buildVotRequestMessage(Long lastControllerServerId, long fromServerId, long serverId, int version) {DttaskMessage dttaskMessage = new DttaskMessage();dttaskMessage.setType(VOTING);dttaskMessage.setInfo(JSON.toJSONString(new VotRequestMessage(lastControllerServerId, fromServerId, serverId, version)));return dttaskMessage;}}

1.2.3 VotingMessageService

这里以VotingMessageService为例,它实现了IMessageService接口,完成投票信息的处理。其它的*MessageService也是同样的,这里就不一一举例了,可以在 com.swsm.dttask.server.service.message包下查看

@Slf4j
@Component
public class VotingMessageService implements IMessageService {@Overridepublic byte getMessageType() {return DttaskMessage.VOTING;}@Overridepublic void execute(ChannelHandlerContext ctx, DttaskMessage message) {Channel channel = ctx.channel();VotRequestMessage votRequestMessage = JSON.parseObject(message.getInfo(), VotRequestMessage.class);long fromServerId = votRequestMessage.getFromServerId();Long lastControllerServerId = votRequestMessage.getLastControllerServerId();ServerInfo.addOtherNode(fromServerId, channel);boolean addRes = ServerInfo.addVotResult(votRequestMessage.getVersion(), votRequestMessage.getServerId());if (!addRes) {log.info("丢弃以前版本的投票信息={}", votRequestMessage);return;}// 归票VotResult votResult = ServerInfo.getVotResult();Map<Long, Integer> votMap = votResult.getVotMap();for (Map.Entry<Long, Integer> entry : votMap.entrySet()) {long controllerServerId = entry.getKey();if (votMap.get(controllerServerId) >= ServerInfo.getVotMax()) {// 归票成功log.info("本节点={}是controller", controllerServerId);ServerInfo.setStatus(ServerStatus.RUNNING);Controller controller = ServerInfo.initController();for (Long otherServerId : ServerInfo.getOtherNodeIds()) {Channel otherNodeChannel = ServerInfo.getChannelByServerId(otherServerId);otherNodeChannel.writeAndFlush(DttaskMessage.buildVotRespMessage(controllerServerId));}return;}}}
}

1.2.4 MessageServiceManager

Spring托管的bean,里面会将所有实现了IMessageService接口的类都管理起来,并在消息到来时进行选择

@Slf4j
@Component
public class MessageServiceManager {@Autowired(required = false)private List<IMessageService> messageServices;private Map<Byte, IMessageService> messageServiceMap = new HashMap<>();@PostConstructpublic void init() {if (messageServices != null) {for (IMessageService messageService : messageServices) {messageServiceMap.put(messageService.getMessageType(), messageService);}}}public IMessageService chooseMessageService(byte messageType) {if (messageServiceMap.containsKey(messageType)) {return messageServiceMap.get(messageType);}return messageServiceMap.get(Byte.MIN_VALUE);}}

1.2.5 借助netty实现节点与节点直接的心跳

@Slf4j
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {  @Override  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  if (evt instanceof IdleStateEvent) {  IdleStateEvent event = (IdleStateEvent) evt;  if (event.state() == IdleState.READER_IDLE) {  log.warn("读取空闲...");} else if (event.state() == IdleState.WRITER_IDLE) {log.warn("写入空闲...");} else if (event.state() == IdleState.ALL_IDLE) {Long serverId = ServerInfo.getServerIdByChannelId(ctx.channel().id());log.warn("serverId={}与server通信读取或写入空闲...", serverId);if (serverId != null) {ctx.writeAndFlush(DttaskMessage.buildPingMessage(serverId));} else {ctx.close();}}}  }  
}

1.2.6 ServerInfo类完成所有信息的管理

ServerInfo类完成所有信息的管理,它里面会存储很多系统运行需要的数据如:当前节点信息(myNodeInfo)、Channel和节点的关系(nodeChannelMap、nodeChannelServerIdMap)、系统状态(status)、其它节点信息(otherNodeInfoMap);

ServerInfo还肩负着确定节点角色(Controller或Follower)以及初始化角色的任务

@Slf4j
public class ServerInfo {private ServerInfo() {}private static NioEventLoopGroup bossGroup;public static void setBossGroup(NioEventLoopGroup bg) {bossGroup = bg;}public static NioEventLoopGroup getBossGroup() {return bossGroup;}private static NioEventLoopGroup workerGroup;private static Channel serverChannel;private static Bootstrap connectOtherNodeBootStrap;private static ServerBootstrap bootstrapForClient;private static NodeInfo myNodeInfo;private static Map<Long, Channel> nodeChannelMap = new ConcurrentHashMap<>();private static Map<ChannelId, Long> nodeChannelServerIdMap = new ConcurrentHashMap<>();private static volatile ServerStatus status;private static Map<Long, NodeInfo> otherNodeInfoMap = new ConcurrentHashMap<>();private static VotResult votResult = new VotResult();private static Controller controller;private static Follower follower;public static void init() {RedisUtil redisUtil = BeanUseHelper.redisUtil();DttaskServerConfig dttaskServerConfig = BeanUseHelper.dttaskServerConfig();long localServerId = dttaskServerConfig.getServerId();ServerInfo.setStatus(ServerStatus.STARTING);Long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);if (controllerServerId == null) {log.info("当前启动状态为:未确定controller");initNodeInfoByConfig();Long minServerId = ServerInfo.getMinNodeId();if (minServerId == localServerId) {log.info("就当前一个节点:{},此节点就是controller", localServerId);ServerInfo.setStatus(ServerStatus.RUNNING);} else {log.info("有多个节点,节点状态应为VOTING");ServerInfo.setStatus(ServerStatus.VOTING);}} else {log.info("当前启动状态为:已确定controller");ServerInfo.refreshNodeInfoByRedis();InetSocketAddress address = dttaskServerConfig.getServerInfoMap().get(localServerId);ServerInfo.setMyNodeInfo(localServerId, address.getHostString(), address.getPort(), null);ServerInfo.setStatus(ServerStatus.IDENTIFYING);}}private static void initNodeInfoByConfig() {DttaskServerConfig dttaskServerConfig = BeanUseHelper.dttaskServerConfig();long localServerId = dttaskServerConfig.getServerId();Map<Long, InetSocketAddress> serverInfoMap = dttaskServerConfig.getServerInfoMap();for (Map.Entry<Long, InetSocketAddress> entry : serverInfoMap.entrySet()) {long id = entry.getKey();InetSocketAddress address = serverInfoMap.get(id);if (localServerId != id) {ServerInfo.addOtherNode(id, address.getHostString(), address.getPort());} else {ServerInfo.setMyNodeInfo(localServerId, address.getHostString(), address.getPort(), null);}}}public static Controller initController() {long localServerId = ServerInfo.getServerId();RedisUtil redisUtil = BeanUseHelper.redisUtil();log.info("初始化本节点={}controller信息...", ServerInfo.getServerId());ServerInfo.setRole(ServerRole.CONTROLLER);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, localServerId);ServerInfo.setOtherNodeRole(localServerId);ServerInfo.refreshRedisNodeInfo();controller = Controller.getInstance();return controller;}public static Follower initFollower() {log.info("初始化本节点={}follower信息...", ServerInfo.getServerId());RedisUtil redisUtil = BeanUseHelper.redisUtil();Long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);if (controllerServerId == null) {log.error("init follower时,controller还没有确定...");throw new BusinessException("init follower时,controller还没有确定...");}ServerInfo.setRole(ServerRole.FOLLOWER);ServerInfo.setStatus(ServerStatus.RUNNING);ServerInfo.setOtherNodeRole(controllerServerId);follower = Follower.getInstance();return follower;}public static void setMyNodeInfo(long serverId, String ip, int port, ServerRole serverRole) {NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(ip);nodeInfo.setPort(port);nodeInfo.setServerRole(serverRole);myNodeInfo = nodeInfo;}public static long getServerId() {return myNodeInfo.getServerId();}public static NodeInfo getMyNodeInfo() {return myNodeInfo;}public static Map<Long, NodeInfo> getOtherNodeInfoMap() {return otherNodeInfoMap;}public static int getVotMax() {return otherNodeInfoMap.size();}public static VotResult getVotResult() {return votResult;}public static synchronized void cacheChannelAnsServerIdRel(long serverId, Channel channel) {nodeChannelMap.put(serverId, channel);nodeChannelServerIdMap.put(channel.id(), serverId);}public static Channel getChannelByServerId(Long serverId) {return nodeChannelMap.get(serverId);}public static synchronized void removeChannel(ChannelId channelId) {Long serverId = nodeChannelServerIdMap.get(channelId);if (serverId != null) {log.info("删除和节点id={}的连接", serverId);nodeChannelServerIdMap.remove(channelId);nodeChannelMap.remove(serverId);otherNodeInfoMap.remove(serverId);}}public static synchronized void refreshRedisNodeInfo() {if (myNodeInfo != null && getRole() != null && getRole().isController()) {RedisUtil redisUtil = BeanUseHelper.redisUtil();Map<Long, NodeInfo> otherNodeInfoMap = ServerInfo.getOtherNodeInfoMap();List<NodeInfo> nodeInfoList = new ArrayList<>();nodeInfoList.addAll(otherNodeInfoMap.values());nodeInfoList.add(ServerInfo.getMyNodeInfo());log.info("controller刷新节点信息到redis:{}", nodeInfoList);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO, JSON.toJSONString(nodeInfoList));}}public static Long getServerIdByChannelId(ChannelId channelId) {return nodeChannelServerIdMap.get(channelId);}public static synchronized boolean addVotResult(int version, long chooseServerId) {Integer curVersion = votResult.getVersion();if (version < votResult.getVersion()) {log.info("版本={}已失效,当前的为:{}", version, curVersion);return false;}votResult.setVersion(version);if (votResult.getVotMap().containsKey(chooseServerId)) {votResult.getVotMap().put(chooseServerId, votResult.getVotMap().get(chooseServerId) + 1);} else {votResult.getVotMap().put(chooseServerId, 1);}return true;}public static void addOtherNode(long serverId, String ip, int port) {NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(ip);nodeInfo.setPort(port);otherNodeInfoMap.put(serverId, nodeInfo);}public static void addOtherNode(long serverId, Channel channel) {addOtherNode(serverId, channel, null);}public static void addOtherNode(long serverId, Channel channel, ServerRole serverRole) {InetSocketAddress address = BeanUseHelper.dttaskServerConfig().getServerInfoMap().get(serverId);if (address == null) {throw new BusinessException(CharSequenceUtil.format("id={}的没有配置在文件中", serverId));}if (channel != null && ServerInfo.getServerIdByChannelId(channel.id()) == null) {ServerInfo.cacheChannelAnsServerIdRel(serverId, channel);}NodeInfo nodeInfo = new NodeInfo();nodeInfo.setServerId(serverId);nodeInfo.setIp(address.getHostString());nodeInfo.setPort(address.getPort());nodeInfo.setServerRole(serverRole);otherNodeInfoMap.put(serverId, nodeInfo);}public static NodeInfo getNodeInfo(long serverId) {return otherNodeInfoMap.get(serverId);}public static Set<Long> getOtherNodeIds() {return otherNodeInfoMap.keySet();}public static long getMinNodeId() {if (getOtherNodeIds().isEmpty()) {return ServerInfo.getServerId();}return Collections.min(getOtherNodeIds());}public static void setOtherNodeRole(long controllerServerId) {for (Map.Entry<Long, NodeInfo> entry : otherNodeInfoMap.entrySet()) {long serverId = entry.getKey();if (serverId == controllerServerId) {otherNodeInfoMap.get(serverId).setServerRole(ServerRole.CONTROLLER);} else {otherNodeInfoMap.get(serverId).setServerRole(ServerRole.FOLLOWER);}}}public static ServerRole getRole() {return myNodeInfo.getServerRole();}public static void setStatus(ServerStatus s) {status = s;}public static void setRole(ServerRole r) {myNodeInfo.setServerRole(r);}public static Set<Channel> getOtherNodeChannel(Long serverId) {Set<Channel> res = new HashSet<>();for (Map.Entry<Long, Channel> entry : nodeChannelMap.entrySet()) {long id = entry.getKey();if (!Objects.equals(serverId, id)) {res.add(nodeChannelMap.get(id));}}return res;}/*** 这个方法针对,本节点并没有和要断开节点有连接的* @param offlineServerId 掉线的节点id*/public static void removeNode(long offlineServerId) {log.info("删除掉线节点id={}", offlineServerId);otherNodeInfoMap.remove(offlineServerId);}public static void refreshNodeInfoByRedis() {RedisUtil redisUtil = BeanUseHelper.redisUtil();Object obj = redisUtil.getCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO);if (obj != null) {List<NodeInfo> nodeInfoList = JSON.parseObject(obj.toString(),new TypeReference<List<NodeInfo>>() {}.getType());for (NodeInfo nodeInfo : nodeInfoList) {otherNodeInfoMap.put(nodeInfo.getServerId(), nodeInfo);}}}public static boolean isIdentifying() {return status.isIdentifying();}public static boolean isVoting() {return status.isVoting();}public static boolean isRunning() {return status.isRunning();}public static void setWorkerGroup(NioEventLoopGroup bg) {workerGroup = bg;}public static NioEventLoopGroup getWorkerGroup() {return workerGroup;}public static void setServerChannel(Channel ch) {serverChannel = ch;}public static Channel getServerChannel() {return serverChannel;}public static void setConnectOtherNodeBootStrap(Bootstrap bs) {connectOtherNodeBootStrap = bs;}public static Bootstrap getConnectOtherNodeBootStrap() {return connectOtherNodeBootStrap;}public static void setBootstrapForClient(ServerBootstrap sbs) {bootstrapForClient = sbs;}public static ServerBootstrap getBootstrapForClient() {return bootstrapForClient;}}

1.2.7 SpringInitRunner -- 增加选举的逻辑

SpringInitRunner前面只启动了netty等待连接,这里将完成选举,以及当就自己一个节点则就认为自己是controller

@Component
@Slf4j
public class SpringInitRunner implements CommandLineRunner {@Autowiredprivate DttaskServerConfig dttaskServerConfig;@Autowiredprivate NetworkService networkService;@Autowiredprivate MessageServiceManager messageServiceManager;@Autowiredprivate RedisUtil redisUtil;@PostConstructpublic void init() {initServerBootStrap();initConnectOtherNodeBootStrap();}private void initConnectOtherNodeBootStrap() {ServerInfo.setConnectOtherNodeBootStrap(new Bootstrap());ServerInfo.getConnectOtherNodeBootStrap().group(new NioEventLoopGroup(4)).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new DttaskMessageDecoder(MESSAGE_MAX_SIZE, MESSAGE_LENGTH_FILED_OFFSET, MESSAGE_LENGTH_FILED_LENGTH));socketChannel.pipeline().addLast(new IdleStateHandler(dttaskServerConfig.getReadIdleSecondTime(),dttaskServerConfig.getWriteIdleSecondTime(),dttaskServerConfig.getAllIdleSecondTime()));socketChannel.pipeline().addLast(new DttaskMessageEncoder());socketChannel.pipeline().addLast(new ServerClientChannelHandler(networkService, redisUtil, messageServiceManager));}});}private void initServerBootStrap() {ServerInfo.setBossGroup(new NioEventLoopGroup(4));ServerInfo.setWorkerGroup(new NioEventLoopGroup(8));ServerInfo.setBootstrapForClient(new ServerBootstrap());ServerInfo.getBootstrapForClient().group(ServerInfo.getBossGroup(), ServerInfo.getWorkerGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new DttaskMessageDecoder(MESSAGE_MAX_SIZE, MESSAGE_LENGTH_FILED_OFFSET, MESSAGE_LENGTH_FILED_LENGTH));socketChannel.pipeline().addLast(new DttaskMessageEncoder());IdleStateHandler idleStateHandler = new IdleStateHandler(dttaskServerConfig.getReadIdleSecondTime(), dttaskServerConfig.getWriteIdleSecondTime(), dttaskServerConfig.getAllIdleSecondTime());socketChannel.pipeline().addLast(idleStateHandler);socketChannel.pipeline().addLast(new HeartBeatServerHandler());socketChannel.pipeline().addLast(new ServerClientChannelHandler(networkService, redisUtil, messageServiceManager));}});}@Overridepublic void run(String... args) {log.info("spring启动完成,接下来启动 netty");ServerInfo.init();try {log.info("启动监听其它节点端请求的服务端...");ServerInfo.setServerChannel(ServerInfo.getBootstrapForClient().bind(dttaskServerConfig.listenerPort()).sync().channel());} catch (Exception e) {log.error("启动 监听其它节点请求的服务端出现异常", e);System.exit(-1);}try {log.info("连接controller或开始vote...");if (ServerInfo.isIdentifying()) {log.info("连接controller...");RedisUtil redisUtil = BeanUseHelper.redisUtil();long controllerServerId = redisUtil.getLongValue(Constant.RedisConstants.DTTASK_CONTROLLER);networkService.connectController(controllerServerId);} else if (ServerInfo.isVoting()){log.info("开始vote...");long minNodeId = ServerInfo.getMinNodeId();networkService.startVote(null, minNodeId);} else if (ServerInfo.isRunning()) {log.info("已确认本节点={}就是controller...", ServerInfo.getServerId());Controller controller = ServerInfo.initController();}}  catch (Exception e) {log.error("连接controller或开始vote出现异常", e);System.exit(-1);}log.info("netty 启动成功...");}@PreDestroypublic void shutdown() {if (ServerInfo.getOtherNodeIds().isEmpty()) {redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, null);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_NODE_INFO, null);}try {ServerInfo.getServerChannel().close().sync();} catch (InterruptedException e) {log.error("dttask-server netty shutdown 出现异常", e);Thread.currentThread().interrupt();} finally {ServerInfo.getWorkerGroup().shutdownGracefully();ServerInfo.getBossGroup().shutdownGracefully();}}
}

1.2.8 ServerClientChannelHandler -- 完成投票等消息处理

前面ServerClientChannelHandler只是完成了基本框架,代码较少,现在要在这里添加处理每个消息的逻辑,放心,代码也不多,因为我们已经对消息处理进行了拆分,只要加入一个策略选择就可以。

这里不得不感叹一下:策略模式的功能,否则这里将会有一大堆if else。

当节点与节点的通信断开时,会触发channelInactive和exceptionCaught方法,我们需要在这里处理断开的业务逻辑,注意断开的业务逻辑需要判断断开的是Controller还是Follower,这里的处理逻辑不同。

@Slf4j
public class ServerClientChannelHandler extends SimpleChannelInboundHandler<DttaskMessage> {private NetworkService networkService;private MessageServiceManager messageServiceManager;private RedisUtil redisUtil;public ServerClientChannelHandler(NetworkService networkService, RedisUtil redisUtil, MessageServiceManager messageServiceManager) {super();this.networkService = networkService;this.redisUtil = redisUtil;this.messageServiceManager = messageServiceManager;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive={}", ctx.channel().id());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DttaskMessage message) throws Exception {log.info("收到客户端的请求:{}", message);messageServiceManager.chooseMessageService(message.getType()).execute(ctx, message);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.warn("channelInactive...");stopChannel(ctx, null);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.warn("exceptionCaught...", cause);stopChannel(ctx, cause);}private void stopChannel(ChannelHandlerContext ctx, Throwable cause) {Channel channel = ctx.channel();long localServerId = ServerInfo.getServerId();Long serverId = ServerInfo.getServerIdByChannelId(ctx.channel().id());if (serverId == null) {return;}if (cause != null) {log.error("nodeId={}与本节点id={}通信出现异常", serverId, localServerId, cause);} else {log.error("nodeId={}与本节点id={}通信失效", serverId, localServerId);}if (channel.isActive()) {channel.close();}// 判断下线的是follower 还是 controllerNodeInfo nodeInfo = ServerInfo.getNodeInfo(serverId);if (!nodeInfo.getServerRole().isController()) {log.info("下线的是follower,id={}", serverId);Set<Channel> otherNodeChannels = ServerInfo.getOtherNodeChannel(serverId);for (Channel otherNodeChannel : otherNodeChannels) {otherNodeChannel.writeAndFlush(DttaskMessage.buildNodeOfflineMessage(serverId));}ServerInfo.removeChannel(channel.id());ServerInfo.refreshRedisNodeInfo();} else {log.info("下线的是controller,id={}", serverId);redisUtil.setCacheObject(Constant.RedisConstants.DTTASK_CONTROLLER, null);ServerInfo.removeChannel(channel.id());long minNodeId = ServerInfo.getMinNodeId();if (minNodeId != localServerId) {// 重新选举networkService.startVote(serverId, minNodeId);} else {// 当前就只剩自己一个节点ServerInfo.setStatus(ServerStatus.RUNNING);Controller controller = ServerInfo.initController();}ServerInfo.refreshRedisNodeInfo();}}
}

2. 验证

2.1 建立3个节点的配置

注意:Server.port需要是不一样的

注意:节点的serverId也要和serverInfo匹配

2.2 idea建立针对3个配置的启动Service

2.3 依次启动验证

可以按照本文最起那面的 节点详细交互图的 步骤进行测试。

  • 确保redis,mysql都已ok
  • 依次启动3个节点,可以看到1为controller,2 3为follower,redis的key --- 完成选举

  • 下线3号节点

  • 下线1号节点

2号节点称为Controller

  • 上线1号节点、3号节点

所有节点启动完成,这时2是C、1 3是F

  • 停止2号节点

至此完成了所有验证

相关文章:

5.实现简化版raft协议完成选举

1.设计 前面已经完成了netty的集成&#xff0c;接下来就是借助netty完成选举就行了。 针对选举&#xff0c;我们用到了VotRequestMessage、VotRespMessage、当节点下线时NodeOfflineMessage、NodeOnlineMessage、NodeOnlineRespMessage 1.1 节点详细的交互 1.2 对所有消息的…...

服装管理系统 简单实现

服装管理系统 项目使用jsp servletmysql实现&#xff1b; 登陆注册 首页 首页显示服装信息 服装管理 1添加服装 2修改服装 3分页查询服装 4导出服装信息 5 导入服装信息 代码结构截图 百度网盘 链接&#xff1a;https://pan.baidu.com/s/1zfLHGMnrYd-JtnhzS5elYQ 提取码…...

深度学习项目实战:垃圾分类系统

简介&#xff1a; 今天开启深度学习另一板块。就是计算机视觉方向&#xff0c;这里主要讨论图像分类任务–垃圾分类系统。其实这个项目早在19年的时候&#xff0c;我就写好了一个版本了。之前使用的是python搭建深度学习网络&#xff0c;然后前后端交互的采用的是java spring …...

C#浅拷贝和深拷贝数据

目录 一、浅拷贝 二、深拷贝 一、浅拷贝 就是把原来的数据&#xff0c;复制一份&#xff0c;但是2份数据是共享地址的&#xff0c;修改第一份数据或者修改第二份数据&#xff0c;都会一起改变&#xff0c;这可能不是我们程序中需要的场景。 下面我们演示一下&#xff0c;首…...

【JVM】4.运行时数据区(程序计数器、虚拟机栈)

文章目录 4.JVM的运行时数据区4.1 程序计数器4.2 Java虚拟机栈4.3 虚拟机栈内存溢出 4.JVM的运行时数据区 4.1 程序计数器 程序计数器&#xff08;PC&#xff09;会记录着下一行字节码指令的地址。执行完当前指令后&#xff0c;PC刷新&#xff0c;JVM的执行引擎根据程序计数器…...

算法:程序员的数学读书笔记

目录 ​0的故事 ​一、按位计数法 二、不使用按位计数法的罗马数字 三、十进制转二进制​​​​​​​ ​四、0所起到的作用​​​​​​​ 逻辑 一、为何逻辑如此重要 二、兼顾完整性和排他性 三、逻辑 四、德摩根定律 五、真值表 六、文氏图 七、卡诺图 八、逻…...

机器学习算法---时间序列

类别内容导航机器学习机器学习算法应用场景与评价指标机器学习算法—分类机器学习算法—回归机器学习算法—聚类机器学习算法—异常检测机器学习算法—时间序列数据可视化数据可视化—折线图数据可视化—箱线图数据可视化—柱状图数据可视化—饼图、环形图、雷达图统计学检验箱…...

RK3568/RV1126/RV1109/RV1106 ISP调试方案

最近一直在做瑞芯微rv1126的开发&#xff0c;由于项目性质&#xff0c;与camera打的交道比较多&#xff0c;包括图像的采集&#xff0c;ISP处理&#xff0c;图像处理&#xff0c;H.264/H.265编解码等各个方面吧。学到了不少&#xff0c;在学习的过程中&#xff0c;也得到了不少…...

【TB作品】51单片机,语音出租车计价器

西交大题目 1.语音出租车计价器 一、功能要求: 1.具有可模拟出租车车轮转速传感器的硬件设计,可计量出租车所走的公 里数。 2.显示和语音播报里程、价格和等待红灯或堵车的计时价格: 3.具有等待计时功能 4.具有实时年月日显示和切换功能。 5.操作简单、界面友好。 二、设计建议…...

jmeter简单压测kafka

前言 这也是一个笔记&#xff0c;就是计划用jmeter做性能测试&#xff0c;但是这里是只要将数据放到kafka的topic里&#xff0c;后面查看下游业务处理能力。 一、方案 因为只要实现数据放到kafka&#xff0c;参考了下博友的方案&#xff0c;可行。 二、方案验证 详细过程就不…...

【漏洞复现】红帆OA iorepsavexml.aspx文件上传漏洞

漏洞描述 广州红帆科技深耕医疗行业20余年,专注医院行政管控,与企业微信、阿里钉钉全方位结合,推出web移动一体化办公解决方案——iOffice20(医微云)。提供行政办公、专业科室应用、决策辅助等信息化工具,采取平台化管理模式,取代医疗机构过往多系统分散式管理,实现医…...

04_Web框架之Django一

Web框架之Django一 学习目标和内容 1、能够描述Django的作用 2、能够使用Django创建应用 3、能够使用GET和POST请求方式进行传参 4、能够使用Django的函数式方法定义视图 5、能够进行Django的配置文件修改 6、能够基本使用Django的路由定义 一、Django相关介绍 1、什么是Djan…...

单机架构到分布式架构的演变

目录 1.单机架构 2.应用数据分离架构 3.应用服务集群架构 4.读写分离 / 主从分离架构 5.引入缓存 —— 冷热分离架构 6.垂直分库 7.业务拆分 —— 微服务 8.容器化引入——容器编排架构 总结 1.单机架构 初期&#xff0c;我们需要利用我们精干的技术团队&#xff0c;快…...

1.新入手的32位单片机资源和资料总览

前言&#xff1a; 学了将近1年的linux驱动和uboot&#xff0c;感觉反馈不足&#xff0c;主要是一直在学各种框架&#xff0c;而且也遇到了门槛&#xff0c;比如驱动部分&#xff0c;还不能随心所欲地编程&#xff0c;原因是有些外设的原理还不够深刻、有些复杂的底层驱动的代码…...

jmeter判断’响应断言‘两个变量对象是否相等

1、首先需要设置变量&#xff0c;json、正则、csv文件等变量 2、然后在响应断言中 ①JMeter Variable Name to use —— 输入一个变量&#xff0c;变量名即可 ② 模式匹配规则 ——相等 ③测试模式 ——输入引用的变量命${变量名} &#xff08;注意这里是需要添加一个测试模式…...

【Linux基础命令使用】

文章目录 一. 操作系统和文件及文件路径介绍二. 基础指令介绍三. 结束语 一. 操作系统和文件及文件路径介绍 什么是操作系统&#xff1f;操作系统是一款进行软硬件资源管理的软件为什么要进行软硬件资源管理&#xff1f;对上提供良好的稳定的运行服务----工具Linux指令和图形化…...

【JNA与C++基本使用示例】

JNA中java与C使用注意事项和代码示例 JNA关系映射表使用案列注意代码示例C代码java代码 JNA关系映射表 使用案列 注意 JNA只支持C方式的dll使用C的char* 作为返回值时&#xff0c;需要返回的变量为malloc分配的地址C的strlen函数只获得除/0以外的字符串长度 代码示例 C代码…...

HttpRunner接口自动化测试框架

简介 HttpRunner是一款面向 HTTP(S) 协议的通用测试框架&#xff0c;只需编写维护一份 YAML/JSON 脚本&#xff0c;即可实现自动化测试、性能测试、线上监控、持续集成等多种测试需求。 项目地址&#xff1a;GitHub - httprunner/httprunner: HttpRunner 是一个开源的 API/UI…...

云计算:Vmware 安装 FreeNAS

目录 一、实验 1.Vmware 安装 FreeNAS 2.配置Web界面 二、问题 1.iSCSI如何限定名称 2.LUN和LVM的区别 一、实验 1.Vmware 安装 FreeNAS &#xff08;1&#xff09;环境准备 VMware Workstation 17 FreeNAS相关安装部署镜像: 官网地址&#xff1a; https://download…...

数据库交付运维高级工程师-腾讯云TDSQL

数据库交付运维高级工程师-腾讯云TDSQL上机指导&#xff0c;付费指导&#xff0c;暂定99...

目标检测YOLO实战应用案例100讲-光伏电站热斑检测(续)

目录 2.5 图像重建方法实验及其结果分析 2.5.1 数据集与超参数 2.5.2 结果分析...

jmeter如何循环运行到csv文件最后一行后停止

1、首先在线程组中设置’循环次数‘–勾选永远 2、csv数据文件设置中设置&#xff1a; 遇到文件结束符再次循环?——改为&#xff1a;False 遇到文件结束符停止线程?——改为&#xff1a;True 3、再次运行就会根据文档的行数运行数据 &#xff08;如果需要在循环控制器中&…...

电路中的屏蔽罩作用及设计

1.1 屏蔽罩作用 1.1.1 屏蔽电子信号,防止外界的干扰或内部向外的辐射&#xff1a; 一般见于通信类电路PCB&#xff0c;主要一个无线通信产品上有的敏感器件、模拟、数字电路、DCDC电源电路&#xff0c;都需屏蔽隔离&#xff0c;是为了不影响其它电路&#xff0c;也有防止其它电…...

CodeBlocks定义异常:multiple definition of 和 first defined here

基于链表实现贪吃蛇案例时候&#xff0c;在CodeBlocks的CPP源文件定义函数和全局变量均报错 异常现象 在**自定义的cpp**文件定义全局变量、对象、函数等均出现重复定义和首次定义 multiple definition of Controller::showCopy() first defined here 异常解决方案 正确代码…...

RHEL7.5编译openssl1.1.1w源码包到rpm包

openssl1.1.1w下载地址 https://www.openssl.org/source/ 安装依赖包 yum -y install curl which make gcc perl perl-WWW-Curl rpm-build wget http://mirrors.aliyun.com/centos-vault/7.5.1804/os/x86_64/Packages/perl-WWW-Curl-4.15-13.el7.x86_64.rpm rpm -ivh pe…...

结构型设计模式(二)装饰器模式 适配器模式

装饰器模式 Decorator 1、什么是装饰器模式 装饰器模式允许通过将对象放入特殊的包装对象中来为原始对象添加新的行为。这种模式是一种结构型模式&#xff0c;因为它通过改变结构来改变被装饰对象的行为。它涉及到一组装饰器类&#xff0c;这些类用来包装具体组件。 2、为什…...

C#数据结构

C#数据结构 常见结构 1、集合 2、线性结构 3、树形结构 4、图形结构 Array/ArrayList/List 特点&#xff1a;内存上连续存储&#xff0c;节约空间&#xff0c;可以索引访问&#xff0c;读取快&#xff0c;增删慢 using System; namespace ArrayApplication {class MyAr…...

代码随想Day39 | 62.不同路径、63. 不同路径 II

62.不同路径 每次向右或者向下走两个选择&#xff0c;定义dp数组dp[i][j] 为到达索引ij的路径和&#xff0c;状态转移公式为 dp[i][j]dp[i-1][j]dp[i][j-1]&#xff0c;初始状态的第一行和第一列为1&#xff0c;从左上到右下开始遍历即可。详细代码如下&#xff1a; class Sol…...

Autosar通信实战系列07-Com模块要点及其配置介绍(二)

本文框架 前言1. ComGeneral配置2. ComConfig配置2.1 ComGwMapping2.2 ComIPdus2.3 ComIPduGroups2.4 ComIPduSignals2.5 ComIPduSignalGroups2.6 ComTimeBasis前言 在本系列笔者将结合工作中对通信实战部分的应用经验进一步介绍常用,包括但不限于通信各模块的开发教程,代码…...

DSP捕获输入简单笔记

之前使用stm32的大概原理是&#xff1a; 输入引脚输入一个脉冲&#xff0c;捕获1开始极性捕获&#xff0c;捕获的是从启动捕获功能开始计数&#xff0c;捕获的是当前的计数值&#xff1b; 例如一个脉冲&#xff0c;捕获1捕获上升沿&#xff0c;捕获2捕获下降沿&#xff1b;而两…...

多产品网站怎么做企业网站/河南企业网站建设

通过王涛完成的项目练习&#xff0c;发现了struts中的表单校验失败后重显的一个问题&#xff1a;假设ActionForm Bean中有一个整数类型的属性&#xff0c;如果将其类型定义为int&#xff0c;这个属性的默认值是0&#xff1b;在表单页面中用一个文本框来输入这个属性的值&#x…...

wordpress 产品模块/怎么建设自己的网站

1)jsp是简servlet编写的一种技术&#xff0c;它将Java代码&#xff08;一定是在服务器端执行&#xff0c;不是在客户端的浏览器那儿执行&#xff09;和html语句混在同一个文件中编写&#xff0c;只对网页中要动态产生的内容用Java代码来编写&#xff0c;而对固定不变的静态内容…...

如何做好网络营销管理/西安seo代运营

E此浏览器不支持画布C我一路 看 过 千山和万水C我的脚 踏 遍 天南和地北F 我都无所谓日晒或是风 吹F 鲜红的纯粹路边那朵蔷 薇C关掉了 手 机 管他谁是谁C不要去 理 会 是是与非非F 从不觉疲惫天亮走到天 黑F F C黄昏中的堡 垒 (多颓废)F G如果迎着风 就飞Em Am俯瞰这世界 有多美…...

南昌建站系统外包/今日小说搜索百度风云榜

最新的 Firefox 2 版本为 Firefox 2.0.0.14&#xff0c; Firefox 3 版本为 Firefox 3.0rc1。主流依然是 Firefox 2.0.0.14&#xff0c;但由于在不久的将来 Firefox 2 会升级到 Firefox 3&#xff0c;对于我们前端是好消息&#xff08;更好更优的功能&#xff09;&#xff0c;也…...

wordpress的文章如何备份/好用的搜索引擎

2019独角兽企业重金招聘Python工程师标准>>> 1 进入php源代码目录中的mbstring所在目录cd /usr/local/src/php-5.2.4/ext/mbstring/2 执行php安装后目录中的bin/phpize文件/usr/local/php/bin/phpize3 进入php源代码目录cd /usr/local/src/php-5.2.4/4 执行上述目录…...

中牟网站建设/网址域名注册

ESG使用指南&#xff1a;1.ESG操作文档网站&#xff1a;ESG有个网站&#xff0c;是专门的操作文档网站&#xff0c;因为ESG三个环境&#xff0c;流程各不一样。地址&#xff1a;http://10.20.12.90:20567/esg-help-doc/2.ESG管理平台网站&#xff0c;分别管理开发&#xff0c;测…...