大多数机器学习项目都是从单线程概念验证开始的,其中每个任务在下一个任务开始之前完成。下面描述的单线程 ML 管道就是一个示例。

但是,在某些时候,您将超出上面所示的管道。这可能是由于数据集不再适合单个进程的内存所致。当您的模型变得复杂并且实验需要数小时甚至数天才能完成时,您也会超出此管道。要解决这些问题,您可以在多个进程之间分配工作并添加并行度。若要完全分发上述管道,需要进行两项修改:

  1. 分发数据处理

  2. 分发模型训练。

单线程管道中的数据处理(或机器学习领域中通常称为数据预处理)通常是通过将整个数据集加载到内存中并在将数据交给模型进行训练之前对其进行转换来完成的。但是,一旦您希望利用分布式训练技术,这种情况就会改变。数据预处理可以从训练循环中以增量方式完成。在我上一篇关于 Ray Data 的文章中,我展示了如何使用 Ray 数据集来分发数据预处理,这些数据集可以映射到预处理任务或 Actor。具体来说,我展示了如何创建 Ray 数据集并将其映射到处理任务。然后,我们通过遍历该数据集并观察每次迭代执行的预处理任务来测试该数据集。此外,我还演示了如何使用 MinIO 作为数据集的源,因为数据集太大而无法完全加载到内存中。这需要在 MinIO 中查询对象列表,以便将对象检索进行训练和编码到我们的预处理任务中。如果您还没有阅读这篇文章,请立即快速阅读。

在这篇文章中,我将展示如何实现完全分发 ML 训练管道所需的第二个修改。我将展示如何在训练循环中分发模型训练和使用映射数据集。当运行需要很长时间才能完成的训练函数时,最好在每个 epoch 之后对模型进行检查点。检查点也是检索最终完全训练模型的方式。这篇文章将向你展示如何将你的模型检查到MinIO。

可视化分布式 ML 管道

在实现分布式管道时,它有助于可视化将要构建的内容。下图是完整的分布式管道的可视化,包括分布式预处理,这在我之前的文章中已经介绍过。在我完成分发模型训练所需的编码任务时,请参阅此图。

上图可能看起来很奇怪,因为用于预处理的任务(或参与者)是从负责分布式训练的 worker 调用的。 但是,请记住我上面的简要描述 - 预处理任务映射到数据集,当我们从训练循环中迭代数据集时,会调用这些任务。

在开始编码之前,让我们更好地了解 Ray Workers。

Ray Worker 是执行包含训练逻辑的 Python 函数的进程。此函数通常包含损失函数和优化器的设置代码,后跟训练模型的纪元循环。Ray Train 通过在集群中创建多个工作线程来分发模型训练。在生产环境中,您的集群可以是 Kubernetes 集群。在开发计算机上;Ray Train 将为每个工人创建一个流程。工作线程的数量决定了并行度,并且是可配置的。

第一个编码任务是创建一个能够在 Ray Worker 中运行的训练函数。

创建分布式训练函数

使用光线训练(和光线数据)的训练函数如下所示。我重点介绍了 Ray 函数的使用,这些函数有助于分布式训练。令人惊讶的是,与在单个线程中实现的函数相比,这需要最少的代码更改。

def train_func_per_worker(training_parameters):
  
   # Train the model and log training metrics.
   model = tu.MNISTModel(training_parameters['input_size'], 

                         training_parameters['hidden_sizes'],
                         training_parameters['output_size'])
   model = ray.train.torch.prepare_model(model)

   # Get the dataset shard for the training worker.
   train_data_shard = train.get_dataset_shard('train')

   loss_func = nn.NLLLoss()
   optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], 

                         momentum=training_parameters['momentum'])

   metrics = {}
   batch_size_per_worker = training_parameters['batch_size_per_worker']
   for epoch in range(training_parameters['epochs']):
       total_loss = 0
       batch_count = 0
       for batch in train_data_shard.iter_torch_batches(batch_size=batch_size_per_worker):
           # Get the images and labels from the batch.
           images, labels = batch['X'], batch['y']
           labels = labels.type(torch.LongTensor)   # casting to long
           images, labels = images.to(device), labels.to(device)

           # Flatten MNIST images into a 784 long vector.
           images = images.view(images.shape[0], -1)
      
           # Training pass
           optimizer.zero_grad()           
           output = model(images)

           loss = loss_func(output, labels)
          
           # This is where the model learns by backpropagating
           loss.backward()
          
           # And optimizes its weights here
           optimizer.step()
          
           total_loss += loss.item()
           batch_count += 1

       metrics = {'training_loss': total_loss/batch_count}
       checkpoint = None
       if train.get_context().get_world_rank() == 0:
           temp_dir = os.getcwd()
           torch.save(model.module.state_dict(), os.path.join(temp_dir, 'mnist_model.pt'))
           checkpoint = Checkpoint.from_directory(temp_dir)
       train.report(metrics, checkpoint=checkpoint)

在此函数中,ray 框架的第一个用途是通过使用 ray.train.torch.prepare_model() 函数修改模型来为分布式训练准备模型。此函数创建一个新模型,该模型能够将梯度和参数与其他工作线程中以相同方式创建的模型同步。

接下来,上面的函数使用 train.get_dataset_shard() 函数获取数据分片。您使用的 worker 越多,这个分片就越小,您的训练运行速度就越快。最后,使用数据集分片的 iter_torch_batches() 方法返回 Pytorch 张量的批次。如果工作人员意外失败,请尝试使用较小的batch_size。在单台计算机上运行分布式训练时,内存不足错误很常见。它们也可能是由于使用过多的工人造成的。

总而言之,每个 worker 在启动分布式训练作业时都会执行此训练函数。每个 worker 将创建一个可以与其他 worker 的模型同步的模型副本,并且每个 worker 将获得自己的数据分片。

启动分布式工作线程

我们将使用下面的函数来创建一个本地集群,并为其配置所需数量的 worker。请注意,当我初始化 Ray 时,我必须告诉它安装 MinIO SDK。Ray 将自动安装自身和 PyTorch。如果您使用的是任何其他库,请将它们添加到 pip 列表中。初始化 Ray Train 后,此函数将检索我在上一篇文章中介绍的 Ray 数据集。

设置分布式训练是通过 TorchTrainer 类完成的 - 它的构造函数传递了我们之前编写的训练函数、训练参数、训练数据集、缩放配置对象和运行配置对象。

def distributed_training(training_parameters, num_workers: int, use_gpu: bool):
   logger = du.create_logger()
  
   logger.info('Initializing Ray.')
   initialize_ray()

   train_data, test_data, load_time_sec = du.get_ray_dataset(training_parameters)

   # Scaling configuration
   scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

   # Initialize a Ray TorchTrainer
   start_time = time()
   trainer = TorchTrainer(
       train_loop_per_worker=train_func_per_worker,
       train_loop_config=training_parameters,
       datasets={'train': train_data},
       scaling_config=scaling_config,
       run_config=get_minio_run_config()
   )
   result = trainer.fit()
   training_time_sec = (time()-start_time)

   logger.info(result)
   logger.info(f'Load Time (in seconds) = {load_time_sec}')
   logger.info(f'Training Time (in seconds) = {training_time_sec}')
  
   model = tu.MNISTModel(training_parameters['input_size'], 

                         training_parameters['hidden_sizes'], 

                         training_parameters['output_size'])
   with result.checkpoint.as_directory() as checkpoint_dir:
       model.load_state_dict(torch.load(os.path.join(checkpoint_dir, "model.pt")))
   tu.test_model(model, test_data)
  
   ray.shutdown()

缩放配置对象(从 ScaleConfig 类创建)告诉 Ray Train 我们需要多少个工作线程,以及我们是否希望使用 GPU。run_config 参数确定 Ray Train 将指标和检查点发送到何处。在后面的章节中,我将展示如何使用 run_config 将数据发送到 MinIO。

在 TorchTrainer 对象中设置所有这些信息后,您可以调用其 fit() 方法,Ray Train 将创建工作线程并在工作线程中运行您的训练函数。此方法将阻塞,直到所有工作线程完成。返回的值将是一个字典,其中包含来自训练的指标和检查点。

让我们更详细地讨论指标和检查点。

报告指标和检查点~

在训练函数的底部,您将看到下面的代码片段。使用指标和检查点调用 train.report() 是 Ray Train 将此信息从工作线程发送回控制代码的方式。

metrics = {'training_loss': total_loss/batch_count}
checkpoint = None
if train.get_context().get_world_rank() == 0:
   temp_dir = os.path.join(os.getcwd(), 'checkpoint')
   torch.save(model.module.state_dict(), os.path.join(temp_dir, 'mnist_model.pt'))
   checkpoint = Checkpoint.from_directory(temp_dir)
train.report(metrics, checkpoint=checkpoint)

此代码仅从“世界排名”为零的工作线程创建检查点。“世界排名”只是识别工人的一种方式。请记住,随着训练的进行,每个工作人员的模型都会与其他工作人员同步。因此,让每个工作线程序列化模型并返回模型是浪费周期。模型也可能变得非常大,因此不必要的检查点会浪费存储空间。

train.report() 中的数据将位于 train.fit() 的返回值中。当您打印出此数据(以下示例)时,您只会看到来自其中一个工作线程的信息 - 此外,数据将仅是上一个纪元期间发送的值。这对检查点来说很有意义 - 大多数时候,您只需要模型的最终状态。然而,就指标而言,这是不幸的。我喜欢查看每个 epoch 的损失值,以确定更多的 epoch 是否会提高准确性,或者我是否超过了最佳损失值。

Result(
  metrics={'loss_0_4': 0.9962198138237},
  path='/Users/keithpij/ray_results/TorchTrainer_2023-12-14_08-43-53/TorchTrainer_d3b70_00000_0_2023-12-14_08-43-56',
  filesystem='local',
  checkpoint=Checkpoint(filesystem=local, path=/Users/keithpij/ray_results/TorchTrainer_2023-12-14_08-43-53/TorchTrainer_d3b70_00000_0_2023-12-14_08-43-56/checkpoint_000004)
)

上面示例输出中显示的检查点是对已序列化到临时目录的模型的引用。让我们看看如何向 MinIO 发送检查点。

向 MinIO 发送检查点

Ray Train 运行会生成报告的指标、检查点和其他工件的历史记录。其中一些工件是 Ray Train 日志,可以帮助您跟踪问题。默认情况下,此信息将保存到临时目录中。但是,您可以将其配置为保存到 MinIO。这是通过前面所示的 TorchTrainer 对象的 run_config 参数完成的。下面的函数将生成一个运行配置,该配置会将运行期间创建的信息发送到 MinIO。训练的 storage_path 参数。RunConfig() 函数是一个 MinIO 存储桶。

def get_minio_run_config():
   import s3fs
   import pyarrow.fs

   s3_fs = s3fs.S3FileSystem(
       key = os.environ['MINIO_ACCESS_KEY'],
       secret = os.environ['MINIO_SECRET_ACCESS_KEY'],
       endpoint_url = os.environ['MINIO_URL']
   )
   custom_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(s3_fs))

   run_config = train.RunConfig(storage_path='ray-train', storage_filesystem=custom_fs)
   return run_config
   

下面的代码片段演示了在创建 TorchTrainer 对象时如何使用此函数。

trainer = TorchTrainer(
   train_loop_per_worker=train_func_per_worker,
   train_loop_config=training_parameters,
   datasets={'train': train_data},
   scaling_config=scaling_config,
   run_config=get_minio_run_config()  # train.RunConfig(storage_path=os.getcwd(), name="ray_experiments")
   )
   result = trainer.fit()

从检查点加载模型

我们要做的最后一件事是从检查点加载经过训练的模型。如果要在生产环境中提供模型,则需要执行此操作。或者,您可以使用模型在训练期间未看到的测试数据来测试模型。在本文的代码示例中,我针对测试集测试模型。

result = trainer.fit()
  
model = tu.MNISTModel(training_parameters['input_size'], 

                      training_parameters['hidden_sizes'], 

                      training_parameters['output_size'])

with result.checkpoint.as_directory() as checkpoint_dir:
   model.load_state_dict(torch.load(os.path.join(checkpoint_dir, "model.pt")))
tu.test_model(model, test_data)

总结

在这篇文章中,我完成了我在上一篇文章中开始的工作,在那里我展示了如何使用 Ray Data 来分发在训练模型之前需要进行的任何预处理。我演示了如何分配模型的训练。我还演示了如何配置 Ray Train 以将指标和检查点发送到 MinIO。最后,我演示了如何加载检查点,以便您可以测试和部署模型。

示例代码可以通过替换数据访问函数和模型作为分布式训练项目的模板。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐