在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

怎么用Mars Remote API执行Python函数

134次阅读
没有评论

共计 9341 个字符,预计需要花费 24 分钟才能阅读完成。

这篇文章主要讲解了“怎么用 Mars Remote API 执行 Python 函数”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“怎么用 Mars Remote API 执行 Python 函数”吧!

Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。

启动 Mars 分布式环境可以参考:

命令行方式在集群中部署。

Kubernetes 中部署。

MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。

采用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。

from typing import List
import numpy as np
def calc_chunk(n: int, i: int):
 #  计算 n 个随机点(x 和 y 轴落在 - 1 到 1 之间)到原点距离小于 1 的点的个数
 rs = np.random.RandomState(i)
 a = rs.uniform(-1, 1, size=(n, 2))
 d = np.linalg.norm(a, axis=1)
 return (d   1).sum()
def calc_pi(fs: List[int], N: int):
 #  将若干次  calc_chunk  计算的结果汇总,计算  pi  的值
 return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i)
 for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time 下可以看到结果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在单机需要 12.3 s。

要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。

import mars.remote as mr
#  函数调用改成  mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))
 for i in range(N // n)]
#  把  spawn  的列表传入作为参数,再  spawn  新的函数
pi = mr.spawn(calc_pi, args=(fs, N))
#  通过  execute()  触发执行,fetch()  获取结果
print(pi.execute().fetch())

%%time 下看到结果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

结果一模一样,但是却有数倍的性能提升。

可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。

一个例子

为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。

困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。

准备数据

这个例子里我们使用 otto 数据集。

首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
def gen_data():
 df = pd.read_csv(otto/train.csv)
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y)
 
 return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()

模型

接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。

RandomForest:

from sklearn.ensemble import RandomForestClassifier
def random_forest(X_train: pd.DataFrame, 
 y_train: pd.Series, 
 verbose: bool = False,
 **kw):
 model = RandomForestClassifier(verbose=verbose, **kw)
 model.fit(X_train, y_train)
 return model

接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。

def gen_random_forest_parameters():
 for n_estimators in [50, 100, 600]:
 for max_depth in [None, 3, 15]:
 for criterion in [gini ,  entropy]:
 yield {
  n_estimators : n_estimators,
  max_depth : max_depth,
  criterion : criterion
 }

LogisticRegression 也是这个过程。我们先定义模型。

from sklearn.linear_model import LogisticRegression
def logistic_regression(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):
 model = LogisticRegression(verbose=verbose, **kw)
 model.fit(X_train, y_train)
 return model

接着生成供 LogisticRegression 使用的超参。

def gen_lr_parameters():
 for penalty in [l2 ,  none]:
 for tol in [0.1, 0.01, 1e-4]:
 yield {
  penalty : penalty,
  tol : tol
 }

XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。

from xgboost import XGBClassifier
def xgb(X_train: pd.DataFrame,
 y_train: pd.Series,
 verbose: bool = False,
 **kw):
 model = XGBClassifier(verbosity=int(verbose), **kw)
 model.fit(X_train, y_train)
 return model

生成一系列超参。

def gen_xgb_parameters():
 for n_estimators in [100, 600]:
 for criterion in [gini ,  entropy]:
 for learning_rate in [0.001, 0.1, 0.5]:
 yield {
  n_estimators : n_estimators,
  criterion : criterion,
  learning_rate : learning_rate
 }

验证

接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。

from sklearn.metrics import log_loss
def metric_model(model, 
 X_test: pd.DataFrame,
 y_test: pd.Series) -  float:
 if isinstance(model, bytes):
 model = pickle.loads(model)
 y_pred = model.predict_proba(X_test)
 return log_loss(y_test, y_pred)

 #  把训练和验证封装到一起  model = train_func(X_train, y_train, verbose=verbose, **train_params)  metric = metric_model(model, X_test, y_test)  return model, metric

找出最好的模型

做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。

results = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(random_forest, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})
 
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(logistic_regression, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})
 
# -------
# XGBoost
# -------
 
for params in gen_xgb_parameters():
 print(f calculating on {params} )
 # fixed random_state
 params[random_state] = 123
 # use all CPU cores
 params[n_jobs] = -1
 model, metric = train_and_metric(xgb, params,
 X_train, y_train,
 X_test, y_test)
 print(f metric: {metric} )
 results.append({ model : model, 
  metric : metric})

运行一下,需要相当长时间,我们省略掉一部分输出内容。

calculating on {n_estimators : 50,  max_depth : None,  criterion :  gini}
metric: 0.6964123781828575
calculating on {n_estimators : 50,  max_depth : None,  criterion :  entropy}
metric: 0.6912312790832288
#  省略其他模型的输出结果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。

使用 Remote API 分布式加速

现在我们尝试使用 Remote API 通过分布式方式加速整个过程。

集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。

n_cores = 8
mem = 2 * n_cores # 16G
# o  是  MaxCompute  入口,这里创建  10  个  worker  的集群,每个  worker 8 核 16G
cluster = o.create_mars_cluster(10, n_cores, mem, image= extended)

为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。

if not o.exist_resource(otto_train.csv):
 with open(otto/train.csv) as f:
 #  上传资源
 o.create_resource(otto_train.csv ,  file , fileobj=f)
 
def gen_data():
 #  改成从资源读取
 df = pd.read_csv(o.open_resource( otto_train.csv))
 
 X = df.drop([target ,  id], axis=1)
 y = df[target]
 
 label_encoder = LabelEncoder()
 label_encoder.fit(y)
 y = label_encoder.transform(y)
 
 return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。

import mars.remote as mr
# n_output  说明是  4  输出
# execute()  执行后,数据会读取到  Mars  集群内部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_  开头的都是  Mars  对象,这时候数据在集群内,这些对象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,
 train_params: dict,
 X_train: pd.DataFrame, 
 y_train: pd.Series, 
 X_test: pd.DataFrame, 
 y_test: pd.Series,
 verbose: bool = False
 ):
 model, metric = train_and_metric(train_func, train_params,
 X_train, y_train, 
 X_test, y_test, verbose=verbose)
 return pickle.dumps(model), metric

后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。

接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。

import numpy as np
tasks = []
models = []
metrics = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
 # fixed random_state
 params[random_state] = 123
 task = mr.spawn(distributed_train_and_metric,
 args=(random_forest, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和评价分别存储
 models.append(task[0])
 metrics.append(task[1])
 
 
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
 # fixed random_state
 params[random_state] = 123
 task = mr.spawn(distributed_train_and_metric,
 args=(logistic_regression, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和评价分别存储
 models.append(task[0])
 metrics.append(task[1])
# -------
# XGBoost
# -------
 
for params in gen_xgb_parameters():
 # fixed random_state
 params[random_state] = 123
 #  再指定并发为核的个数
 params[n_jobs] = n_cores
 task = mr.spawn(distributed_train_and_metric,
 args=(xgb, params,
 remote_X_train, remote_y_train,
 remote_X_test, remote_y_test), 
 kwargs={verbose : 2},
 n_output=2
 )
 tasks.extend(task)
 #  把模型和评价分别存储
 models.append(task[0])
 metrics.append(task[1])

#  把顺序打乱,目的是能分散到  worker  上平均一点 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代码几乎一致。

运行查看结果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。

细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。

以第 0 个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。

print(models[0].fetch_log())

输出我们简略一部分。

building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
#  中间省略
building tree 49 of 50
building tree 50 of 50

要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。

感谢各位的阅读,以上就是“怎么用 Mars Remote API 执行 Python 函数”的内容了,经过本文的学习后,相信大家对怎么用 Mars Remote API 执行 Python 函数这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计9341字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 成人免费视频一区二区 | 国产高清乱码又大又圆 | 久久99精品久久久久久园产越南 | 四虎hk网址| 99久久99视频 | 97干干干| 黄片一级毛片 | 色综合天天综合网国产成人网 | 一级做a爰性色毛片免费 | 日本理论视频 | 国精产品一二三区传媒公司 | 亚洲经典三级 | 第一福利在线观看永久视频 | 日韩草逼视频 | 大伊香蕉在线精品不卡视频 | 亚洲人成电影院在线观看 | 欧美一级毛片免费看高清 | 久久久久久人妻毛片a片 | 美女被扒开内裤桶屁股眼视频网站 | 尤物精品在线观看 | 无遮挡粉嫩小泬久久久久久久 | 欧美一级视频 | 成人一级大片 | 一级黄色大片视频 | 国产乱码卡二卡三卡老狼 | 国产无遮挡又黄又爽在线观看 | 一级做a爱片特黄在线观看yy | 国产高清在线精品 | 最近中文字幕视频高清 | 真实国产乱啪福利露脸 | 无码人妻丰满熟妇区毛片 | 影音先锋人妻啪啪av资源网站 | 国内最真实的xxxx人伦 | 色妺妺在线视频 | 亚洲人的天堂男人爽爽爽 | 日韩精品免费视频 | www婷婷av久久久影片 | 欧美精品亚洲精品日韩经典 | 国产真实老熟女无套内射 | 人妻系列av无码专区 | 亚洲av无码av制服另类专区 |