酒飲みによる機械学習日記

〜ただの酒好きが、なんとなく流行っている機械学習について語っていきます、たまに金融ネタも〜

直線のy切片と傾きを求める with Chainer

Introduction

初記事になります!!よろしくお願いします!間違っている箇所等ございましたら、すぐに教えていただけると嬉しいです🙇‍♂️

Deep Learningの導入はだいたいMNISTのサンプルを回すことがほとんどだな〜って感じてますが、結局独自のデータセットを用意して学習させるときに、なぜかつまづいてしまう方が非常に多いような気がしてます。CNNやLSTMをはじめとするRNNもサンプルなら簡単に回せるのに、いざ独自のデータセットとなると...

今回はDeep Learning(Deepでもないですが...)で簡単なy切片と傾きを求めるコードを書いてみようと思います。Chainerを使用しますが、CPUでも回るものなので、CPU用にコーディングしていきます!(コード全文は記事の最後にまとめてあります!!)

構成は、

  1. データについて
  2. ネットワークについて
  3. 学習方法について
  4. 学習結果
  5. 考察
  6. まとめ

Python3.6とChainer4.1を使用しました。Macbook Air上のAnaconda環境です。

1. データについて

今回はnumpyのrandom.rand()を使用したので、xy平面上の点は、x, yともに0以上1未満の数値をとります。学習用データ数は今回は10000個で統一していますが、実行時に変更できるようにしてあります。1個(1組の方がいいのかも?)での学習をやっている記事はいくつもあったので、多くした場合でできるのかやってみることにしました。

2. ネットワークについて

2点p1, p2を与えた時に、それぞれx, y座標の数値を持つので、4つの値があります。これをインプット層に入れて、隠れ層はインプット層と同じユニット数(=4)とし、アウトプット層は傾きとy切片用にユニット数を2としています。ポイントは傾きとy切片のlossをそれぞれ確認できるように、chainerのreporter.report()を使って、新たにlossを定義しているところです!!

class Net(Chain):
    def __init__(self):
        super(Net, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=4, out_size=4)
            self.lc2 = L.Linear(in_size=4, out_size=4)
            self.lc3 = L.Linear(in_size=4, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

3. 学習方法について

特に深くは考えていないですが、以下のように設定しておきました。

  • エポック数:100
  • バッチサイズ:1024
  • Validation用データ:1000個(学習用データと同じように乱数で生成)

また独自の学習データセットを使う時、メモリとか特に気にしない場合は、タプルのリストで渡せば問題ないです。以下が学習データセットを作成するための関数です。この関数は1つの点に関するタプルを返すので、これを欲しい点の数だけ実行し、リストにまとめます!あとはnp.float32にしないとエラーになるので、注意が必要です。

def generate_data():
    p1 = np.random.random(2).astype(np.float32)
    p2 = np.random.random(2).astype(np.float32)

    if p1[1] > p2[1]:
        tmp = p2.copy()
        p2 = p1.copy()
        p1 = tmp.copy()

    slope = (p2[1] - p1[1]) / (p2[0] - p1[0])
    yinter = p1[1] - slope*p1[0]
    true = np.array([slope, yinter]).astype(np.float32)
    return (p1, p2, true)

4. 学習結果

傾きのlossと、y切片のloss、そしてそれらを足し合わせた全体のlossについて、学習用データとValidation用データのものを可視化しました!y切片よりも傾きのlossの方が高いことがわかります。また、あまり全体的にlossが下がっている印象がないのと、学習用データとValidation用データで差が結構あるなと感じています。

f:id:nts-524:20180913030815p:plain

5. 考察

5.1. 座標値の引き算(p1 - p2)を入れた場合

ネットワークを変えていくつかやってみようと思います。まずはx, yそれぞれの座標値の引き算を入れたものです。Validationのlossがだいぶ学習用に近づいていることがわかります。これは、やはり傾きがx, yのそれぞれの増加量から計算されることにあるのかなと感じています。

class Net2(Chain):
    def __init__(self):
        super(Net2, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=6, out_size=6)
            self.lc2 = L.Linear(in_size=6, out_size=6)
            self.lc3 = L.Linear(in_size=6, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

f:id:nts-524:20180913031415p:plain

5.2. 座標値の引き算(p2 - p1)も入れた場合

Validation用データの傾きのlossが大きく下がり、逆に今度は学習用データよりも、Validation用データのlossの方が低くなりました。これは、ChainerのPlotReport()が、学習用データに関してはその時のミニバッチにおけるlossであるのに対し、validation用データに対しては、validation全体の平均であることので、validation用データのlossの方が、より全体の平均を表していることに起因している(折れ線グラフの振れ方の違いからも明らか)。

class Net3(Chain):
    def __init__(self):
        super(Net3, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=8, out_size=8)
            self.lc2 = L.Linear(in_size=8, out_size=8)
            self.lc3 = L.Linear(in_size=8, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2, p2-p1], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

f:id:nts-524:20180913031822p:plain

5.3. さらに2点の中点の座標を追加した場合

直線のパラメータ推定には2点あれば十分であるが、中点の座標も入れることで、3点分にしてみた。validation用データの全体のlossがまた下がった。

class Net4(Chain):
    def __init__(self):
        super(Net4, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=10, out_size=10)
            self.lc2 = L.Linear(in_size=10, out_size=10)
            self.lc3 = L.Linear(in_size=10, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2, p2-p1, (p2-p1)/2.], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

f:id:nts-524:20180913032551p:plain

6. まとめ

比較的とっつきやすい内容で、Chainerのコーディングをしてみました。傾きとy切片なので、計算式が分かっているので、なんとなく引き算の値や中点をデータとして加えてみることができました。時間があるときに、これのpredict用コードであったり、より改善するにはどうすればいいのかをちょっと考えてみたいですね。 傾きやy切片は実数全体の範囲を持つので、正直予測は意外に難しいかもしれない中、lossをみる限りは、ある程度いい値になっているなといった印象です。ある程度の数の整数だけとかにすれば、もっと設定が軽くなるので、lossは下がりやすいかもしれません。

付録:コード全文

# ===== modules ===== #
# chainer
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import Chain
from chainer import reporter
from chainer.iterators import SerialIterator
from chainer.training.updaters import StandardUpdater
from chainer.training import Trainer
from chainer.training import extensions
from chainer.serializers import save_npz
# others
import numpy as np
import argparse, os

# ===== utils ===== #
def generate_data():
    p1 = np.random.random(2).astype(np.float32)
    p2 = np.random.random(2).astype(np.float32)

    if p1[1] > p2[1]:
        tmp = p2.copy()
        p2 = p1.copy()
        p1 = tmp.copy()

    slope = (p2[1] - p1[1]) / (p2[0] - p1[0])
    yinter = p1[1] - slope*p1[0]
    true = np.array([slope, yinter]).astype(np.float32)
    return (p1, p2, true)

# ===== network ===== #
class Net(Chain):
    def __init__(self):
        super(Net, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=4, out_size=4)
            self.lc2 = L.Linear(in_size=4, out_size=4)
            self.lc3 = L.Linear(in_size=4, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

class Net2(Chain):
    def __init__(self):
        super(Net2, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=6, out_size=6)
            self.lc2 = L.Linear(in_size=6, out_size=6)
            self.lc3 = L.Linear(in_size=6, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

class Net3(Chain):
    def __init__(self):
        super(Net3, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=8, out_size=8)
            self.lc2 = L.Linear(in_size=8, out_size=8)
            self.lc3 = L.Linear(in_size=8, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2, p2-p1], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss

class Net4(Chain):
    def __init__(self):
        super(Net4, self).__init__()
        with self.init_scope():
            self.lc1 = L.Linear(in_size=10, out_size=10)
            self.lc2 = L.Linear(in_size=10, out_size=10)
            self.lc3 = L.Linear(in_size=10, out_size=2)

    def __call__(self, p1, p2, true):
        ps = np.concatenate([p1, p2, p1-p2, p2-p1, (p2-p1)/2.], axis=1)
        h = self.lc1(ps)
        h = self.lc2(h)
        h = self.lc3(h)
        pred = h

        loss_slope = F.mean_absolute_error(x0=pred[:,0], x1=true[:,0])
        loss_yinter = F.mean_absolute_error(x0=pred[:,1], x1=true[:,1])
        loss = loss_slope + loss_yinter
        reporter.report({'loss': loss, 'loss_slope': loss_slope, 'loss_yinter': loss_yinter}, observer=self)
        return loss


# ===== main ===== #
def main():
    # - Argparse - #
    parser = argparse.ArgumentParser()
    parser.add_argument('--num', type=int, default=10000,
                        help='The number of points for training')
    parser.add_argument('--batchsize', type=int, default=1024,
                        help='Batchsize in mini batch')
    parser.add_argument('--epoch', type=int, default=100,
                        help='The number of epochs')
    parser.add_argument('--result', type=str, default='result/',
                        help='Path to saving directory')
    args = parser.parse_args()

    # - Generate data - #
    train_points = [generate_data() for i in range(args.num)]
    valid_points = [generate_data() for i in range(int(args.num*0.1))]
    train_iter = SerialIterator(dataset=train_points, batch_size=args.batchsize, shuffle=True, repeat=True)
    valid_iter = SerialIterator(dataset=valid_points, batch_size=args.batchsize, shuffle=True, repeat=False)

    # - Set model & optimizer - #
    model = Net()
    optimizer = chainer.optimizers.Adam()
    optimizer.setup(model)

    # - Define trainer - #
    updater = StandardUpdater(train_iter, optimizer, device=-1)
    trainer = Trainer(updater, (args.epoch, 'epoch'), out=args.result)

    # - Set extentions - #
    trigger = (1, 'epoch')
    trainer.extend(extensions.Evaluator(valid_iter, model, device=-1), trigger=trigger)
    trainer.extend(extensions.ProgressBar(update_interval=1))
    trainer.extend(extensions.LogReport(trigger=trigger), trigger=trigger)
    trainer.extend(extensions.PlotReport(
        ['main/loss', 'validation/main/loss', 'main/loss_slope', 'validation/main/loss_slope',
         'main/loss_yinter', 'validation/main/loss_yinter'],
         'epoch', file_name='loss.png', trigger=trigger
    ))
    trainer.extend(extensions.PrintReport(
        ['epoch', 'iteration', 'main/loss', 'validation/main/loss',
         'main/loss_slope', 'validation/main/loss_slope', 'main/loss_yinter', 'validation/main/loss_yinter', 'elapsed_time']
    ), trigger=trigger)
    #trainer.extend(extensions.snapshot(), trigger=trigger)

    # - Run trainer - #
    trainer.run()

    # - Save trained model - #
    save_npz( os.path.join(args.result, 'model.npz'), model )
    save_npz( os.path.join(args.result, 'optimizer.npz'), optimizer )

if __name__ == '__main__':
    main()