《深度强化学习实践》Actor-Critic

《深度强化学习实践》Actor-Critic

  • 算法
    • 一、baseline
      • 原理
      • 代码
    • 二、Actor-Critic
      • 代码
        • 参数部分
        • 多个环境同时
        • 网络
        • the same of 下面连续的对应部分
        • loss
          • Value
          • Policy
          • entroy
    • 三、连续动作Actor-Critic
      • 代码
        • 参数
        • env
        • agent
        • 优化器
        • batch和best_reward
        • wtiter
        • 获取样本
        • 记录
        • test
        • batch数据解压
        • loss
          • vloss
          • p loss
          • e loss
  • 小结
  • 应用

算法

一、baseline

原理

策略梯度算法
目标:通过训练,增加好动作的概率,减小不好动作被采集到的概率。
实现 ▽ J ≈ E [ Q ( s , a ) ▽ log ⁡ π ( a ∣ s ) ] {\triangledown }J\approx E[Q(s,a){\triangledown }\log \pi (a|s)] JE[Q(s,a)logπ(as)]

缺点不稳定,收敛速度慢的缺点(因此actor-critic算法致力于解决这两个问题。)

对于稳定这个问题,从数学上,可以通过减小梯度的方差来实现

在Reinforce方法中,使用贴现的总奖励作为策略梯度中关于net梯度前的系数。
下面主要指更新曲折。

对于一个有3种可以选择的动作的状态而言。
在情况一中,3个动作的Q值有两个正一个负,则一定会在前两个正的动作中进行选择;
在情况二中,3个动作的Q值三个都是正的,但是第三个动作的Q值特别小,但是在这种情况下,这个动作不可避免会被选到。
为了尽可能降低这种状况的影响,我们可以采取采样多个样本来应对这种情况;同时我们可以采用一种Q-Learning中提到的baseline的方法。

通过采用baseline的方法
1)、既可以产生有区分的动作,
2)、同时也可以减小训练过程中多次实验带来的方差
从而达到稳定的目的。
但是要注意,这里所说的baseline,是与状态无关的。

代码

代码如下
1、获取参数

GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
BATCH_SIZE = 8

REWARD_STEPS = 10
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--baseline", default=False, action='store_true', help="Enable mean baseline")
    args = parser.parse_args()

2、创建环境

env = gym.make("CartPole-v0")

3、记录器

writer = SummaryWriter(comment="-cartpole-pg" + "-baseline=%s" % args.baseline)

4、定义网络

 net = PGN(env.observation_space.shape[0], env.action_space.n)
    print(net)

其中net为
class PGN(nn.Module):
def init(self, input_size, n_actions):
super(PGN, self).init()
self.net = nn.Sequential(
nn.Linear(input_size, 128),
nn.ReLU(),
nn.Linear(128, n_actions)
)
def forward(self, x):
return self.net(x)

5、定义行动者

    agent = ptan.agent.PolicyAgent(net, preprocessor=ptan.agent.float32_preprocessor,
                                   apply_softmax=True)

这里采用ptan中的打包好的策略智能体。

ptan.agent.float32_preprocessor表示数据格式时float32的tensor,使用softmax,默认根据概率分布选择动作。
关于softmax中维度的选择,参见链接,默认axis=1,就是得到行中最大值的索引。

6、定义“经验收集装置”

exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=GAMMA, steps_count=REWARD_STEPS)

定义好之后,可以通过之后的for循环不断获取一个episode的值,其中每个episode中步数为REWARD_STEPS。

经验收集有两个函数可以实现:
ExperienceSource、ExperienceSourceFirstLast
第一个函数返回每一步的s,a,r;
第二个函数返回头状态,头动作,尾状态以及总奖励(贴现)。这里我们选择的就是第二种。

还需要注意的是:使用ExperienceSource的每个episode中我们设置了一个固定的步数。所以在一个episode中,可能并没有跑完整个任务,因此当执行下一个episode时,起始状态就是上一个episode的最后状态???

7、定义优化器

optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)

8、定义各种变量

    total_rewards = [] #记录一幕的总奖励,这里指的是一幕中共进行的episode(这里指定10步为一个episode)数.
    step_idx = 0 #记录当前是第几个episode
    done_episodes = 0 #记录完整的一幕的个数
    reward_sum = 0.0 #以每次的episode为一个单位,累加

    batch_states, batch_actions, batch_scales = [], [], []

9、数据收集

##############每走10步收集一次数据#####################
    for step_idx, exp in enumerate(exp_source):
    ##一直累加总奖励
        reward_sum += exp.reward
    ##这里的baseline是平均奖励,与状态无关。
        baseline = reward_sum / (step_idx + 1)
        writer.add_scalar("baseline", baseline, step_idx)
    ##将这10步中的状态、动作作为batch中的一条append进去
        batch_states.append(exp.state)
        batch_actions.append(int(exp.action))
    ##!!!!!当baseline不是0的时候,就将此10步中每一步都减去baseline的差值作为系数
        if args.baseline:
            batch_scales.append(exp.reward - baseline)
        else:
            batch_scales.append(exp.reward)
##############当一幕结束时,会返回一幕中总共运行的episode数,以及最后100个episode数
        # handle new rewards
        new_rewards = exp_source.pop_total_rewards()
        if new_rewards:
            done_episodes += 1
            reward = new_rewards[0]
            total_rewards.append(reward)
            mean_rewards = float(np.mean(total_rewards[-100:]))
            print("%d: reward: %6.2f, mean_100: %6.2f, episodes: %d" % (
                step_idx, reward, mean_rewards, done_episodes))
            writer.add_scalar("reward", reward, step_idx)
            writer.add_scalar("reward_100", mean_rewards, step_idx)
            writer.add_scalar("episodes", done_episodes, step_idx)
            if mean_rewards > 195:
                print("Solved in %d steps and %d episodes!" % (step_idx, done_episodes))
                break
###############当记录的数据不满BATCH_SIZE也就是8的时候,先不计算损失,等前几个状态过去以后,之后每加一个数据都会计算损失以及更新参数。
        if len(batch_states) < BATCH_SIZE:
            continue

        states_v = torch.FloatTensor(batch_states)
        batch_actions_t = torch.LongTensor(batch_actions)
        batch_scale_v = torch.FloatTensor(batch_scales)

这里有几件事我们需要清楚:
1、CartPole-v0游戏奖励设定:当杆的角度大于15度的时候游戏就会终止,为了获得更多的奖励,需要动态调节小车的前进方向,以使agent可以较长时间获得奖励;
2、ExperienceSourceFirstLast虽然得到的只有初始和结束时的状态以及初始动作值,看源码实现以及print可以发现,其每一条记录(即10步)的初始状态,都是上一条记录中的第二条记录。也就是说,在电脑中游戏是一步一步运行,但是记录的确实隔一步记录此后的10步。
3、 由实验以及,以上分析可以得到exp_source.pop_total_rewards()得到的,其实是总共运行的步数。
可以通过在experiencesourcefirstlast的__iter__中print(exp)得到。
4、而exp.reward()是每一次获得的带贴现10步的奖励,这个reward才是真正训练用得到的奖励,total_rewards仅仅是为了展示总奖励以及方差等特征。
5、由于batch_value/batch_action中每次append的都是experiencesourcefirstlast中得到的初始状态以及初始动作,也就是说都是一个;又因为在后面if len(batch_states) < BATCH_SIZE: continue以及batch_states.clear() batch_actions.clear() batch_scales.clear()使得每次数据到达batch以后就会执行训练以及清空。
10、计算损失、优化

  states_v = torch.FloatTensor(batch_states)
        batch_actions_t = torch.LongTensor(batch_actions)
        batch_scale_v = torch.FloatTensor(batch_scales)
        
        optimizer.zero_grad()
        
        logits_v = net(states_v)
        log_prob_v = F.log_softmax(logits_v, dim=1)
        log_prob_actions_v = batch_scale_v * log_prob_v[range(BATCH_SIZE), batch_actions_t]
        loss_policy_v = -log_prob_actions_v.mean()

        loss_policy_v.backward(retain_graph=True)
        grads = np.concatenate([p.grad.data.numpy().flatten()
                                for p in net.parameters()
                                if p.grad is not None])

        prob_v = F.softmax(logits_v, dim=1)
        entropy_v = -(prob_v * log_prob_v).sum(dim=1).mean()
        entropy_loss_v = -ENTROPY_BETA * entropy_v
        entropy_loss_v.backward()
        
        optimizer.step()
        loss_v = loss_policy_v + entropy_loss_v

首先是转换为tensor,优化器梯度清空。
然后是计算两个损失的梯度,分别是策略梯度损失和熵损失。

(1)、策略梯度损失:
首先通过网络得到二维数据,分别是样本数*动作数,即8*2;
然后对于每一个样本,也就是对于动作取logsoftmax,得到对于每个动作的log概率分布;
其次将(每次的10步贴现奖励-baseline)作为系数,乘以对应动作的log概率得到8条数据;
最后做了期望处理,以及最大化奖励变最小化损失后,加负号。
注意!这里其实求的是损失,但是由于我们计算的梯度确实与前面的R、b无关,所以可以将公式写作:
▽ J 1 ≈ − E [ ( R − b ) ▽ log ⁡ π ( a ∣ s ) ] {\triangledown }J1\approx -E[(R-b){\triangledown }\log \pi (a|s)] J1E[(Rb)logπ(as)]

这里的关于python的技巧有:
1、梯度回传以后,防止变量的图结构丢失,通俗来讲就是先把梯度保存,等下次回传完熵损失以后一起进行优化
2、flatten()可以使numpy对象返回一维数组;np.concatenate实现对一维数组的拼接。

(2)、熵损失
其最终实现的公式为
▽ J 2 ≈ E [ β ▽ ( π ( s ) ∗ l o g π ( s ) ) ] {\triangledown }J2\approx E[{{\beta }\triangledown }(\pi (s)*log \pi (s))] J2E[β(π(s)logπ(s))]因为熵本身前面有负号,这样最大化熵,也就是最小化正的这个公式。
最大熵公式可以保证在探索的时候尽可能随机,而不是快速收敛到局部最优。

最后一部分就是执行优化参数。
11、计算参数更新前后的相对熵(KL散度)

        new_logits_v = net(states_v)
        new_prob_v = F.softmax(new_logits_v, dim=1)
        kl_div_v = -((new_prob_v / prob_v).log() * prob_v).sum(dim=1).mean()
        writer.add_scalar("kl", kl_div_v.item(), step_idx)

如题,用来验证更新前后两个分布之间的相似程度。

关于熵的这部分:详见强化学习中的基础数学知识

12、tensorboardX

        writer.add_scalar("kl", kl_div_v.item(), step_idx)
        writer.add_scalar("baseline", baseline, step_idx)
        writer.add_scalar("entropy", entropy_v.item(), step_idx)
        writer.add_scalar("batch_scales", np.mean(batch_scales), step_idx)
        writer.add_scalar("loss_entropy", entropy_loss_v.item(), step_idx)
        writer.add_scalar("loss_policy", loss_policy_v.item(), step_idx)
        writer.add_scalar("loss_total", loss_v.item(), step_idx)

        writer.add_scalar("grad_l2", np.sqrt(np.mean(np.square(grads))), step_idx)
        writer.add_scalar("grad_max", np.max(np.abs(grads)), step_idx)
        writer.add_scalar("grad_var", np.var(grads), step_idx)

值得关注的是
前面数据收集中提到的记录的一幕episode的R以及最后100幕R的平均;
以及梯度方差。

二、Actor-Critic

代码

参数部分

GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
BATCH_SIZE = 128
NUM_ENVS = 50

REWARD_STEPS = 4
CLIP_GRAD = 0.1
这里的CLIP_GRAD用来当做梯度L2范数的阈值,步数在PG中10步,但是这里用了未来的值函数,即TD,所以不用那么多。

    parser = argparse.ArgumentParser()
    parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")
    parser.add_argument("-n", "--name", required=True, help="Name of the run")
    args = parser.parse_args()
    device = torch.device("cuda" if args.cuda else "cpu")

这部分一样。

多个环境同时

    make_env = lambda: ptan.common.wrappers.wrap_dqn(gym.make("PongNoFrameskip-v4"))
    envs = [make_env() for _ in range(NUM_ENVS)]
    writer = SummaryWriter(comment="-pong-a2c_" + args.name)

这里我感觉也不需要太纠结,环境到底是怎么定义的,只需要记住这种方法既可以,对于wrap_dqn可以看看,然而看了以后还是看不太懂,等前面看dqn再说,感觉就是把最原始的环境按照自己的想法包装再包装。下面就是这个函数,这一些分别是前一个环境作为参数作为父类,传入。

def wrap_dqn(env, stack_frames=4, episodic_life=True, reward_clipping=True):
    """Apply a common set of wrappers for Atari games."""
    assert 'NoFrameskip' in env.spec.id
    if episodic_life:
        env = EpisodicLifeEnv(env)
    env = NoopResetEnv(env, noop_max=30)
    env = MaxAndSkipEnv(env, skip=4)
    if 'FIRE' in env.unwrapped.get_action_meanings():
        env = FireResetEnv(env)
    env = ProcessFrame84(env)
    env = ImageToPyTorch(env)
    env = FrameStack(env, stack_frames)
    if reward_clipping:
        env = ClippedRewardsWrapper(env)
    return env

网络

下面这个是网络模型,因为这个传入的是图片,所以需要用到卷积。
首先是三层卷积,分别用Relu函数激活,就是body部分,后面是策略函数和值函数,他们都公用面卷积的部分。这两个函数的输入是一个数字,代表输入数据的维度,其中使用np.prod将多维的tensor转为所有维度相乘的一个数字。这是作为网络结构的部分,作为真实的传入数据,是通过归一化,然后view(fx.size()[0], -1)将得到的batchsize行的一维数据,这里fx.size()的格式是(batchsize,channels,x,y)。
这里的策略函数输出的是动作的序号

class AtariA2C(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(AtariA2C, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        conv_out_size = self._get_conv_out(input_shape)
        self.policy = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        self.value = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )
    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
    def forward(self, x):
        fx = x.float() / 256
        conv_out = self.conv(fx).view(fx.size()[0], -1)
        return self.policy(conv_out), self.value(conv_out)

the same of 下面连续的对应部分

这一部分的一个新的发现可能是关于with里的这个if tracker.reward(new_rewards[0], step_idx): break,还有就是with也是顺序进行的,当进行到最后就退出了。

    exp_source = ptan.experience.ExperienceSourceFirstLast(envs, agent, gamma=GAMMA, steps_count=REWARD_STEPS)

    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE, eps=1e-3)

    batch = []

    with common.RewardTracker(writer, stop_reward=18) as tracker:
        with ptan.common.utils.TBMeanTracker(writer, batch_size=10) as tb_tracker:
            for step_idx, exp in enumerate(exp_source):
                batch.append(exp)

                # handle new rewards
                new_rewards = exp_source.pop_total_rewards()
                if new_rewards:
                    if tracker.reward(new_rewards[0], step_idx):
                        break

                if len(batch) < BATCH_SIZE:
                    continue

                states_v, actions_t, vals_ref_v = unpack_batch(batch, net, device=device)
                batch.clear()

loss

Value

根据前面的分析,第二个是值函数,计算的均方误差损失就可以得到值函数的损失了。

                optimizer.zero_grad()
                logits_v, value_v = net(states_v)
                loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_ref_v)
Policy

这个地方和前面baseline的方法一样

                log_prob_v = F.log_softmax(logits_v, dim=1)
                adv_v = vals_ref_v - value_v.squeeze(-1).detach()
                log_prob_actions_v = adv_v * log_prob_v[range(BATCH_SIZE), actions_t]
                loss_policy_v = -log_prob_actions_v.mean()
entroy
                prob_v = F.softmax(logits_v, dim=1)
                entropy_loss_v = ENTROPY_BETA * (prob_v * log_prob_v).sum(dim=1).mean()

三、连续动作Actor-Critic

代码

参数

一样的首先是关于参数的设置。默认关闭cuda,如果需要开启,就在终端运行的时候加上–cuda,就会因为store_true将args.cuda设置为True。-n这个参数是必须需要的,因为有required选项。

 #定义结构
 parser = argparse.ArgumentParser()
 #设置需要的参数
    parser.add_argument("--cuda", default=False, action='store_true', help='Enable CUDA')
    parser.add_argument("-n", "--name", required=True, help="Name of the run")
    #实例化
    args = parser.parse_args()
    #设置运行的设备,新建文件夹
    device = torch.device("cuda" if args.cuda else "cpu")
    save_path = os.path.join("saves", "a2c-" + args.name)
    os.makedirs(save_path, exist_ok=True)

env

设置两个一样的环境,用来一个用来产生数据,一个用来测试模型。

    env = gym.make(ENV_ID)
    test_env = gym.make(ENV_ID)

agent

    net = model.ModelA2C(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
    agent = model.AgentA2C(net, device=device)
    exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, GAMMA, steps_count=REWARD_STEPS)

1、其中的net就是agent的核心部分,输入是状态,输出是动作的均值,方差,以及对于输入状态值函数的估计。其中的.to(device)的作用是将所有最开始读取数据时的tensor变量copy一份到device所指定的GPU上去,之后的运算都在GPU上进行。
关于策略函数
(1)之前离散状态的时候是一般只一个可以选择的动作,且这个动作有几个可以选择的值,每个值都有一定概率被选择;
(2)对于连续动作控制来说,一般是多个可以选择的动作,且每个动作的都有一个可以取值的区间。
如果我们只用网络的输出作为选择的动作,那不利于探索,为了增加随机性的成分。我们选择输出正太分布的均值和方差来描述值的概率分布
关于值函数
值函数就是在公用网络部分self.base输出之后,再接一层,输出一个scale。

(不知为啥,两个__中间的部分被加粗了)
class ModelA2C(nn.Module):
           def init(self, obs_size, act_size):
                      super(ModelA2C, self).init()

                      self.base = nn.Sequential(
                        nn.Linear(obs_size, HID_SIZE),
                        nn.ReLU(),
                        )

                        self.mu = nn.Sequential(
                        nn.Linear(HID_SIZE, act_size),
                        nn.Tanh(),
                       )
           
tanh使得均值取值在-1到1之间,这里因为我们将电机的取值也量化在0-1之间

                       self.var = nn.Sequential(
                       nn.Linear(HID_SIZE, act_size),
                        nn.Softplus(),
                        )
           
softplus可以看做正的平滑的relu函数

            self.value = nn.Linear(HID_SIZE, 1)
直接输出作为值函数

            def forward(self, x):
                        base_out = self.base(x)
                       return self.mu(base_out), self.var(base_out), self.value(base_out)

2、调用agent类

class AgentA2C(ptan.agent.BaseAgent):
    def __init__(self, net, device="cpu"):
        self.net = net
        self.device = device

    def __call__(self, states, agent_states):
        states_v = ptan.agent.float32_preprocessor(states).to(self.device)

        mu_v, var_v, _ = self.net(states_v)
        mu = mu_v.data.cpu().numpy()
        sigma = torch.sqrt(var_v).data.cpu().numpy()
        actions = np.random.normal(mu, sigma)
        actions = np.clip(actions, -1, 1)
        return actions, agent_states

可以看到agent类继承了父类BaseAgent。同时调用的时候,首先给赋给他之前定义的网络,以及网络需要运行的设备。这里再明确设备是因为,在定义网络的时候会定义网络运行的设备,同时呢,我们的数据也必须在那个设备上,因此有了states_v = ptan.agent.float32_preprocessor(states).to(self.device)
将得到的mu,和开方后的方差,也就是标准差移动到cpu,使用numpy进行运算采样(因为numpy只能运行在cpu),并限制区间,得到最终的action。
3、定义迭代器
这个函数一节讲过,但是当时理解不是很深入,于是再来重新补充下。
(1)我们知道这个迭代器输出的是一条从当前状态走REWARD_STEPS步过程中的记录;
(2)这个迭代器每次返回的是“一条数据”;
(3)因为可能遇到终止状态,因此每条这样的样本中包含的实际样本数可能小于REWARD_STEPS;
(4)这个迭代器其实是具有超前视角的,就是说虽然它能提前感知到结束,但是它实际中每次只走一步,所以对于last_states而言,会有REWARD_STEPS个None
(5)还有一个重要的点,是关于这个pop_total_rewards或者pop_rewards_steps函数输出的reward,它输出的reward的来源是在class ExperienceSource的迭代中,这里面的总奖励是不贴现的,所以也可以代表步数。当然这个步数,也就是最终得到的样本数。因为像前面说的,实际中还是走了一步。因此呢,还是得等到没有实际中没有下一步可以走的时候,就会输出这个结果。

优化器

optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
这个没什么好说的

batch和best_reward

batch = [] 
best_reward  = None

batch用来装样本,best_reward用来放目前最有的奖励,用来判断是否保存样本。

wtiter

writer = SummaryWriter(comment="-a2c_" + args.name)
    with ptan.common.utils.RewardTracker(writer) as tracker:
        with ptan.common.utils.TBMeanTracker(writer, batch_size=10) as tb_tracker:

首先是定义一个可以用来记录的tensorboard的记录器
1、第一句话是命名一个RewardTracker实例

class RewardTracker:
    def __init__(self, writer, min_ts_diff=1.0):
        """
        Constructs RewardTracker
        :param writer: writer to use for writing stats
        :param min_ts_diff: minimal time difference to track speed
        """
        self.writer = writer
        self.min_ts_diff = min_ts_diff

    def __enter__(self):
        self.ts = time.time()
        self.ts_frame = 0
        self.total_rewards = []
        return self

    def __exit__(self, *args):
        self.writer.close()

    def reward(self, reward, frame, epsilon=None):
        self.total_rewards.append(reward)
        mean_reward = np.mean(self.total_rewards[-100:])
        ts_diff = time.time() - self.ts
        if ts_diff > self.min_ts_diff:
            #这里因为是当一个回合结束时才能得到frame,而这个frame是总共的步数,不会因为done清0,所以这个差值就是主函数中的steps
            speed = (frame - self.ts_frame) / ts_diff
            self.ts_frame = frame
            self.ts = time.time()
            epsilon_str = "" if epsilon is None else ", eps %.2f" % epsilon
            print("%d: done %d episodes, mean reward %.3f, speed %.2f f/s%s" % (
                frame, len(self.total_rewards), mean_reward, speed, epsilon_str
            ))
            sys.stdout.flush()
            self.writer.add_scalar("speed", speed, frame)
        if epsilon is not None:
            self.writer.add_scalar("epsilon", epsilon, frame)
        self.writer.add_scalar("reward_100", mean_reward, frame)
        self.writer.add_scalar("reward", reward, frame)
        return mean_reward if len(self.total_rewards) > 30 else None

作为with进来的,需要有__enter__,进来就会执行,走的时候就__exit__。
这里重点来看这个reward函数。
传进来的参数从后面的代码(在下面)一节来看,有这一个回合的总建立,以及总共进行的步数。
(1)对于奖励的部分
首先添加总奖励
然后取最后100回合的平均总奖励,并记录
记录当前回合的奖励
(2)速度
这个地方比较迷
至于这个速度的表达为:
这个速度是当前这一回合步数(因为在主函数中的序号是不会停的),除以这一回合的所用的时间。

不太清楚这个值到底代表什么???
2、第二句话

class TBMeanTracker:
    """
    TensorBoard value tracker: allows to batch fixed amount of historical values and write their mean into TB

    Designed and tested with pytorch-tensorboard in mind
    """
    def __init__(self, writer, batch_size):
        """
        :param writer: writer with close() and add_scalar() methods
        :param batch_size: integer size of batch to track
        """
        assert isinstance(batch_size, int)
        assert writer is not None
        self.writer = writer
        self.batch_size = batch_size

    def __enter__(self):
        self._batches = collections.defaultdict(list)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.writer.close()

    @staticmethod
    def _as_float(value):
        assert isinstance(value, (float, int, np.ndarray, np.generic, torch.autograd.Variable)) or torch.is_tensor(value)
        tensor_val = None
        if isinstance(value, torch.autograd.Variable):
            tensor_val = value.data
        elif torch.is_tensor(value):
            tensor_val = value

        if tensor_val is not None:
            return tensor_val.float().mean().item()
        elif isinstance(value, np.ndarray):
            return float(np.mean(value))
        else:
            return float(value)

    def track(self, param_name, value, iter_index):
        assert isinstance(param_name, str)
        assert isinstance(iter_index, int)

        data = self._batches[param_name]
        data.append(self._as_float(value))

        if len(data) >= self.batch_size:
            self.writer.add_scalar(param_name, np.mean(data), iter_index)
            data.clear()

首先判断batch_size类型是否是int,writer如果不是None则通过。
接下来生成一个默认的字典,值类型是list。同时注意到这里的return self。这个语句可以用来返回实例自身,可以用于循环调用方法。见链接python中 return self的作用
_as_float是一个类内定义的函数,提供下面函数track使用。

torch.autograd.Variable是可以自动计算梯度的变量
torch.is_tensor判断是否是是否是tensor
这里首先判断是不是tensor类型,如果是并且非空,就取均值。如果是numpy类型,也取平均。这也就暗示输入的步数确实不是一个。

这里输出了float类型的值以后,首先以传入参数为键,定义空列表,然后将刚得到的float赋值给他。最终记录的是达到batch_size以后的一个回合平均步数,横坐标是总步数。

这里有个很奇怪的现象,就是上面提到的,输入的一个回合的步数确实不是一个值,可能是2或者3个或者一个,而且对应的总奖励确实也不是一个。这就很让人疑惑。
握草!!!要哭惹!!!
再次提醒,外置硬盘系统一定不要移动硬盘,要不它会悄悄掉线,然后将你写的csdn抹杀。。。。。。。。。。。。。
从来亿遍!!!
就是哈,刚才我在这个地方写了个,可把我牛逼坏了,叉会腰,是怎么回事呢?
就是我在reward函数中加了个标志位

    def reward(self, reward, frame, epsilon=None):
        self.total_rewards.append(reward)
        mean_reward = np.mean(self.total_rewards[-100:])
        ts_diff = time.time() - self.ts
        if ts_diff > self.min_ts_diff:
            #这里因为是当一个回合结束时才能得到frame,而这个frame是总共的步数,不会因为done清0,所以这个差值就是主函数中的steps
            speed = (frame - self.ts_frame) / ts_diff
            self.ts_frame = frame
            self.ts = time.time()
            epsilon_str = "" if epsilon is None else ", eps %.2f" % epsilon
            print("%d: done %d episodes, mean reward %.3f, speed %.2f f/s%s" % (
                frame, len(self.total_rewards), mean_reward, speed, epsilon_str
            ))
            sys.stdout.flush()
            self.writer.add_scalar("speed", speed, frame)
        else:
            print("恭喜找到bug")
        if epsilon is not None:
            self.writer.add_scalar("epsilon", epsilon, frame)
        self.writer.add_scalar("reward_100", mean_reward, frame)
        self.writer.add_scalar("reward", reward, frame)
        return mean_reward if len(self.total_rewards) > 30 else None

然后结果是这样
《深度强化学习实践》Actor-Critic_第1张图片
就是说原因是因为时间没有达到要求,因此没有出发输出函数。
第二个叉会腰呢是因为,顺藤摸瓜,我发现之前的平均步数的函数的理解也是不对的,也是标志位:

    def _as_float(value):
        # print('dfsdfdsf',value)

        assert isinstance(value, (float, int, np.ndarray, np.generic, torch.autograd.Variable)) or torch.is_tensor(value)
        tensor_val = None
        if isinstance(value, torch.autograd.Variable):
            tensor_val = value.data
        elif torch.is_tensor(value):
            tensor_val = value

        if tensor_val is not None:
            print('eee',tensor_val.float().mean().item())
            return tensor_val.float().mean().item()
        elif isinstance(value, np.ndarray):
            print('rrrrr')
            return float(np.mean(value))
        else:
            print('ttttt')
            return float(value)

得到下图
《深度强化学习实践》Actor-Critic_第2张图片
同样的东西写两遍真是恶心。。。
不多说了,自己看。
不对,还有个总结
-------------总结------------
总的来说
首先定义奖励函数,用来记录当前奖励以及平均奖励
然后定义步数函数,用来记录一个batch样本中的平均回合步数

获取样本

就是说我们获取的是一个包含起始状态与2步以后的状态,以及动作,奖励的数据,与是否是终止状态无关,或者说没有太大关系,唯一的关系是,可能一个样本记录的中间步数少于2步,当在接近终止状态时。

for step_idx, exp in enumerate(exp_source):

从迭代器中取出一条上面介绍的样本。其中包括样本的序号(从不清0),以及样本数据。

记录

这就用到上面介绍的记录器了

                rewards_steps = exp_source.pop_rewards_steps()
                if rewards_steps:
                    #zip多个可迭代对象的对应位置元素打包组成元祖对象,list(zip)可以转为数组,zip(*list(zip))可以将可迭代对象进行解压成多个列表
                    rewards, steps = zip(*rewards_steps)
                    tb_tracker.track("episode_steps", steps[0], step_idx)
                    tracker.reward(rewards[0], step_idx)

这里弹出的是回合奖励和回合步数的打包成的元祖对象,得到返回之后,首先进行解包,然后将当前奖励和(回合数、总步数)发给记录器进行记录。

test

这一段是测试代码

                if step_idx % TEST_ITERS == 0:
                    ts = time.time()
                    rewards, steps = test_net(net, test_env, device=device)
                    print("Test done is %.2f sec, reward %.3f, steps %d" % (
                        time.time() - ts, rewards, steps))
                    writer.add_scalar("test_reward", rewards, step_idx)
                    writer.add_scalar("test_steps", steps, step_idx)
                    if best_reward is None or best_reward < rewards:
                        if best_reward is not None:
                            print("Best reward updated: %.3f -> %.3f" % (best_reward, rewards))
                            name = "best_%+.3f_%d.dat" % (rewards, step_idx)
                            fname = os.path.join(save_path, name)
                            #这里是保存的模型的参数,到指定的.bat文件
                            torch.save(net.state_dict(), fname)
                        best_reward = rewards

其中测试网络如下:

def test_net(net, env, count=10, device="cpu"):
    rewards = 0.0
    steps = 0
    for _ in range(count):
        obs = env.reset()
        while True:
            obs_v = ptan.agent.float32_preprocessor([obs]).to(device)
            mu_v = net(obs_v)[0]
            action = mu_v.squeeze(dim=0).data.cpu().numpy()
            action = np.clip(action, -1, 1)
            obs, reward, done, _ = env.step(action)
            rewards += reward
            steps += 1
            if done:
                break
    return rewards / count, steps / count

这里是在cpu上定义了
哎wc,我要枯了啊,我昨晚写的又没保存。。。r哦

哎,真心枯了。。。
再来亿遍
以上是测试网络,网路里直接将均值作为动作。当平均奖励大于最好的,就更新。

batch数据解压

               states_v, actions_v, vals_ref_v = \
                    common.unpack_batch_a2c(batch, net, last_val_gamma=GAMMA ** REWARD_STEPS, device=device)
                batch.clear()
def unpack_batch_a2c(batch, net, last_val_gamma, device="cpu"):
    """
    Convert batch into training tensors
    :param batch:
    :param net:
    :return: states variable, actions tensor, reference values variable
    """
    states = []
    actions = []
    rewards = []
    not_done_idx = []
    last_states = []
    for idx, exp in enumerate(batch):

        states.append(exp.state)
        actions.append(exp.action)
        rewards.append(exp.reward)
        #当最后一个状态
        if exp.last_state is not None:
            not_done_idx.append(idx)
            last_states.append(exp.last_state)
    states_v = ptan.agent.float32_preprocessor(states).to(device)
    actions_v = torch.FloatTensor(actions).to(device)
    print(not_done_idx)
    # handle rewards
    rewards_np = np.array(rewards, dtype=np.float32)
    if not_done_idx:
        last_states_v = ptan.agent.float32_preprocessor(last_states).to(device)
        #得到网络中输出的当前状态的值函数
        last_vals_v = net(last_states_v)[2]
        #只要二维数组的第一个维度的值
        last_vals_np = last_vals_v.data.cpu().numpy()[:, 0]
        #得到初始状态的值函数V(s)=r+yr+yyV(s+1)
        rewards_np[not_done_idx] += last_val_gamma * last_vals_np

    ref_vals_v = torch.FloatTensor(rewards_np).to(device)
    return states_v, actions_v, ref_vals_v

这个地方我当时还测试了一下,确实就是如我所料,就是如下图所示,选5步的时候,中间5个没有。
在这里插入图片描述《深度强化学习实践》Actor-Critic_第3张图片还有就是几个需要注意的地方:
(1)vs = r+yr+yyvs+1…
(2)有的是ptan.agent.float32_preprocessor(last_states).to(device)
有的直接torch.FloatTensor(actions).to(device),看源码,两者的区别就是前者首先将数据变为ndarry再tensor。
(3)输出为s,a,vs(真)

loss

vloss
  optimizer.zero_grad()
                mu_v, var_v, value_v = net(states_v)
                loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_ref_v)

就是显然的均方误差损失,真实值用的是TD。

p loss
                adv_v = vals_ref_v.unsqueeze(dim=-1) - value_v.detach()
                log_prob_v = adv_v * calc_logprob(mu_v, var_v, actions_v)
                loss_policy_v = -log_prob_v.mean()

其中calc_logprob为

def calc_logprob(mu_v, var_v, actions_v):
    p1 = - ((mu_v - actions_v) ** 2) / (2*var_v.clamp(min=1e-3))
    p2 = - torch.log(torch.sqrt(2 * math.pi * var_v))
    return p1 + p2

就是正常的的最大化的过程,后面在p loss中再乘以系数,并最小化损失。

e loss
entropy_loss_v = ENTROPY_BETA * (-(torch.log(2*math.pi*var_v) + 1)/2).mean()

然后最后就是加起来优化。

终于结束了,,,闹心。接下来才是第二部分Actor-Critic,我是跳着看的。。。

小结

应用

你可能感兴趣的