当前位置: 首页 > news >正文

openmetadata1.3.1 自定义连接器 开发教程

openmetadata自定义连接器开发教程

一、开发通用自定义连接器教程

官网教程链接:

1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors

2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector

(一)创建服务类型自定义连接器类

参考文档:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace

1.创建自定义连接器

示例:my_csv_connector.py

"""
自定义Database Service 从 CSV 文件中提取元数据
"""
import csv
import tracebackfrom pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Anyfrom metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (DatabaseService,
)
from metadata.generated.schema.entity.data.table import (Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class InvalidCsvConnectorException(Exception):"""Sample data is not valid to be ingested"""class CsvModel(BaseModel):name: strcolumn_names: List[str]column_types: List[str]@validator("column_names", "column_types", pre=True)def str_to_list(cls, value):"""Suppose that the internal split is in ;"""return value.split(";")class CsvConnector(Source):"""Custom connector to ingest Database metadata.We'll suppose that we can read metadata from a CSVwith a custom database name from a business_unit connection option."""# 内置方法def __init__(self, config: WorkflowSource, metadata: OpenMetadata):self.config = configself.metadata = metadata# 获取配置信息self.service_connection = config.serviceConnection.__root__.configself.source_directory: str = (# 获取CSV文件路径self.service_connection.connectionOptions.__root__.get("source_directory"))if not self.source_directory:raise InvalidCsvConnectorException("未获取到source_directory配置信息")self.business_unit: str = (# 获取自定义的数据库名称self.service_connection.connectionOptions.__root__.get("business_unit"))if not self.business_unit:raise InvalidCsvConnectorException("未获取到business_unit配置信息")self.data: Optional[List[CsvModel]] = Nonesuper().__init__()# 内置函数@classmethoddef create(cls, config_dict: dict, metadata_config: OpenMetadataConnection) -> "CsvConnector":config: WorkflowSource = WorkflowSource.parse_obj(config_dict)connection: CustomDatabaseConnection = config.serviceConnection.__root__.configif not isinstance(connection, CustomDatabaseConnection):raise InvalidSourceException(f"Expected CustomDatabaseConnection, but got {connection}")return cls(config, metadata_config)# 静态方法:按行读取@staticmethoddef read_row_safe(row: Dict[str, Any]):try:return CsvModel.parse_obj(row)except ValidationError:logger.warning(f"Error parsing row {row}. Skipping it.")# 预处理:读取文件及数据def prepare(self):# Validate that the file existssource_data = Path(self.source_directory)if not source_data.exists():raise InvalidCsvConnectorException("Source Data path does not exist")try:with open(source_data, "r", encoding="utf-8") as file:reader = csv.DictReader(file)# 读取数据self.data = [self.read_row_safe(row) for row in reader]except Exception as exc:logger.error("Unknown error reading the source file")raise excdef yield_create_request_database_service(self):yield Either(# 串讲元数据读取服务right=self.metadata.get_create_service_from_source(entity=DatabaseService, config=self.config))# 业务原数据库名处理方法def yield_business_unit_db(self):# 选择我们刚刚创建的服务(如果不是UI)# 获取提取服务对象service_entity: DatabaseService = self.metadata.get_by_name(entity=DatabaseService, fqn=self.config.serviceName)yield Either(right=CreateDatabaseRequest(name=self.business_unit,service=service_entity.fullyQualifiedName,))# chems处理方法def yield_default_schema(self):# Pick up the service we just created (if not UI)database_entity: Database = self.metadata.get_by_name(entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}")yield Either(right=CreateDatabaseSchemaRequest(name="default",database=database_entity.fullyQualifiedName,))# 业务元数据处理方法def yield_data(self):"""Iterate over the data list to create tables"""database_schema: DatabaseSchema = self.metadata.get_by_name(entity=DatabaseSchema,fqn=f"{self.config.serviceName}.{self.business_unit}.default",)# 异常处理# 假设我们有一个要跟踪的故障# try:#     1/0# except Exception:#     yield Either(#         left=StackTraceError(#             name="My Error",#             error="Demoing one error",#             stackTrace=traceback.format_exc(),#         )#     )# 解析csv元数据信息(获取列名和类型)for row in self.data:yield Either(right=CreateTableRequest(name=row.name,databaseSchema=database_schema.fullyQualifiedName,columns=[Column(name=model_col[0],dataType=model_col[1],)for model_col in zip(row.column_names, row.column_types)],))# 迭代器:元数据迭代返回def _iter(self) -> Iterable[Entity]:# 数据库元数据信息存储yield from self.yield_create_request_database_service()# 业务源数据库yield from self.yield_business_unit_db()# 业务schemayield from self.yield_default_schema()# 业务数据yield from self.yield_data()# 测试数据库连接def test_connection(self) -> None:pass# 连接关闭def close(self):pass

(二)将自定义连接器方法打包编译进ingestion镜像

项目目录:

image-20240701153616535

Dockerfile:

FROM openmetadata/ingestion:1.3.1# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .

编译服务镜像

docker build -t om-ingestion:build -f Dockerfile .

(三)部署新版ingestion服务()

docker-compose up -d

docker-compose-ingestion.yml

version: "3.9"
volumes:ingestion-volume-dag-airflow:ingestion-volume-dags:ingestion-volume-tmp:es-data:
services:  ingestion:container_name: om_ingestionimage: om-ingestion:buildenvironment:AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"AIRFLOW__CORE__EXECUTOR: LocalExecutorAIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}DB_PORT: ${AIRFLOW_DB_PORT:-5432}AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}DB_USER: ${AIRFLOW_DB_USER:-airflow_user}DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}# extra connection-string properties for the database# EXAMPLE# require SSL (only for Postgres)# properties: "?sslmode=require"DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}# To test the lineage backend# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflowAIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/apiAIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQentrypoint: /bin/bashcommand:- "/opt/airflow/ingestion_dependency.sh"expose:- 8080ports:- "8080:8080"networks:- app_net_ingestionvolumes:- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs- ingestion-volume-dags:/opt/airflow/dags- ingestion-volume-tmp:/tmpnetworks:app_net_ingestion:ipam:driver: defaultconfig:- subnet: "172.16.240.0/24"

(四)根据服务类型选择对应类型的custom服务创建采集器测试

image-20240701160552070

点击保存添加元数据提取器:

image-20240701160623658

image-20240701160654370

二、开发内置连接器教程(Streamsets)

官网教程链接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector

(一)定义连接器class类json模版(streamSetsConnection.json)

目录openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json

{"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json","$schema": "http://json-schema.org/draft-07/schema#","title": "StreamSetsConnection","description": "StreamSets Metadata Pipeline Connection Config","type": "object","javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection","definitions": {"StreamSetsType": {"description": "Service type.","type": "string","enum": ["StreamSets"],"default": "StreamSets"},"basicAuthentication": {"title": "Username Authentication","description": "Login username","type":"object","properties": {"username": {"title": "Username","description": "StreamSets user to authenticate to the API.","type": "string"}},"additionalProperties": false}},"properties": {"type": {"title": "Service Type","description": "Service Type","$ref": "#/definitions/StreamSetsType","default": "StreamSets"},"hostPort": {"expose": true,"title": "Host And Port","description": "Pipeline Service Management/UI URI.","type": "string","format": "uri"},"streamSetsConfig": {"title": "StreamSets Credentials Configuration","description": "We support username authentication","oneOf": [{"$ref": "#/definitions/basicAuthentication"}]},"supportsMetadataExtraction": {"title": "Supports Metadata Extraction","$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"}},"additionalProperties": false,"required": ["hostPort", "streamSetsConfig"]
}

(二)开发采集器源码:

目录:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*

image-20240701162822027

1.streamsets连接客户端(client.py)

import logging
import traceback
from typing import Any, Iterable, Optionalimport requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth# 设置日志记录器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)REQUESTS_TIMEOUT = 60 * 5def clean_uri(uri: str) -> str:"""清理URI,确保它以HTTP或HTTPS开头"""if not uri.startswith(("http://", "https://")):return "http://" + urireturn uriclass StreamSetsClient:"""在StreamSets Data Collector REST API之上的包装器"""def __init__(self,host_port: str,username: Optional[str] = None,password: Optional[str] = None,verify: bool = False,):self.api_endpoint = clean_uri(host_port) + "/rest"self.username = usernameself.password = passwordself.verify = verifyself.headers = {"Content-Type": "application/json"}def get(self, path: str) -> Optional[Any]:"""GET方法包装器"""try:res = requests.get(f"{self.api_endpoint}/{path}",verify=self.verify,headers=self.headers,timeout=REQUESTS_TIMEOUT,auth=HTTPBasicAuth(self.username, self.password),)res.raise_for_status()return res.json()except HTTPError as err:logger.warning(f"Connection error calling the StreamSets API - {err}")raise errexcept ValueError as err:logger.warning(f"Cannot pick up the JSON from API response - {err}")raise errexcept Exception as err:logger.warning(f"Unknown error calling StreamSets API - {err}")raise errdef list_pipelines(self) -> Iterable[dict]:"""List all pipelines"""try:return self.get("v1/pipelines")except Exception as err:logger.error(traceback.format_exc())raise errdef get_pipeline_details(self, pipeline_id: str) -> dict:"""Get a specific pipeline by ID"""return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")def test_list_pipeline_detail(self) -> Iterable[dict]:"""Test API access for listing pipelines"""return self.list_pipelines()

2.连接器和测试连接器(connection.py)

"""
源连接处理程序
"""
from typing import Optionalfrom metadata.generated.schema.entity.automations.workflow import (Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (BasicAuthentication,StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClientdef get_connection(connection: StreamSetsConnection) -> StreamSetsClient:"""Create connection"""if isinstance(connection.streamSetsConfig, BasicAuthentication):return StreamSetsClient(host_port=connection.hostPort,username=connection.streamSetsConfig.username,password="95bd7977208bc935cac3656f4a9eea3a",verify=False,)def test_connection(metadata: OpenMetadata,client: StreamSetsClient,service_connection: StreamSetsConnection,automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:"""元数据工作流或自动化工作流期间测试连接。这可以作为一部分执行"""def custom_executor():list(client.list_pipelines())test_fn = {"GetPipelines": custom_executor}test_connection_steps(metadata=metadata,test_fn=test_fn,service_type=service_connection.type.value,automation_workflow=automation_workflow,)

3.元数据提取器(metadata.py)

"""
提取StreamSets 源的元数据 
"""
import traceback
from typing import Iterable, List, Optional, Anyfrom metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationErrorfrom metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class StagesDetails(BaseModel):instanceName: strlabel:strstageType: strstageName: strdescription: strinputLanes: List[str]outputLanes: List[str]downstream_task_names: set[str] = set()class StreamSetsPipelineDetails(BaseModel):"""Defines the necessary StreamSets information"""uuid: strpipelineId: strtitle: strname: strcreated: intcreator: strdescription: strclass StreamsetsSource(PipelineServiceSource):"""执行必要的方法,从 Airflow 的元数据数据库中提取管道元数据"""@classmethoddef create(cls, config_dict: dict, metadata: OpenMetadata):logger.info("create..........")config: WorkflowSource = WorkflowSource.parse_obj(config_dict)logger.info(f"WorkflowSource: {config}")connection: StreamSetsConnection = config.serviceConnection.__root__.configlogger.info(f"StreamSetsConnection: {connection}")if not isinstance(connection, StreamSetsConnection):raise InvalidSourceException(f"Expected StreamSetsConnection, but got {connection}")return cls(config, metadata)def yield_pipeline(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[CreatePipelineRequest]]:logger.info("yield_pipeline.......")try:connection_url = Noneif self.service_connection.hostPort:connection_url = (f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines")logger.info(f"pipeline_details:{pipeline_details}")logger.info(f"connection_url:{connection_url}")pipeline_request = CreatePipelineRequest(name=pipeline_details.name,displayName=pipeline_details.title,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",tasks=self._get_tasks_from_details(pipeline_details),service=self.context.pipeline_service,)yield Either(right=pipeline_request)self.register_record(pipeline_request=pipeline_request)except TypeError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=(f"Error building DAG information from {pipeline_details}. There might be Airflow version"f" incompatibilities - {err}"),stackTrace=traceback.format_exc(),))except ValidationError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Error building pydantic model for {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))except Exception as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Wild error ingesting pipeline {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))# 获取解析管道详情def _get_tasks_from_details(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[Task]]:logger.info("_get_tasks_from_details.......")logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")try:stages = self.get_stages_by_pipline(pipeline_details)return [Task(name=stage.instanceName,displayName=stage.label,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",taskType=stage.stageType,description=stage.description,downstreamTasks=list(stage.downstream_task_names)if stage.downstream_task_nameselse [],)for stage in stages]except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}.")return Nonedef yield_pipeline_lineage_details(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[AddLineageRequest]]:logger.info("yield_pipeline_lineage_details..........")"""将连接转换为管道实体:param pipeline_details: 来自  StreamSets的pipeline_details对象return:使用任务创建管道请求"""passdef get_pipelines_list(self) -> Optional[List[StreamSetsPipelineDetails]]:logger.info("get_pipelines_list..........")"""Get List of all pipelines"""if self.connection.list_pipelines() is not None:for list_pipeline in self.connection.list_pipelines():logger.info(f"pipeline:{list_pipeline}")try:yield StreamSetsPipelineDetails(uuid=list_pipeline.get("uuid"),pipelineId=list_pipeline.get("pipelineId"),title=list_pipeline.get("title"),name=list_pipeline.get("name"),created=list_pipeline.get("created"),creator=list_pipeline.get("creator"),description=list_pipeline.get("description"),)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}.")else:return None# 获取上下关联关系def get_stages_lane(self, stages: Optional[List[StagesDetails]]) -> {}:logger.info("get_stages_lane......")input_lane_to_stage_map = {}for stage in stages:logger.info(f"stage_info:{stage}")for input_lane in stage.get("inputLanes", []):try:if input_lane_to_stage_map.get(input_lane) is None:input_lane_to_stage_map[input_lane] = set()input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))else:input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")return input_lane_to_stage_mapdef get_stages_by_pipline(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[StagesDetails]]:logger.info("get_stages_by_pipline")pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)stages = []if pipeline_detail.get("stages"):stages = pipeline_detail.get("stages")input_lane_to_stage_map = self.get_stages_lane(stages)for stage in stages:logger.info(f"stage:{stage}")try:input_lanes = stage.get("inputLanes", [])output_lanes = stage.get("outputLanes", [])downstream_stage_names = set()for output_lane in stage.get("outputLanes", []):if output_lane in input_lane_to_stage_map.keys():for down_stage in input_lane_to_stage_map.get(output_lane, []):downstream_stage_names.add(down_stage)yield StagesDetails(instanceName=stage.get("instanceName"),label=stage["uiInfo"].get("label"),stageType=stage["uiInfo"].get("stageType"),stageName=stage.get("stageName"),description=stage["uiInfo"].get("description"),inputLanes=input_lanes,outputLanes=output_lanes,downstream_task_names=downstream_stage_names)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StagesDetails from {stage} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}.")def get_pipeline_name(self, pipeline_details: StreamSetsPipelineDetails) -> str:return pipeline_details.namedef yield_pipeline_status(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[OMetaPipelineStatus]]:pass

(三)修改前端ui源码,添加连接器对象

目录:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts

/**  Copyright 2022 Collate.*  Licensed under the Apache License, Version 2.0 (the "License");*  you may not use this file except in compliance with the License.*  You may obtain a copy of the License at*  http://www.apache.org/licenses/LICENSE-2.0*  Unless required by applicable law or agreed to in writing, software*  distributed under the License is distributed on an "AS IS" BASIS,*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.*  See the License for the specific language governing permissions and*  limitations under the License.*/import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';export const getPipelineConfig = (type: PipelineServiceType) => {let schema = {};const uiSchema = { ...COMMON_UI_SCHEMA };switch (type) {case PipelineServiceType.Airbyte: {schema = airbyteConnection;break;}case PipelineServiceType.Airflow: {schema = airflowConnection;break;}case PipelineServiceType.GluePipeline: {schema = gluePipelineConnection;break;}case PipelineServiceType.Fivetran: {schema = fivetranConnection;break;}case PipelineServiceType.Dagster: {schema = dagsterConnection;break;}case PipelineServiceType.Nifi: {schema = nifiConnection;break;}case PipelineServiceType.StreamSets: {schema = streamSetsConnection;break;}case PipelineServiceType.DomoPipeline: {schema = domoPipelineConnection;break;}case PipelineServiceType.CustomPipeline: {schema = customPipelineConnection;break;}case PipelineServiceType.DatabricksPipeline: {schema = databricksPipelineConnection;break;}case PipelineServiceType.Spline: {schema = splineConnection;break;}default:break;}return cloneDeep({ schema, uiSchema });
};

(四)前端ui源码,添加MD说明文档

路径:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md

# StreamSets
在本节中,我们将提供使用 StreamSets 连接器的指南和参考。## 要求
系统 支持 StreamSets 连接器的 1 种连接类型:
- **基本认证**:使用用户名对 StreamSets 进行登陆。您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有关 StreamSets 连接器的详细信息。## 连接详细信息
$$section
### Host and Port $(id="hostPort")
管道服务管理 URI。这应指定为格式为"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份验证(用户名/密码身份验证。有关详细信息,请参阅要求部分。
$$$$section
### Username $(id="username")
用于连接到 StreamSets 的用户名。此用户应该能够向 StreamSets API 发送请求并访问“资源”终结点。
$$

(五)创建 Java ClassConverter(可选)

(六)构建dockefile重新构建镜像

server服务Dockerfile

# Build stage
FROM alpine:3.19 AS buildCOPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /RUN mkdir -p /opt/openmetadata && \tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \rm openmetadata-*.tar.gz# Final stage
FROM alpine:3.19EXPOSE 8585RUN adduser -D openmetadata && \apk update && \apk upgrade && \apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/ShanghaiCOPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /USER openmetadataWORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]

ingestion服务Dockerfile

路径:ingestion/Dockerfile

FROM apache/airflow:2.7.3-python3.10#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \&& apt-get -qq install -y \tzdata \alien \build-essential \default-libmysqlclient-dev \freetds-bin \freetds-dev \gcc \gnupg \libaio1 \libevent-dev \libffi-dev \libpq-dev \librdkafka-dev \libsasl2-dev \libsasl2-2 \libsasl2-modules \libsasl2-modules-gssapi-mit \libssl-dev \libxml2 \libkrb5-dev \openjdk-11-jre \openssl \postgresql \postgresql-contrib \tdsodbc \unixodbc \unixodbc-dev \unzip \vim \git \wget --no-install-recommends \# Accept MSSQL ODBC License&& ACCEPT_EULA=Y apt-get install -y msodbcsql18 \&& rm -rf /var/lib/apt/lists/*RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \then \wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \else \wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \fiENV LD_LIBRARY_PATH=/instantclient# Security patches for base image
# monitor no fixed version for
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \&& apt-get -qq install -t bullseye-backports -y \curl \libpcre2-8-0 \postgresql-common \expat \bind9# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestionCOPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dagsUSER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1RUN pip install --upgrade pipWORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."WORKDIR /home/airflow/ingestion# 提供要安装的引入依赖项的参数。默认为全部提供要安装的引入依赖项的参数。默认为全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \then \pip install "ibm-db-sa~=0.4"; \fi# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead 
# because the psycopg2-binary generates a architecture specific error 
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configsEXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db

(七)构建服务镜像

根目录下执行构建server:

docker build -t om-server:build -f docker/development/Dockerfile .

根目录下执行构建ingestion:

docker build -t om-ingestion:build -f ingestion/Dockerfile .

(八)部署新版服务

docker-compose -f docker/development/docker-compose-postgres.yml up -d

(九)访问服务,创建streamsets元数据采集

image-20240701165027755

image-20240701165054548

相关文章:

openmetadata1.3.1 自定义连接器 开发教程

openmetadata自定义连接器开发教程 一、开发通用自定义连接器教程 官网教程链接: 1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors 2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector (一&…...

PostgreSQL 如何优化存储过程的执行效率?

文章目录 一、查询优化1. 正确使用索引2. 避免不必要的全表扫描3. 使用合适的连接方式4. 优化子查询 二、参数传递1. 避免传递大对象2. 参数类型匹配 三、减少数据量处理1. 限制返回结果集2. 提前筛选数据 四、优化逻辑结构1. 分解复杂的存储过程2. 避免过度使用游标 五、事务处…...

普中51单片机:数码管显示原理与实现详解(四)

文章目录 引言数码管的结构数码管的工作原理静态数码管电路图开发板IO连接图代码演示 动态数码管实现步骤数码管驱动方式电路图开发板IO连接图真值表代码演示1代码演示2代码演示3 引言 数码管(Seven-Segment Display)是一种常见的显示设备,广…...

web缓存代理服务器

一、web缓存代理 web代理的工作机制 代理服务器是一个位于客户端和原始(资源)服务器之间的服务器,为了从原始服务器取得内容,客户端向代理服务器发送一个请求,并指定目标原始服务器,然后代理服务器向原始…...

容器:queue(队列)

以下是关于queue容器的总结 1、构造函数&#xff1a;queue [queueName] 2、添加、删除元素: push() 、pop() 3、获取队头/队尾元素&#xff1a;front()、back() 4、获取栈的大小&#xff1a;size() 5、判断栈是否为空&#xff1a;empty() #include <iostream> #include …...

探索 WebKit 的后台同步新纪元:Web Periodic Background Synchronization 深度解析

探索 WebKit 的后台同步新纪元&#xff1a;Web Periodic Background Synchronization 深度解析 随着 Web 应用逐渐成为我们日常生活中不可或缺的一部分&#xff0c;用户对应用的响应速度和可靠性有了更高的期待。Web Periodic Background Synchronization API&#xff08;周期…...

ctfshow web入门 web338--web344

web338 原型链污染 comman.js module.exports {copy:copy };function copy(object1, object2){for (let key in object2) {if (key in object2 && key in object1) {copy(object1[key], object2[key])} else {object1[key] object2[key]}}}login.js var express …...

mupdf加载PDF显示中文乱码

现象 加载PDF显示乱码,提示非嵌入字体 non-embedded font using identity encoding调式 在pdf-font.c中加载字体 调试源码发现pdf文档的字体名字居然是GBK&#xff0c;估计又是哪个windows下写的pdf生成工具生成pdf 字体方法&#xff1a; static pdf_font_desc * load_cid…...

常用的限流工具Guava RateLimiter 或Redisson RRateLimiter

在分布式系统和高并发场景中&#xff0c;限流是一个非常常见且重要的需求。以下是一些常用的限流工具和库&#xff0c;包括它们的特点和使用场景&#xff1a; 1. Guava RateLimiter Google 的 Guava 库中的 RateLimiter 是一个简单且高效的限流工具&#xff0c;适用于单节点应…...

卷积神经网络(CNN)和循环神经网络(RNN) 的区别与联系

卷积神经网络&#xff08;CNN&#xff09;和循环神经网络&#xff08;RNN&#xff09;是两种广泛应用于深度学习的神经网络架构&#xff0c;它们在设计理念和应用领域上有显著区别&#xff0c;但也存在一些联系。 ### 卷积神经网络&#xff08;CNN&#xff09; #### 主要特点…...

Unity【入门】场景切换和游戏退出及准备

1、必备知识点场景切换和游戏退出 文章目录 1、必备知识点场景切换和游戏退出1、场景切换2、鼠标隐藏锁定相关3、随机数和自带委托4、模型资源的导入1、模型由什么构成2、Unity支持的模型格式3、如何指导美术同学导出模型4、学习阶段在哪里获取模型资源 2、小项目准备工作需求分…...

Python 函数递归

以下是一个使用递归计算阶乘的 Python 函数示例 &#xff1a; 应用场景&#xff1a; 1. 动态规划问题&#xff1a;在一些需要逐步求解子问题并利用其结果的动态规划场景中&#xff0c;递归可以帮助直观地表达问题的分解和求解过程。 2. 遍历具有递归结构的数据&#xff1a;如递…...

MyBatis(27)如何配置 MyBatis 实现打印可执行的 SQL 语句

在开发过程中&#xff0c;打印可执行的SQL语句对于调试和性能优化是非常有帮助的。MyBatis提供了几种方式来实现SQL语句的打印。 1. 使用日志框架 MyBatis可以通过配置其内部使用的日志框架&#xff08;如Log4j、Logback等&#xff09;来打印SQL语句。这是最常用的方法。 Lo…...

3.js - 裁剪平面(clipIntersection:交集、并集)

看图 代码 // ts-nocheck// 引入three.js import * as THREE from three// 导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls// 导入lil.gui import { GUI } from three/examples/jsm/libs/lil-gui.module.min.js// 导入tween import …...

在5G/6G应用中实现高性能放大器的建模挑战

来源&#xff1a;Modelling Challenges for Enabling High Performance Amplifiers in 5G/6G Applications {第28届“集成电路和系统的混合设计”(Mixed Design of Integrated Circuits and Systems)国际会议论文集&#xff0c;2021年6月24日至26日&#xff0c;波兰洛迪} 本文讨…...

Perl 数据类型

Perl 数据类型 Perl 是一种功能丰富的编程语言&#xff0c;广泛应用于系统管理、网络编程、GUI 开发等领域。在 Perl 中&#xff0c;数据类型是编程的基础&#xff0c;决定了变量存储信息的方式以及可以对这些信息执行的操作。本文将详细介绍 Perl 中的主要数据类型&#xff0…...

网络协议 -- IP、ICMP、TCP、UDP字段解析

网络协议报文解析及工具使用介绍 1. 以太网帧格式及各字段作用 -------------------------------- | Destination MAC Address (48 bits) | -------------------------------- | Source MAC Address (48 bits) …...

【工具】豆瓣自动回贴软件

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 相比于之前粗糙丑陋的黑命令框版本&#xff0c;这个版本新增了UI界面&#xff0c;从此可以不需要再挨个去翻配置文件了。 另外&#xff0c;升级了隐藏浏…...

初学Spring之动态代理模式

动态代理和静态代理角色一样 动态代理的代理类是动态生成的 动态代理分为两大类&#xff1a; 基于接口的动态代理&#xff08;JDK 动态代理&#xff09;、基于类的动态代理&#xff08;cglib&#xff09; 也可以用 Java 字节码实现&#xff08;Javassist&#xff09; Prox…...

Visual studio 2023下使用 installer projects 打包C#程序并创建 CustomAction 类

Visual studio 2023下使用 installer projects 打包C#程序并创建 CustomAction 类 1 安装Visual studio 20203,并安装插件1.1 下载并安装 Visual Studio1.2 步骤二:安装 installer projects 扩展插件2 创建安装项目2.1 创建Windows安装项目2.2 新建应用程序安装文件夹2.3 添加…...

vue学习笔记(购物车小案例)

用一个简单的购物车demo来回顾一下其中需要注意的细节。 先看一下最终效果 功能&#xff1a; &#xff08;1&#xff09;全选按钮和下面的商品项的选中状态同步&#xff0c;当下面的商品全部选中时&#xff0c;全选勾选&#xff0c;反之&#xff0c;则不勾选。 &#xff08…...

昇思25天学习打卡营第19天 | RNN实现情感分类

RNN实现情感分类 概述 情感分类是自然语言处理中的经典任务&#xff0c;是典型的分类问题。本节使用MindSpore实现一个基于RNN网络的情感分类模型&#xff0c;实现如下的效果&#xff1a; 输入: This film is terrible 正确标签: Negative 预测标签: Negative输入: This fil…...

【VUE基础】VUE3第三节—核心语法之ref标签、props

ref标签 作用&#xff1a;用于注册模板引用。 用在普通DOM标签上&#xff0c;获取的是DOM节点。 用在组件标签上&#xff0c;获取的是组件实例对象。 用在普通DOM标签上&#xff1a; <template><div class"person"><h1 ref"title1">…...

生物化学笔记:电阻抗基础+电化学阻抗谱EIS+电化学系统频率响应分析

视频教程地址 引言 方法介绍 稳定&#xff1a;撤去扰动会到原始状态&#xff0c;反之不稳定&#xff0c;还有近似稳定的 阻抗谱图形&#xff08;Nyquist和Bode图&#xff09; 阻抗谱图形是用于分析电化学系统和材料的工具&#xff0c;主要有两种类型&#xff1a;Nyquist图和B…...

SQL使用join查询方式找出没有分类的电影id以及名称

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff0c;这篇文章男女通用&#xff0c;看懂了就去分享给你的码吧。 描述 现有电影信息…...

对MsgPack与JSON进行序列化的效率比较

序列化是将对象转换为字节流的过程&#xff0c;以便在内存或磁盘上存储。常见的序列化方法包括MsgPack和JSON。以下将详细探讨MsgPack和JSON在序列化效率方面的差异。 1. MsgPack的效率&#xff1a; 优点&#xff1a; 高压缩率&#xff1a; MsgPack采用高效的二进制编码格式&…...

Unix\Linux 执行shell报错:“$‘\r‘: 未找到命令” 解决

linux执行脚本sh xxx.sh报错&#xff1a;$xxx\r: 未找到命令 原因&#xff1a;shell脚本在Windows编写导致的换行问题&#xff1a; Windows 的换行符号为 CRLF&#xff08;\r\n&#xff09;&#xff0c;而 Unix\Linux 为 LF&#xff08;\n&#xff09;。 缩写全称ASCII转义说…...

动态路由--RIP配置(思科cisco)

一、简介 RIP协议&#xff08;Routing Information Protocol&#xff0c;路由信息协议&#xff09;是一种基于距离矢量的动态路由选择协议。 在RIP协议中&#xff0c;如果路由器A和网络B直接相连&#xff0c;那么路由器A到网络B的距离被定义为1跳。若从路由器A出发到达网络B需要…...

python - 函数 / 字典 / 集合

一.函数 形参和实参&#xff1a; >>> def MyFirstFunction(name): 函数定义过程中的name是叫形参 ... print(传递进来的 name 叫做实参&#xff0c;因为Ta是具体的参数值&#xff01;) print前面要加缩进tab&#xff0c;否则会出错。 >>> MyFirstFun…...

connect to github中personal access token生成token方法

一、问题 执行git push时弹出以下提示框 二、解决方法 去github官网生成Token&#xff0c;步骤如下 选择要授予此 令牌token 的 范围 或 权限 要使用 token 从命令行访问仓库&#xff0c;请选择 repo 。 要使用 token 从命令行删除仓库&#xff0c;请选择 delete_repo 其他根…...

Appium启动APP时报错Security exception: Permission Denial

报错内容Security exception: Permission Denial: starting Intent 直接通过am命令尝试也是同样的报错 查阅资料了解到&#xff1a;android:exported | App quality | Android Developers exported属性默认false&#xff0c;所以android:exported"false"修改为t…...

ubuntu22 使用ufw防火墙

专栏总目录 一、安装 sudo apt update sudo apt install ufw 二、启动防火墙 &#xff08;一&#xff09;启动命令 sudo ufw enable &#xff08;二&#xff09;重启命令 sudo ufw reload 三、配置规则 #允许SSH连接 sudo ufw allow ssh #如果sshd服务端口指定到了8888&a…...

初识STM32:开发方式及环境

STM32的编程模型 假如使用C语言的方式写了一段程序&#xff0c;这段程序首先会被烧录到芯片当中&#xff08;Flash存储器中&#xff09;&#xff0c;Flash存储器中的程序会逐条的进入CPU里面去执行。 CPU相当于人的一个大脑&#xff0c;虽然能执行运算和执行指令&#xff0c;…...

详解Amivest 流动性比率

详解Amivest 流动性比率 Claude-3.5-Sonnet Poe Amivest流动性比率是一个衡量证券市场流动性的重要指标。这个比率主要用于评估在不对价格造成重大影响的情况下,市场能够吸收多少交易量。以下是对Amivest流动性比率的详细解释: 定义: Amivest流动性比率是交易额与绝对收益率的…...

pycharm小游戏制作

以下是一个使用 Python 和 PyGame库在 PyCharm中创建一个简单的小游戏&#xff08;贪吃蛇游戏&#xff09;的示例代码&#xff0c;希望对您有所帮助&#xff1a; import pygame import random# 基础设置 # 屏幕高度 SCREEN_HEIGHT 480 # 屏幕宽度 SCREEN_WIDTH 600 # 小方格…...

昇思11天

基于 MindSpore 实现 BERT 对话情绪识别 BERT模型概述 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是由Google于2018年开发并发布的一种新型语言模型。BERT在许多自然语言处理&#xff08;NLP&#xff09;任务中发挥着重要作用&am…...

AI绘画Stable Diffusion【图生图教程】:图片高清修复的三种方案详解,你一定能用上!(附资料)

大家好&#xff0c;我是画画的小强 今天给大家分享一下用AI绘画Stable Diffusion 进行 高清修复&#xff08;Hi-Res Fix&#xff09;&#xff0c;这是用于提升图像分辨率和细节的技术。在生成图像时&#xff0c;初始的低分辨率图像会通过放大算法和细节增强技术被转换为高分辨…...

适用于Mac和Windows的最佳iPhone恢复软件

本文将指导您选择一款出色的iPhone数据恢复软件来检索您的宝贵数据。 市场上有许多所谓的iPhone恢复程序。各种程序很难选择并选择其中之一。一旦您做出了错误的选择&#xff0c;您的数据就会有风险。 最好的iPhone数据恢复软件应包含以下功能。 1.安全可靠。 2.恢复成功率高…...

64.ThreadLocal造成的内存泄漏

内存泄漏 程序中已动态分配的堆内存,由于某种原因程序为释放和无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。内存泄漏的堆积终将导致内存溢出。 内存溢出 没有足够的内存提供申请者使用。 ThreadLocal出现内存泄漏的真实原因 内存泄漏的发…...

深入刨析Redis存储技术设计艺术(二)

三、Redis主存储 3.1、存储相关结构体 redisServer:服务器 server.h struct redisServer { /* General */ pid_t pid; /* Main process pid. */ pthread_t main_thread_id; /* Main thread id */ char *configfile; /* Absolut…...

python读取写入txt文本文件

读取 txt 文件 def read_txt_file(file_path):"""读取文本文件的内容:param file_path: 文本文件的路径:return: 文件内容"""try:with open(file_path, r, encodingutf-8) as file:content file.read()return contentexcept FileNotFoundError…...

日期选取限制日期范围antdesign vue

限制选取的日期范围 效果图 <a-date-pickerv-model"dateTime"format"YYYY-MM-DD":disabled-date"disabledDate"valueFormat"YYYY-MM-DD"placeholder"请选择日期"allowClear />methods:{//回放日期选取范围限制&…...

【大模型】衡量巨兽:解读评估LLM性能的关键技术指标

衡量巨兽&#xff1a;解读评估LLM性能的关键技术指标 引言一、困惑度&#xff1a;语言模型的试金石1.1 定义与原理1.2 计算公式1.3 应用与意义 二、BLEU 分数&#xff1a;翻译质量的标尺2.1 定义与原理2.2 计算方法2.3 应用与意义 三、其他评估指标&#xff1a;综合考量下的多元…...

《优化接口设计的思路》系列:第2篇—小程序性能优化

优化Uniapp应用程序的性能可以从以下几个方面进行优化&#xff1a; 1.减少页面加载时间&#xff1a;避免页面过多和过大的组件&#xff0c;减少不必要的资源加载。可以使用懒加载的方式&#xff0c;根据用户的实际需求来加载页面和组件。 2.节流和防抖&#xff1a;对于频繁触发…...

prototype 和 __proto__的区别

prototype 和 __proto__ 在 JavaScript 中都与对象的原型链有关&#xff0c;但它们各自有不同的用途和含义。 prototype prototype 是函数对象的一个属性&#xff0c;它指向一个对象&#xff0c;这个对象包含了可以由特定类型的所有实例共享的属性和方法。当我们创建一个新的…...

网络中未授权访问漏洞(Rsync,PhpInfo)

Rsync未授权访问漏洞 Rsync未授权访问漏洞是指Rsync服务配置不当或存在漏洞&#xff0c;导致攻击者可以未经授权访问和操作Rsync服务。Rsync是一个用于文件同步和传输的开源工具&#xff0c;通常在Unix/Linux系统上使用。当Rsync服务未经正确配置时&#xff0c;攻击者可以利用…...

DataWhaleAI分子预测夏令营 学习笔记

AI分子预测夏令营学习笔记 一、直播概览 主持人介绍 姓名&#xff1a;徐翼萌角色&#xff1a;DataWhale助教活动目的&#xff1a;分享机器学习赛事经验&#xff0c;提升参赛者在分子预测领域的能力 嘉宾介绍 姓名&#xff1a;余老师背景&#xff1a;Data成员&#xff0c;腾…...

lnmp php7 安装ssh2扩展

安装ssh2扩展前必须安装libssh2包 下载地址: wget http://www.libssh2.org/download/libssh2-1.11.0.tar.gzwget http://pecl.php.net/get/ssh2-1.4.tgz &#xff08;这里要换成最新的版本&#xff09; 先安装 libssh2 再安装 SSH2: tar -zxvf libssh2-1.11.0.tar.gzcd libss…...

数据库概念题总结

1、 2、简述数据库设计过程中&#xff0c;每个设计阶段的任务 需求分析阶段&#xff1a;从现实业务中获取数据表单&#xff0c;报表等分析系统的数据特征&#xff0c;数据类型&#xff0c;数据约束描述系统的数据关系&#xff0c;数据处理要求建立系统的数据字典数据库设计…...

提升用户体验之requestAnimationFrame实现前端动画

1)requestAnimationFrame是什么? 1.MDN官方解释 2.解析这段话&#xff1a; 1、那么浏览器重绘是指什么呢&#xff1f; ——大多数电脑的显示器刷新频率是60Hz&#xff0c;1000ms/6016.66666667ms的时间刷新一次 2、重绘之前调用指定的回调函数更新动画&#xff1f; ——requ…...