简单网站建设价格,网站开发外包报价,网站建设季度考核评价工作,wordpress二次开发主题MPC算法#xff0c;在行动前推演一下
MPC#xff08;Model Predictive Control#xff0c;模型预测控制#xff09;是一种先进的控制策略#xff0c;它利用未来预测模型来优化当前的控制动作。MPC的核心思想是#xff0c;在每一个控制步骤中#xff0c;都基于当前系统状…MPC算法在行动前推演一下
MPCModel Predictive Control模型预测控制是一种先进的控制策略它利用未来预测模型来优化当前的控制动作。MPC的核心思想是在每一个控制步骤中都基于当前系统状态的测量值通过预测模型向前看若干步并计算出一个使未来输出最优化的控制序列。然后只应用这个序列的第一个控制动作并在下一个时间步重复这个过程。
MPC算法的主要特点包括 滚动时域优化MPC在每一个控制步骤中解决一个有限时间范围内的优化问题而不是试图直接解决整个控制任务。 模型预测利用系统模型来预测未来的行为。这个模型可以是物理模型也可以是基于数据的模型。 约束处理MPC可以自然地处理系统的输入和输出约束例如限制控制输入的最大和最小值或者状态变量的运行范围。 优化目标通常包括跟踪预定轨迹、最小化能耗、保证系统稳定性等。 反馈校正由于实际系统与模型之间存在差异MPC通常会在每个控制周期中使用最新的测量状态来更新预测并重新优化。
MPC算法的一般步骤包括 状态测量获取当前系统状态的实际测量值。 模型预测使用当前状态和预测模型来预测未来的状态和输出。 优化问题设置定义目标函数和约束条件设置优化问题。 求解优化问题求解优化问题得到最优控制序列。 应用控制动作将优化问题的解即控制序列的第一个控制动作应用到系统上。 重复过程在下一个控制周期重复上述步骤。
MPC算法广泛应用于化工、石油、交通、航空等众多领域特别是在需要考虑复杂约束和多步预测的场合。然而MPC算法的性能在很大程度上依赖于模型的准确性和优化算法的选择。随着计算能力的提升和优化算法的发展MPC在实际工业应用中的使用越来越广泛。
import gym
#创建环境
env gym.make(Pendulum-v1)定义样本池对象
import numpy as np
import torchclass Pool:def __init__(self,limit):#样本池self.datas []self.limit limitdef add(self,state,action,reward,next_state,over):if isinstance(state,np.ndarray) or isinstance(state,torch.Tensor):#检查变量 state 是否是 bo.ndarray 或者 torch.Tensor 类型state state.reshape(3).tolist()action float(action)reward float(reward)if isinstance(next_state,np.ndarray) or isinstance(next_state,torch.Tensor):next_state next_state.reshape(3).tolist()over bool(over)self.datas.appen((state,action,reward,next_state,over))#数据上限超出时从最古老的开始删除while len(self.datas)self.limit:self.datas.pop(0)#获取一批数据样本def get_sample(self):samples self.datasstate torch.FloatTensor([i[0] for i in samples]).reshape(-1,3)action torch.FloatTensor([i[1] for i in samples]).reshape(-1,1)reward torch.FloatTensor([i[2] for i in samples]).reshape(-1,1) next_state torch.FloatTensor([i[3]for i in samples]).reshape(-1,3)over torch.LongTensor([i[4]for i in samples]).reshape(-1,1)input torch.cat([state,action],dim1)label torch.cat([rewarrd,next_state-state],dim1)return input,labeldef _len_(self):return len(self_datas)
初始化样本池并添加一局游戏的数据
pool Pool(1000000)#初始化一局游戏的数据
def _():#初始化游戏state env.reset()#玩到游戏结束为止over Falsewhile not over:#随机一个动作action env.action_space.sample()[0]#执行动作得到反馈next_state,reward,over,_ env.step([action])#记录数据样本pool.add(state,action,reward,next_state,over)#更新游戏状态开始下一个动作state next_state
定义主模型
import random
#定义主模型
class Model(torch.nn.Module):#swish激活函数class Swish(torch.nn.Module):def __init__(self):super().__init__()def forward(self,x):return X*torch.sigmoid(x)
主模型中的FC层
#定义一个工具层
class FCLayer(torch.nn.Module):def __init__(self,in_size,out_size):super().__init__()self.in_size in_size#初始化参数std in_size**0.5std *2std 1/stdweight torch.empty(5, in_size,out_size)torch.nn.init.normal_(weight,mean0.0,stdstd)self.weight torch.nn.Parameter(weighr)self.bias torch.nn.Parameter(torch.zeros(5,1,out_size))def forward(self,x):x torch.bmm(x,self.weight)x xself.biasreturn x
主模型初始化函数
def __init__(self):super().__init__()self.sequential torch.nn.Sequential(self.FCLayer(4,200), #全连接层线性层self.Swish(), #激活函数层self.FCLayer(200,200),self.Swish(),self.FCLayer(200,200),self.Swish(), self.FCLayer(200,200),self.Swish(),self.FCLayer(200,8),torch.nn.Identity(),)self.softplus torch.nn.Softplus() #Softplus 是一个平滑的ReLU激活函数self.optimizer torch.optim.Adam(self.parameters(),lr1e-3)
主模型计算过程,计算结果是一个均值和log方差对logvar的加减操作是为了调整logvar的最大最小值
def forward(self,x):x self.sequential(x)mean x[...,:4]logvar x[...,4:]logvar 0.5 - logvarlogva 0.5 -self.softplus(logvar)logvar logvar10logvar self.softplus(logvar)-10return mean,logvar
主模型的训练函数
def train(self,input,label):#反复训练N次for _ in range(len(input)//64*20):#从全量数据中抽样64个反复抽5遍形成5份数据select [torch.randperm(len(input))[:64] for _ in range(5)]select torch.stack(select)input_select input[select]label_select label[select]del select#模型计算mean,logvar model(input_select)#计算lossmse_loss (mean - label_select)**2*(-logvar).exp()mse_loss mse_loss.mean(dim1).mean()var_losslogvar.mean(dim1).mean()loss mse_lossvar_lossself.optimizer.zero_grad()loss.backward()self.optimizer.step()
初始化主模型
model Model()a,b model(torch.randn(5,64,4))
a.shape,b.shape
MPC fake step函数功能是根据state和action估计reward和next_state
class MPC:def _fake_step(self,state,action):input torch.cat([state,action],dim1)#重复5遍input input.unsqueeze(dim0).repeat([5,1,1])#模型计算with torch.no_grad():mean,std model(input)std std.exp().sqrt()del input#means的后3列加上环境数据mean[:,:,1:]state#重采样sample torch.distributions.Normal(0,1).sample(mean.shape)#0-4的值域采样b个元素select [random.choice(range(5))for _ in range(mean.shape[1])]#重采样结果第0个维度0-4随机选择第二个维度0-b顺序选择sample sample[select,range(mean.shape[1])]#切分一下就成了rewards,next_statereward,next_state sample[:,:1],sample[:,1:]return reward,next_state
MPC cem优化函数虚拟N条动作链并优化M次求与孤寂结果最佳的分布 def _cem_optimizer(self,state,mean):state torch.FloatTensor(state).reshape(1,3)var torch.ones(25)#当前游戏的环境信息复制50次state state.repeat(50,1)#循环5次寻找最优解for _ in range(5):#采样50个标准正态分布数据作为actionactions torch.distributions.Normal(0,1).sample([50,25])#乘以标准差加上均值actions *var**0.5actions mean#计算每条动作序列的累计奖励reward_sum torch.zeros(50,1)#遍历25个动作for i in range(25):action actions[:,i].unsqueeze(dim1)#现在是不能真的去玩游戏的只能去预测reward和next_statereward,next_state self._fake_step(state,action)reward_sum rewardstate next_state#按照reward_sum从小到大排序select torch.sort(reward_sum.squeeze(dim1)).indicesactions actions[select]del select#取发聩最优的10个动作链actions actions[-5:]#在下一次随机动作时希望贴近这些动作的分布new_mean actions.mean(dim 0)new_var actions.var(dim0)#增量更新mean 0.1*mean 0.9*new_meanvar 0.1*var0.9*new_varreturn mean
MPC mpc函数每次动作都预演N次求预演结果最佳的分布
def mpc(self):#初始化动作的分布均值都是0mean torch.zeros(25)reward_sum 0state env.reset()over Falsewhile not over:#当前状态下找25个最优都知道均值actions self._cem_optimize(state,mean)#执行第一个动作action actions[0].item()#执行动作next_state,reward,over,_ env.step([action])#增加数据pool.add(state,action,reward,next_state,over)state next_statereward_sum reward#下个动作的均值在当前动作均值的基础上寻找mean torch.empty(actions.shape)mean[:-1] actions[1:]mean[-1] 0return reward_sum
初始化MPC对象
mpc MPC()
a,b mpc._fake_step(torch.randn(200,3),torch.randn(200,1))
print(a.shape,b.shape)
print(mpc._cem_optimize(torch.randn(1,3),,torch.zeros(25)).shape)
训练
for i in range(10):input,label pool.get_sample()model.train(input,label)reward_sum mpc.mpc()print(i,len(pool),reward_sum)