当前位置: 首页 > news >正文

[华为北向网管NCE开发教程(6)消息订阅

1.作用

之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。

2.场景

所支持订阅的场景有如下,以告警通知为例,当我们订阅告警通知以后,如果NCE网管有告警通知产生以后,就会给订阅的人发送一个通知(也就是实时告警推送)。那么我们就可以接收到如下的通知。

2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160140.0Z, X.733::EventType=securityAlarm, emsTime=20240605160142.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784751, isClearable=true, affectedTPList=21}
2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160147.0Z, X.733::EventType=securityAlarm, emsTime=20240605160149.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784752, isClearable=true, affectedTPList=21}
2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160155.0Z, X.733::EventType=securityAlarm, emsTime=20240605160156.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784753, isClearable=true, affectedTPList=21}
2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160202.0Z, X.733::EventType=securityAlarm, emsTime=20240605160203.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784755, isClearable=true, affectedTPList=21}
2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160213.0Z, X.733::EventType=securityAlarm, emsTime=20240605160214.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784756, isClearable=true, affectedTPList=21}

同理,如果我们订阅了文件传输状态通知,当存在文件传输完成的时候会收到如下通知,通知信息中包含了,文件传输完成后,文件的存储地址。

2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786334, fileName=pm/sdh/0605-0606/3145740.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786335, fileName=pm/sdh/0605-0606/3145734.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786336, fileName=pm/sdh/0605-0606/3145739.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
通知类型说明
NT_ALARM告警通知
NT_ALARM_UPDATED告警更新通知
NT_TCA性能越限告警通知
NT_OBJECT_CREATION对象创建通知
NT_OBJECT_DELETION对象删除通知
NT_ATTRIBUTE_VALUE_CHANGE属性改变通知
NT_STATE_CHANGE状态改变通知
NT_ROUTE_CHANGE路由改变通知
NT_PROTECTION_SWITCH保护倒换通知
NT_FILE_TRANSFER_STATUS文件传输状态通知
NT_EPROTECTION_SWITCH设备保护倒换通知事件
NT_ASON_RESOURCE_CHANGE智能资源改变通知
NT_PRBSTEST_STATUS伪随机码测试状态通知
NT_WDMPROTECTION_SWITCH波分保护倒换通知
NT_ATMPROTECTION_SWITCH ATM保护倒换通知
NT_RPRPROTECTION_SWITCH RPR保护组倒换通知事件格式
NT_IPPROTECTION_SWITCH Tunnel保护组倒换通知事件格式

3.如何开订阅(SpringBoot为例)

3.1登录NCE

3.1.1CorbaLoginReq

配置文件的登录参数如下

huawei: nce: login: corba:host: 127.0.0.1port: 12001userName: 111111passWord: 111111

配置文件参数注入Spring Bean

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import lombok.Data;@Data
@SpringBootConfiguration
@ConfigurationProperties(prefix = "huawei.nce.login.corba")
public class CorbaLoginReq {private String host;private String port;private String userName;private String passWord;
}

3.1.2CorbaLoginRes

登录返回参数

import org.omg.DynamicAny.DynAnyFactory;import lombok.Data;
import mtnm.tmforum.org.emsSession.EmsSession_I;@Data
public class CorbaLoginRes {private org.omg.CORBA.ORB orb;private org.omg.PortableServer.POA rootPOA ;private EmsSession_I emsSession;private DynAnyFactory dynAnyFactory;
}

3.1.3TANmsSession_IImpl

import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.session.Session_I;
/*** NmsSession_IPOA for EMS(NCE) invoking. * @author**/
public class TANmsSession_IImpl extends NmsSession_IPOA {public void eventLossCleared(String endTime) {log("TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).");log("endTime:"+endTime);}public void eventLossOccurred(String startTime, String notificationId) {log("TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.");log("startTime:"+startTime+", notificationId:"+notificationId);}public Session_I associatedSession() {log("TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).");return null;}public void endSession() {log("TANmsSession_IImpl.endSession() is invoked by EMS(NCE).");}public void ping() {log("TANmsSession_IImpl.ping() is invoked by EMS(NCE).");}private static void log(String str){System.out.println(str);}
}

3.1.4BaseCorbaService

public interface BaseCorbaService {/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/CorbaLoginRes login();/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/void clearLogin();
}
import java.util.Arrays;
import java.util.List;import org.omg.CosNaming.NameComponent;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import com.collect.sdh.module.corba.entity.CorbaLoginReq;
import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
import com.collect.sdh.module.corba.service.BaseCorbaService;import mtnm.tmforum.org.common.Common_IHolder;
import mtnm.tmforum.org.emsMgr.EMSMgr_I;
import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
import mtnm.tmforum.org.emsSession.EmsSession_I;
import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
import mtnm.tmforum.org.managedElement.ManagedElement_T;
import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
import mtnm.tmforum.org.nmsSession.NmsSession_I;
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;@Service
public class BaseCorbaServiceImpl implements BaseCorbaService {@Autowiredprivate CorbaLoginReq loginReq;private CorbaLoginRes login;/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/@Overridepublic void clearLogin() {login = null;}/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/@Overridepublic CorbaLoginRes login() {if(login != null) {/*本应该检测登录是否可用,如果可用,则返回登录信息,不可用则重新登录,(不知道是否可以使用emsSession.ping()来判断)但是没找到华为有这个接口,因此如果出现不可抗力因素导致登录无效,例如网络中断则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录*/	return login;}try {login = new CorbaLoginRes();String[] argv = new String[2];argv[0] = "-ORBInitRef";argv[1] = "NameService=corbaloc::" + loginReq.getHost() + ":" + loginReq.getPort() + "/NameService";org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);org.omg.PortableServer.POA rootPOA = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));rootPOA.the_POAManager().activate();DynAnyFactory dynAnyFactory = DynAnyFactoryHelper.narrow(orb.resolve_initial_references("DynAnyFactory"));org.omg.CosNaming.NamingContextExt nc = org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));org.omg.CosNaming.NameComponent[] name;name = new NameComponent[5];name[0] = new NameComponent("TMF_MTNM", "Class");name[1] = new NameComponent("HUAWEI", "Vendor");name[2] = new NameComponent("Huawei/NCE", "EmsInstance");name[3] = new NameComponent("2.0", "Version");name[4] = new NameComponent("Huawei/NCE", "EmsSessionFactory_I");EmsSessionFactory_I emsSessionFactory = EmsSessionFactory_IHelper.narrow(nc.resolve(name));NmsSession_IPOA pNmsSessionServant = new TANmsSession_IImpl();NmsSession_I nmsSession = pNmsSessionServant._this(orb);EmsSession_IHolder emsSessionInterfaceHolder = new EmsSession_IHolder();emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);EmsSession_I emsSession = emsSessionInterfaceHolder.value;login.setDynAnyFactory(dynAnyFactory);login.setOrb(orb);login.setRootPOA(rootPOA);login.setEmsSession(emsSession);return login;} catch (Exception e) {e.printStackTrace();return null;}}
}

3.2定制通知

3.2.1ConsumerNotice

需要实现接口:org.omg.CosNotifyComm.StructuredPushConsumerPOA

import java.util.HashMap;
import java.util.Map;import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.AnyUtil;import lombok.extern.log4j.Log4j2;/*** @description:消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:57:26*/
@Log4j2
public class ConsumerNotice extends StructuredPushConsumerPOA{private CorbaLoginRes loginRes;public ConsumerNotice(CorbaLoginRes loginRes) {super();this.loginRes = loginRes;}private static Map<String, String> noticeTypes = new HashMap<>();static {noticeTypes.put("NT_ALARM", "告警通知");noticeTypes.put("NT_ALARM_UPDATED", "告警更新通知");noticeTypes.put("NT_TCA", "性能越限告警通知");noticeTypes.put("NT_OBJECT_CREATION", "对象创建通知");noticeTypes.put("NT_OBJECT_DELETION", "对象删除通知");noticeTypes.put("NT_ATTRIBUTE_VALUE_CHANGE", "属性改变通知");noticeTypes.put("NT_STATE_CHANGE", "状态改变通知");noticeTypes.put("NT_ROUTE_CHANGE", "路由改变通知");noticeTypes.put("NT_PROTECTION_SWITCH", "保护倒换通知");noticeTypes.put("NT_FILE_TRANSFER_STATUS", "文件传输状态通知");noticeTypes.put("NT_EPROTECTION_SWITCH", "设备保护倒换通知事件");noticeTypes.put("NT_ASON_RESOURCE_CHANGE", "智能资源改变通知");noticeTypes.put("NT_PRBSTEST_STATUS", "伪随机码测试状态通知");noticeTypes.put("NT_WDMPROTECTION_SWITCH", "波分保护倒换通知");noticeTypes.put("NT_ATMPROTECTION_SWITCH", "ATM保护倒换通知");noticeTypes.put("NT_RPRPROTECTION_SWITCH", "RPR保护组倒换通知事件格式");noticeTypes.put("NT_IPPROTECTION_SWITCH", "Tunnel保护组倒换通知事件格式");}@Overridepublic void disconnect_structured_push_consumer() {log.info("Consumer disconnect_structured_push_consumer");}@Overridepublic void push_structured_event(StructuredEvent event) throws Disconnected {String eventType = event.header.fixed_header.event_type.type_name;Map<String, Object> eventData = new HashMap<>(event.filterable_data.length);for (int i = 0; i < event.filterable_data.length; i++) {if (!ObjectUtils.isEmpty(event.filterable_data[i])) {eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));}}log.info("收到事件通知:{}<{}>,通知参数:{}",eventType, noticeTypes.get(eventType), eventData);}@Overridepublic void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {}}

3.2.2AnyUtil

用于解析返回的信息。

import org.omg.CORBA.Any;
import org.omg.CORBA.TCKind;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynArray;
import org.omg.DynamicAny.DynEnum;
import org.omg.DynamicAny.DynSequence;
import org.omg.DynamicAny.DynStruct;
import org.omg.DynamicAny.DynUnion;/*** @description:org.omg.DynamicAny格式化工具* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:33:17*/
public class AnyUtil {/*** @description:格式化数据* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:34:17*/public static String parseAny(Any any, DynAnyFactory factory){if( null==any ){return null;}StringBuilder result = new StringBuilder();try {switch (any.type().kind().value()) {case TCKind._tk_char:result.append(any.extract_char());break;case TCKind._tk_null:break;case TCKind._tk_boolean:result.append(any.extract_boolean());break;case TCKind._tk_short:result.append(any.extract_short());break;case TCKind._tk_long:result.append(any.extract_long());break;case TCKind._tk_double:result.append(any.extract_double());break;case TCKind._tk_float:result.append(any.extract_float());break;case TCKind._tk_octet:result.append(any.extract_octet());break;case TCKind._tk_ulong:result.append(any.extract_ulong());break;case TCKind._tk_string:result.append(any.extract_string());break;case TCKind._tk_enum:{DynEnum dynEnum = (DynEnum) factory.create_dyn_any(any);result.append(dynEnum.get_as_string());break;}case TCKind._tk_any:{any=factory.create_dyn_any(any).get_any();result.append(any);break;}case TCKind._tk_objref:{result.append(any.extract_Object());break;}case TCKind._tk_struct:case TCKind._tk_except:{DynStruct dynstruct = (DynStruct) factory.create_dyn_any(any);org.omg.DynamicAny.NameValuePair[] members = dynstruct.get_members();result.append("{");for (int i = 0; i < members.length; i++) {if(i>0){result.append(" ");}result.append(members[i].id).append(" ").append(parseAny(members[i].value, factory));}result.append("}");break;}case TCKind._tk_union:DynUnion dynunion = (DynUnion) factory.create_dyn_any(any);result.append(dynunion.member_name()).append(" ");result.append(parseAny(dynunion.member().to_any(), factory));break;case TCKind._tk_sequence:DynSequence dynseq = (DynSequence) factory.create_dyn_any(any);Any[] contents = dynseq.get_elements();result.append("{");for (int i = 0; i < contents.length; i++){result.append(parseAny(contents[i], factory));}result.append("}");break;case TCKind._tk_array:DynArray dynarray = (DynArray) factory.create_dyn_any(any);Any[] arrayContents = dynarray.get_elements();result.append("{");for (int i = 0; i < arrayContents.length; i++){result.append(parseAny(arrayContents[i], factory)).append("");}result.append("}");break;default:result.append(any.type().kind().value());}} catch (Exception ex) {ex.printStackTrace();}return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));}
}

3.3订阅通知

SubscribeNotice 实现 Runnable,即订阅的时候,另起一个线程来订阅。该线程负责订阅。

3.3.1SubscribeNotice

import org.omg.CORBA.IntHolder;
import org.omg.CORBA.Object;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.JsonUtils;import lombok.extern.log4j.Log4j2;/*** @description:订阅消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午9:33:18*/
@Log4j2
public class SubscribeNotice implements Runnable{/*** 登录corba成功后的参数*/private CorbaLoginRes loginRes;/*** 记录订阅通知的通道ID的存储文件地址*/private String poxyIdPath;public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {super();this.loginRes = loginRes;this.poxyIdPath = poxyIdPath;}@Overridepublic void run() {try {//获取通道IntHolder poxyId = new IntHolder();poxyId.value = getPoxyId(poxyIdPath);EventChannelHolder eventChannel = new EventChannelHolder();loginRes.getEmsSession().getEventChannel(eventChannel);//ConsumerNotice extends StructuredPushConsumerPOA 为消费者ConsumerNotice consumerNotice = new ConsumerNotice(loginRes);ConsumerAdmin defaultConsumerAdmin = eventChannel.value.default_consumer_admin();//连接通道,如果发现通道已经打开,则先关闭之前的通道(已经打开的通道即使不可以,北向接口并未释放该接口的资源,但是会限制连接通道(数量 < 3))try {if (poxyId.value > 0){log.info("释放旧的消费通道:{}", poxyId.value);ProxySupplier oldSupplier = defaultConsumerAdmin.get_proxy_supplier(poxyId.value);assert (oldSupplier != null);StructuredProxyPushSupplier myOldPoxy = StructuredProxyPushSupplierHelper.narrow(oldSupplier);myOldPoxy.disconnect_structured_push_supplier();}}catch (Exception e) {e.printStackTrace();}ProxySupplier tmpSupplier = defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);StructuredProxyPushSupplier proxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(tmpSupplier);Object servant = loginRes.getRootPOA().servant_to_reference(consumerNotice);proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));savePoxyId(poxyIdPath, poxyId.value);log.info("保存此次的消费通道:{}", poxyId.value);loginRes.getOrb().run();} catch (Exception e) {e.printStackTrace();}}/*** @description:获取已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:40:57*/public int getPoxyId(String path) {int poxyId = -1;//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库String str = JsonUtils.readStringFromSystemPath(path);if(!ObjectUtils.isEmpty(str)) {poxyId = Integer.parseInt(str);}return poxyId;}/*** @description:保存已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:41:33*/public void savePoxyId(String path, int poxyId) {//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));}
}

3.3.2JsonUtils

为了保证代码完整性,如果你完全抄上面的代码,这里提供了代码需要的两个文件操作示例

public static String readStringFromSystemPath(String path) {String data = "";try {InputStream inputStream = new FileInputStream(path);byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);data = new String(bdata, StandardCharsets.UTF_8);} catch (FileNotFoundException e) {log.info("文件不存在,文件地址:{}", path);} catch (Exception e) {log.info("读取文件失败,文件地址:{},失败原因:{}", path,e.getMessage());} return data;}public static void writeStringToSystemPath(String filePath, String str) {Writer write = null;try {File file = new File(filePath);if(file.exists()) {file.delete();}if (!file.getParentFile().exists()) {file.getParentFile().mkdirs();}if(file.createNewFile()) {write = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);write.write(str);write.flush();}} catch (Exception e) {e.printStackTrace();} finally {if(write !=null ) {try {write.close();} catch (IOException e) {e.printStackTrace();}}}}

3.4启动订阅

这里我们使用SpringBoot启动的时候启动订阅,即实现ApplicationRunner,然后使用线程池的单线程来启动上面我们编写的线程。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.service.BaseCorbaService;/*** @description:启动订阅corba的消费* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 下午4:18:16*/
@Component
public class SubscribeRunner implements ApplicationRunner  {@Value(value = "${file-save-path}")private String poxyIdPath;@Autowiredprivate BaseCorbaService baseCorbaService;@Overridepublic void run(ApplicationArguments args) throws Exception {poxyIdPath = poxyIdPath + "poxyId";CorbaLoginRes login = baseCorbaService.login();ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(new SubscribeNotice(login, poxyIdPath));}
}

4.效果展示

在这里插入图片描述

相关文章:

[华为北向网管NCE开发教程(6)消息订阅

1.作用 之前介绍的都是我们向网管NCE发起请求获取数据&#xff0c;消息订阅则反过来&#xff0c;是网管NCE系统给我们推送信息。其原理和MQ&#xff0c;JMS这些差不多&#xff0c;这里不过多累述。 2.场景 所支持订阅的场景有如下&#xff0c;以告警通知为例&#xff0c;当我…...

2024.6.15 英语六级 经验与复盘

文章目录 英语六级 经验与复盘2024年上半年六级考试(2024 6.8 - 6.15)前情提要&#xff1a;经验&#xff1a;作文&#xff1a;(30min)听力&#xff1a;(25min)SectionC(精细阅读) (30min)SectionB(段落匹配) (15min)SectionA(选词填空) (5min / 舍弃)翻译&#xff08;20min&…...

计算机专业的未来展望

身份角度一&#xff1a;一名曾经的计算机专业学生  作为一名曾经的计算机专业学生&#xff0c;我认为计算机相关专业仍然是一个值得考虑的选择。随着科技的飞速发展&#xff0c;计算机行业的需求只会越来越高&#xff0c;因此&#xff0c;无论是在就业前景还是个人发展方面&a…...

Shell变量的高级用法

在Shell编程中&#xff0c;变量的使用是至关重要的。初学者可能只使用最基本的变量赋值和调用&#xff0c;但Shell变量实际上有很多高级用法&#xff0c;可以极大地提升脚本的灵活性和效率。本文将介绍几种Shell变量的高级用法&#xff0c;帮助您更好地利用Shell脚本。 1. 参数…...

【Python/Pytorch - 网络模型】-- SVD算法

文章目录 文章目录 00 写在前面01 基于Pytorch版本的SVD算代码02 理论知识 00 写在前面 &#xff08;1&#xff09;矩阵的奇异值分解在最优化问题、特征值问题、最小二乘方问题、广义逆矩阵问题及统计学等方面都有重要应用&#xff1b; &#xff08;2&#xff09;应用&#…...

全光万兆时代来临:信而泰如何推动F5G-A(50PONFTTR)技术发展

技术背景 F5G-A&#xff08;Fifth Generation Fixed Network-Advanced&#xff0c;第五代固定网络接入&#xff09;是固定网络技术的一次重大升级&#xff0c;代表了光纤网络技术的最新发展。F5G-A旨在提供更高的带宽、更低的延迟、更可靠的连接以及更广泛的应用场景。 F5G-A六…...

港科夜闻 | 香港科大与香港科大(广州)合推红鸟跨校园学习计划,共享教学资源,促进港穗学生交流学习...

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大与香港科大(广州)合推“红鸟跨校园学习计划”&#xff0c;共享教学资源&#xff0c;促进港穗学生交流学习。香港科大与香港科大(广州)6月14日共同宣布推出“红鸟跨校园学习计划”&#xff0c;以进一步加强两校学…...

基于Wireshark实现对FTP的抓包分析

基于Wireshark实现对FTP的抓包分析 前言一、虚拟机Win10环境配置二、FileZilla客户端的安装配置下载FileZilla客户端安装FileZilla 三、FileZilla Server安装下载FileZilla Server安装 四、实现对FTP的抓包前置工作实现抓包完成抓包 前言 推荐一个网站给想要了解或者学习人工智…...

Vue54-浏览器的本地存储webStorage

一、本地存储localStorage的作用 二、本地存储的代码实现 2-1、存储数据 注意&#xff1a; localStorage是window上的函数&#xff0c;所以&#xff0c;可以把window.localStorage直接写成localStorage&#xff08;直接调用&#xff01;&#xff09; 默认调了p.toString()方…...

Linux下Shell脚本基础知识

主要参考视频&#xff1a; 这可能是B站讲的最好的Linux Shell脚本教程&#xff0c;3h打通Linux-shell全套教程&#xff0c;从入门到精通完整版_哔哩哔哩_bilibili 主要参考文档&#xff1a; Shell 教程 | 菜鸟教程 (runoob.com) Bash Shell教程 (yiibai.com) 先用视频入门&…...

爬虫初学篇——看完这些还怕自己入门不了?

初次学习爬虫&#xff0c;知识笔记小分享 学scrapy框架可看&#xff1a;孤寒者博主的【Python爬虫必备—&#xff1e;Scrapy框架快速入门篇——上】 目录&#x1f31f; 一、&#x1f349;基础知识二、&#x1f349;http协议&#xff1a;三、&#x1f349;解析网页(1) xpath的用…...

[数据集][目标检测]减速区域检测数据集VOC+YOLO格式1654张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1654 标注数量(xml文件个数)&#xff1a;1654 标注数量(txt文件个数)&#xff1a;1654 标注…...

OpenGL3.3_C++_Windows(8)

材质&&漫反射&#xff0c;光照贴图 使用struct为材质建立结构体&#xff0c;以便方便管理漫反射贴图是物体的颜色值&#xff08;纹理&#xff09;&#xff08;通过 UV 坐标映射到渲染物体的表面&#xff09;&#xff0c;材质是物体的属性&#xff08;物体对光的交互&a…...

GPU的工作原理

location: Beijing 1. why is GPU CPU的存储单元和计算单元的互通过慢直接促进了GPU的发展 先介绍一个概念&#xff1a;FLOPS&#xff08;Floating Point Operations Per Second&#xff0c;浮点运算每秒&#xff09;是一个衡量其执行浮点运算的能力&#xff0c;可以作为计算…...

Linux常⽤服务器构建-samba

目录 1. 介绍 2. 安装 3. 配置 3.1 创建存放共享⽂件的路径 3.2 创建samba账户 4 重启samba 5. 访问共享⽂件 5.1 mac下访问⽅式 5.2 windows下访问⽅式 1. 介绍 Samba 是在 Linux 和 UNIX 系统上实现 SMB 协议的⼀个免费软件&#xff0c;能够完成在 windows 、 mac 操作系统…...

【Java】已解决java.lang.UnsupportedOperationException异常

文章目录 问题背景可能出错的原因错误代码示例正确代码示例注意事项 已解决java.lang.UnsupportedOperationException异常 在Java编程中&#xff0c;java.lang.UnsupportedOperationException是一个运行时异常&#xff0c;通常表示尝试执行一个不支持的操作。这种异常经常发生…...

在ubuntu中恢复误删除的文件

1、安装 TestDisk 在 Ubuntu 上&#xff0c;可以使用以下命令安装 TestDisk&#xff1a; sudo apt-get install testdisk2、查询你删除的文件所在那个分区 #查询分区 df -h #我这里是/dev/sda2 #也可以使用下面命令查看具体哪个分区 lsblk3、查询该分区是什么系统类型 sudo …...

Sklearn中逻辑回归建模

分类模型的评估 回归模型的评估方法&#xff0c;主要有均方误差MSE&#xff0c;R方得分等指标&#xff0c;在分类模型中&#xff0c;我们主要应用的是准确率这个评估指标&#xff0c;除此之外&#xff0c;常用的二分类模型的模型评估指标还有召回率&#xff08;Recall&#xff…...

【ARM】MDK出现报错error: A\L3903U的解决方法

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决MDK出现报错error: A\L3903U这样类型的报错 2、 问题场景 电脑或者软件因为意外情况导致崩溃&#xff0c;无法正常关闭&#xff0c;强制电脑重启之后&#xff0c;打开工程去编译出现下面的报错信息&#xff08;…...

0018__字体的kerning是什么意思

泰山OFFICE技术讲座&#xff1a;字体的kerning是什么意思-CSDN博客 了解CSS属性font-kerning,font-smoothing,font-variant-CSDN博客...

LORA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS

文章汇总 总体来看像是一种带权重的残差&#xff0c;但解决的如何高效问题的事情。 相比模型的全微调&#xff0c;作者提出固定预训练模型参数不变&#xff0c;在原本权重矩阵旁路添加低秩矩阵的乘积作为可训练参数&#xff0c;用以模拟参数的变化量。 模型架构 h W 0 x △…...

cmake、make、makefile、ninga的关系

CMake是一种跨平台的构建系统&#xff0c;它用来管理软件的编译过程。CMake可以生成本地平台特定的构建文件&#xff0c;例如Makefile或者Microsoft Visual Studio项目文件&#xff0c;以便开发人员更轻松地在不同的平台上构建他们的项目。它的主要功能是配置和生成构建脚本&am…...

StarRocks详解

什么是StarRocks&#xff1f; StarRocks是新一代极速全场景MPP数据库&#xff08;高并发数据库&#xff09;。 StarRocks充分吸收关系型OLAP数据库和分布式存储系统在大数据时代的优秀研究成果。 1.可以在Spark和Flink里面处理数据&#xff0c;然后将处理完的数据写到StarRo…...

【C语言】进程间通信之管道pipe

进程间通信之管道pipe 一、进程间通信管道pipe()管道的读写行为 最后 一、进程间通信 管道pipe() 管道pipe也称为匿名管道&#xff0c;只有在有血缘关系的进程间进行通信。管道的本质就是一块内核缓冲区。 进程间通过管道的一端写&#xff0c;通过管道的另一端读。管道的读端…...

03.VisionMaster 机器视觉 位置修正 工具

VisionMaster 机器视觉 位置修正 工具 官方解释&#xff1a;位置修正是一个辅助定位、修正目标运动偏移、辅助精准定位的工具。可以根据模板匹配结果中的匹配点和匹配框角度建立位置偏移的基准&#xff0c;然后再根据特征匹配结果中的运行点和基准点的相对位置偏移实现ROI检测…...

Oracle 是否扼杀了开源 MySQL

Oracle 是否无意中扼杀了开源 MySQL Peter Zaitsev是一位俄罗斯软件工程师和企业家&#xff0c;曾在MySQL公司担任性能工程师。大约15年前&#xff0c;当甲骨文收购Sun公司并随后收购MySQL时&#xff0c;有很多关于甲骨文何时“杀死MySQL”的讨论。他曾为甲骨文进行辩护&#…...

机器学习归一化特征编码

特征缩放 因为对于大多数的机器学习算法和优化算法来说&#xff0c;将特征值缩放到相同区间可以使得获取性能更好的模型。就梯度下降算法而言&#xff0c;例如有两个不同的特征&#xff0c;第一个特征的取值范围为1——10&#xff0c;第二个特征的取值范围为1——10000。在梯度…...

抛光粉尘可爆性检测 打磨粉尘喷砂粉尘爆炸下限测试

抛光粉尘可爆性检测 抛光粉尘的可爆性检测是一种安全性能测试&#xff0c;用于确定加工过程中产生的粉尘在特定条件下是否会爆炸&#xff0c;从而对生产安全构成威胁。如果粉尘具有可爆性&#xff0c;那么在生产环境中就需要采取相应的防爆措施。粉尘爆炸的条件通常包括粉尘本身…...

python14 字典类型

字典类型 键值对方式&#xff0c;可变数据类型&#xff0c;所以有增删改功能 声明方式1 {} 大括号&#xff0c;示例 d {key1 : value1, key2 : value2, key3 : value3 ....} 声明方式2 使用内置函数 dict() 创建1)通过映射函数创建字典zip(list1,list2) 继承了序列的所有操作 …...

深入了解 .url文件中的 Prop3属性

在使用 Windows 操作系统时&#xff0c;我们经常会遇到以 .url 结尾的文件&#xff0c;它们通常被用来快速访问互联网上的特定网页。这些文件虽然看起来简单&#xff0c;但其中包含的 Prop3 属性却有其特殊的作用和意义。 1. Prop3 是什么&#xff1f; 在 .url 文件中&#x…...

苏州企业网站建设服务中心/东莞搜索排名提升

为什么80%的码农都做不了架构师&#xff1f;>>> Mybatis sql改写 近期有项目需要&#xff0c;需要把update转化成insert&#xff0c;网上搜索了下发现mybatis的拦截器可以实现该功能。 一、mybatis拦截器 实现一个拦截器拦截所有update方法 import java.sql.SQLExc…...

网站如何做吸引人的项目/汕头seo托管

书上的代码已完全不可参考&#xff0c;只好按知识点从网上查资料一个一个实例 了。 <!DOCTYPE html> <html> <head><title>ExtJs</title><meta http-equiv"Content-Type" content"text/html; charsetutf-8"/><link…...

wordpress评论无法/购物网站

在工作中经常看某一个项目的日志进行分析问题,一般都是日志文件最后多少行有需要的日志当然也需要实时查看日志的最后的多少行,那么这篇就把命令记录一下 临时准备了一个文件来演示 seq 20 > nginx.log1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 查看最后10行记…...

导柱导套网站建设/精准的搜索引擎优化

在 Windows系统下的 可执行文件的一种&#xff08;还有 NE、 LE&#xff09;&#xff0c;是 微软设计、 TIS&#xff08;Tool Interface Standard,工具接口标准&#xff09;委员会批准的一种可执行文件格式。PE的意思是Portable Executable&#xff08;可移植可执行&#xff09…...

搜索别人的网站是带logo的请问怎么做的/南京响应式网站建设

有 时当 使用 一个 包含 多 个 参 数 的 方 法 时 &#xff0c; 由 于参 数 过 多 会 导 致 可 读 性 严 重 下 降 &#xff0c; 如 &#xff1a; 有 时当 使用 一个 包含 多 个 参 数 的 方 法 时 &#xff0c; 由 于参 数 过 多 会 导 致 可 读 性 严 重 下 降 &#xff0c; …...

谷歌seo详细教学/福州seo排名公司

Properties类 Properties类介绍特点&#xff1a; 1、Hashtable的子类&#xff0c;map集合中的方法都可以用。 2、该集合没有泛型。键值都是字符串。 3、它是一个可以持久化的属性集。键值可以存储到集合中&#xff0c;也可以存储到持久化的设备(硬盘、U盘、光盘)上。键值的来源…...