Python一些可能用的到的函数系列124 GlobalFunc
说明
GlobalFunc是算网的下一代核心数据处理基础。
算网是一个分布式网络,为了能够实现真的分布式计算(加快大规模任务执行效率),以及能够在很长的时间内维护不同版本的计算方法,需要这样一个对象/服务来支撑。GlobalFunc支持通过web请求来完成计算,通常要求输入和输出都是可json序列化的;同时也支持作为一个本地对象被引入和使用。
另外一个核心点是智能化。智能化的一个前提是代理化,而不是直接调用。GlobalFunc允许使用参数化字符串来声明调用,这样每一次的操作是可以被其他算法/程序解读的。
内容
本次讨论4个主题:存储形态、基本操作、使用场景以及服务化。
1 存储形态
git项目
首先讨论GF的存储形态。传统的python包是以文件形式组织的,加上__init__.py文件指示就可以。GF本质上也是这样的,不过项目文件夹同时也是一个git项目文件夹,这意味着GF可以利用git的诸多特性:
- 1 分支。通过分支可以针对不同的项目或者探索进行微调,甚至可以支持未来由agent来进行微调实验。
- 2 比较(暂未使用)。git可以具备代码比较的能力,方便查探差异。
当前的GF可以视为一个文件夹,下面有若干子文件夹。每个子文件夹是一个独立的包,下面有若干py文件,每个文件的文件名和函数名是一致的。
如果在其他机器上拉取了该项目,那么对应的代码是一致的。当然,这仍然需要对应的环境支持才可以。所以一般来说没有必要去拉取项目,而是使用某个镜像。
除了文件形态,GF采用数据库存储,这种数据存储又是由Redis和Mongo两部分构成的。长期的持久化由Mongo完成,而零碎的取数由Redis完成。
RedisOrMongo本来的设计是为了通用的,大量的,不同程序间的数据存储服务的。为了区别不同程序的缓存,ROM约定了k(键)的命名规范,这个对于区分不同的变量非常有用。
如下,按三段命名法,k由三部分构成。总体上三段分别对应,项目、子项目、变量。或者从数据库上理解,三段分别是数据库、数据表和记录。

本次的存储:
- 1 tier1: sp_GlobalFunc 这个是GF的空间
- 2 tier2: Base_master 通过模块名+分支构成了第二级名称
- 3 tier2: 函数名
基于数据库的存储带来了很多便利:
- 1 首先,数据库方式允许了更智能的搜索。很多时候重复建设的根源就是在于找不到过去开发的结果。
一个有趣的实操是和chatgpt结合起来:将函数写好后,发给gpt解读,形成文本。
import os
def popen(some_cmd):with os.popen(some_cmd) as f:res = f.read()return res
def popen1(some_cmd):with os.popen(some_cmd) as f:res = f.read()return res.split()[0]def get_machine_name(given_name = None):if given_name is None :try:default_name = popen1('cat /etc/hostname')except:default_name = 'xx'else:default_name = given_namereturn default_name'''
你的 `popen` 函数用于执行命令并返回命令的输出,而 `popen1` 函数执行命令并返回输出的第一个单词。这些函数可以用于执行系统命令并获取输出。`get_machine_name` 函数用于获取机器名,默认情况下它尝试从 `/etc/hostname` 文件中读取机器名,如果失败,则返回 'xx'。如果提供了 `given_name` 参数,它将使用该参数作为机器名。这些函数看起来能够满足获取机器名和执行系统命令的需求。如果你有任何特定问题或需要进一步的帮助,可以随时提出。'''
因为这些文本已经全部在数据库中了,因此很方便进一步将其处理为向量,或者直接使用es进行搜索。
这样的本质是帮助使用者面对信息过载,其实也就是在推荐。泛推荐类算法将是我2024年的一个主要方向。
2 基本操作对象封装
以下讨论关于GF的基本操作。
例如查看当前分支、扫描文件变动、简单git提交、包的初始化、存储函数信息、生成函数、列出包的所有函数。
测试应用是参数化调用。
开发和执行两部分需要分开,假设GlobalFunc是一个git项目,同时也是顶级目录和函数包(有init文件)。在这个项目下有若干子文件夹,每个文件夹也是一个函数包。
开发时要切入到GlobalFunc目录下,而执行时则保持与GlobalFunc平级。
2.1 开发
对象将一些功能整合到一起
# 依赖 RedisOrMongo -> WMongo
# tier1 是sp+项目名
class GFBase:def __init__(self, project_path = None, redis_agent_host = 'http://172.17.0.1:24021/',redis_connection_hash = None,tier1 = None):self.project_path = project_pathself.redis_cfg = Naive()self.redis_cfg.redis_agent_host = redis_agent_hostself.redis_cfg.redis_connection_hash = redis_connection_hashself.tier1 = tier1# 查看当前分支def _get_current_branch(self):return get_current_branch(self.project_path)# 扫描目录下的所有文件信息def _get_scan_files(self):return scan_directory(self.project_path)# 简单提交gitdef _simple_commit_git(self, commit_message = None):git_commit_and_push(commit_message, self.project_path)# 为包增加初始文件:针对CreateOrUpdate一个包下的函数def _generate_init_py(self, packname = None):package_dir = '/'.join([self.project_path, packname])generate_init_py(package_dir)# 保存一个文件到ROM(RedisOrMongo) some_k 就是「包.函数名」def _save_a_file_rom(self, some_k):# 校验,应该包含两个点some_func = some_ksome_func_list = some_func.split('.')current_branch = self._get_current_branch()scan_dict = self._get_scan_files()tier1 = self.tier1if len(some_func_list) == 3:print('长度为3段,正确')tier2 = some_func_list[0] + '_' + current_branchvar_space_name = '.'.join([tier1,tier2])var_name = some_func_list[1]is_new = not verify_redis_var_name(var_space_name)rom = RedisOrMongo(var_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065', is_new = is_new)# 持久化存储 # some_func_jstr = json.dumps(scan_dict[some_func]) # 不必压缩rom.setx(var_name, scan_dict[some_func], persist=True)# 生成一个文件到指定位置def _gen_a_file(self,func_name = None, target_folder = None, tier1 = None, pack_name = None, branch_name ='master'):tier1 = self.tier1 or tier1 assert all([func_name,target_folder,pack_name]), 'unc_name,target_folder,pack_name 不为空'tier2 = '_'.join([pack_name, branch_name])the_space_name = '.'.join([tier1,tier2])rom = RedisOrMongo(the_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065') # 获取 meta,data : data就是代码字符the_data = rom.getx(func_name)target_folder1 = '/'.join([target_folder, pack_name])os.makedirs(target_folder1, exist_ok=True)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder1, filename, filedata)# 列出包的所有函数def _list_file_names_without_extension(self, pack_name):pack_path = self.project_path + pack_namereturn list_file_names_without_extension(pack_path)# 重载包def _reload_package(self, pack_name= None):reload_package(pack_name)# 初始化
git_project_path = './GlobalFunc/' # 切到GlobalFunc下执行
tier1 = 'sp_GlobalFunc' # 对应rom的命名规范
redis_agent_host = 'http://公网IP:24021/' # 需要通过微服务执行
gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )
来看具体的应用和功能代码
2.1.1 查看当前分支
通过subprocess 调用git 命令查看分支,这与具体的应用相关。目前都是master。
gfbase._get_current_branch()
# ----
import subprocess
# 获取当前分支
def get_current_branch(directory=None):try:# 构建 git 命令git_command = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']if directory:# 如果指定了目录,则将命令添加到目录中执行git_command = ['git', '-C', directory, 'rev-parse', '--abbrev-ref', 'HEAD']# 运行 git 命令获取当前分支result = subprocess.check_output(git_command)# 将结果解码为字符串并去除换行符current_branch = result.decode('utf-8').strip()return current_branchexcept subprocess.CalledProcessError:# 如果出现错误,例如不在 Git 仓库中,返回 Nonereturn None# 例子:获取当前分支(在指定的目录下执行)
# current_branch = get_current_branch(directory='/path/to/your/git/repository')
2.1.2 扫描所有文件的信息
对项目下的所有py文件进行扫描,提取元数据,并将代码保存为数据。这个操作是向数据提交函数的前提。
scan_dict = gfbase._get_scan_files()
# ----
import os
import hashlib
import time
import json# 计算文件的 MD5 哈希值
def calculate_file_hash(file_path):md5 = hashlib.md5()with open(file_path, "rb") as f:while True:data = f.read(65536)if not data:breakmd5.update(data)return md5.hexdigest()def get_line_count(file_path):with open(file_path, "r", encoding="utf-8") as f:return sum(1 for line in f)def get_file_metadata(file_path):metadata = {}try:stat = os.stat(file_path)metadata["hash"] = calculate_file_hash(file_path)# python似乎没法正确读取create_time, linux的stat命令可以显示正确时间,但是不同的系统间格式又不同,算了# metadata["created_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_ctime))metadata["modified_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime))metadata["line_count"] = get_line_count(file_path)except Exception as e:print(f"Error getting metadata for {file_path}: {str(e)}")return metadatadef get_file_data(file_path):data = Nonetry:with open(file_path, "r") as f:content = f.read()# 使用 json.dumps 将文件内容序列化为 JSON 格式data = json.dumps(content)except Exception as e:print(f"Error getting data for {file_path}: {str(e)}")return datadef build_key(root= None, xfile=None,directory_path = None, base_directory = None):relative_path = os.path.relpath(root, directory_path)relative_path = relative_path.replace('.', '') # 将 . 替换为空字符串if relative_path == '.':return xfileelse:return f"{base_directory}.{relative_path.replace(os.path.sep, '.')}.{xfile}"def scan_directory(directory_path):result = {}base_directory = os.path.basename(directory_path)# 遍历指定目录及其子目录for root, dirs, files in os.walk(directory_path):# 过滤以 . 开头的文件夹 和以下划线开头的文件夹dirs[:] = [d for d in dirs if not d.startswith('.') and not d.startswith('_')]for file in files:if not file.startswith(".") and not file.startswith("_") and file != "__init__.py" and not file.endswith('.pkl'):file_path = os.path.join(root, file)key = build_key(root, file, directory_path, base_directory)key = key.lstrip('.')value = {}value["meta"] = get_file_metadata(file_path)value["meta"]['name'] = filedata = get_file_data(file_path)if data is not None:value["data"] = dataresult[key] = valuereturn result# scan_dict = scan_directory('/Users/yukai/m4git/GlobalFunc/')
n [2]: scan_dict = gfbase._get_scan_files()In [3]: scan_dict.keys()
Out[3]: dict_keys(['WMongo_V9000_012.py', 'remark.md', 'test.md', 'RedisOrMongo_v100.py', 'debug_pys.d010_参数化调用.py', 'debug_pys.d002_扫描信息.py', 'debug_pys.d008_列出某个包的所有函数.py', 'debug_pys.d006_存储一个函数信息.py',...In [4]: scan_dict['Base.inverse_time_str.py']
Out[4]:
{'meta': {'hash': 'df80f0d4afec7ce0aa0ef00d7bc536ee','modified_time': '2023-10-27 15:25:39','line_count': 11,'name': 'inverse_time_str.py'},'data': '"import time\\n\\ndef inverse_time_str(time_str = None, bias_hours = 0):\\n ts = time.mktime (time.strptime(time_str,\'%Y-%m-%d %H:%M:%S\')) + bias_hours* 3600\\n return ts \\n\\n\'\'\'\\n\\u4f60\\u7684 inverse_time_str \\u51fd\\u6570\\u7528\\u4e8e\\u5c06\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u8f6c\\u6362\\u56de\\u65f6\\u95f4\\u6233\\uff0c\\u8003\\u8651\\u4e86\\u65f6\\u533a\\u504f\\u79fb\\u3002\\u8be5\\u51fd\\u6570\\u63a5\\u53d7\\u4e00\\u4e2a\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u548c\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\uff0c\\u5e76\\u8fd4\\u56de\\u4e00\\u4e2a\\u65f6\\u95f4\\u6233\\uff08\\u4ee5\\u79d2\\u4e3a\\u5355\\u4f4d\\uff09\\u3002\\n\\n\\u4f60\\u7684\\u51fd\\u6570\\u5b9e\\u73b0\\u770b\\u8d77\\u6765\\u662f\\u6b63\\u786e\\u7684\\uff0c\\u5b83\\u4f7f\\u7528\\u4e86 time.mktime \\u548c time.strptime \\u6765\\u6267\\u884c\\u5b57\\u7b26\\u4e32\\u5230\\u65f6\\u95f4\\u6233\\u7684\\u8f6c\\u6362\\u3002\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\u88ab\\u8003\\u8651\\u5728\\u5185\\uff0c\\u4ee5\\u4fbf\\u6839\\u636e\\u9700\\u8981\\u8c03\\u6574\\u65f6\\u95f4\\u6233\\u3002\\n\'\'\'"'}
2.1.3 提交git项目
当开发作为git项目时,需要进行提交。这个在本地分发或者是在分支开发时有用。
gfbase._simple_commit_git()In [5]: gfbase._simple_commit_git()
[master cca9732] None 2024-02-13 13:53:033 files changed, 1 insertion(+)create mode 100644 __pycache__/__init__.cpython-39.pyc
Enumerating objects: 16, done.
Counting objects: 100% (16/16), done.
Delta compression using up to 10 threads
Compressing objects: 100% (9/9), done.
Writing objects: 100% (9/9), 837 bytes | 837.00 KiB/s, done.
Total 9 (delta 7), reused 0 (delta 0), pack-reused 0
To ssh://101.89.161.51:10600/home/git/GlobalFunc.gitdee7abd..cca9732 master -> master
Git操作成功完成。import subprocess
from datetime import datetimedef git_commit_and_push(commit_message= '简单提交', git_directory='.'):try:current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")full_commit_message = f"{commit_message} {current_datetime}"# 添加所有更改subprocess.run(['git', '-C', git_directory, 'add', '.'])# 提交更改subprocess.run(['git', '-C', git_directory, 'commit', '-m', full_commit_message])# 推送到远程仓库subprocess.run(['git', '-C', git_directory, 'push'])print("Git操作成功完成。")except subprocess.CalledProcessError as e:print("Git操作失败:", e)
2.1.4 刷新一个包的初始化文件
在增加了函数之后,这几乎是一定要执行的。增加函数 -> 刷新包初始化文件 -> git 提交,这是一个固定流程。
gfbase._generate_init_py('Base')以下自动为包增加了这行语句,从而导入新开发的函数
...
from .test1_save_test import test1_save_testimport os
def generate_init_py(package_dir):if not os.path.exists(package_dir):print(f"Package directory '{package_dir}' not found.")returnwith open(os.path.join(package_dir, "__init__.py"), "w") as init_py:for filename in os.listdir(package_dir):if filename.endswith(".py") and not filename.startswith("_") and not filename.startswith("."):module_name = filename[:-3] # Remove the .py extensioninit_py.write(f"from .{module_name} import {module_name}\n")
2.1.5 尝试存储一个函数
除了以文件方式增加持久化的方式之外,还需要将函数持久化到数据库。
Base.test1_save_test.py
def test1_save_test():print('test1_save_test')gfbase._save_a_file_rom('Base.test1_save_test.py')In [7]: gfbase._save_a_file_rom('Base.test1_save_test.py')
长度为3段,正确
【Loading cur_w】from pickle
1.当前使用的MongeAgent:http://公网IP:24011/
2.Tier1:NameSpace, Tier2:RedisSpace
3.ConnectionHash:6552982f4bcd003477c5e2170250ccce
4.FilterDict:{'_is_enable': 1}
5.Limits:10000
6.Sort:
7.Skip:0
约定空间名称以sp_开头,节点以node_开头,临时变量以tem_开头
【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master 存在,可以使用
description
2.1.6 读取并生成一个函数 - 需要伴随一个包的初始化文件
这个功能的应用场景是在某个新的位置,重现(可能是部分)包函数。
target_folder = '~/Desktop/test_func'
gfbase._gen_a_file(func_name='from_pickle',target_folder=target_folder,pack_name='Base')【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master 存在,可以使用
description
文件 'from_pickle.py' 已成功创建于路径 '~/Desktop/test_func/Base'import os
def create_file(path, filename, content):# 组合完整的文件路径file_path = os.path.join(path, filename)try:# 打开文件以写入内容with open(file_path, 'w') as file:file.write(content)print(f"文件 '{filename}' 已成功创建于路径 '{path}'")except Exception as e:print(f"创建文件时出现错误:{e}")
2.1.7 列出包的所有函数
通过扫描文件夹下文件列表的方式列出所有函数。
gfbase._list_file_names_without_extension('Base')
{'Naive','WMongo','__init__','clear_digits','color_print','cols2s','create_folder','flat_dict',
...import os
def list_file_names_without_extension(directory):return set(os.path.splitext(file)[0] for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file)))
2.1.8 重载一个包
在调试时可以更新然后重载
gfbase._reload_package('Base')
成功重新加载模块: Base
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)
2.2 执行
需要切换到GlobalFunc的同级目录执行
2.2.1 位置参数调用
这种方式将逐渐被弃用。位置参数主要是方便,但是在函数复杂时并不方便。另外,既然采用间接方式调用,那么以关键字参数显然更加清晰。
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)gfgo = GFGo()
以下执行一个函数,将一个数据 'a’存储为pickle, 文件名为‘a_data.pkl’
import GlobalFunc as funcs
pack_func = 'Base.to_pickle'
args1 = ['a', 'a_data', './']
gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)In [4]: import GlobalFunc as funcs...: pack_func = 'Base.to_pickle'...: args1 = ['a', 'a_data', './']...: gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)...:
data save to pickle: ./a_data.pkl
2.2.2 关键字参数调用
这是未来的主流(规范):约定GlobalFunc函数的参数都以关键字参数
有一个工程是逐步将以前用位置参数的函数全部进行改造。
原函数
import pickle
def to_pickle(data, file_name, path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')
改造后
import pickle
def to_pickle(data = None, file_name = None, path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')
重新调用
# 重载包
In [10]: gfgo._reload_package('GlobalFunc')...:
成功重新加载模块: GlobalFuncIn [11]: pack_func = 'Base.to_pickle'...: kwargs1 = {'data':'a',...: 'file_name':'a_data',...: 'path':'./'}
In [12]: gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)
data save to pickle: ./a_data.pkl
将这个函数也刷新到数据库,重新切回gfbase
gfbase._save_a_file_rom('Base.to_pickle.py')
到这里,基础功能就算告一段落。
3 场景及操作对象封装
先只讨论一种场景,就是镜像场景。
镜像场景是指使用docker容器开发,完成后封装为镜像使用。由于镜像是连同对应的开发环境一并封装的,开箱即用,所以是一个更成熟的方式。
简单来说,就是打开一个容器,将GlobalFunc文件夹拷贝进去,然后将操作对象,例如GFBase或者GFGo拷贝进去,就可以使用了。
源镜像(registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205)是我之前维护的一个常用镜像,镜像装有gpu版 pytorch1.x,并安装了一些其他有用的包。
现在基于源镜像构建,目标是myregistry.domain.com:24052/worker.andy.globalfunc:v101
3.1 建立新容器
docker run -it registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205 bash
3.2 拷贝GlobalFunc包
在算网机上先执行拉取git pull,更新GlobalFunc项目
注意m7.24065.pkl mymeta.pkl两个数据库连接文件要根据不同的情况刷新一下。
在宿主机执行
docker cp /opt/GlobalFunc df5a340570ab:/workspace/
此时
root@df5a340570ab:/workspace# ls
funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e.so GlobalFunc test.ipynb
3.3 拷贝GFGo对象
这里只考虑执行的情况
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)
3.4 提交保存
docker commit df5a340570ab myregistry.domain.com:24052/worker.andy.globalfunc:v101
docker push myregistry.domain.com:24052/worker.andy.globalfunc:v101
小插曲
自建的docker 镜像仓库需要使用证书到期了,因此要重新刷新一下。
制作一个10年的证书
openssl req \-newkey rsa:4096 -nodes -sha256 -keyout /home/certs/domain.key \-addext "subjectAltName = DNS:myregistry.domain.com" \-x509 -days 3650 -out /home/certs/domain.crt
制作好证书后进行分发和授信。
# 1)安装工具包
apt-get install -y ca-certificates
# 2)复制ca.crt
cp /home/certs/domain.crt /usr/local/share/ca-certificates/domain.crt
# 3)更新被信任的CA证书
update-ca-certificates
# 4)重启本地dockerd(此步骤是必须的,否则依然报错x509: certificate signed by unknown authority)
systemctl daemon-reload
systemctl restart docker
要注意的是,镜像仓库的服务由于要存储镜像,需要较大的空间,因此项目的启动位置会存放在存储空间较大的位置。
3.5 调用执行
启动容器
docker run -it --rm myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash
执行命令
from GFGo import GFGo
import GlobalFunc as funcs
gfgo = GFGo()
pack_func = 'Base.to_pickle'
kwargs1 = {'data':'a', 'file_name':'a_data', 'path':'./'}
gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)data save to pickle: ./a_data.pkl
4 服务化
GlobalFunc有两种使用方式:本地模式和API模式,其中本地模式也就是普通的包调用模式,而API模式是本次要讨论的模式。
由于每个GlobalFunc的容器都会有资源限制,且由于计算资源的分布式化,未来其容器(分身)将会呈现随机化的态势。
GlobalFunc本身是为了提供无差别的函数服务,所以端口将采取分配一个地址段的方式,而不是按照规则指定。端口区间采用26000~27000。
一个容器将由所在的宿主机、镜像版本、内存上限、显存上限以及端口号决定。例如: m7_v101_30G_0G_26000表示在m7上运行的容器,采用v101版本,最大内存为30G,最大显存0G(无显卡),占用端口26000.
服务会分为两部分,服务端与客户端。
服务仅接受和返回json字符串,客户端完成格式转换。
所以服务端承担的功能相对简单,而客户端则需要处理相对复杂的翻译功能。另外,服务端所执行的必然是有返回的函数操作,而不能是本地文件操作。例如使用to_pickle,文件将会存在服务端的文件系统。
服务端采用tornado搭建,一方面其格式非常简单,另一方面效率非常高。
安装了tornado包之后,只需要两个文件就可以启动server了。
文件1: server_funcs.py
过去,我会把服务需要的函数放在这里,现在的话,理论上可以不需要,因为所有的依赖函数都会在GlobalFunc包中。
不过从设计的角度,我认为仍然应该保留这个文件(即使是空的)。因为未来有可能有一些服务的配置,以及一些固定的数据库连接可以放在这里。这样会比较便于管理,避免反复建立数据库连接。
文件2: server.py
在当前容器中,必然存在GFGo.py和GlobalFunc, 在server.py中引入他们。同时也存在,即使是空的server_funcs.py。
from server_funcs import *
from GFGo import GFGo
import GlobalFunc as funcs
一个启动示例
from server_funcs import *
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver # http服务器
import tornado.ioloop # ?
import tornado.options # 指定服务端口和路径解析
import tornado.web # web模块
from tornado.options import define, options
import os.path # 获取和生成template文件路径app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动tornado.ioloop.IOLoop.instance().start()
启动服务
python3 server.py
本地调用
import requests as req In [4]: req.get('http://127.0.0.1:8000/').text
Out[4]: '【GET】This is Website for Internal API SystemPlease Refer to API document'In [7]: req.post('http://127.0.0.1:8000/',json = {}).json()
Out[7]: {'info': '【POST】This is Website for Internal API System', 'input_dict': {}}
参数化调用一个函数
函数用于将时间字符串转为时间戳(在统一时间轴之下,已经不再需要用这个函数)
import timedef inverse_time_str(time_str = None, bias_hours = 0):ts = time.mktime (time.strptime(time_str,'%Y-%m-%d %H:%M:%S')) + bias_hours* 3600return ts '''
你的 inverse_time_str 函数用于将时间字符串转换回时间戳,考虑了时区偏移。该函数接受一个时间字符串和偏移小时数,并返回一个时间戳(以秒为单位)。你的函数实现看起来是正确的,它使用了 time.mktime 和 time.strptime 来执行字符串到时间戳的转换。偏移小时数被考虑在内,以便根据需要调整时间戳。
'''
假设我们知道一个时间字符串,并希望知道其东八区时间的时间戳,那么发送请求
import requests as req
pack_func = 'Base.inverse_time_str'
kwargs = {'time_str':'2024-02-15 11:11:11','bias_hours':-8}
req.post('http://127.0.0.1:8000/gfgo/',json = {'pack_func':pack_func,'kwargs':kwargs}).json()1707966671服务端:大约花了0.53ms来完成
[I 240215 08:31:48 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.53ms

到这里,GlobalFunc的基础部分,也就是连通性部分已经完成。之后将可以基于这个基础进行下一步的开发,GF将会成为下一代CalNet的逻辑执行基础。
其他
原来在开发算网(CalNet)的时候,总是假设机器处在局域网中,但是实际上并不是。而且由于CalNet是基于微服务搭建的,网络配置应该作为一项独立的配置独立出来。在外网去调用微服务时,由于数据库操作代理服务默认使用了内网地址,在很多参数传递时又偷懒了,所以连接起来非常不方便。
Next,重新设计操作微服务的对象,通过分离配置、操作引导等方式改进体验。
相关文章:
Python一些可能用的到的函数系列124 GlobalFunc
说明 GlobalFunc是算网的下一代核心数据处理基础。 算网是一个分布式网络,为了能够实现真的分布式计算(加快大规模任务执行效率),以及能够在很长的时间内维护不同版本的计算方法,需要这样一个对象/服务来支撑。Globa…...
python中线程/线程池,进程/进程池的创建
创建子线程 # 创建子线程t1 threading.Thread(targetjob,args(1,))# 执行子线程t1.start()# 等待子线程执行print("waiting threading")t1.join()print("threading done")创建子进程 # 创建子进程p1 multiprocessing.Process(targetjob,args(1,),name&qu…...
【c++】vector的增删查改
1.先定义一个类对象vector 为了防止和库里面发生冲突,定义一个命名空间,将类对象放在命名空间 里面 #include<iostream> using namespace std; namespace zjw {class vector {public:private:}; }2.定义变量,需要一个迭代器ÿ…...
【研究生复试】计算机软件工程人工智能研究生复试——资料整理(速记版)——JAVA
1、JAVA 2、计算机网络 3、计算机体系结构 4、数据库 5、计算机租场原理 6、软件工程 7、大数据 8、英文 自我介绍 1. Java 1. 和 equals的区别 比较基本数据类型是比较的值,引用数据类型是比较两个是不是同一个对象,也就是引用是否指向同 一个对象&…...
JVM-JVM中对象的生命周期
申明:文章内容是本人学习极客时间课程所写,文字和图片基本来源于课程资料,在某些地方会插入一点自己的理解,未用于商业用途,侵删。 原资料地址:课程资料 对象的创建 常量池检查:检查new指令是否能在常量池…...
RegExp正则表达式左限定右限定左右限定,预查询,预查寻,断言 : (?<= , (?= , (?<! , (?!
RegExp正则表达式左限定右限定左右限定,预查询,预查寻,断言 : (?< , (? , (?<! , (?! 有好多种称呼 (?< , (? , (?<! , (?! 有好多种称呼 , 我称为: 左限定, 右限定, 左否定, 右否定 (?<左限定) (?右限定)(?<!左否定) (?!右限定) 再…...
相机图像质量研究(30)常见问题总结:图像处理对成像的影响--重影
系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…...
问题记录——c++ sort 函数 和 严格弱序比较
引出 看下面这段cmp函数的定义 //按照vector第一个元素升序排序 static bool cmp(const vector<int>& a, const vector<int>& b){return a[0] < b[0]; }int eraseOverlapIntervals(vector<vector<int>>& intervals) {//按区间左端排序…...
《Go 简易速速上手小册》第9章:数据库交互(2024 最新版)
文章目录 9.1 连接数据库 - Go 语言的海底宝藏之门9.1.1 基础知识讲解安装数据库驱动数据库连接 9.1.2 重点案例:用户信息管理系统准备数据库Go 代码实现连接数据库添加新用户查询用户信息用户登录验证主函数 9.1.3 拓展案例 1:批量添加用户准备数据库Go…...
redis的hash数据结构底层简记
hash:k和v都是string的hash表。 HSET(设置集合数据,4.0之前只能设置1个,之后可以设置多个),HSETNX(若k不存在则设置对应v),HDEL(删除指定kv,可以一次删除多个)…...
清除Django的管理员admin站点中“Recent Actions“最近活动面板上的所有信息
清除Django的管理员admin站点中"Recent Actions"最近活动面板上的所有信息 本文主要介绍了如何清除Django的管理员admin站点中"Recent Actions"最近活动面板上的所有信息 操作步骤如下 进入Django项目目录中运行代python manage.py shell进入Django shell…...
【JVM篇】ThreadLocal中为什么要使用弱引用
文章目录 🍔ThreadLocal中为什么要使用弱引用⭐总结 🍔ThreadLocal中为什么要使用弱引用 ThreadLocal可以在线程中存放线程的本地变量,保证数据的线程安全 ThreadLocal是这样子保存对象的: 在每个线程中,存放了一个…...
Stable Diffusion教程——stable diffusion基础原理详解与安装秋叶整合包进行出图测试
前言 在2022年,人工智能创作内容(AIGC)成为了AI领域的热门话题之一。在ChatGPT问世之前,AI绘画以其独特的创意和便捷的创作工具迅速走红,引起了广泛关注。随着一系列以Stable Diffusion、Midjourney、NovelAI等为代表…...
【JavaEE】_线程与多线程的创建
目录 1. 线程的概念 2. 创建与使用多线程 2.1 方式1:继承Thread类 2.2 方式2: 实现Runnable接口 2.3 以上两种创建线程方式的对比 3. 多线程的优势-增加运行速度 1. 线程的概念 进程的存在是由于系统的多任务执行需求,这也要求程序员进…...
【前端工程化面试题】如何优化提高 webpack 的构建速度
使用最新版本的 Webpack 和相关插件: 每个新版本的 Webpack 都会带来性能方面的改进和优化,因此始终确保你在使用最新版本。同时,更新你的相关插件也是同样重要的。 使用DllPlugin动态链接库: 使用DllPlugin和DllReferencePlugin来将第三方库的代码进行…...
免费chatgpt使用
基本功能如下: https://go.aigcplus.cc/auth/register?inviteCode3HCULH2UD...
OpenCV识别人脸案例实战
使用级联函数 基本流程 函数介绍 在OpenCV中,人脸检测使用的是cv2.CascadeClassifier.detectMultiScale()函数,它可以检测出图片中所有的人脸。该函数由分类器对象调用,其语法格式为: objects cv2.CascadeClassifier.detectMul…...
VOSK——离线语音库
文章目录 识别函数调用添加自定义热词表1. SetWords2. SetLatticeWords3. SetPartialWords使用示例注意1. SetMaxAlternatives2. SetNLSML3. SetSpkModel4. SetGrammar使用示例注意SetLogLevel示例用法注意事项 识别函数调用 在使用Vosk库进行语音识别时,PartialRe…...
ELAdmin 隐藏添加编辑按钮
使用场景 做了一个监控模块,数据都是定时生成的,所以不需要手动添加和编辑功能。 顶部不显示 可以使用 true 或者 false 控制现实隐藏 created() {this.crud.optShow {add: false,edit: false,del: true,download: true,reset: true}},如果没有 crea…...
浅谈Websocket
由于 http 存在⼀个明显的弊端(消息只能有客户端推送到服务器端,⽽服务器端不能主动推送到客户端),导致如果服务器如果有连续的变化,这时只能使⽤轮询,⽽轮询效率过低,并不适合。于是 WebSocket 被发明出来 WebSocket 是⼀种在 Web 应⽤程序中实现双向通信的协议。与传…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
