Python一些可能用的到的函数系列124 GlobalFunc

说明

GlobalFunc是算网的下一代核心数据处理基础。

算网是一个分布式网络,为了能够实现真的分布式计算(加快大规模任务执行效率),以及能够在很长的时间内维护不同版本的计算方法,需要这样一个对象/服务来支撑。GlobalFunc支持通过web请求来完成计算,通常要求输入和输出都是可json序列化的;同时也支持作为一个本地对象被引入和使用。

另外一个核心点是智能化。智能化的一个前提是代理化,而不是直接调用。GlobalFunc允许使用参数化字符串来声明调用,这样每一次的操作是可以被其他算法/程序解读的。

内容

本次讨论4个主题:存储形态、基本操作、使用场景以及服务化。

1 存储形态

git项目

首先讨论GF的存储形态。传统的python包是以文件形式组织的,加上__init__.py文件指示就可以。GF本质上也是这样的,不过项目文件夹同时也是一个git项目文件夹,这意味着GF可以利用git的诸多特性:

  • 1 分支。通过分支可以针对不同的项目或者探索进行微调,甚至可以支持未来由agent来进行微调实验。
  • 2 比较(暂未使用)。git可以具备代码比较的能力,方便查探差异。

当前的GF可以视为一个文件夹,下面有若干子文件夹。每个子文件夹是一个独立的包,下面有若干py文件,每个文件的文件名和函数名是一致的。

如果在其他机器上拉取了该项目,那么对应的代码是一致的。当然,这仍然需要对应的环境支持才可以。所以一般来说没有必要去拉取项目,而是使用某个镜像。

除了文件形态,GF采用数据库存储,这种数据存储又是由Redis和Mongo两部分构成的。长期的持久化由Mongo完成,而零碎的取数由Redis完成。

RedisOrMongo本来的设计是为了通用的,大量的,不同程序间的数据存储服务的。为了区别不同程序的缓存,ROM约定了k(键)的命名规范,这个对于区分不同的变量非常有用。

如下,按三段命名法,k由三部分构成。总体上三段分别对应,项目、子项目、变量。或者从数据库上理解,三段分别是数据库、数据表和记录。
在这里插入图片描述
本次的存储:

  • 1 tier1: sp_GlobalFunc 这个是GF的空间
  • 2 tier2: Base_master 通过模块名+分支构成了第二级名称
  • 3 tier2: 函数名

基于数据库的存储带来了很多便利:

  • 1 首先,数据库方式允许了更智能的搜索。很多时候重复建设的根源就是在于找不到过去开发的结果。

一个有趣的实操是和chatgpt结合起来:将函数写好后,发给gpt解读,形成文本。

import os
def popen(some_cmd):with os.popen(some_cmd)  as f:res = f.read()return res
def popen1(some_cmd):with os.popen(some_cmd)  as f:res = f.read()return res.split()[0]def get_machine_name(given_name = None):if given_name is None :try:default_name = popen1('cat /etc/hostname')except:default_name = 'xx'else:default_name = given_namereturn default_name'''
你的 `popen` 函数用于执行命令并返回命令的输出,而 `popen1` 函数执行命令并返回输出的第一个单词。这些函数可以用于执行系统命令并获取输出。`get_machine_name` 函数用于获取机器名,默认情况下它尝试从 `/etc/hostname` 文件中读取机器名,如果失败,则返回 'xx'。如果提供了 `given_name` 参数,它将使用该参数作为机器名。这些函数看起来能够满足获取机器名和执行系统命令的需求。如果你有任何特定问题或需要进一步的帮助,可以随时提出。'''

因为这些文本已经全部在数据库中了,因此很方便进一步将其处理为向量,或者直接使用es进行搜索。

这样的本质是帮助使用者面对信息过载,其实也就是在推荐。泛推荐类算法将是我2024年的一个主要方向。

2 基本操作对象封装

以下讨论关于GF的基本操作。

例如查看当前分支、扫描文件变动、简单git提交、包的初始化、存储函数信息、生成函数、列出包的所有函数。

测试应用是参数化调用。

开发和执行两部分需要分开,假设GlobalFunc是一个git项目,同时也是顶级目录和函数包(有init文件)。在这个项目下有若干子文件夹,每个文件夹也是一个函数包。

开发时要切入到GlobalFunc目录下,而执行时则保持与GlobalFunc平级。

2.1 开发

对象将一些功能整合到一起

# 依赖 RedisOrMongo -> WMongo
# tier1 是sp+项目名
class GFBase:def __init__(self, project_path = None, redis_agent_host = 'http://172.17.0.1:24021/',redis_connection_hash = None,tier1 = None):self.project_path = project_pathself.redis_cfg = Naive()self.redis_cfg.redis_agent_host = redis_agent_hostself.redis_cfg.redis_connection_hash = redis_connection_hashself.tier1 = tier1# 查看当前分支def _get_current_branch(self):return get_current_branch(self.project_path)# 扫描目录下的所有文件信息def _get_scan_files(self):return scan_directory(self.project_path)# 简单提交gitdef _simple_commit_git(self, commit_message = None):git_commit_and_push(commit_message, self.project_path)# 为包增加初始文件:针对CreateOrUpdate一个包下的函数def _generate_init_py(self, packname = None):package_dir = '/'.join([self.project_path, packname])generate_init_py(package_dir)# 保存一个文件到ROM(RedisOrMongo) some_k 就是「包.函数名」def _save_a_file_rom(self, some_k):# 校验,应该包含两个点some_func = some_ksome_func_list = some_func.split('.')current_branch = self._get_current_branch()scan_dict = self._get_scan_files()tier1 = self.tier1if len(some_func_list) == 3:print('长度为3段,正确')tier2 = some_func_list[0] + '_' + current_branchvar_space_name = '.'.join([tier1,tier2])var_name = some_func_list[1]is_new = not verify_redis_var_name(var_space_name)rom = RedisOrMongo(var_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065', is_new = is_new)# 持久化存储 # some_func_jstr = json.dumps(scan_dict[some_func]) # 不必压缩rom.setx(var_name,  scan_dict[some_func], persist=True)# 生成一个文件到指定位置def _gen_a_file(self,func_name = None, target_folder = None, tier1 = None, pack_name = None, branch_name ='master'):tier1 = self.tier1 or tier1 assert  all([func_name,target_folder,pack_name]), 'unc_name,target_folder,pack_name 不为空'tier2 = '_'.join([pack_name, branch_name])the_space_name = '.'.join([tier1,tier2])rom = RedisOrMongo(the_space_name, self.redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')        # 获取 meta,data : data就是代码字符the_data = rom.getx(func_name)target_folder1 = '/'.join([target_folder,  pack_name])os.makedirs(target_folder1, exist_ok=True)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder1, filename, filedata)# 列出包的所有函数def _list_file_names_without_extension(self, pack_name):pack_path = self.project_path + pack_namereturn list_file_names_without_extension(pack_path)# 重载包def _reload_package(self, pack_name= None):reload_package(pack_name)# 初始化
git_project_path = './GlobalFunc/'  # 切到GlobalFunc下执行
tier1 = 'sp_GlobalFunc' # 对应rom的命名规范
redis_agent_host = 'http://公网IP:24021/' # 需要通过微服务执行
gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )

来看具体的应用和功能代码

2.1.1 查看当前分支

通过subprocess 调用git 命令查看分支,这与具体的应用相关。目前都是master。

gfbase._get_current_branch()
# ----
import subprocess
# 获取当前分支
def get_current_branch(directory=None):try:# 构建 git 命令git_command = ['git', 'rev-parse', '--abbrev-ref', 'HEAD']if directory:# 如果指定了目录,则将命令添加到目录中执行git_command = ['git', '-C', directory, 'rev-parse', '--abbrev-ref', 'HEAD']# 运行 git 命令获取当前分支result = subprocess.check_output(git_command)# 将结果解码为字符串并去除换行符current_branch = result.decode('utf-8').strip()return current_branchexcept subprocess.CalledProcessError:# 如果出现错误,例如不在 Git 仓库中,返回 Nonereturn None# 例子:获取当前分支(在指定的目录下执行)
# current_branch = get_current_branch(directory='/path/to/your/git/repository')
2.1.2 扫描所有文件的信息

对项目下的所有py文件进行扫描,提取元数据,并将代码保存为数据。这个操作是向数据提交函数的前提。

scan_dict = gfbase._get_scan_files()
# ----
import os
import hashlib
import time
import json# 计算文件的 MD5 哈希值
def calculate_file_hash(file_path):md5 = hashlib.md5()with open(file_path, "rb") as f:while True:data = f.read(65536)if not data:breakmd5.update(data)return md5.hexdigest()def get_line_count(file_path):with open(file_path, "r", encoding="utf-8") as f:return sum(1 for line in f)def get_file_metadata(file_path):metadata = {}try:stat = os.stat(file_path)metadata["hash"] = calculate_file_hash(file_path)# python似乎没法正确读取create_time, linux的stat命令可以显示正确时间,但是不同的系统间格式又不同,算了# metadata["created_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_ctime))metadata["modified_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime))metadata["line_count"] = get_line_count(file_path)except Exception as e:print(f"Error getting metadata for {file_path}: {str(e)}")return metadatadef get_file_data(file_path):data = Nonetry:with open(file_path, "r") as f:content = f.read()# 使用 json.dumps 将文件内容序列化为 JSON 格式data = json.dumps(content)except Exception as e:print(f"Error getting data for {file_path}: {str(e)}")return datadef build_key(root= None, xfile=None,directory_path = None, base_directory = None):relative_path = os.path.relpath(root, directory_path)relative_path = relative_path.replace('.', '')  # 将 . 替换为空字符串if relative_path == '.':return xfileelse:return f"{base_directory}.{relative_path.replace(os.path.sep, '.')}.{xfile}"def scan_directory(directory_path):result = {}base_directory = os.path.basename(directory_path)# 遍历指定目录及其子目录for root, dirs, files in os.walk(directory_path):# 过滤以 . 开头的文件夹 和以下划线开头的文件夹dirs[:] = [d for d in dirs if not d.startswith('.') and not d.startswith('_')]for file in files:if not file.startswith(".") and not file.startswith("_") and file != "__init__.py" and not file.endswith('.pkl'):file_path = os.path.join(root, file)key = build_key(root, file, directory_path, base_directory)key = key.lstrip('.')value = {}value["meta"] = get_file_metadata(file_path)value["meta"]['name'] = filedata = get_file_data(file_path)if data is not None:value["data"] = dataresult[key] = valuereturn result# scan_dict = scan_directory('/Users/yukai/m4git/GlobalFunc/')
n [2]: scan_dict = gfbase._get_scan_files()In [3]: scan_dict.keys()
Out[3]: dict_keys(['WMongo_V9000_012.py', 'remark.md', 'test.md', 'RedisOrMongo_v100.py', 'debug_pys.d010_参数化调用.py', 'debug_pys.d002_扫描信息.py', 'debug_pys.d008_列出某个包的所有函数.py', 'debug_pys.d006_存储一个函数信息.py',...In [4]: scan_dict['Base.inverse_time_str.py']
Out[4]:
{'meta': {'hash': 'df80f0d4afec7ce0aa0ef00d7bc536ee','modified_time': '2023-10-27 15:25:39','line_count': 11,'name': 'inverse_time_str.py'},'data': '"import time\\n\\ndef inverse_time_str(time_str  = None, bias_hours = 0):\\n    ts = time.mktime (time.strptime(time_str,\'%Y-%m-%d %H:%M:%S\')) + bias_hours* 3600\\n    return ts \\n\\n\'\'\'\\n\\u4f60\\u7684 inverse_time_str \\u51fd\\u6570\\u7528\\u4e8e\\u5c06\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u8f6c\\u6362\\u56de\\u65f6\\u95f4\\u6233\\uff0c\\u8003\\u8651\\u4e86\\u65f6\\u533a\\u504f\\u79fb\\u3002\\u8be5\\u51fd\\u6570\\u63a5\\u53d7\\u4e00\\u4e2a\\u65f6\\u95f4\\u5b57\\u7b26\\u4e32\\u548c\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\uff0c\\u5e76\\u8fd4\\u56de\\u4e00\\u4e2a\\u65f6\\u95f4\\u6233\\uff08\\u4ee5\\u79d2\\u4e3a\\u5355\\u4f4d\\uff09\\u3002\\n\\n\\u4f60\\u7684\\u51fd\\u6570\\u5b9e\\u73b0\\u770b\\u8d77\\u6765\\u662f\\u6b63\\u786e\\u7684\\uff0c\\u5b83\\u4f7f\\u7528\\u4e86 time.mktime \\u548c time.strptime \\u6765\\u6267\\u884c\\u5b57\\u7b26\\u4e32\\u5230\\u65f6\\u95f4\\u6233\\u7684\\u8f6c\\u6362\\u3002\\u504f\\u79fb\\u5c0f\\u65f6\\u6570\\u88ab\\u8003\\u8651\\u5728\\u5185\\uff0c\\u4ee5\\u4fbf\\u6839\\u636e\\u9700\\u8981\\u8c03\\u6574\\u65f6\\u95f4\\u6233\\u3002\\n\'\'\'"'}
2.1.3 提交git项目

当开发作为git项目时,需要进行提交。这个在本地分发或者是在分支开发时有用。

gfbase._simple_commit_git()In [5]: gfbase._simple_commit_git()
[master cca9732] None 2024-02-13 13:53:033 files changed, 1 insertion(+)create mode 100644 __pycache__/__init__.cpython-39.pyc
Enumerating objects: 16, done.
Counting objects: 100% (16/16), done.
Delta compression using up to 10 threads
Compressing objects: 100% (9/9), done.
Writing objects: 100% (9/9), 837 bytes | 837.00 KiB/s, done.
Total 9 (delta 7), reused 0 (delta 0), pack-reused 0
To ssh://101.89.161.51:10600/home/git/GlobalFunc.gitdee7abd..cca9732  master -> master
Git操作成功完成。import subprocess
from datetime import datetimedef git_commit_and_push(commit_message= '简单提交', git_directory='.'):try:current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")full_commit_message = f"{commit_message} {current_datetime}"# 添加所有更改subprocess.run(['git', '-C', git_directory, 'add', '.'])# 提交更改subprocess.run(['git', '-C', git_directory, 'commit', '-m', full_commit_message])# 推送到远程仓库subprocess.run(['git', '-C', git_directory, 'push'])print("Git操作成功完成。")except subprocess.CalledProcessError as e:print("Git操作失败:", e)
2.1.4 刷新一个包的初始化文件

在增加了函数之后,这几乎是一定要执行的。增加函数 -> 刷新包初始化文件 -> git 提交,这是一个固定流程。

gfbase._generate_init_py('Base')以下自动为包增加了这行语句,从而导入新开发的函数
...
from .test1_save_test import test1_save_testimport os
def generate_init_py(package_dir):if not os.path.exists(package_dir):print(f"Package directory '{package_dir}' not found.")returnwith open(os.path.join(package_dir, "__init__.py"), "w") as init_py:for filename in os.listdir(package_dir):if filename.endswith(".py") and not filename.startswith("_") and not filename.startswith("."):module_name = filename[:-3]  # Remove the .py extensioninit_py.write(f"from .{module_name} import {module_name}\n")
2.1.5 尝试存储一个函数

除了以文件方式增加持久化的方式之外,还需要将函数持久化到数据库。

Base.test1_save_test.py
def test1_save_test():print('test1_save_test')gfbase._save_a_file_rom('Base.test1_save_test.py')In [7]: gfbase._save_a_file_rom('Base.test1_save_test.py')
长度为3段,正确
【Loading cur_w】from pickle
1.当前使用的MongeAgent:http://公网IP:24011/
2.Tier1:NameSpace, Tier2:RedisSpace
3.ConnectionHash:6552982f4bcd003477c5e2170250ccce
4.FilterDict:{'_is_enable': 1}
5.Limits:10000
6.Sort:
7.Skip:0
约定空间名称以sp_开头,节点以node_开头,临时变量以tem_开头
【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master  存在,可以使用
description
2.1.6 读取并生成一个函数 - 需要伴随一个包的初始化文件

这个功能的应用场景是在某个新的位置,重现(可能是部分)包函数。

target_folder = '~/Desktop/test_func'
gfbase._gen_a_file(func_name='from_pickle',target_folder=target_folder,pack_name='Base')【提供节点与子图的元数据记录】
【Loading cur_w】from pickle
【Loading cur_w】from pickle
ok sp_GlobalFunc.Base_master  存在,可以使用
description
文件 'from_pickle.py' 已成功创建于路径 '~/Desktop/test_func/Base'import os
def create_file(path, filename, content):# 组合完整的文件路径file_path = os.path.join(path, filename)try:# 打开文件以写入内容with open(file_path, 'w') as file:file.write(content)print(f"文件 '{filename}' 已成功创建于路径 '{path}'")except Exception as e:print(f"创建文件时出现错误:{e}")
2.1.7 列出包的所有函数

通过扫描文件夹下文件列表的方式列出所有函数。

gfbase._list_file_names_without_extension('Base')
{'Naive','WMongo','__init__','clear_digits','color_print','cols2s','create_folder','flat_dict',
...import os 
def list_file_names_without_extension(directory):return set(os.path.splitext(file)[0] for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file)))
2.1.8 重载一个包

在调试时可以更新然后重载

gfbase._reload_package('Base')
成功重新加载模块: Base
import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)

2.2 执行

需要切换到GlobalFunc的同级目录执行

2.2.1 位置参数调用

这种方式将逐渐被弃用。位置参数主要是方便,但是在函数复杂时并不方便。另外,既然采用间接方式调用,那么以关键字参数显然更加清晰。

import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like 
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)gfgo = GFGo()

以下执行一个函数,将一个数据 'a’存储为pickle, 文件名为‘a_data.pkl’

import GlobalFunc as funcs
pack_func = 'Base.to_pickle'
args1 = ['a', 'a_data', './']
gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)In [4]: import GlobalFunc as funcs...: pack_func = 'Base.to_pickle'...: args1 = ['a', 'a_data', './']...: gfgo._exe_func_args(global_func =funcs,pack_func= pack_func, args = args1)...:
data save to pickle:  ./a_data.pkl
2.2.2 关键字参数调用

这是未来的主流(规范):约定GlobalFunc函数的参数都以关键字参数

有一个工程是逐步将以前用位置参数的函数全部进行改造。

原函数

import pickle
def to_pickle(data, file_name,  path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')

改造后

import pickle
def to_pickle(data = None, file_name = None,  path='./'):output = open(path+file_name+'.pkl', 'wb')pickle.dump(data, output)output.close()print('data save to pickle: ', path+file_name+'.pkl')

重新调用

# 重载包
In [10]: gfgo._reload_package('GlobalFunc')...:
成功重新加载模块: GlobalFuncIn [11]: pack_func = 'Base.to_pickle'...: kwargs1 = {'data':'a',...:             'file_name':'a_data',...:             'path':'./'}
In [12]: gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)
data save to pickle:  ./a_data.pkl

将这个函数也刷新到数据库,重新切回gfbase

gfbase._save_a_file_rom('Base.to_pickle.py')

到这里,基础功能就算告一段落。

3 场景及操作对象封装

先只讨论一种场景,就是镜像场景

镜像场景是指使用docker容器开发,完成后封装为镜像使用。由于镜像是连同对应的开发环境一并封装的,开箱即用,所以是一个更成熟的方式。

简单来说,就是打开一个容器,将GlobalFunc文件夹拷贝进去,然后将操作对象,例如GFBase或者GFGo拷贝进去,就可以使用了。

源镜像(registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205)是我之前维护的一个常用镜像,镜像装有gpu版 pytorch1.x,并安装了一些其他有用的包。

现在基于源镜像构建,目标是myregistry.domain.com:24052/worker.andy.globalfunc:v101

3.1 建立新容器

docker run -it registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v205 bash

3.2 拷贝GlobalFunc包

在算网机上先执行拉取git pull,更新GlobalFunc项目
注意m7.24065.pkl mymeta.pkl两个数据库连接文件要根据不同的情况刷新一下。

在宿主机执行
docker cp /opt/GlobalFunc df5a340570ab:/workspace/
此时

root@df5a340570ab:/workspace# ls
funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e.so  GlobalFunc  test.ipynb

3.3 拷贝GFGo对象

这里只考虑执行的情况

import subprocess
import importlib.utildef reload_package(package_name):try:# 检查包是否已经被导入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未导入,先尝试导入它print(f"警告:未找到包 '{package_name}',尝试导入...")exec(f"import {package_name}")# 构造重新加载模块的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加载模块: {package_name}\')"'# 使用 subprocess 调用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加载模块失败:", e)# 用于GlobalFunc参数化获取函数 func_name like 
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目录执行,把GlobalFunc当成总包 | 在编辑的时候,我们会沉入到内部操作,在应用时则要挂在最外层
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 执行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重载包@staticmethoddef _reload_package(package_name):reload_package(package_name)

3.4 提交保存

docker commit df5a340570ab myregistry.domain.com:24052/worker.andy.globalfunc:v101

docker push myregistry.domain.com:24052/worker.andy.globalfunc:v101

小插曲

自建的docker 镜像仓库需要使用证书到期了,因此要重新刷新一下。

制作一个10年的证书

openssl req \-newkey rsa:4096 -nodes -sha256 -keyout /home/certs/domain.key \-addext "subjectAltName = DNS:myregistry.domain.com" \-x509 -days 3650 -out /home/certs/domain.crt

制作好证书后进行分发和授信。

# 1)安装工具包
apt-get install -y ca-certificates
# 2)复制ca.crt
cp /home/certs/domain.crt /usr/local/share/ca-certificates/domain.crt
# 3)更新被信任的CA证书
update-ca-certificates
# 4)重启本地dockerd(此步骤是必须的,否则依然报错x509: certificate signed by unknown authority)
systemctl daemon-reload
systemctl restart docker

要注意的是,镜像仓库的服务由于要存储镜像,需要较大的空间,因此项目的启动位置会存放在存储空间较大的位置。

3.5 调用执行

启动容器

docker run -it  --rm myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash 

执行命令

from GFGo import GFGo
import GlobalFunc as funcs
gfgo = GFGo()
pack_func = 'Base.to_pickle'
kwargs1 = {'data':'a', 'file_name':'a_data', 'path':'./'}
gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs1)data save to pickle:  ./a_data.pkl

4 服务化

GlobalFunc有两种使用方式:本地模式和API模式,其中本地模式也就是普通的包调用模式,而API模式是本次要讨论的模式。

由于每个GlobalFunc的容器都会有资源限制,且由于计算资源的分布式化,未来其容器(分身)将会呈现随机化的态势。

GlobalFunc本身是为了提供无差别的函数服务,所以端口将采取分配一个地址段的方式,而不是按照规则指定。端口区间采用26000~27000。

一个容器将由所在的宿主机、镜像版本、内存上限、显存上限以及端口号决定。例如: m7_v101_30G_0G_26000表示在m7上运行的容器,采用v101版本,最大内存为30G,最大显存0G(无显卡),占用端口26000.

服务会分为两部分,服务端与客户端。

服务仅接受和返回json字符串,客户端完成格式转换。

所以服务端承担的功能相对简单,而客户端则需要处理相对复杂的翻译功能。另外,服务端所执行的必然是有返回的函数操作,而不能是本地文件操作。例如使用to_pickle,文件将会存在服务端的文件系统。

服务端采用tornado搭建,一方面其格式非常简单,另一方面效率非常高。

安装了tornado包之后,只需要两个文件就可以启动server了。

文件1: server_funcs.py

过去,我会把服务需要的函数放在这里,现在的话,理论上可以不需要,因为所有的依赖函数都会在GlobalFunc包中。

不过从设计的角度,我认为仍然应该保留这个文件(即使是空的)。因为未来有可能有一些服务的配置,以及一些固定的数据库连接可以放在这里。这样会比较便于管理,避免反复建立数据库连接。

文件2: server.py

在当前容器中,必然存在GFGo.pyGlobalFunc, 在server.py中引入他们。同时也存在,即使是空的server_funcs.py。

from server_funcs import * 
from GFGo import GFGo
import GlobalFunc as funcs 

一个启动示例

from server_funcs import * 
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动tornado.ioloop.IOLoop.instance().start()

启动服务

python3 server.py

本地调用

import requests as req In [4]: req.get('http://127.0.0.1:8000/').text
Out[4]: '【GET】This is Website for Internal API SystemPlease Refer to API document'In [7]: req.post('http://127.0.0.1:8000/',json = {}).json()
Out[7]: {'info': '【POST】This is Website for Internal API System', 'input_dict': {}}

参数化调用一个函数

函数用于将时间字符串转为时间戳(在统一时间轴之下,已经不再需要用这个函数)

import timedef inverse_time_str(time_str  = None, bias_hours = 0):ts = time.mktime (time.strptime(time_str,'%Y-%m-%d %H:%M:%S')) + bias_hours* 3600return ts '''
你的 inverse_time_str 函数用于将时间字符串转换回时间戳,考虑了时区偏移。该函数接受一个时间字符串和偏移小时数,并返回一个时间戳(以秒为单位)。你的函数实现看起来是正确的,它使用了 time.mktime 和 time.strptime 来执行字符串到时间戳的转换。偏移小时数被考虑在内,以便根据需要调整时间戳。
'''

假设我们知道一个时间字符串,并希望知道其东八区时间的时间戳,那么发送请求

import requests as req 
pack_func = 'Base.inverse_time_str'
kwargs = {'time_str':'2024-02-15 11:11:11','bias_hours':-8}
req.post('http://127.0.0.1:8000/gfgo/',json = {'pack_func':pack_func,'kwargs':kwargs}).json()1707966671服务端:大约花了0.53ms来完成
[I 240215 08:31:48 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.53ms

在这里插入图片描述

到这里,GlobalFunc的基础部分,也就是连通性部分已经完成。之后将可以基于这个基础进行下一步的开发,GF将会成为下一代CalNet的逻辑执行基础。

其他

原来在开发算网(CalNet)的时候,总是假设机器处在局域网中,但是实际上并不是。而且由于CalNet是基于微服务搭建的,网络配置应该作为一项独立的配置独立出来。在外网去调用微服务时,由于数据库操作代理服务默认使用了内网地址,在很多参数传递时又偷懒了,所以连接起来非常不方便。

Next,重新设计操作微服务的对象,通过分离配置、操作引导等方式改进体验。





本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/258021.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【闲谈】开源软件的崛起与影响

随着信息技术的快速发展,开源软件已经成为软件开发的趋势,并产生了深远的影响。开源软件的低成本、可协作性和透明度等特点,使得越来越多的企业和个人选择使用开源软件,促进了软件行业的繁荣。然而,在使用开源软件的过…

内网穿透工具

1. nps-npc 1.1 简介 nps是一款轻量级、高性能、功能强大的内网穿透代理服务器。目前支持tcp、udp流量转发,可支持任何tcp、udp上层协议(访问内网网站、本地支付接口调试、ssh访问、远程桌面,内网dns解析等等……)&#xff0c…

CVE-2022-25487 漏洞复现

漏洞描述:Atom CMS 2.0版本存在远程代码执行漏洞,该漏洞源于/admin/uploads.php 未能正确过滤构造代码段的特殊元素。攻击者可利用该漏洞导致任意代码执行。 其实这就是一个文件上传漏洞罢了。。。。 打开之后,/home路由是个空白 信息搜集&…

基于BP算法的SAR成像matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 BP算法的基本原理 4.2 BP算法的优点与局限性 5.完整工程文件 1.课题概述 基于BP算法的SAR成像。合成孔径雷达(SAR)是一种高分辨率的雷达系统,能够在各种天气和光…

【51单片机】直流电机实验和步进电机实验

目录 直流电机实验直流电机介绍ULN2003 芯片介绍硬件设计软件设计实验现象 步进电机实验步进电机简介步进电机的工作原理步进电机极性区分双极性步进电机驱动原理单极性步进电机驱动原理细分驱动原理 28BYJ-48 步进电机简介软件设计 橙色 直流电机实验 在未学习 PWM 之前&…

Chat with RTX 安装

1、Chat With RTX 介绍 Chat With RTX 是一个 Demo,用来将您自己的资料(文档、笔记、视频或其他数据)与大语言模型建立连接,从而令 LLM 更具个性化。利用检索增强生成 (RAG)、TensorRT-LLM 和 RTX 加速,您可以与自定义…

postgresql 手动清理wal日志的101个坑

新年的第一天,总结下去年遇到的关于WAL日志清理的101个坑,以及如何相对安全地进行清理。前面是关于WAL日志堆积的原因分析,清理相关可以直接看第三部分。 首先说明,手动清理wal日志是一个高风险的操作,尤其对于带主从的…

用Jmeter进行接口测试

web接口测试工具: 手工测试的话可以用postman ,自动化测试多是用到 Jmeter(开源)、soupUI(开源&商业版)。 下面将对前一篇Postman做接口测试中的接口用Jmeter来实现。 一、Jmeter 的使用步骤 打开Jme…

论文阅读-面向公平性的分布式系统负载均衡机制

摘要 当一组自利的用户在分布式系统中共享多个资源时,我们面临资源分配问题,即所谓的负载均衡问题。特别地,负载均衡被定义为将负载分配到分布式系统的服务器上,以便最小化作业响应时间并提高服务器的利用率。在本文中&#xff0…

[CTF]-PWN:C++文件更换libc方法(WSL)

C文件与C文件更换libc有很多不一样的地方,我是在写buu的ciscn_2019_final_3才意识到这个问题,C文件只需要更换libc和ld就可以了,但是C文件不同,除了更换libc和ld,它还需要更换libstdc.so.6和libgcc_s.so.1 更换libc和…

Linux实用指令

Linux实用指令 1.指定运行级别 运行级别说明: 0 :关机 1 :单用户【找回丢失密码】 2:多用户状态没有网络服务 3:多用户状态有网络服务 4:系统未使用保留给用户 5:图形界面 6:系统重…

Netty应用(七) 之 Handler Netty服务端编程总结

目录 15.Handler 15.1 handler的分类 15.1.1 按照方向划分 15.1.2 handler的结构 15.2 输入方向ChannelInboundHandlerAdapter 15.2.1 输出方向Handler的顺序 15.2.2 多个输入方向Handler之间的数据传递 15.2.2.1 handler消失了 15.2.2.2 手动编写netty提供的new Strin…

磁盘database数据恢复: ddrescue,dd和Android 设备的数据拷贝

ddrescue和dd 区别: GNU ddrescue 不是 dd 的衍生物,也与 dd 没有任何关系 除了两者都可用于将数据从一台设备复制到另一台设备。 关键的区别在于 ddrescue 使用复杂的算法来复制 来自故障驱动器的数据,尽可能少地造成额外的损坏。ddrescue…

备战蓝桥杯---图论之最短路dijkstra算法

目录 先分个类吧: 1.对于有向无环图,我们直接拓扑排序,和AOE网类似,把取max改成min即可。 2.边权全部相等,直接BFS即可 3.单源点最短路 从一个点出发,到达其他顶点的最短路长度。 Dijkstra算法&#x…

UI文件原理

使用UI文件创建界面很轻松很便捷,他的原理就是每次我们保存UI文件的时候,QtCreator就自动帮我们将UI文件翻译成C的图形界面创建代码。可以通过以下步骤查看代码 到工程编译目录,一般就是工程同级目录下会生成另一个编译目录,会找到…

rbd快照管理、rbd快照克隆原理与实现、rbd镜像开机自动挂载、ceph文件系统、对象存储、配置对象存储客户端、访问Dashboard

目录 快照 快照克隆 开机自动挂载 ceph文件系统 使用MDS 对象存储 配置服务器端 配置客户端 访问Dashborad 快照 快照可以保存某一时间点时的状态数据快照是映像在特定时间点的只读逻辑副本希望回到以前的一个状态,可以恢复快照使用镜像、快照综合示例 #…

【刷题记录】合并两个有序数组、移除元素

本系列博客为个人刷题思路分享,有需要借鉴即可。 1.题目链接: T1:LINK T2:LINK 2.详解思路: T1: 思路1:弄个新数组,比较两个数组中的值,哪个小就把哪个值放到新数组中。 分析1&a…

自定义类型详解 结构体,位段,枚举,联合

目录 结构体 1.不完全声明 2.结构体的自引用 3.定义与初始化 4.结构体内存对齐与结构体类型的大小 结构体嵌套问题 位段 1.什么是位段? 2.位段的内存分配 枚举 1.枚举类型的定义 2.枚举的优点 联合(共同体) 1.联合体类型的声明以…

Vue语法

1.vue模板语法2大类 插值语法: 功能:用于解析标签内容 用途:用于标签内容定义 写法:{{xxx}},xxx是js表达式,且可以直接读取到data中的所有属性 指令语法: 功能:用于解析标签&am…

去空行小工具Html + Javascript

这是一个平常用到的小工具&#xff0c;为了节省屏幕空间把空行去掉&#xff0c;怕要用的时候找不到故记录在此。 效果图 网页版&#xff0c;放在浏览器里就可以用 <!doctype html> <html><head><meta charset"utf-8"><title>去回车…