三线城市做网站需求/培训总结
背景
由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:
- 关闭连接NioEventLoop没有释放导致oom
- 设计思想是一个设备一个连接,而不是一个网关一个连接
- 连接断开后客户端无从感知
前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
所以为了解决以上问题,我打算重新封装一个Modbus组件。
步骤
代码如下所示,目前只分享modbus-core相关的代码。
- modbus-core:实现设备读写指令的下发以及应答。
- modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。
引入架包
<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>${vertx.version}</version></dependency>
工具类
ByteUtil
package com.bho.modbus.utils;import java.nio.ByteBuffer;public class ByteUtil {/*** 字节数组转字符串* @param bytes* @return*/public static String bytesToHexString(byte[] bytes) {StringBuffer sb = new StringBuffer(bytes.length);String sTemp;for (int i = 0; i < bytes.length; i++) {sTemp = Integer.toHexString(0xFF & bytes[i]);if (sTemp.length() < 2) {sb.append(0);}sb.append(sTemp.toUpperCase());}return sb.toString();}/*** int整型转字节数组* @param data* @param offset* @param len* @return*/public static byte[] intToBytes(int data, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(data);byte[] bytes = buffer.array();if (len - offset == 4) {return bytes;}byte[] dest = new byte[len];System.arraycopy(bytes, offset, dest, 0, len);return dest;}/*** 字节数组转int整型* @param bytes* @param offset* @param len* @return*/public static int bytesToInt(byte[] bytes, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);for (int i = len; i < 4; i ++) {buffer.put((byte) 0x00);}for (int i = offset; i < offset + len; i++) {buffer.put(bytes[i]);}buffer.flip();return buffer.getInt();}}
Crc16
package com.bho.modbus.utils;public class Crc16 {/*** 获取CRC16校验码* @param arr_buff* @return*/public static byte[] getCrc16(byte[] arr_buff) {int len = arr_buff.length;// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。int crc = 0xFFFF;int i, j;for (i = 0; i < len; i++) {// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));for (j = 0; j < 8; j++) {// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位if ((crc & 0x0001) > 0) {// 如果移出位为 1, CRC寄存器与多项式A001进行异或crc = crc >> 1;crc = crc ^ 0xA001;} else// 如果移出位为 0,再次右移一位crc = crc >> 1;}}return intToBytes(crc);}private static byte[] intToBytes(int value) {byte[] src = new byte[2];src[1] = (byte) ((value >> 8) & 0xFF);src[0] = (byte) (value & 0xFF);return src;}}
实体类
ModbusMode
目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。
package com.bho.modbus.model;import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;import java.nio.ByteOrder;@Log4j2
public enum ModbusMode {/*** 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)*/TCP,/*** 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】**/RTU,;public ByteToMessageDecoder getDecoder() {if (this == ModbusMode.TCP) {return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,2, 0, 6, true);}if (this == ModbusMode.RTU){return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,1, 2, 0, true);}return null;}public byte[] readData(byte[] bytes) {int len = bytes.length;if (this == ModbusMode.RTU) {byte[] tempArr = new byte[len - 2];System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);byte[] crc16 = Crc16.getCrc16(tempArr);if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));return null;}if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));}return tempArr;}if (this == ModbusMode.TCP) {if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));}return bytes;}return null;}public byte[] writeData(byte[] bytes) {if (log.isDebugEnabled()) {log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));}int len = bytes.length;if (this == ModbusMode.RTU) {byte[] crc16 = Crc16.getCrc16(bytes);byte[] tempArr = new byte[len + 2];System.arraycopy(bytes, 0, tempArr, 0, len);tempArr[len] = crc16[0];tempArr[len + 1] = crc16[1];return tempArr;}if (this == ModbusMode.TCP) {byte[] tempArr = new byte[len + 6];tempArr[1] = 0x01;byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);tempArr[4] = lenBytes[0];tempArr[5] = lenBytes[1];System.arraycopy(bytes, 0, tempArr, 6, len);return tempArr;}return null;}}
ModbusFunc
功能码
package com.bho.modbus.model;/*** Modbus常见功能码*/
public enum ModbusFunc {/*** 错误代码* 01:非法的功能码* 02:非法的寄存器地址* 03:非法的数据值* 04:从机故障*//*** 请求:* 功能代码:1字节 0x01* 起始地址:2字节 0x0000-0xffff* 线圈数量:2字节 0x0001-0x07d0(2000)** 正确响应:* 功能代码:1字节 0x01* 字节数:1字节 N(读线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 错误响应:* 功能代码:1字节 0x81* 错误代码:1字节 0x01-0x04*/READ_COILS((byte)0x01),//读连续线圈状态READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上/*** 请求:* 功能代码:1字节 0x03* 起始地址:2字节 0x0000-0xffff* 寄存器数量:2字节 0x0001-0x007d(125)** 正确响应:* 功能代码:1字节 0x03* 字节数:1字节 2N(N为寄存器数量)* 寄存器数量:2N字节** 错误响应:* 功能代码:1字节 0x83* 错误代码:1字节 0x01-0x04*/READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上/*** 请求:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 正确响应:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 错误响应:* 功能代码:1字节 0x85* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_COILS((byte)0x05),//写单个线圈/*** 请求:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 错误响应:* 功能代码:1字节 0x86* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器/*** 请求:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)* 写入字节数:1字节 2N(N为寄存器个数)* 寄存器值:2N字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)** 错误响应:* 功能代码:1字节 0x90* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器/*** 请求:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)* 写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 正确响应:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)** 错误响应:* 功能代码:1字节 0x8F* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_COILS((byte)0x0F),//写多个线圈;private byte func;ModbusFunc(byte func) {this.func = func;}public byte getFunc() {return func;}
}
ModbusParamConfig
下发指令参数配置信息
package com.bho.modbus.model;import lombok.Data;@Data
public class ModbusParamConfig {private RegisterType registerType;//寄存器类型private int registerAddress;//寄存器地址private String name;//指标名称private DataType dataType;//指标数据类型private int numberSplit;//(除)倍数public enum RegisterType {COIL,HOLDING_REGISTER,INPUT_REGISTER;}public enum DataType {BOOL,FLOAT,INT;}}
SendCmdTask
下发指令任务
package com.bho.modbus.model;import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;import java.util.List;@Data
public class SendCmdTask {private List<ModbusParamConfig> paramConfigs;//参数列表private JSONObject reqParam;//请求参数 写数据必填private Boolean isWrite;//是否是写数据private Integer slaveId;//从机IDprivate Integer reqTimeout;//请求超时时间(秒)private Promise<JSONObject> promise;private Long timerId;public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {this.paramConfigs = paramConfigs;this.reqParam = reqParam;this.isWrite = isWrite;this.slaveId = slaveId;this.reqTimeout = Math.max(reqTimeout, 5);Promise<JSONObject> promise = Promise.promise();this.promise = promise;this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));}
}
核心类
package com.bho.modbus.core;import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;@Log4j2
public class ModbusConnection {private String ip;//从机IPprivate Integer port;//从机端口private AtomicBoolean isAlive;//从机是否在线private ModbusMode mode;//通讯模式private NetSocket netSocket;//客户端连接private boolean isInitiativeClose;//是否是主动关闭连接private Long failRetryTimerId;//失败重试定时器IDprivate Integer failRetryIntervalSecond;//连接断开后重连间隔时间private Integer reqTimeoutSecond = 1;//请求超时时间private Long queueTimerId;//队列定时器private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列private Map<String, Promise<byte[]>> promiseMap;private Vertx vertx;public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {this.vertx = vertx;this.ip = ip;this.port = port;this.failRetryIntervalSecond = failRetryIntervalSecond;this.mode = mode;this.isAlive = new AtomicBoolean(false);this.writeQueue = new ConcurrentLinkedQueue<>();this.readQueue = new ConcurrentLinkedQueue<>();this.promiseMap = new ConcurrentHashMap<>();consumerTaskQueue(true);}/*** 建立连接* @return*/public Future<Boolean> connect(){NetClient netClient = vertx.createNetClient();return vertx.executeBlocking(b -> {netClient.connect(port, ip).onSuccess(socket -> {log.info("Modbus connect success, ip:{}, port:{}", ip, port);netSocket = socket;isAlive.set(true);b.tryComplete(true);NetSocketImpl netSocketImpl = (NetSocketImpl) socket;netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());socket.handler(buf -> {byte[] bytes = mode.readData(buf.getBytes());if (bytes == null) {return;}int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);int errFuncNo = funcNo - 128;String key = String.format("%s_%s", slaveId, funcNo);String errKey = String.format("%s_%s", slaveId, errFuncNo);if (promiseMap.containsKey(key)) {Promise<byte[]> promise = promiseMap.get(key);byte[] content = new byte[bytes.length - 2];System.arraycopy(bytes, 2, content, 0, content.length);promise.tryComplete(content);} else if (promiseMap.containsKey(errKey)) {Promise<byte[]> promise = promiseMap.get(errKey);int data = ByteUtil.bytesToInt(bytes, 2, 1);switch (data) {case 1:promise.tryFail("Illegal function code");break;case 2:promise.tryFail("Illegal register address");break;case 3:promise.tryFail("Illegal data value");break;case 4:promise.tryFail("Slave fault");break;}}});socket.closeHandler(h -> {if (!isInitiativeClose) {log.error("Modbus connect close, ip:{}, port:{}", ip, port);failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());} else {log.info("Modbus connect close, ip:{}, port:{}", ip, port);}});}).onFailure(err -> {log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());isAlive.set(false);b.fail(err.getMessage());failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());});});}/*** 是否在线* @return*/public boolean isActive() {return isAlive.get();}/*** 断开连接*/public void close() {isInitiativeClose = true;if (failRetryTimerId != null) {vertx.cancelTimer(failRetryTimerId);}if (queueTimerId != null) {vertx.cancelTimer(queueTimerId);}if (netSocket != null) {netSocket.close();}}/*** 下发读写任务(串行 优先写任务)* 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务* @param task 读写任务* @return*/public Promise<JSONObject> offerTask(SendCmdTask task) {if (task.getIsWrite()) {writeQueue.offer(task);} else {readQueue.offer(task);}return task.getPromise();}/*** 消费任务队列 500毫秒轮询一次 优先消费写任务* @param delayFlag*/private void consumerTaskQueue(boolean delayFlag){if(delayFlag){queueTimerId = vertx.setTimer(500,id->{consumerTaskQueue(false);});return;}if(writeQueue.isEmpty() && readQueue.isEmpty()){consumerTaskQueue(true);return;}if(!writeQueue.isEmpty()){SendCmdTask sendCmdTask = writeQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);return;}if(!readQueue.isEmpty()){SendCmdTask sendCmdTask = readQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);}}private Future<Void> executeTask(SendCmdTask sendCmdTask){vertx.cancelTimer(sendCmdTask.getTimerId());Future<JSONObject> future;if (sendCmdTask.getIsWrite()) {future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());} else {future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());}return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res)).onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);}/*** 写数据* @param reqParam 下发参数* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();Promise<byte[]> promise = Promise.promise();List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.complete(reqParam);}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.tryComplete(reqParam.get(paramConfig.getName()));}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 读数据* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();int num = paramConfigs.size();Promise<byte[]> promise = Promise.promise();Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);return vertx.executeBlocking(h -> {netSocket.write(buffer);promise.future().onSuccess(buf -> {JSONObject jsonObject = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);switch (registerType) {case COIL:Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);break;case INPUT_REGISTER:case HOLDING_REGISTER:jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}h.complete(jsonObject);}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {switch (registerType) {case COIL:h.complete(Integer.valueOf(buf[1]) == 1);break;case INPUT_REGISTER:case HOLDING_REGISTER:h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发* @param paramConfigs* @return 是否可以合并下发命令*/private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {if (paramConfigs.size() == 1) {return false;}int lastPos = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();for (int i = 1; i < paramConfigs.size(); i++) {int curPos = paramConfigs.get(i).getRegisterAddress();if (curPos - lastPos != 1) {return false;}ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();if (registerType != curRegisterType) {return false;}lastPos = curPos;}return true;}/*** 获取查询数据命令* @param startPos 查询地址* @param num 查询数量* @param slaveId 从机ID* @param registerType 寄存器类型* @param promise* @return*/private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {byte[] bytes = new byte[6];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];switch (registerType) {case COIL:bytes[1] = ModbusFunc.READ_COILS.getFunc();break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();break;case INPUT_REGISTER:bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();break;}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);bytes[4] = numBytes[2];bytes[5] = numBytes[3];Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}/*** 获取写数据命令* @param startPos 查询地址* @param slaveId 从机ID* @param reqParam 写参数* @param keys 参数列表* @param registerType 寄存器类型* @param promise* @return*/private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());byte[] bytes = new byte[len];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];if (keys.size() == 1) {switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();boolean value = reqParam.getBoolean(keys.get(0));if (value) {bytes[4] = (byte) 0xFF;} else {bytes[4] = 0x00;}bytes[5] = 0x00;break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);bytes[4] = dataArr[0];bytes[5] = dataArr[1];break;}} else {byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);bytes[4] = dataNum[0];bytes[5] = dataNum[1];switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];for (int i = 0; i < dataSize; i += 2) {int sum = 0;int startIndex = i * 8;int endIndex = (i + 2) * 8;endIndex = endIndex > keys.size() ? keys.size() : endIndex;for (int j = startIndex; j < endIndex; j++) {sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);}byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);if (i + 8 < keys.size()) {bytes[i + 7] = sumArr[0];bytes[i + 8] = sumArr[1];} else {bytes[i + 7] = sumArr[1];}}break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];for (int i = 0; i < keys.size(); i++) {String paramKey = keys.get(i);Integer value = reqParam.getInteger(paramKey);byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);bytes[i * 2 + 7] = dataArr[0];bytes[i * 2 + 8] = dataArr[1];}break;}}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {return vertx.executeBlocking(b -> {Future.join(futures).onComplete(h -> {JSONObject okJson = new JSONObject();JSONObject errJson = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);Future<Object> objectFuture = futures.get(i);if (objectFuture.succeeded()) {okJson.put(paramConfig.getName(), objectFuture.result());} else {errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());}}if (okJson.size() > 0) {b.tryComplete(okJson);} else {b.tryFail(errJson.getString(paramConfigs.get(0).getName()));}});});}private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {if (numberSplit == 1) {return value;}Float temp = value * 1f / numberSplit;switch (dataType) {case INT :return Math.round(temp);case FLOAT:return temp;}return temp;}}
测试
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;import java.util.List;@Log4j2
public class TestModbus {public static final String READ_DATA = "[" +" {" +" \"name\": \"a\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 504," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"b\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 505," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"c\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 506," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"d\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 507," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }," +" {" +" \"name\": \"e\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 508," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }]";private static final String WRITE_DATA = "[" +" {" +" \"name\": \"do0\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 20," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +" ,{" +" \"name\": \"do1\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 21," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +"]";public static void main(String[] args) {testReadData();
// testWriteData();;}private static void testWriteData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();JSONObject reqParam = new JSONObject();reqParam.put("do0", false);reqParam.put("do1", false);List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connectFuture.succeeded()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info("read:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);Promise<JSONObject> promise2 = connection.offerTask(task2);promise2.future().onSuccess(suc -> {log.info("write:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}private static void testReadData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connection.isActive()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info(suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}
}
运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
modbus-app配置参数
格式如下:
{"readable": {"devType01": {"ReportData": [{"name" : "xxx","registerType" : "COIL","registerAddress" : 1,"dataType" : "BOOL","numberSplit" : 1}]},"devType02": {"ReportData": [{"name" : "a","registerType" : "HOLDING_REGISTER","registerAddress" : 1,"dataType" : "INT","numberSplit" : 1},{"name" : "b","registerType" : "HOLDING_REGISTER","registerAddress" : 2,"dataType" : "INT","numberSplit" : 10},{"name": "c","registerType": "","dataType": "FLOAT","mbScript": "(a*10000+b)/10"}]}},"writable": {"devType01": {"Control": [{"name": "operation","registerType": "COIL","registerAddress": 21,"dataType": "BOOL","numberSplit": 1}]}},"readDataPeriods": [{"period" : 60,"deviceTypes": ["devType01"]},{"period" : 600,"deviceTypes": ["devType02","devType03"]}]
}
具体怎么实现这边就不过多讲解了…
结束
不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。
相关文章:

Java实现Modbus读写数据
背景 由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题: 关闭连接NioEventLoop没有释放导致oom设计思想是一个设备一个连接,而不是一个网关一个连接连接断开后客户端无从感知 前两个问题解…...

C++11新特性⑤ | 仿函数与lambda表达式
目录 1、引言 2、仿函数 3、lambda表达式 3.1、lambda表达式的一般形式 3.2、返回类型说明 3.3、捕获列表的规则 3.4、可以捕获哪些变量 3.5、lambda表达式给编程带来的便利 VC常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...&a…...

解决websocket不定时出现1005错误
后台抛出异常如下: Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005 Caused by: java.lang.IllegalArgume…...

文章内容生成随机图像,并将这些图像上链
一、需求背景 在当前的互联网时代,信息越来越快速地传播,一篇好的文章不仅需要有吸引人的文字内容,还需要有精美的配图。但是,对于某些只有文字,而没有图片的文章,我们可以使用程序去生成随机的图片来作为文章的配图。 本文将详细介绍如何使用Java语言实现文章内容生成…...

l8-d9 UDP通信实现
一、函数接口扩展与UDP通信实现流程 1.write/read到send/recv 函数原型: ssize_t send(int sockfd, const void *buf, size_t len, int flags); ssize_t recv(int sockfd, void *buf, size_t len, int flags); 前三个参数同read/write一样; ssize_t rea…...

MongoDB复杂聚合查询与java中MongoTemplate的api对应
MongoDB聚合json脚本 db.getCollection("202303_refund").aggregate([{"$match": {"courseType": "常规班课","teacherRefundReasonCheck": true,"teacherId": {"$in": [7544]},"createTime"…...

WireShark抓包工具的安装
1.下载安装包 在官网或者电脑应用商城都可以下载 2.安装 打开安装包,点击next 点击next 选择UI界面,两种都装上 根据习惯选择 选择安装位置点击安装 开始安装安装成功...

审计智能合约的成本是多少?如何审计智能合约?
审计智能合约的成本是多少?如何审计智能合约? 智能合约安全审计在去中心化金融 (DeFi) 生态系统中非常普遍。如果您投资了一个区块链项目,您的决定可能部分基于智能合约代码审查的结果。 虽然大多数人都了解审计对网络安全的重要性ÿ…...

9.7 校招 内推 面经
绿泡*泡: neituijunsir 交流裙 ,内推/实习/校招汇总表格 1、校招 | Momenta 2024校招火热进行中!新增招聘岗位(内推) 校招 | Momenta 2024校招火热进行中!新增招聘岗位(内推) 2、…...

【网络编程】IO多路复用
IO多路复用是一种高效的I/O处理方式,它允许单个进程能够同时监视多个文件描述符(sockets、文件等),并在其中任何一个文件描述符准备好进行I/O操作时进行处理。它的核心在于使用少量的线程或进程来管理多个I/O操作,以提…...

MySQL与postgreSQL数据库的区别
MySQL 是一个流行的开源关系型数据库管理系统,具有以下优势: 开源和免费:MySQL 是一个开源软件,允许用户免费下载、使用和修改。它的免费版本(Community Edition)提供了广泛的功能,适用于大多数…...

单片机电子元器件-按键
电子元器件 按键上有 四个引脚 1 2 、 3 4 按下之后 导通 1 3 、 2 4 初始导通 通常按键开关为机械弹性开关,开关在闭合不会马上稳定的接通,会有一连串的抖动 抖动时间的长短有机械特性来决定的,一般为5ms 到10 ms 。 消抖的分类 硬件消…...

Nacos docker实现nacos高可用集群项目
目录 Nacos是什么? Nacos在公司里的运用是什么? 使用docker构建nacos容器高可用集群 实验规划图:编辑 1、拉取nacos镜像 2、创建docker网桥(实现集群内的机器的互联互通(所有的nacos和mysql)&#x…...

基于Dubbo实现服务的远程调用
目录 前言 RPC思想 为什么使用Dubbo Dubbo技术框架 编辑 调用关系流程 基础实现 A.提供统一业务Api B.编辑服务提供者Product B.a 添加依赖 B.b 添加Dubbo 配置(基于yaml配置文件) B.c 编写并暴露服务 C.编辑服务消费者 C.a 添加依赖 C.b 添加Dubbo配置 C.c 引用…...

Redis事务的理解
介绍 Redis通过MULTI、EXEC、WATCH等命令来实现事务功能。 事务提供了一种将多个命令请求打包,然后一次性、按照顺序地执行多个命令的机制,并且在事务执行期间,服务器不会因为其他客户端请求而中断事务的执行功能,他会将事务中的…...

PostgreSQL安装异常,服务无法启动导致创建服务器超时
win上安装pg后无法创建服务器,提示创建超时,发现服务列表里面pg15服务 并没有启动,启动服务器发现服务不了,截图忘记截了,复现不了,解决方法是 换个身份,然后继续启动,然后就可以在…...

汽车电子系统网络安全解决方案
声明 本文是学习GB-T 38628-2020 信息安全技术 汽车电子系统网络安全指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 汽车电子系统网络安全范围 本标准给出了汽车电子系统网络安全活动框架,以及在此框架下的汽车电子系统网络安全活动…...

切片机制和MR工作机制
切片机制 默认的切片大小和块大小一致,切片的个数决定了MapTask的个数。 数据倾斜问题:如果某个切片的大小太小,会浪费了MapTask申请的CPU资源。 如果剩余数据长度大于128*1.1, 就切片成2份,否则就不进行切分了。 InputFormat基…...

【postgresql 基础入门】基础架构和命名空间层次,查看数据库对象再也不迷路
postgresql 基础架构 专栏内容: postgresql内核源码分析手写数据库toadb并发编程 开源贡献: toadb开源库 个人主页:我的主页 管理社区:开源数据库 座右铭:天行健,君子以自强不息;地势坤&…...

是的,决定放弃算法去机器学习了
可是梦想啊!~她永存心间!!! 我啊~本是执着于这些算法的怪咖,梦想是icpc,ccpc~ 可是啊~ 在以后的科研和工作中,这些算法很多都是用不到的,学习算法更重要的目的是锻炼编程能力和分析…...

Python 03(循环语句)
Python03(循环语句) 文章目录 Python03(循环语句)一、while语句二、while实现猜数字三、while循环的嵌套while循环嵌套实例需求: 四、for循环1、什么 是for循环2、语法3、执行流程4、for循环的基本使用5、range()函数6…...

安科瑞铁塔基站能耗监控解决方案
安科瑞 华楠 1 背景概述 5G发展,基站先行。5G基站的选址建设,是保证5G信号覆盖的基础,因此5G基站建设是5G产业布局的一部分,也是5G成熟的基础。 2G、3G、4G均是低频段信号传输,宏基站几乎能应付所有的信号覆盖。但由…...

操作系统-线程复用
操作系统执行线程复用的过程涉及到线程调度和管理。线程复用是指操作系统能够有效地重用现有的线程来执行新的任务,而不必每次都创建新线程。这有助于减少线程创建和销毁的开销,提高系统性能。下面是操作系统如何执行线程复用的关键步骤: 线程…...

通达信自定义副图行业指标K线指标 HYZS_QD
行业指数:HY_INDEXC,NODRAW; DRAWKLINE(HY_INDEXH,HY_INDEXO,HY_INDEXL,HY_INDEXC); MA5:MA(HY_INDEXC,5),COLORWHITE; {MA10:MA(HY_INDEXC,10),COLORYELLOW,LINETHICK2}; DRAWTEXT_FIX(1,1,1,1,STRCAT(STRCAT(CON2STR(HY_INDEXADV,0),/),STRCAT(CON2STR(HY_INDEXDEC,0), ))),…...

MDK-Keil AC6 Compiler屏蔽特定警告
最近在使用STM32CubeMX生成MDK工程是,使用了 AC6 版本的编译器进行编译代码,然后发现了一些警告,但是在 AC5 版本下编译又正常。于是研究了下怎么屏蔽特定的警告,这里记录一下。 1. Keil AC6屏蔽特定警告 遇到的警告如下&#x…...

计算机网络的故事——了解Web及网络基础
了解Web及网络基础 文章目录 了解Web及网络基础一、使用 HTTP 协议访问 Web二、HTTP 的诞生三、网络基础 TCP/IP四、与 HTTP 关系密切的协议 : IP、TCP 和 DNS 一、使用 HTTP 协议访问 Web 根据Web浏览器指定的URL,从对应的服务器中获取文件资源,从而显…...

[系统安全] 五十三.DataCon竞赛 (2)2022年DataCon涉网分析之恶意样本IOC自动化提取详解
您可能之前看到过我写的类似文章,为什么还要重复撰写呢?只是想更好地帮助初学者了解病毒逆向分析和系统安全,更加成体系且不破坏之前的系列。因此,我重新开设了这个专栏,准备系统整理和深入学习系统安全、逆向分析和恶意代码检测,“系统安全”系列文章会更加聚焦,更加系…...

自动驾驶——估计预瞄轨迹YawRate
1.Introduction 在ADAS控制系统中,通常根据预瞄距离x去估计横向距离y,有如下关系: y a0 a1 x a2 * x^2 a3 * x^3 ,那么现在有个需求,希望根据上述x和y的关系,去估计规划预瞄轨迹yawRate 2.How to es…...

PMP证书考下来要多少费用?
PMP考试共有三项费用:分为考前费用、考后费用和续证费用。 第一项是考前费用: 1、培训费用,在英文报名时需要填写培训公司名称和35学时的培训证明。一般的培训公司收费不一,有些公司大概是三千元左右,而有些公司可能…...

C动态分配
动态分布与静态发布: 静态分配 1、 在程序编译或运行过程中,按事先规定大小分配内存空间的分配方式。int a [10] 2、 必须事先知道所需空间的大小。 3、 分配在栈区或全局变量区,一般以数组的形式。 4、 按计划分配。 动态分配 1、在程序运…...