Flink协调器Coordinator及自定义Operator
Flink协调器Coordinator及自定义Operator
最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。
协调器Coordinator
Flink中的协调器是用来协调运行时的算子,运行在JobManager中,通过事件的方式与算子通信。例如Source和Sink算子中的协调器是用来发现和分配工作或者聚合和提交元数据。
线程模型
所有协调器方法都由作业管理器的主线程(邮箱线程)调用。这意味着这些方法在任何情况下都不得执行阻塞操作(如 I/ O 或等待锁或或Futures)。这很有可能使整个 JobManager 瘫痪。
因此,涉及更复杂操作的协调器应生成线程来处理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地从另一个线程调用,而不是从调用协调器方法的线程调用。
一致性
与调度程序的视图相比,协调器对任务执行的视图高度简化,但允许与在并行子任务上运行的操作员进行一致的交互。具体而言,保证严格按顺序调用以下方法:
- executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任务就绪的时候调用一次。SubtaskGateway是用来与子任务交互的网关。这是与子任务尝试交互的开始。
executionAttemptFailed(int, int, Throwable):在尝试失败或取消后立即调用每个子任务。此时,应停止与子任务尝试的交互。 - subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦调度程序确定了要还原的检查点,这些方法就会通知协调器。前一种方法在发生区域故障/ 恢复(可能影响子任务的子集)时调用,后一种方法在全局故障/ 恢复的情况下调用。此方法应用于确定要恢复的操作,因为它会告诉要回退到哪个检查点。协调器实现需要恢复自还原的检查点以来与相关任务的交互。只有在子任务的所有尝试被调用后 executionAttemptFailed(int, int, Throwable) ,才会调用它。
- executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢复的任务(新尝试)准备就绪后再次调用。这晚于 subtaskReset(int, long),因为在这些方法之间,会计划和部署新的尝试。
接口方法说明
实现自定义的协调器需要实现OperatorCoordinator接口方法,各方法说明如下所示:
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {// ------------------------------------------------------------------------/*** 启动协调器,启动时调用一次当前方法在所有方法之前* 此方法抛出的异常都会导致当前作业失败*/void start() throws Exception;/*** 释放协调器时调用当前方法,此方法应当释放持有的资源* 此方法抛出的异常不会导致作业失败*/@Overridevoid close() throws Exception;// ------------------------------------------------------------------------/*** 处理来自并行算子实例的事件* 此方法抛出的异常会导致作业失败并恢复*/void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)throws Exception;// ------------------------------------------------------------------------/*** 为协调器做checkpoint,将当前协调器中的状态序列化到checkpoint中,执行成功需要调用CompletableFuture的complete方法,失败需要调用CompletableFuture的completeExceptionally方法*/void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)throws Exception;/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the* method.*/@Overridevoid notifyCheckpointComplete(long checkpointId);/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the* method.*/@Overridedefault void notifyCheckpointAborted(long checkpointId) {}/*** 从checkpoint重置当前的协调器*/void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;// ------------------------------------------------------------------------/*** 子任务重置时调用此方法*/void subtaskReset(int subtask, long checkpointId);/*** 子任务失败时调用此方法 */void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);/*** 子任务就绪时调用此方法*/void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}
算子Operator
Flink中执行计算任务的算子,像使用DataStream API时调用的map、flatmap、process传入的自定义函数最终都会封装为一个一个的算子。使用UDF已经能够满足大多数的开发场景,但涉及到与协调器打交道时需要自定义算子,自定义算子相对比较好简单,具体可以参考org.apache.flink.streaming.api.operators.KeyedProcessOperator的实现。
自定义算子需要实现AbstractStreamOperator和OneInputStreamOperator接口方法
实现定时器功能,需要实现Triggerable接口方法
实现处理协调器的事件功能,需要实现OperatorEventHandler接口方法
示例
自定义算子
这里实现一个自定义的算子,用来处理KeyedStream的数据,它能够接受来自协调器的事件,并且能够给协调器发送事件。
MyKeyedProcessOperator实现代码如下:
package com.examples.operator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 自定义的KeyedProcessOperator* @author shirukai*/
public class MyKeyedProcessOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>implements OneInputStreamOperator<IN, OUT>,Triggerable<KEY, VoidNamespace>,OperatorEventHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);private transient TimestampedCollector<OUT> collector;private transient TimerService timerService;private final OperatorEventGateway operatorEventGateway;public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {this.processingTimeService = processingTimeService;this.operatorEventGateway = operatorEventGateway;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);timerService = new SimpleTimerService(internalTimerService);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {LOG.info("processElement: {}", element);collector.setTimestamp(element);// 注册事件时间定时器timerService.registerEventTimeTimer(element.getTimestamp() + 10);// 注册处理时间定时器timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);// 给协调器发送消息operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));// 不做任何处理直接发送到下游collector.collect((OUT) element.getValue());}@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onEventTime: {}", timer);}@Overridepublic void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onProcessingTime: {}", timer);}@Overridepublic void handleOperatorEvent(OperatorEvent evt) {LOG.info("handleOperatorEvent: {}", evt);}
}
算子工厂类MyKeyedProcessOperatorFactory:
package com.examples.operator;import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;/*** 自定义算子工厂类* @author shirukai*/
public class MyKeyedProcessOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN>implements OneInputStreamOperatorFactory<IN, IN>,CoordinatedOperatorFactory<IN>,ProcessingTimeServiceAware {@Overridepublic OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);}@Overridepublic <T extends StreamOperator<IN>> T createStreamOperator(StreamOperatorParameters<IN> parameters) {final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();final OperatorEventGateway gateway =parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);try {final MyKeyedProcessOperator<?, IN, IN> operator = new MyKeyedProcessOperator<>(processingTimeService, gateway);operator.setup(parameters.getContainingTask(),parameters.getStreamConfig(),parameters.getOutput());parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, operator);return (T) operator;} catch (Exception e) {throw new IllegalStateException("Cannot create operator for "+ parameters.getStreamConfig().getOperatorName(),e);}}@Overridepublic Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {return MyKeyedProcessOperator.class;}
}
自定义协调器
协调器执行器线程工厂类CoordinatorExecutorThreadFactory,当前类可以通用,用来创建协调器线程。
package com.examples.coordinator;import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FatalExitExceptionHandler;import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;/*** A thread factory class that provides some helper methods.*/
public class CoordinatorExecutorThreadFactoryimplements ThreadFactory, Thread.UncaughtExceptionHandler {private final String coordinatorThreadName;private final ClassLoader classLoader;private final Thread.UncaughtExceptionHandler errorHandler;@Nullableprivate Thread thread;// TODO discuss if we should fail the job(JM may restart the job later) or directly kill JM// process// Currently we choose to directly kill JM processCoordinatorExecutorThreadFactory(final String coordinatorThreadName, final ClassLoader contextClassLoader) {this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);}@VisibleForTestingCoordinatorExecutorThreadFactory(final String coordinatorThreadName,final ClassLoader contextClassLoader,final Thread.UncaughtExceptionHandler errorHandler) {this.coordinatorThreadName = coordinatorThreadName;this.classLoader = contextClassLoader;this.errorHandler = errorHandler;}@Overridepublic synchronized Thread newThread(Runnable r) {thread = new Thread(r, coordinatorThreadName);thread.setContextClassLoader(classLoader);thread.setUncaughtExceptionHandler(this);return thread;}@Overridepublic synchronized void uncaughtException(Thread t, Throwable e) {errorHandler.uncaughtException(t, e);}public String getCoordinatorThreadName() {return coordinatorThreadName;}boolean isCurrentThreadCoordinatorThread() {return Thread.currentThread() == thread;}
}
协调器上下文CoordinatorContext,当前类可以通用。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.examples.coordinator;import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;/*** A context class for the {@link OperatorCoordinator}.** <p>The context serves a few purposes:** <ul>* <li>Thread model enforcement - The context ensures that all the manipulations to the* coordinator state are handled by the same thread.* </ul>*/
@Internal
public class CoordinatorContext implements AutoCloseable {private static final Logger LOG =LoggerFactory.getLogger(CoordinatorContext.class);private final ScheduledExecutorService coordinatorExecutor;private final ScheduledExecutorService workerExecutor;private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;private final OperatorCoordinator.Context operatorCoordinatorContext;private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;public CoordinatorContext(CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this(Executors.newScheduledThreadPool(1, coordinatorThreadFactory),Executors.newScheduledThreadPool(1,new ExecutorThreadFactory(coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),coordinatorThreadFactory,operatorCoordinatorContext);}public CoordinatorContext(ScheduledExecutorService coordinatorExecutor,ScheduledExecutorService workerExecutor,CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this.coordinatorExecutor = coordinatorExecutor;this.workerExecutor = workerExecutor;this.coordinatorThreadFactory = coordinatorThreadFactory;this.operatorCoordinatorContext = operatorCoordinatorContext;this.subtaskGateways = new HashMap<>(operatorCoordinatorContext.currentParallelism());}@Overridepublic void close() throws InterruptedException {// Close quietly so the closing sequence will be executed completely.ComponentClosingUtils.shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));ComponentClosingUtils.shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));}public void runInCoordinatorThread(Runnable runnable) {coordinatorExecutor.execute(runnable);}// --------- Package private methods for the DynamicCepOperatorCoordinator ------------ClassLoader getUserCodeClassloader() {return this.operatorCoordinatorContext.getUserCodeClassloader();}void subtaskReady(OperatorCoordinator.SubtaskGateway gateway) {final int subtask = gateway.getSubtask();if (subtaskGateways.get(subtask) == null) {subtaskGateways.put(subtask, gateway);} else {throw new IllegalStateException("Already have a subtask gateway for " + subtask);}}void subtaskNotReady(int subtaskIndex) {subtaskGateways.put(subtaskIndex, null);}Set<Integer> getSubtasks() {return subtaskGateways.keySet();}public void sendEventToOperator(int subtaskId, OperatorEvent event) {callInCoordinatorThread(() -> {final OperatorCoordinator.SubtaskGateway gateway =subtaskGateways.get(subtaskId);if (gateway == null) {LOG.warn(String.format("Subtask %d is not ready yet to receive events.",subtaskId));} else {gateway.sendEvent(event);}return null;},String.format("Failed to send event %s to subtask %d", event, subtaskId));}/*** Fail the job with the given cause.** @param cause the cause of the job failure.*/void failJob(Throwable cause) {operatorCoordinatorContext.failJob(cause);}// ---------------- private helper methods -----------------/*** A helper method that delegates the callable to the coordinator thread if the current thread* is not the coordinator thread, otherwise call the callable right away.** @param callable the callable to delegate.*/private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {// Ensure the split assignment is done by the coordinator executor.if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()&& !coordinatorExecutor.isShutdown()) {try {final Callable<V> guardedCallable =() -> {try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Coordinator Executor",t);ExceptionUtils.rethrowException(t);return null;}};return coordinatorExecutor.submit(guardedCallable).get();} catch (InterruptedException | ExecutionException e) {throw new FlinkRuntimeException(errorMessage, e);}}try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Source Coordinator Executor", t);throw new FlinkRuntimeException(errorMessage, t);}}
}
自定义协调器
package com.examples.coordinator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;/*** 自定义协调器* 需要实现 OperatorCoordinator 接口* @author shirukai*/
public class MyKeyedProcessCoordinator implements OperatorCoordinator {private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessCoordinator.class);/*** The name of the operator this RuleDistributorCoordinator is associated with.*/private final String operatorName;private final CoordinatorContext context;private boolean started;public MyKeyedProcessCoordinator(String operatorName, CoordinatorContext context) {this.operatorName = operatorName;this.context = context;}@Overridepublic void start() throws Exception {LOG.info("Starting Coordinator for {}: {}.",this.getClass().getSimpleName(),operatorName);// we mark this as started first, so that we can later distinguish the cases where 'start()'// wasn't called and where 'start()' failed.started = true;runInEventLoop(() -> {LOG.info("Coordinator started.");},"do something for coordinator.");}@Overridepublic void close() throws Exception {}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {LOG.info("Recovering subtask {} to checkpoint {} for operator {} to checkpoint.",subtask,checkpointId,operatorName);runInEventLoop(() -> {},"making event gateway to subtask %d available",subtask);}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {runInEventLoop(() -> {LOG.info("Removing itself after failure for subtask {} of operator {}.",subtask,operatorName);context.subtaskNotReady(subtask);},"handling subtask %d failure",subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {assert subtask == gateway.getSubtask();LOG.debug("Subtask {} of operator {} is ready.", subtask, operatorName);runInEventLoop(() -> {context.subtaskReady(gateway);sendEventToOperator(new MyEvent("hello,I'm from coordinator"));},"making event gateway to subtask %d available",subtask);}private void sendEventToOperator(OperatorEvent event) {for (Integer subtask : context.getSubtasks()) {try {context.sendEventToOperator(subtask, event);} catch (Exception e) {LOG.error("Failed to send OperatorEvent to operator {}",operatorName,e);context.failJob(e);return;}}}private void runInEventLoop(final ThrowingRunnable<Throwable> action,final String actionName,final Object... actionNameFormatParameters) {ensureStarted();context.runInCoordinatorThread(() -> {try {action.run();} catch (Throwable t) {// If we have a JVM critical error, promote it immediately, there is a good// chance the logging or job failing will not succeed any moreExceptionUtils.rethrowIfFatalErrorOrOOM(t);final String actionString =String.format(actionName, actionNameFormatParameters);LOG.error("Uncaught exception in the coordinator for {} while {}. Triggering job failover.",operatorName,actionString,t);context.failJob(t);}});}private void ensureStarted() {if (!started) {throw new IllegalStateException("The coordinator has not started yet.");}}
}
自定义协调器提供器
package com.examples.coordinator;import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;/*** 自定义协调器的提供者** @author shirukai*/
public class MyKeyedProcessCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {private static final long serialVersionUID = 1L;private final String operatorName;public MyKeyedProcessCoordinatorProvider(String operatorName, OperatorID operatorID) {super(operatorID);this.operatorName = operatorName;}@Overrideprotected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {final String coordinatorThreadName = " MyKeyedProcessCoordinator-" + operatorName;CoordinatorExecutorThreadFactory coordinatorThreadFactory =new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader());CoordinatorContext coordinatorContext =new CoordinatorContext(coordinatorThreadFactory, context);return new MyKeyedProcessCoordinator(operatorName, coordinatorContext);}
}
执行测试
package com.examples;import com.examples.operator.MyKeyedProcessOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author shirukai*/
public class CoordinatorExamples {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<MyData> source = env.fromElements(new MyData(1, 1.0), new MyData(2, 2.0), new MyData(1, 3.0));MyKeyedProcessOperatorFactory<MyData> operatorFactory = new MyKeyedProcessOperatorFactory<>();source.keyBy((KeySelector<MyData, Integer>) MyData::getId).transform("MyKeyedProcess", TypeInformation.of(MyData.class), operatorFactory).print();env.execute();}public static class MyData {private Integer id;private Double value;public MyData(Integer id, Double value) {this.id = id;this.value = value;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Double getValue() {return value;}public void setValue(Double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"id=" + id +", value=" + value +'}';}}
}
相关文章:

Flink协调器Coordinator及自定义Operator
Flink协调器Coordinator及自定义Operator 最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。 协调器Coordinator Flink中的协调…...

C调用C++中的类
文章目录 测试代码 测试代码 在C语言中调用C类,需要遵循几个步骤: 在C代码中,确保C类的函数是extern “C”,这样可以防止名称修饰(name mangling)。 使用头文件声明C类的公共接口,并且为这个…...

NFTScan 正式上线 Sei NFTScan 浏览器和 NFT API 数据服务
2024 年 6 月 12 号,NFTScan 团队正式对外发布了 Sei NFTScan 浏览器,将为 Sei 生态的 NFT 开发者和用户提供简洁高效的 NFT 数据搜索查询服务。NFTScan 作为全球领先的 NFT 数据基础设施服务商,Sei 是继 Bitcoin、Ethereum、BNBChain、Polyg…...

2024年高考:计算机相关专业前景分析与选择建议
2024年高考结束,面对计算机专业是否仍具有吸引力的讨论,本文将从行业趋势、就业市场、个人兴趣与能力、专业选择建议等多个角度进行深入分析,以帮助考生和家长做出明智的决策。 文章目录 一、行业趋势与就业市场1. 计算机行业的发展与变革2. …...

SQL聚合函数---汇总数据
此篇文章内容均来自与mysql必知必会教材,后期有衍生会继续更新、补充知识体系结构 文章目录 SQL聚集函数表:AGV()count()根据需求可以进行组合处理 max()min()max()、min()、avg()组…...

webpack5新特性
webpack5新特性 持久化缓存资源模块moduleIds & chunkIds的优化更智能的tree shakingnodeJs的polyfill脚本被移除支持生成e6/es2015的代码SplitChunk和模块大小Module Federation 持久化缓存 缓存生成的webpack模块和chunk,来改善构建速度cache 会在开发模式被设置成 ty…...

java单体服务自定义锁名称工具类
需求: 操作员能够对自己权限下的用户数据进行数据填充,但是不同操作员之间可能会有重复的用户数据,为了避免操作员覆盖数据或者重复操作数据,应该在操作用户数据时加锁,要求加的这一把锁必须是细粒度的锁,…...

整理好了!2024年最常见 20 道并发编程面试题(四)
上一篇地址:整理好了!2024年最常见 20 道并发编程面试题(三)-CSDN博客 七、请解释什么是条件变量(Condition Variable)以及它的用途。 条件变量是一种同步机制,用于在多线程编程中协调线程间的…...

持续交付一
一、 你的项目依赖的 jQuery 版本是 1.0.0 ,Bootstrap 依赖的版本是 1.1.0,而 Chosen 依赖的版本是 1.2.0,看上去都是小版本不一致,一开始并没有发现任何问题,但是如果到后期发现不兼容,可能就为时已晚了。…...

基于 Python 解析 XML 文件并将数据存储到 MongoDB 数据库
1. 问题背景 在软件开发中,我们经常需要处理各种格式的数据。XML 是一种常用的数据交换格式,它可以存储和传输结构化数据。很多网站会提供 XML 格式的数据接口,以便其他系统可以方便地获取数据。 我们有这样一个需求:我们需要从…...

Interview preparation--案例加密后数据的模糊查询
加密数据的模糊查询实现方案 我们知道加密后的数据对模糊查询不是很友好,本篇就针对加密数据模糊查询这个问题来展开讲一讲实现的思路,希望对大家有所启发。为了数据安全我们在开发过程中经常会对重要的数据进行加密存储,常见的有࿱…...

一个简单的R语言数据分析案例
在R语言中,数据分析可以涵盖广泛的领域,包括描述性统计、探索性数据分析、假设检验、数据可视化、机器学习等。以下是一个简单的R语言数据分析案例,该案例将涵盖数据导入、数据清洗、描述性统计、数据可视化以及一个简单的预测模型。 案例&a…...

springCloudAlibaba之分布式事务组件---seata
Seata Sea学习分布式事务Seata二阶段提交协议AT模式TCC模式 Seata服务搭建Seata Server(事务协调者TC)环境搭建seata服务搭建-db数据源seata服务搭建-nacos启动seata服务 分布式事务代码搭建-client端搭建接入微服务应用 Sea学习 事务:事务是…...

无公网IP与服务器完成企业微信网页应用开发远程调试详细流程
文章目录 前言1. Windows安装Cpolar2. 创建Cpolar域名3. 创建企业微信应用4. 定义回调本地接口5. 回调和可信域名接口校验6. 设置固定Cpolar域名7. 使用固定域名校验 前言 本文主要介绍如何在企业微信开发者中心通过使用内网穿透工具提供的公网域名成功验证回调本地接口服务! …...

CSS 字体颜色渐变
CSS 字体颜色渐变 css 代码: 注意:background: linear-gradient(属性),属性可以调整方向 例如:to bottom 上下结构,to right 左右结构font-family: DIN, DIN;font-weight: normal;font-size: 22px;color:…...

【机器学习】基于CTC模型的语音转换可编辑文本研究
1.引言 1.1语音识别技术的研究背景 1.1.1.语音识别技术的需求 语音识别技术的研究和发展,对于提升人类与机器的交互方式具有深远的影响。首先,它极大地提高了工作效率和便利性。通过语音指令控制设备,用户可以更快捷地完成任务,…...

数据结构错题答案汇总
王道学习 第一章 绪论 1.1 3.A 数据的逻辑结构是从面向实际问题的角度出发的,只采用抽象表达方式,独立于存储结构,数据的存储方式有多种不同的选择;而数据的存储结构是逻辑结构在计算机上的映射,它不能独立于逻辑结构而存在。数…...

搞AI?中小企业拿什么和大厂拼?
近期,苹果发布M4芯片,号称“比当今任何AI PC的任何神经引擎都强!”紧随其后微软携“CopilotPCs”的概念加入AI PC激战。截至目前,包括联想、惠普、华为等多家主流PC厂商在内,已经至少推出了超50款AI PC产品。 AI重塑行…...

光伏电站阵列式冲击波声压光伏驱鸟器
光伏电站内鸟群的聚集可不是一件好事,鸟类排泄物,因其粘度大、具有腐蚀性的特点,一旦堆积在太阳能板上,会严重影响光伏电站的发电效率。长期积累的鸟粪不仅难以清洗,还可能引发组件的热斑效应,严重时甚至可…...

Webrtc支持FFMPEG硬解码之解码实现(三)
前言 此系列文章分分为三篇, Webrtc支持FFMPEG硬解码之Intel(一)-CSDN博客 Webrtc支持FFMPEG硬解码之NVIDA(二)-CSDN博客 Webrtc支持FFMPEG硬解码之解码实现(三)-CSDN博客 AMD硬解目前还没找到可用解码器,欢迎留言交流 环境 Windows平台 VS2019 <...

RIP协议
RIP基本概念 RIP(Routing Information Protocol)是一种基于距离矢量的路由协议,用于在自治系统(AS)内的网关之间交换路由信息。RIP 是一种相对简单且广泛使用的内部网关协议(IGP),适…...

计算机视觉与深度学习实战,Python为工具,基于光流场的车流量计数应用
一、引言 随着科技的飞速发展,计算机视觉和深度学习技术在现代社会中的应用越来越广泛。其中,车流量计数作为智能交通系统的重要组成部分,对于城市交通管理和规划具有重要意义。本文旨在探讨以Python为工具,基于光流场的车流量计数应用,为智能交通系统的发展提供技术支撑。…...

插入排序(排序算法)
文章目录 插入排序详细代码 插入排序 插入排序,类似于扑克牌的玩法一样,在有序的数组中,扫描无序的数组,逐一的将元素插入到有序的数组中。 实现细节: 从第一个元素开始,该元素可以认为已经被排序取出下…...

【附带源码】机械臂MoveIt2极简教程(六)、第三个demo -机械臂的避障规划
系列文章目录 【附带源码】机械臂MoveIt2极简教程(一)、moveit2安装 【附带源码】机械臂MoveIt2极简教程(二)、move_group交互 【附带源码】机械臂MoveIt2极简教程(三)、URDF/SRDF介绍 【附带源码】机械臂MoveIt2极简教程(四)、第一个入门demo 【附带源码】机械臂Move…...

innovus:route secondary pg pin
我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 innovus route secondary pg pin分以下几步: #设置pg net连接 globalNetConnect VDD_AON -type pgpin -pin VNW #设置ndr rule,具体绕线层次跟signal绕…...

btstack协议栈实战篇--LE Peripheral - Test Pairing Methods
btstack协议栈---总目录_bt stack是什么-CSDN博客 目录 1.Main Application Setup 2.Packet Handler 3.btstack_main 4.log信息 首先先理解一下,ble中的central,Peripheral,master和slave的理解? 以下是对这些概念的理解: “Central”(中心设备):与“Maste…...

git下载项目登录账号或密码填写错误不弹出登录框
错误描述 登录账号或密码填写错误不弹出登录框 二、解决办法 控制面板\用户帐户\凭据管理器 找到对应的登录地址进行更新或者删除 再次拉取或者更新就会提示输入登录信息...

平移矩阵中的数学思考
《webgl编程指南》中,“平移矩阵”中相关值的得出 是基于“矩阵和向量相乘所得的等式”与“向量表达式”组成一个方程组 xaxbyczd xxTx 书中说,根据上面的方程组,可以很容易得出 a1、b0、c0、dTx 0、问题来了! 我也确实可以看…...

【机器学习】Qwen2大模型原理、训练及推理部署实战
目录 一、引言 二、模型简介 2.1 Qwen2 模型概述 2.2 Qwen2 模型架构 三、训练与推理 3.1 Qwen2 模型训练 3.2 Qwen2 模型推理 四、总结 一、引言 刚刚写完【机器学习】Qwen1.5-14B-Chat大模型训练与推理实战 ,阿里Qwen就推出了Qwen2&#x…...

JetLinks开源物联网平台社区版部署教程
1.上github搜素jetlinks 2.找到源代码,并且下载到本地。 3.项目下载完成之后,还需要另外下载三个核心依赖模块。在github找到jetlinks。 4.点击进去下载,下载完成之后,你会发现里面有三个文件夹是空白的,先不用理会&am…...