
python代码实现了基于物理信息神经网络(Physics-Informed Neural Network,PINN)的方法来求解一个二维稳态问题
这段代码实现了基于物理信息神经网络(Physics-Informed Neural Network,PINN)的方法来求解一个二维稳态问题(从代码中推测可能是类似泊松方程相关的问题)。代码的主要流程包括定义PINN模型的结构、初始化相关参数、设置训练过程以及进行预测和后处理,最终将一些结果保存到本地文件中。
import sys
sys.path.insert(0, '../Utilities/') # 便于快速导入模块
import tensorflow as tf
import numpy as np
import scipy.io
import time
np.random.seed(1234)
tf.random.set_seed(1234)
tf.compat.v1.disable_eager_execution()
class PINN:
def __init__(self, X_ub, ub, X_f1, layers1 , dudtb): # 定义初始化参数
self.lb = lb
self.uub = uub
self.x_ub = X_ub[:, 0:1]#用于拟合边界条件的边界点
self.y_ub = X_ub[:, 1:2]
self.ub = ub
self.dudtb = dudtb
# x_f1,y_f1为用于拟合控制方程的内点
self.x_f1 = X_f1[:, 0:1]
self.y_f1 = X_f1[:, 1:2]
# I初始化神经网络
self.layers1 = layers1
self.weights1, self.biases1, self.A1 = self.initialize_NN(layers1)
# tf placeholders and graph
self.sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(allow_soft_placement=True,
log_device_placement=True))
# 用占位符定义tf中的变量即其精度,维度,后面会以字典的形式赋值它
self.x_ub_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.x_ub.shape[1]])
self.y_ub_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.y_ub.shape[1]])
self.x_f1_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.x_f1.shape[1]])
self.y_f1_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.y_f1.shape[1]])
# 用占位符定义tf中的变量即其精度,维度,后面会以字典的形式赋值它
# self.x_uq_tf = tf.placeholder(tf.float32, shape=[None, self.x_uq.shape[1]])
# self.y_uq_tf = tf.placeholder(tf.float32, shape=[None, self.y_uq.shape[1]])
# 神经网络预测和损失函数定义
self.ub1_pred = self.net_u1(self.x_ub_tf, self.y_ub_tf)
self.f1_pred = self.net_f(self.x_f1_tf, self.y_f1_tf)
self.dudtb_pred = self.net_dudtb(self.x_ub_tf, self.y_ub_tf)
# self.uq1_pred = self.net_uq(self.x_uq_tf, self.y_uq_tf)
self.loss1 = tf.reduce_mean(tf.square (self.ub -self.ub1_pred)) \
+ 10000*tf.reduce_mean(tf.square(self.f1_pred)) \
+ tf.reduce_mean(tf.square(self.dudtb - self.dudtb_pred))
# self.loss1 = tf.reduce_mean(tf.square(self.ub - self.ub1_pred)) + tf.reduce_mean(tf.square(self.f1_pred))+tf.reduce_mean(tf.square(self.uq - self.uq1_pred))
self.optimizer_Adam = tf.compat.v1.train.AdamOptimizer(0.0008) # 选择优化器和学习率
self.train_op_Adam1 = self.optimizer_Adam.minimize(self.loss1)
init = tf.compat.v1.global_variables_initializer()
self.sess.run(init)
def initialize_NN(self, layers):
weights = []
biases = []
A = []#自适应激活函数的系数,可以不用
num_layers = len(layers)
for l in range(0, num_layers - 1):
W = self.xavier_init(size=[layers[l], layers[l + 1]])
# in_dim = layers[0]
b = tf.Variable(tf.zeros([1, layers[l + 1]], dtype=tf.float32), dtype=tf.float32)
a = tf.Variable(0.05, dtype=tf.float32)
weights.append(W)
biases.append(b)
A.append(a)
return weights, biases, A
# 初始化参数W,b,a
def xavier_init(self, size):
in_dim = size[0]
out_dim = size[1]
xavier_stddev = np.sqrt(2 / (in_dim + out_dim))
# m = tf.Variable(np.array([in_dim, out_dim]).flatten(), dtype=tf.float32)
# m = tf.reshape(m, [-1])
# print(m.dtype)
v1 = tf.Variable((tf.random.truncated_normal([in_dim, out_dim], stddev=xavier_stddev, dtype=tf.float32)),
dtype=tf.float32, name='v1')
return v1
#定义激活函数
def neural_net_tanh(self, X, weights, biases, A):
num_layers = len(weights) + 1
H = 2.0 * (X - self.lb) / (self.uub - self.lb) - 1.0
# H = X
for l in range(0, num_layers - 2):
W = weights[l]
b = biases[l]
H = tf.tanh(20*A[l]*tf.add(tf.matmul(H, W), b))###################################
# H = tf.tanh(20*A[l]*tf.add(tf.matmul(H, W), b))#修改激活函数tanh为sin cos sigmoid
# H = tf.sin(tf.add(tf.matmul(H, W), b)) # 非适用激活函数
W = weights[-1]
b = biases[-1]
Y = tf.add(tf.matmul(H, W), b)
return Y
#用于神经网络计算温度
def net_u1(self, x, y):
u = self.neural_net_tanh(tf.concat([x, y], 1), self.weights1, self.biases1, self.A1)
return u
def net_dudtb(self, x, y):
dudtb1 = self.neural_net_tanh(tf.concat([x, y], 1), self.weights1, self.biases1, self.A1)
return dudtb1
# 用于神经网络计算控制方程残差
def net_f(self, x1, y1):
# Sub-Net1
u1 = self.net_u1(x1, y1)
u1_x = tf.gradients(u1, x1)[0]
u1_y = tf.gradients(u1, y1)[0]
u1_xx = tf.gradients(u1_x, x1)[0]
u1_yy = tf.gradients(u1_y, y1)[0]
# Residuals##################################################
f1 = u1_xx + tf.multiply(y1,u1) -5* tf.sin(x1)
return f1
#计算热流
# def net_uq(self, xq, yq):
# # Sub-Net1
# uq = self.net_u1(xq, yq)
#
# uq_y = tf.gradients(uq, yq)[0]
#
# uq_yy = tf.gradients(uq_y, yq)[0]
# return uq_yy
# return f1 u1_yy
def callback(self, loss):
print(loss)
#训练
def train(self, nIter, X_star1, u_exact1):
# 将之前定义的占位符变量赋值
tf_dict = {self.x_ub_tf: self.x_ub, self.y_ub_tf: self.y_ub, self.x_f1_tf: self.x_f1,
self.y_f1_tf: self.y_f1}
MSE_history1 = []
l2_err = []
for it in range(nIter):
self.sess.run(self.train_op_Adam1, tf_dict)
if it % 20 == 0:#每20步迭代显示结果一次
loss1_value = self.sess.run(self.loss1, tf_dict)
# Predicted solution
u_pred1 = model.predict(X_star1)
l2_error = np.linalg.norm(u_exact1 - u_pred1, 2) / np.linalg.norm(u_exact1, 2)
print('It: %d, Loss1: %.3e, L2_err: %.3e' %
(it, loss1_value, l2_error))
MSE_history1.append(loss1_value)
l2_err.append(l2_error)
return MSE_history1, l2_err
#预测
def predict(self, X_star1):
u_star1 = self.sess.run(self.ub1_pred, {self.x_ub_tf: X_star1[:, 0:1], self.y_ub_tf: X_star1[:, 1:2]})
return u_star1
#主程序
if __name__ == "__main__":
# 边界点数
N_ub = 100
# 控制方程点数
N_f1 = 3000
# 神经网络结构
layers1 = [2, 20, 20, 20, 20, 1]
# 从MATLAB中 加载数据
data = scipy.io.loadmat(r'C:\Users\许晋嘉\Desktop\pinn\二维稳态基础算例\10.25算例.mat') ## XPINN_2D_PoissonEqn
x_f1 = data['x_f1'].flatten()[:, None]
y_f1 = data['y_f1'].flatten()[:, None]
xb = data['xb'].flatten()[:, None]
yb = data['yb'].flatten()[:, None]
ub_train = data['ub'].flatten()[:, None]
u_exact = data['u_exact'].flatten()[:, None]
dudtb_train = data['dudtb'].flatten()[:, None]
#将同类数据连接在一块
X_f1_train = np.hstack((x_f1.flatten()[:, None], y_f1.flatten()[:, None]))
X_ub_train = np.hstack((xb.flatten()[:, None], yb.flatten()[:, None]))
X_star1 = np.hstack((x_f1.flatten()[:, None], y_f1.flatten()[:, None]))
lb = X_star1.min(0)
uub = X_star1.max(0)
# Randomly select the residual points from sub-domains
idx1 = np.random.choice(X_f1_train.shape[0], N_f1, replace=False)
X_f1_train = X_f1_train[idx1, :]
# Randomly select boundary points
idx4 = np.random.choice(X_ub_train.shape[0], N_ub, replace=False)
X_ub_train = X_ub_train[idx4, :]
ub_train = ub_train[idx4, :]
# XPINN model
model = PINN(X_ub_train, ub_train, X_f1_train, layers1, dudtb_train)
# 训练
Max_iter = 30000
# 修改最大迭代次数,建议你算的时候改小一点5000,10000,都可以,计算一次很慢,等到后面论文里需要的数据迭代次数就得大一点
start_time = time.time()
MSE_hist1, l2_err = model.train(Max_iter, X_star1, u_exact)
elapsed = time.time() - start_time
print('Training time: %.4f' % (elapsed))
# 用训练好的模型预测结果
u_pred1 = model.predict(X_star1)
# 后处理,算相对误差最大值,范数相对误差,保存结果
u_pred = u_pred1
jd_error = (u_exact - u_pred)
xd_error = np.abs(jd_error / (u_exact + 1e-10))
xd_max = max(xd_error)
error_u_total = np.linalg.norm(np.squeeze(u_exact) - u_pred.flatten(), 2) / np.linalg.norm(np.squeeze(u_exact), 2)
print('Error u_total: %e' % (error_u_total))
print(xd_max)
# print(MSE_hist1)
np.savetxt(r'C:\Users\许晋嘉\Desktop\pinn\二维稳态基础算例\xd_error.txt', xd_error)
np.savetxt(r'C:\Users\许晋嘉\Desktop\pinn\二维稳态基础算例\jd_error.txt', jd_error)
np.savetxt(r'C:\Users\许晋嘉\Desktop\pinn\二维稳态基础算例\MSE_hist1.txt', MSE_hist1)
np.savetxt(r'C:\Users\许晋嘉\Desktop\pinn\二维稳态基础算例\u_pred1.txt', u_pred1)
以下是对这段Python代码的详细分析:
代码整体功能概述
这段代码实现了基于物理信息神经网络(Physics-Informed Neural Network,PINN)的方法来求解一个二维稳态问题(从代码中推测可能是类似泊松方程相关的问题)。代码的主要流程包括定义PINN模型的结构、初始化相关参数、设置训练过程以及进行预测和后处理,最终将一些结果保存到本地文件中。
代码结构分析
- 模块导入部分:
import sys
sys.path.insert(0, '../Utilities/') # 便于快速导入模块
import tensorflow as tf
import numpy as np
import scipy.io
import time
导入了必要的模块,包括用于处理系统路径、深度学习框架TensorFlow、数值计算的NumPy、读取MATLAB数据文件的 scipy.io
以及用于记录时间的 time
模块。
- 随机种子设置部分:
np.random.seed(1234)
tf.random.set_seed(1234)
tf.compat.v1.disable_eager_execution()
设置了NumPy和TensorFlow的随机种子,确保每次运行代码时生成的随机数序列是可重复的。同时禁用了TensorFlow的 eager 执行模式,切换到基于图的执行模式(常用于旧版本的TensorFlow编程风格)。
PINN
类定义部分:__init__
方法:
def __init__(self, X_ub, ub, X_f1, layers1, dudtb): # 定义初始化参数
self.lb = lb
self.uub = uub
self.x_ub = X_ub[:, 0:1]#用于拟合边界条件的边界点
self.y_ub = X_ub[:, 1:2]
self.ub = ub
self.dudtb = dudtb
# x_f1,y_f1为用于拟合控制方程的内点
self.x_f1 = X_f1[:, 0:1]
self.y_f1 = X_f1[:, 1:2]
# I初始化神经网络
self.layers1 = layers1
self.weights1, self.biases1, self.A1 = self.initialize_NN(layers1)
# tf placeholders and graph
self.sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(allow_soft_placement=True,
log_device_placement=True))
# 用占位符定义tf中的变量即其精度,维度,后面会以字典的形式赋值它
self.x_ub_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.x_ub.shape[1]])
self.y_ub_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.y_ub.shape[1]])
self.x_f1_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.x_f1.shape[1]])
self.y_f1_tf = tf.compat.v1.placeholder(tf.float32, shape=[None, self.y_f1.shape[1]])
# 用占位符定义tf中的变量即其精度,维度,后面会以字典的形式赋值它
# self.x_uq_tf = tf.placeholder(tf.float32, shape=[None, self.x_uq.shape[1]])
# self.y_uq_tf = tf.placeholder(tf.float32, shape=[None, self.y_uq.shape[1]])
# 神经网络预测和损失函数定义
self.ub1_pred = self.net_u1(self.x_ub_tf, self.y_ub_tf)
self.f1_pred = self.net_f(self.x_f1_tf, self.y_f1_tf)
self.dudtb_pred = self.net_dudtb(self.x_ub_tf, self.y_ub_tf)
# self.uq1_pred = self.net_uq(self.x_uq_tf, self.y_uq_tf)
self.loss1 = tf.reduce_mean(tf.square (self.ub -self.ub1_pred)) \
+ 10000*tf.reduce_mean(tf.square(self.f1_pred)) \
+ tf.reduce_mean(tf.square(self.dudtb - self.dudtb_pred))
# self.loss1 = tf.reduce_mean(tf.square(self.ub - self.ub1_pred)) + tf.reduce_mean(tf.square(self.f1_pred))+tf.reduce_mean(tf.square(self.uq - self.uq1_pred))
self.optimizer_Adam = tf.compat.v1.train.AdamOptimizer(0.0008) # 选择优化器和学习率
self.train_op_Adam1 = self.optimizer_Adam.minimize(self.loss1)
init = tf.compat.v1.global_variables_initializer()
self.sess.run(init)
此方法是类的构造函数,用于初始化 PINN
类的实例对象。它接收边界点数据 X_ub
、边界条件值 ub
、用于拟合控制方程的内点数据 X_f1
、神经网络的层数结构 layers1
和其他相关参数(如 dudtb
)。在方法内部,进行了如下操作:
- 提取输入数据的不同维度信息,作为类的属性保存,用于后续操作。
- 通过 initialize_NN
方法初始化神经网络的权重、偏置等参数。
- 创建TensorFlow的会话(Session
),并定义了输入数据的占位符(placeholder
),用于在训练和预测时传入实际数据。
- 定义了神经网络的预测输出(如 ub1_pred
、f1_pred
等)以及损失函数 loss1
,损失函数综合考虑了边界条件拟合误差、控制方程残差误差等多方面因素,并设置了 Adam
优化器及对应的学习率,最后初始化所有的TensorFlow变量。
- **`initialize_NN` 方法**:
def initialize_NN(self, layers):
weights = []
biases = []
A = []#自适应激活函数的系数,可以不用
num_layers = len(layers)
for l in range(0, num_layers - 1):
W = self.xavier_init(size=[layers[l], layers[l + 1]])
# in_dim = layers[0]
b = tf.Variable(tf.zeros([1, layers[l + 1]], dtype=tf.float32), dtype=tf.float32)
a = tf.Variable(0.05, dtype=tf.float32)
weights.append(W)
biases.append(b)
A.append(a)
return weights, biases, A
用于初始化神经网络的权重、偏置以及自适应激活函数的系数(代码中注释提到可不用)。通过循环按照每层的输入输出维度,使用 xavier_init
方法初始化权重,初始化偏置为全零张量,系数 a
初始化为固定值,并将这些参数分别添加到对应的列表中返回。
- **`xavier_init` 方法**:
def xavier_init(self, size):
in_dim = size[0]
out_dim = size[1]
xavier_stddev = np.sqrt(2 / (in_dim + out_dim))
# m = tf.Variable(np.array([in_dim, out_dim]).flatten(), dtype=tf.float32)
# m = tf.reshape(m, [-1])
# print(m.dtype)
v1 = tf.Variable((tf.random.truncated_normal([in_dim, out_dim], stddev=xavier_stddev, dtype=tf.float32)),
dtype=tf.float32, name='v1')
return v1
按照Xavier初始化方法的原理,根据输入输出维度计算标准差,然后生成符合截断正态分布的随机张量作为神经网络层的权重初始化值并返回。
- **`neural_net_tanh` 方法**:
def neural_net_tanh(self, X, weights, biases, A):
num_layers = len(weights) + 1
H = 2.0 * (X - self.lb) / (self.uub - self.lb) - 1.0
# H = X
for l in range(0, num_layers - 2):
W = weights[l]
b = biases[l]
H = tf.tanh(20*A[l]*tf.add(tf.matmul(H, W), b))###################################
# H = tf.tanh(20*A[l]*tf.add(tf.matmul(H, W), b))#修改激活函数tanh为sin cos sigmoid
# H = tf.sin(tf.add(tf.matmul(H, W), b)) # 非适用激活函数
W = weights[-1]
b = biases[-1]
Y = tf.add(tf.matmul(H, W), b)
return Y
定义了神经网络的前向传播过程,使用 tanh
作为激活函数(代码中也给出了可以替换为其他激活函数如 sin
、cos
、sigmoid
的注释),通过循环对输入数据依次经过各层的线性变换和激活函数操作,最后返回输出结果。
- **`net_u1`、`net_dudtb`、`net_f` 等方法**:
def net_u1(self, x, y):
u = self.neural_net_tanh(tf.concat([x, y], 1), self.weights1, self.biases1, self.A1)
return u
def net_dudtb(self, x, y):
dudtb1 = self.neural_net_tanh(tf.concat([x, y], 1), self.weights1, self.biases1, self.A1)
return dudtb1
def net_f(self, x1, y1):
# Sub-Net1
u1 = self.net_u1(x1, y1)
u1_x = tf.gradients(u1, x1)[0]
u1_y = tf.gradients(u1, y1)[0]
u1_xx = tf.gradients(u1_x, x1)[0]
u1_yy = tf.gradients(u1_y, y1)[0]
# Residuals##################################################
f1 = u1_xx + tf.multiply(y1,u1) -5* tf.sin(x1)
return f1
这些方法分别用于构建神经网络对不同物理量的预测,比如 net_u1
用于预测温度(从代码上下文推测),net_f
用于计算控制方程的残差,在计算残差过程中涉及到对神经网络输出求偏导数(通过 tf.gradients
函数)来得到二阶偏导数等,以构建符合物理规律的残差表达式。
- **`callback` 方法**:
def callback(self, loss):
print(loss)
简单的回调函数,用于在训练过程中打印损失值,不过在当前代码的 train
方法中并没有实际调用这个回调函数(可能后续可以补充相关调用逻辑用于更灵活的监控训练过程)。
- **`train` 方法**:
def train(self, nIter, X_star1, u_exact1):
# 将之前定义的占位符变量赋值
tf_dict = {self.x_ub_tf: self.x_ub, self.y_ub_tf: self.y_ub, self.x_f1_tf: self.x_f1,
self.y_f1_tf: self.y_f1}
MSE_history1 = []
l2_err = []
for it in range(nIter):
self.sess.run(self.train_op_Adam1, tf_dict)
if it % 20 == 0:#每20步迭代显示结果一次
loss1_value = self.sess.run(self.loss1, tf_dict)
# Predicted solution
u_pred1 = model.predict(X_star1)
l2_error = np.linalg.norm(u_exact1 - u_pred1, 2) / np.linalg.norm(u_exact1, 2)
print('It: %d, Loss1: %.3e, L2_err: %.3e' %
(it, loss1_value, l2_error))
MSE_history1.append(loss1_value)
l2_err.append(l2_error)
return MSE_history1, l2_err
实现了模型的训练过程,在每次迭代中运行优化器对模型参数进行更新,每隔20次迭代计算并打印当前的损失值、预测结果与精确解的 L2
相对误差,并将损失值和相对误差分别记录到对应的列表中,最后返回训练过程中的损失值历史记录和 L2
相对误差历史记录。
- **`predict` 方法**:
def predict(self, X_star1):
u_star1 = self.sess.run(self.ub1_pred, {self.x_ub_tf: X_star1[:, 0:1], self.y_ub_tf: X_star1[:, 1:2]})
return u_star1
用于使用训练好的模型进行预测,将输入数据传入已经构建好的神经网络中,获取预测结果并返回。
- 主程序部分(
if __name__ == "__main__"
块):- 数据准备阶段:
N_ub = 100
N_f1 = 3000
layers1 = [2, 20, 20, 20, 20, 1]
data = scipy.io.loadmat(r'C:\Users\Desktop\pinn\二维稳态基础算例\10.25算例.mat') ## XPINN_2D_PoissonEqn
x_f1 = data['x_f1'].flatten()[:, None]
y_f1 = data['y_f1'].flatten()[:, None]
xb = data['xb'].flatten()[:, None]
yb = data['yb'].flatten()[:, None]
ub_train = data['ub'].flatten()[:, None]
u_exact = data['u_exact'].flatten()[:, None]
dudtb_train = data['dudtb'].flatten()[:, None]
X_f1_train = np.hstack((x_f1.flatten()[:, None], y_f1.flatten()[:, None]))
X_ub_train = np.hstack((xb.flatten()[:, None], yb.flatten()[:, None]))
X_star1 = np.hstack((x_f1.flatten()[:, None], y_f1.flatten()[:, None]))
lb = X_star1.min(0)
uub = X_star1.max(0)
idx1 = np.random.choice(X_f1_train.shape[0], N_f1, replace=False)
X_f1_train = X_f1_train[idx1, :]
idx4 = np.random.choice(X_ub_train.shape[0], N_ub, replace=False)
X_ub_train = X_ub_train[idx4, :]
ub_train = ub_train[idx4, :]
首先定义了边界点数量、控制方程点数以及神经网络的结构层数等参数。然后从MATLAB数据文件(.mat
文件)中加载了多种数据,包括控制方程内点的坐标、边界点坐标、边界条件值、精确解以及其他相关物理量数据,并对数据进行了整理(如拼接、随机选择部分数据等操作),同时计算了数据的最小值和最大值用于后续的数据归一化等处理(虽然代码中没看到明确的归一化使用,但相关计算已经准备好)。
- **模型创建与训练阶段**:
model = PINN(X_ub_train, ub_train, X_f1_train, layers1, dudtb_train)
Max_iter = 30000
start_time = time.time()
MSE_hist1, l2_err = model.train(Max_iter, X_star1, u_exact)
elapsed = time.time() - start_time
print('Training time: %.4f' % (elapsed))
创建了 PINN
类的实例对象,传入准备好的数据进行模型初始化,然后设定最大迭代次数,记录训练开始时间,调用 train
方法进行模型训练,并在训练结束后计算并打印训练所花费的时间。
- **预测与后处理阶段**:
u_pred1 = model.predict(X_star1)
jd_error = (u_exact - u_pred)
xd_error = np.abs(jd_error / (u_exact + 1e-10))
xd_max = max(xd_error)
error_u_total = np.linalg.norm(np.squeeze(u_exact) - u_pred.flatten(), 2) / np.linalg.norm(np.squeeze(u_exact), 2)
print('Error u_total: %e' % (error_u_total))
print(xd_max)
np.savetxt(r'C:\Users\Desktop\pinn\二维稳态基础算例\xd_error.txt', xd_error)
np.savetxt(r'C:\Users\Desktop\pinn\二维稳态基础算例\jd_error.txt', jd_error)
np.savetxt(r'C:\Users\Desktop\pinn\二维稳态基础算例\MSE_hist1.txt', MSE_hist1)
np.savetxt(r'C:\Users\Desktop\pinn\二维稳态基础算例\u_pred1.txt', u_pred1)
使用训练好的模型对给定的数据进行预测,然后计算预测结果与精确解之间
更多推荐
所有评论(0)