コンサルでデータサイエンティスト

仕事でPythonを書いてます。機械学習、Webマーケティングに興味があります。趣味は旅です。

異常検知ビジネスで活用できる外れ値検知手法まとめ

機械学習の中でも教師なし学習に分類される分野として異常検知という技術があります。研究分野としては近年下火になりつつあるものの、人工知能やAIを使った異常検知技術はビジネス界隈では期待の大きい分野として有名です。

本記事では、異常検知分野のなかでも外れ値検知の一般的な手法についてまとめました。


目次

はじめに

外れ値検知はビジネスにおけるさまざまな場面で使用されています。

外れ値検知とは、正常時の状態から外れた点を見つけ出す異常検知の種類のひとつです。クレジットカードの利用状況の解析などでは、異常な行動を即時に検出することで被害の拡大を防ぐことができます。また、工場などのモニタリングでは、機械の異常状態をセンサなどで検知してアラートを出すなどのユースケースが考えられます。大量のデータをモニタリングする必要のあるセキュリティ分野や機械などを無人でモニタリングするスマートファクトリー分野のほか、高齢者の転倒検知や患者の体調急変といった予測に需要があるヘルスケア分野などでも実用化が始まっています。

それでは、最初におさえておきたい4つの基本的な外れ値検知手法について確認していきます。

ホテリング理論

最も基本的な手法として、ホテリング理論による外れ値検知について紹介します。

この手法は平均や分散といったデータの基本的な分布情報をもとに、観測値 x' より算出した異常度 a(x') を用いて外れ値検知を行うものです。まずは一般的な外れ値手法を理解するところから始めましょう。

はじめに、ほとんど外れ値が含まれていない1次元のデータセットを用意します。ホテリング理論ではこのデータセット正規分布に従うことを大前提としているため、ヒストグラムやQQ Plot などを使って正規分布であることを確認しておきましょう。もし正規分布でない場合は対数変換ボックスコックス変換などをかけて正規化する必要があります。

正規分布を仮定した場合、ホテリング理論をもとに算出した異常度はデータ数が十分に大きければ自由度1のカイ二乗分布に従うとされています。

chi_squared_distribution

異常発生の確率値 α を定義すれば、カイ二乗分布の表から異常度の閾値を求めてることができます。たとえば、発生確率が 1% 以下のものを異常値とするのであれば、α = 0.01 に対応する異常度を閾値に設定すればよいということになります。

異常度は次式で求めることができます。

 異常度  \,\,  a(x')= \Bigl(\frac{x'-\hat{\mu}}{\hat{\sigma}}\Bigr)


以上が1次元データにおけるホテリング理論による外れ値検知でした。



多次元データ(2次元以上)にこれを適用する場合は、マハラノビス距離という指標を異常度として採用します。マハラノビス距離とは、データの分布を考慮した距離の指標です。

1次元データでは平均と分散を求めましたが、多次元データでは標本平均と標本共分散行列を下記のように求めます。

 標本平均 \qquad \quad \, \, \hat{\mu} = \frac{1}{N} \sum_{n=1}^{N} x^{(n)}
 標本共分散行列 \quad \hat{\Sigma} \frac{1}{N}\sum_{n=1}^{N} (x^{(n)} - \hat{\mu})(x^{(n)} - \hat{\mu})^T


また、異常度はマハラノビス距離を用いて以下のように求めることができます。

 異常度  \,\,  a(x')= (x'-\hat{\mu})^T \hat{\Sigma}^{-1}(x'-\hat{\mu})


1次元でも多次元でも、異常度というスカラー値に落としこんでいるという点では共通していることがわかります。このように求めた異常度に閾値を設定することで、異常か正常かを識別することができます。
明らかに分布から外れている異常点を検出したいときには有効な手法です。

k-近傍法 (k-NN)

あまり有名ではないものの、シンプルに実装ができる古典的な手法として k-近傍法 (k-NN) による外れ値検知 があります。k-近傍法は k-nearest neighbor 法とも呼ばれていますが、異常検知におけるk-近傍法は機械学習の最も基本的な分類手法である k-NNとは少し異なることに注意してください。

想像ができている方もいらっしゃると思いますが、この手法ではk-NNと同様にある点から最も近い k 個の点を考慮して外れ値検知を行います。対象の点から近傍のk個の点を含むような円を描きます。

次の例では k=5 のときを考えます。

knn_anomaly

このとき、データ群から離れた異常点が描く円の半径 ε は正常データ群のものよりも大きくなっていることがわかります。
この性質に基づき、ε を異常度として使用して外れ値検知を行います。具体的には、εがある閾値を超えた場合は異常、そうでない場合は正常といったようにスカラー値に落として外れ値を検出することができます。
メリットとして、多次元データにもそのまま適用できるということがあります (ただし、次元数が10以下を目安と考えたほうがよいでしょう)。一方で、すべての点との距離を計算する必要があることから、計算量が大きくなってしまうというデメリットがあります。


Local Outlier Factor (LOF)

同じく距離をベースにした外れ値検知手法として、Local Outlier Factor (LOF) を紹介します。
この手法では 局所密度 (Local density) という指標に注目します。 局所密度は、周辺の点との密度を表します。点Aは近傍の点 B, C, D との距離が大きいため、局所密度が低いということになります。一方で、点B, C, D はそれぞれの近傍点との距離が近いため局所密度が高いといえます。

Local Outlier Factor (LOF)

つまり、近傍点の局所密度が類似している場合には近くにデータが集まっているということがわかります。点Aのようにデータ群と離れているような点の場合、自身の局所密度と近傍点の局所密度の差が大きいということになります。この差を利用して外れ値を検出することができるのが LOF です。

より詳しい内容については次の記事でまとめたので興味がある方はご確認ください。

hktech.hatenablog.com


One class SVM (Support Vector Machine)

分類アルゴリズムである Support Vector Machine (SVM) を応用した外れ値検知手法として、One class SVM を紹介します。

One class SVMとは、機械学習の分類アルゴリズムである Support Vector Machine (SVM) を教師なしの1クラスに適用した手法です。正常データとして1つのクラス分を学習させ、識別境界を決定することで、その境界を基準として外れ値を検出します。異常がほとんど発生せず、異常クラスのデータを集めにくいようなシステムで異常検知を実現したい場合には有効な外れ値検知手法です。

One class SVMではすべての学習データをクラスタ 1とし、原点のみをクラスタ -1に属するようにカーネルトリックと呼ばれる手法を用いて、高次元空間の特徴空間へデータを写像します。このとき、学習データは原点から遠くに配置されるように写像されるため、もとの学習データと類似していないデータは原点の近くに集まるようになります。

one_class_svm

One class SVM のメリットとしては、学習したデータをもとに複雑な境界線を引くことができるという点があります。一方で、パラメータをチューニングする必要があるというデメリットもあります。

One class SVM については次の記事でより詳細にまとめています。
hktech.hatenablog.com

まとめ

異常検知ビジネスで活用できる一般的な外れ値検知手法についてまとめました。外れ値検知をサービスや研究などで使おうと考えている方には参考になる内容になっているかと思います。Deep Learning などを使ってなにかを始めようとする前に、まずはこの記事で紹介されているような基本的なアルゴリズムで試してみてはいかがでしょうか。



異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

One class SVM による外れ値検知についてまとめた

はじめに

異常検知技術が実用システムに導入される例が増えています。今回は外れ値検知手法として人気が高いアルゴリズムのひとつであるOne class SVMについてご紹介します。

One class SVMとは、機械学習の分類アルゴリズムである Support Vector Machine (SVM) を教師なしの1クラス分類に応用した手法です。正常データとして1つのクラス分を学習させ、識別境界を決定することで、その境界を基準に外れ値を検出します。異常がほとんど発生せず、異常クラスのデータが集まらないようなシステムで異常検知を実現したい場合には有効な外れ値検知手法です。

最後にPythonによる簡単な実装例についてご紹介します。

One Class SVMSVMの違い

One Class SVMSVM の違いについて理解するためには、まずSupport Vectorとは何かということを把握する必要があります。Support Vectorとは学習データの中で最も他クラスと近い位置に点を指します。Support Vector Machineでは、各クラスのSupport Vectorを基準として、それらのユークリッド距離が最大化するように識別境界を設定します。これをマージン最大化と呼びます。

f:id:hktech:20181011233116p:plain


また、識別境界が非線形の場合はカーネルを用いてデータを特徴空間に写像します。カーネルを適切に選択することで、複雑なデータ配置でも識別境界を引くことができます。

f:id:hktech:20181011233133p:plain


通常のSVMでは、学習データとして複数クラスのデータが用意されています。したがって、学習時に識別境界を決定すれば、あるデータがどのクラスに分類されるかを判別する分類器として使うことができます。

しかし、One Class SVMでは学習データとして正常データの1つのクラスしか用意されていません。One class SVMは識別境界を設定するという点では共通しているものの、分類手法としての役割を持つSVMと異なり、識別境界を境に正常データと異常データを識別する外れ値検知手法として使うことができます。

One class SVMによる外れ値検知

SVMでは複数のクラスのSupport Vectorを基準として識別境界を決定するということは理解できました。One class SVMでは1つのクラスのみでどのように識別境界を求めるのかということについて明らかにしていきます。

One class SVMではすべての学習データをクラスタ 1とし、原点のみをクラスタ -1に属するようにカーネルトリックと呼ばれる手法を用いて、高次元空間の特徴空間へデータを写像します。このとき、学習データは原点から遠くに配置されるように写像されるため、もとの学習データと類似していないデータは原点の近くに集まるようになります。この性質を用いて正常および異常データの区別をすることができます。

f:id:hktech:20181011233151p:plain

Python + Scikit-learn を使った実装

PythonでScikit-learn (sklearn) を使えば、One class SVMを数行で実装することができます。

from sklearn.svm import OneClassSVM
clf = OneClassSVM(nu=0.003, kernel='rbf', gamma='auto')
clf.fit(X_train)
pred = clf.predict(X_test)


SVMのパラメータとしてkernelとgamma、One class SVM特有のパラメータとして nu があります。
学習データはすべて綺麗なデータであるとは限らないため、識別境界の決定時に考慮しないデータの割合をnuで定義することができます。

predictの返り値としては 正常: 1、異常: -1のリストが返ってくるため、異常と判定されたサンプルのインデックス番号を確認するには次のコードを実行します。

import numpy as np
np.where(pred < 0)

このあたりはLOFの実装のときと同じですね。このように手法が変わっても返り値が同じ形式で得られるのもScikit-learnパッケージの強みです。

まとめ

異常検知技術のひとつであるOne class SVMについてご紹介しました。One class SVMSupport Vector Machineを1クラス分類に応用することで外れ値検知を実現する手法であるということがわかりました。また、Pythonを使って簡単に実装できることも確認することができました。SVMの中身は理解が難しいことで有名であるため、機会があればスクラッチから実装してみたいと思います。

関連記事
hktech.hatenablog.com



異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

PythonでLocal Outlier Factor (LOF)を実装してみた

目次

はじめに

教師なし学習のひとつとして異常検知という分野があります。その中に含まれる手法として、正常時の状態から外れた点を見つけ出す外れ値検知手法があります。外れ値検知は実アプリケーションにも数多く導入されており、機械学習分野で注目を浴びている技術です。今回はその入門編として Local Outlier Factor (LOF) という手法をPythonで実装してみたいと思います。LOF の理論と仕組みについては次の記事でまとめたので確認してみてください。

hktech.hatenablog.com


Scikit-learnによるLOFの実装

機械学習パッケージであるScikit-learnを使って実装していきます。 今回は、Local Outlier Factor (LOF) のアルゴリズムに基づいてデータXから外れ値検知を行います。

パッケージが入っていなければ、インストールしておきましょう。

pip install scikit-learn
pip install matplotlib
pip install seaborn


早速、Scikit-learnを使って実装していきます。ここでは考慮する近傍点の数を n_neighbors=7 としています。

from sklearn.neighbors import LocalOutlierFactor
clf = LocalOutlierFactor(n_neighbors=7)
pred = clf.fit_predict(X)

Scikit-learnを使うとたった3行で外れ値を検出することができます。簡単ですね。


fit_predictの返り値としては 正常: 1、異常: -1のリストが返ってくるため、異常と判定されたサンプルのインデックス番号を確認するには次のコードを実行します。

import numpy as np
np.where(pred < 0)


試しに実際のデータを使って外れ値検知を行い、検出結果の可視化をしてみたいと思います。今回は、irisデータの一部変数を使い、2次元での可視化を行いました。

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.neighbors import LocalOutlierFactor
from sklearn import datasets
%matplotlib inline
 
iris = datasets.load_iris()
X = iris.data
 
clf = LocalOutlierFactor(n_neighbors=7, contamination=0.005)
pred = clf.fit_predict(X[:, (1, 2)])
 
# 正常データのプロット
plt.scatter(X[:,1][np.where(pred > 0)], X[:,2][np.where(pred > 0)])
# 異常データのプロット
plt.scatter(X[:,1][np.where(pred < 0)], X[:,2][np.where(pred < 0)])

f:id:hktech:20181010230057p:plain


図をみてみると、左上と右下の2つクラスタのいずれにも含まれないオレンジのサンプルが外れ値として検出されていることがわかります。このように、LOFはデータが複数のクラスタに分かれているような場合でも、それらを考慮しつつ外れ値を検出できるモデルであるということが確認できました。

まとめ

Python で Local Outlier Factor (LOF) を実装する方法についてご紹介しました。今回はScikit-learnを使って実装を進めましたが、LOFのアルゴリズム自体はそんなに難しくないため、スクラッチから実装してみるのもいい練習になるかと思います。

git rebase -i コマンドで複数のコミットを1つにまとめる

一度commitしたブランチに対してレビュー修正が入り再度コミットする必要がある場合など、複数のコミットを1つにまとめたい時がありますよね。そんなときに便利なgit rebase -i コマンドをご紹介します。

git rebase -i コマンドを使う場面

具体例を使って手順を確認していきましょう。fix api function というコミットでmerge request を出したものの、レビューでバグを指摘されたために修正コミット fix bug を出したという例を考えます。このようにMerge requestの出すまでの流れについては、次の記事にまとめてありますのでご確認ください。

hktech.hatenablog.com


このような場面でfix api function と fix bug という2つのコミットがログに残したままにしてしまうと、あとでチームメンバーが振り返ったときに何が行われていたのかわからなくなることがあります。また、チームやOSSによっては1 request 1 commitといったルールが採用されている場合もあります。そこで、複数のコミットを1つにまとめてくれる git rebase -iコマンドを使います。この -i とは -interactiveの略となっています。

git rebaseコマンドでコミットをまとめる手順

まずは、git log でgitの歴史を確認します。

git log


fix api function と fix bug というメッセージのコミットが2つあることを確認できます。

commit *****
Author: hktech
Date:   Thu Sep 27 20:58:09 2018 +0900

    fix api function

commit ******
Author: hktech
Date:   Wed Sep 19 14:19:37 2018 +0900

    fix bug


ここからいよいよ git rebase -i コマンドを使っていきます。
上から2つのコミットを1つにまとめたい場合、次のコマンドを打ちます。

git rebase -i HEAD~2 


これは、最新のコミットであるHEADから2つ目までを含めるということを表しています。
するとviなどのエディタで次のようなファイルが開きます。

pick t13rvfp fix api function
pick n2odp4q fix bug

#
# Commands:
# p, pick = use commit
# r, reword = use commit, but edit the commit message
# e, edit = use commit, but stop for amending
# s, squash = use commit, but meld into previous commit
# f, fixup = like "squash", but discard this commit's log message
# x, exec = run command (the rest of the line) using shell
# d, drop = remove commit
#
# These lines can be re-ordered; they are executed from top to bottom.
#
# If you remove a line here THAT COMMIT WILL BE LOST.
#
# However, if you remove everything, the rebase will be aborted.
#


ファイルの文面には丁寧にコマンドの用法が記載されていますが、今回はこの中のpickとsquashを使っていきたいと思います。これらのコマンドはそれぞれ次のような意味を持ちます。

  • Pick (p): コミットをそのまま残す
  • Squash (s): コミットを前のコミットに統合する

今回はfix bugのコミットをfix api function に統合したいので、fix bug をpick から s(squash) に変更し、fix api functionはpickのまま残します。

pick t13rvfp fix api function
s n2odp4q fix bug


vi の場合は:wqで保存してファイルを抜けます。
ここで、再度git log をして変更を確認します。

git log


コミットが1つにまとまっていることが確認できました。

commit *****
Author: hktech
Date:   Thu Sep 27 20:58:09 2018 +0900

    fix api function
    fix bug


この状態で再度リモート(origin)にpushしてmergeをする準備をします。

git push origin fix-api-function


おそらくここで前コミットと競合してしまうため、前のコミットを上書きする形で強制的にpushを行います。このときに使う-f オプションは -forceを意味します。これはリモートレポジトリを強制的に変更してしまうため、使う際には注意が必要です。

git push -f origin fix-api-function

無事に複数のコミットをまとめることができました。

まとめ

git rebase -iコマンドで複数のコミットをまとめる方法についてご紹介しました。せっかくgitを運用するからにはログをキレイに整理しておきたいですよね。細かいコミットをたくさんしてしまっている方は、この記事を参考にコミットをまとめる習慣をつけてみてはいかがでしょうか。

Pythonで決定木分類器をフルスクラッチで実装してみた

機械学習モデルをスクラッチから実装しようと思い立ったので、第一歩として決定木分類器(Decision Tree Classifer) Pythonで実装してみました。RandomForestやXGBoostなどといった決定木系の機械学習アルゴリズムを使う場面も多いと思うので、その基礎となる決定木分類器について実装をしながら仕組みを理解していきます。

決定木とは

決定木とは木構造を用いて分類または回帰のモデルを構築する手法です。変数を基準に条件分岐を行い、末端のノード(葉ノード)にたどりつくまで分岐をたどることで、分類クラスや回帰の結果を決定するものです。木が深くなるほど過学習が起きやすくなってしまうため、学習時には少ない分岐でより高い識別能力を持つ木を構築することが求められます。

分割の基準となる説明変数とその閾値が優先度の高いものから順に並ぶため、分類の解釈がしやすいといったメリットがあります。また、変数の重要度 (Feature importance) を算出することで、どの説明変数が識別に寄与しているかということを定量的に把握できるという特長もあります。

f:id:hktech:20181002225736p:plain
参考: wikipedia 決定木


決定木の学習を実装

Pythonを使って決定木の学習をスクラッチから実装していきます。スクラッチとはいえnumpyを使用しているという点についてはご容赦ください。決定木アルゴリズムにはいくつかの種類がありますが、今回は2分木を前提とし、ノードの分割条件を求める評価基準としてGini係数を使うCARTアルゴリズムで実装を進めます。実装はPython3で、numpyおよびscikit-learnがインストールされていることを前提としています。手早く環境構築をしたい方はanacondaを使いましょう。

hktech.hatenablog.com

Gini係数による不純度の計算

決定木を構築する際の最も重要な指標として不純度 (impurity) というものがあります。これは、あるノードに分岐したサンプルのクラスがどれほど散らばっているかということを表します。分岐させた結果、あるノードに1種類のクラスだけが分類された場合(きれいに分割できた場合)、そのノードの不純度は0となります。このように、不純度を求める方法としては交差エントロピーとGini係数がありますが、今回はCARTアルゴリズムに従ってGini係数を使います。
ノードtにおけるクラス C_iの割合を P(C_i|t)とすると、Gini係数は次の式を使って求めることができます。

 1-\sum_{i=1}^{K}P^2(C_i|t)

よくわからないので具体例を使って説明をしていきます。
文系・理系のサンプルが各200人ずついるとし、数学または国語の点数でこれらを分類したいとします。

f:id:hktech:20181005002646p:plain


より識別性能が高いものを木の上のほうに配置したいため、分岐条件AとBの不純度をそれぞれ求めます。

分割条件Aの ’yes’ グループのGini係数は
【Gini(yes)】1-((\frac{40}{170})^2+(\frac{130}{170})^2) = 0.36

分割条件Aの ’no’ グループのGini係数は
【Gini(no)】 1-((\frac{160}{230})^2+(\frac{70}{230})^2) = 0.42

分割ルールAによる不純度は次のよう二求めることができます。
 \frac{170}{400} \times \mathrm{Gini(yes)} + \frac{230}{400} \times \mathrm{Gini(no)}=0.39


同様に、分割条件Bの不純度を計算すると0.49となりました。
分割条件Aのほうが不純度が低いため、分割条件Aのほうが識別能力が高い分岐条件であることがわかります。Gini係数を計算することで、ノードを分割する最適な説明変数および閾値を求めていき、決定木を構築していきます。

Gini係数を求める関数は次のように実装しました。

def gini_score(data, target, feat_idx, threshold):
    gini = 0
    sample_num = len(target)
   
    div_target = [target[data[:, feat_idx] >= threshold], target[data[:, feat_idx] < threshold]]
   
    for group in div_target:
        score = 0
        classes = np.unique(group)
        for cls in classes:
            p = np.sum(group == cls)/len(group)
            score += p * p
        gini += (1- score) * (len(group)/sample_num)
    return gini

ノードを分割する最適な説明変数と閾値の選択

Gini係数が小さい分岐条件を見つけることで、識別能力の高い木が構築できるということがわかりました。今回は、最適な分岐条件をみつけるためにすべての説明変数について閾値を変化させていき、Gini係数が最小となる説明変数と閾値のペアをノードの分岐条件とする方針で実装していきます。
ノードに流れてくるdataおよびそれらに付与されたクラスラベル targetを渡すことで、変数giniが最小となるときの閾値best_thrsおよび説明変数best_fを返り値として得ることができます。

def search_best_split(data, target):   
    features = data.shape[1]
    best_thrs = None
    best_f = None
    gini = None
    gini_min = 1
 
    for feat_idx in range(features):
        values = data[:, feat_idx]
        for val in values:
            gini = gini_score(data, target, feat_idx, val)
            if gini_min > gini:
                gini_min = gini
                best_thrs = val
                best_f = feat_idx
    return gini_min, best_thrs, best_f    

再帰関数を使った木の構築

木の構築方法はいくつか方法が考えられますが、ここではオブジェクトと再帰関数を使った実装を行います。まずDecisionTreeNodeクラスを定義し、splitメソッドを用いることでノードに対応するDecisionTreeNodeの子オブジェクトを再帰的に生成していきます。停止条件としては、以下の2通りを定義しています。

  • ノードのGini係数が0になったとき(ノードに1つのクラスのサンプルしかないため、以降分岐の必要がない)
  • 木の深さがあらかじめ定義したmax_depthに達したとき


各ノードはleftとrightというクラス変数を持ち、ここに子ノードのオブジェクトが格納されていくため、仮想的に木を構築することができます。

def split(self, depth):
    self.depth = depth
   
    self.gini_min, self.threshold, self.feature = search_best_split(self.data, self.target)
    print('Depth: {}, Sep at Feature: {},Threshold: {}, Label: {}'.format(self.depth, self.feature, self.threshold, self.label))
   
   if self.depth == self.max_depth or self.gini_min == 0:
        return
   
    idx_left = self.data[:, self.feature] >= self.threshold
    idx_right = self.data[:, self.feature] < self.threshold
   
    self.left = DecisionTreeNode(self.data[idx_left],  self.target[idx_left], self.max_depth)
    self.right = DecisionTreeNode(self.data[idx_right], self.target[idx_right], self.max_depth)
    self.left.split(self.depth +1)
    self.right.split(self.depth +1)

DecisionTreeNodeクラス全体は次のように実装しました。

class DecisionTreeNode(object):
    def __init__(self, data, target, max_depth):
        self.left = None
        self.right = None
        self.max_depth = max_depth
        self.depth = None
        self.data = data
        self.target = target
        self.threshold = None
        self.feature = None
        self.gini_min = None
        self.label = np.argmax(np.bincount(target))
   
    def split(self, depth):
        self.depth = depth
        self.gini_min, self.threshold, self.feature = search_best_split(self.data, self.target)
        print('Depth: {}, Sep at Feature: {},Threshold: {}, Label: {}'.format(self.depth, self.feature, self.threshold, self.label))
       
        if self.depth == self.max_depth or self.gini_min == 0:
            return       
        idx_left = self.data[:, self.feature] >= self.threshold
        idx_right = self.data[:, self.feature] < self.threshold
   
        self.left = DecisionTreeNode(self.data[idx_left],  self.target[idx_left], self.max_depth)
        self.right = DecisionTreeNode(self.data[idx_right], self.target[idx_right], self.max_depth)
        self.left.split(self.depth +1)
        self.right.split(self.depth +1)
 
    def predict(self, data):
        if self.gini_min == 0.0 or self.depth == self.max_depth:
            return self.label
        else:
            if data[self.feature] > self.threshold:
                return self.left.predict(data)
            else:
               return self.right.predict(data)



決定木のテストを実装

さきほど作成したDecisionTreeNodeクラスに、テスト時の予測メソッドを実装しました。各ノードのオブジェクトには、best_splitで求めた最適な説明変数と閾値が格納されているためこれを使用します。先ほどの停止条件と同様に、Gini係数が0または深さがmax_depthに達した場合は末端ノード(葉ノード)となるため、ここでラベルを返します。これはノードに含まれているサンプルのうち最も多いクラスを返しており、これが予測ラベルとなります。葉ノードに達していない場合は、閾値に従って分岐を進んでいき、末端に達した時点で予測ラベルを返すという実装になります。

def predict(self, data):
    if self.gini_min == 0.0 or self.depth == self.max_depth:
        return self.label
    else:
        if data[self.feature] > self.threshold:
            return self.left.predict(data)
        else:
           return self.right.predict(data)

自作Pythonコード

全体のPythonコードは次の通りです。scikit-learnのメソッドにならって、fitとpredictを持つクラスDesicionTreeClassifierを定義しました。

import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
def gini_score(data, target, feat_idx, threshold):
    gini = 0
    sample_num = len(target)
   
    div_target = [target[data[:, feat_idx] >= threshold], target[data[:, feat_idx] < threshold]]
   
    for group in div_target:
        score = 0
        classes = np.unique(group)
        for cls in classes:
            p = np.sum(group == cls)/len(group)
            score += p * p
        gini += (1- score) * (len(group)/sample_num)
    return gini
 
def search_best_split(data, target):   
    features = data.shape[1]
    best_thrs = None
    best_f = None
    gini = None
    gini_min = 1
 
    for feat_idx in range(features):
        values = data[:, feat_idx]
        for val in values:
            gini = gini_score(data, target, feat_idx, val)
            if gini_min > gini:
                gini_min = gini
                best_thrs = val
                best_f = feat_idx
    return gini_min, best_thrs, best_f       
 
class DecisionTreeNode(object):
    def __init__(self, data, target, max_depth):
        self.left = None
        self.right = None
        self.max_depth = max_depth
        self.depth = None
        self.data = data
        self.target = target
        self.threshold = None
        self.feature = None
        self.gini_min = None
        self.label = np.argmax(np.bincount(target))
   
    def split(self, depth):
        self.depth = depth
        self.gini_min, self.threshold, self.feature = search_best_split(self.data, self.target)
        print('Depth: {}, Sep at Feature: {},Threshold: {}, Label: {}'.format(self.depth, self.feature, self.threshold, self.label))
       
        if self.depth == self.max_depth or self.gini_min == 0:
            return       
        idx_left = self.data[:, self.feature] >= self.threshold
        idx_right = self.data[:, self.feature] < self.threshold
   
        self.left = DecisionTreeNode(self.data[idx_left],  self.target[idx_left], self.max_depth)
        self.right = DecisionTreeNode(self.data[idx_right], self.target[idx_right], self.max_depth)
        self.left.split(self.depth +1)
        self.right.split(self.depth +1)
 
    def predict(self, data):
        if self.gini_min == 0.0 or self.depth == self.max_depth:
            return self.label
        else:
            if data[self.feature] > self.threshold:
                return self.left.predict(data)
            else:
               return self.right.predict(data)
 
class DesicionTreeClassifier(object):
    def __init__(self, max_depth):
        self.max_depth = max_depth
        self.tree = None
   
    def fit(self, data, target):
        initial_depth = 0
        self.tree = DecisionTreeNode(data, target, self.max_depth)
        self.tree.split(initial_depth)
   
    def predict(self, data):
        pred = []
        for s in data:
            pred.append(self.tree.predict(s))
        return np.array(pred)
 
if __name__ == '__main__':
iris = load_iris()
data = iris.data
target = iris.target
 
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, random_state=0)
 
clf = DesicionTreeClassifier(max_depth=3)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
score = sum(y_pred == y_test)/float(len(y_test))
print('Classification accuracy: {}'.format(score))




自作コードを使った分類結果

フルスクラッチで書いたコードを使って、irisデータセットをもとに学習および評価を行いました。
学習した決定木は次のようになりました。可視化していないのでわかりにくいですが、深さ1の右側の木(一番下の行)はそれ以降分岐がないためGini係数が0であったことがわかります。このときの予測ラベルは0番のクラスです。また、逆側のブランチは深さ3まで伸びています。

Depth: 0, Sep at Feature: 2,Threshold: 3.0, Label: 2
Depth: 1, Sep at Feature: 2,Threshold: 5.0, Label: 2
Depth: 2, Sep at Feature: 2,Threshold: 5.1, Label: 2
Depth: 3, Sep at Feature: 0,Threshold: 6.5, Label: 2
Depth: 3, Sep at Feature: 0,Threshold: 6.7, Label: 2
Depth: 2, Sep at Feature: 3,Threshold: 1.7, Label: 1
Depth: 3, Sep at Feature: 1,Threshold: 3.2, Label: 2
Depth: 3, Sep at Feature: 0,Threshold: 5.0, Label: 1
Depth: 1, Sep at Feature: 0,Threshold: 4.8, Label: 0



最後に、正解率を求めることで分類器が機能していることを確認しました。
学習データ:テストデータ=7:3、max_depth=3としたときの識別精度は以下の通りでした。

Classification accuracy: 0.977

いい感じに分類できていることがわかりますね。

まとめ

Pythonで決定木分類器をフルスクラッチで実装してみました。とても単純なアルゴリズムかと思いきや、再帰関数などを使って実装を工夫する必要があったためなかなか大変でした。決定木アルゴリズムの中でもポピュラーなRandomForestなども実装してみたいと思います。

Pythonを使ってカメラ映像をプレビュー表示しながら動画として保存する

画像を扱う仕事をしていると、カメラを使って自ら画像を撮影しなければいけない場面がありますよね。今回はPCの内臓カメラやUSBカメラを使って、カメラ映像を動画として保存するコードを実装したのでご紹介します。

準備

OpenCVをインストールします。

pip install opencv-python

Pythonソースコード

引数でいろいろな設定をできるようにしています。

import argparse
import cv2
import datetime
import os
 
def set_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument('device_id', type=int, help="通常は、 0: 内蔵カメラ, 1: USBカメラ")
    parser.add_argument('limit', type=int, help="撮影フレーム数")
    parser.add_argument('-f', '--fps', default=30, help="撮影FPS")
    parser.add_argument('-o', '--output_directory', help="出力先ディレクトリ")
    parser.add_argument('-p', '--preview', type=int, default=1, help="プレビュー画面の表示有無 0:表示しない 1:表示する(デフォルト)")
    return parser.parse_args()
 
def get_outputpath(output_directory):
    now = datetime.datetime.now()
    file_name = '{}.avi'.format(now.strftime('%Y%m%d_%H%M%S'))
    if output_directory is None:
        output_path = file_name
    else:
        os.makedirs(output_directory, exist_ok=True)
        output_path = os.path.join(output_directory, file_name)
   return output_path
 
def set_camera(device_id, fps):
    fps = int(fps)
    camera = cv2.VideoCapture(device_id)
    camera.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    camera.set(cv2.CAP_PROP_FPS, fps)
 
    if camera is None:
        raise Exception("Camera not found. Check device id.")
    return camera
 
def capture(camera, output_filepath, show_preview, limit = None):
    frame_number = 0
    fourcc = cv2.VideoWriter_fourcc(*'MJPG')
    ret, frame = camera.read()
 
    fps = camera.get(cv2.CAP_PROP_FPS)
    height = camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
    width = camera.get(cv2.CAP_PROP_FRAME_WIDTH)
    writer = cv2.VideoWriter(output_filepath, fourcc, fps, (int(width), int(height)))
 
    while(ret):
        frame_number += 1
        writer.write(frame)
        if show_preview:
            cv2.imshow("preview", frame)
        if cv2.waitKey(int(1 / fps * 1000)) == 27: # ESC Key
            break
        if limit is not None and frame_number >= limit:
           break
        ret, frame = camera.read()
 
if __name__ == '__main__':
    args = set_arguments()
    output_path = get_outputpath(args.output_directory)
    camera = set_camera(args.device_id, args.fps)
    capture(camera, output_path, args.preview, args.limit)

カメラ映像を動画に保存する

プログラムの使い方について説明します。
引数の渡し方は-hオプションで見ることができます。

python camera_recorder.py -h


こんな感じです。

usage: camera_recorder.py [-h] [-f FPS] [-o OUTPUT_DIRECTORY] [-p PREVIEW]
                          device_id limit
 
positional arguments:
  device_id             通常は、 0: 内蔵カメラ, 1: USBカメラ
  limit                 撮影フレーム数
 
optional arguments:
  -h, --help            show this help message and exit
  -f FPS, --fps FPS     撮影FPS
  -o OUTPUT_DIRECTORY, --output_directory OUTPUT_DIRECTORY
                        出力先ディレクトリ
  -p PREVIEW, --preview PREVIEW
                        プレビュー画面の表示有無 0:表示しない 1:表示する(デフォルト)


それでは実際に動かしてみましょう。これは内臓カメラ(device_id = 0)で300フレーム分撮影してくれる実行スクリプトです。デフォルトではFPSは30なので、10秒間撮影をしてくれます。

python camera_recorder.py 0 300


f:id:hktech:20181002220736p:plain
カメラのプレビュー映像もちゃんと見えていますね。撮影時にはプレビュー表示があるとかなり便利です。

外部接続のUSBカメラで撮影をしたい場合は、次のようにするとできるはずです。

python camera_recorder.py 1 300

まとめ

Pythonを使ってカメラ映像を動画として保存するコードを紹介しました。このコードを使えば画像の学習データを自作できるのですが、アノテーションがとても大変だったりします。特に物体検出タスクなどでは、クラスだけでなくバウンディングボックスの座標なども指定しなくてはいけなくて地獄です。今後の記事ではアノテーションを楽にするような撮影方法や、バウンディングボックス座標を半自動で認識するようなプログラムについて紹介できればと思います。

Pythonを使ってDynamoDBにJSONデータをインポート(アップロード)する

Pythonを使って、AWSを代表するNoSQLデータベースであるDynamoDBにJSONデータをインポート(アップロード)する手順およびスクリプトについてご紹介します。

目次

DynamoDBのテーブルを作成する

DynamoDBはNoSQLであるため、テーブル設計時にスキーマを定義する必要がありません。この辺りについては、NoSQLに関する記事でまとめましたので確認してみてください。

hktech.hatenablog.com


AWSにサインインし、DynamoDBマネジメントコンソールにアクセスします。
https://ap-northeast-1.console.aws.amazon.com/dynamodb/home

テーブルを作成します。

f:id:hktech:20180929003641p:plain


今回はセンサーの出力値を格納するテーブルを作成していきます。

f:id:hktech:20180929003814p:plain


プライマリキーは必須項目ですが、RDBMSとは異なり残りのカラム名を指定する必要がありません。テーブル名、プライマリキーはそれぞれ次のように設定します。

  • テーブル名: sensor_value
  • プライマリキー: id

Python を使ってDynamoDBにJSONデータをインポートする

PythonによるDynamoDBへのデータインポート方法について説明します。
まず、JSON形式のリストデータsensor_data.jsonを用意します。

[{"id": "data_000001", "value": "20"}, {"id": "data_000002", "value": "30"}]

aws_access_key_id, aws_secret_access_keyの中身をそれぞれの環境に合わせて変更し、次のプログラムを実行します。

from boto3.session import Session
from decimal import Decimal
import json
 
def get_dynamo_table(key_id, access_key, table_name):
    session = Session(
            aws_access_key_id=key_id,
            aws_secret_access_key=access_key,
            region_name='ap-northeast-1'
    )
 
    dynamodb = session.resource('dynamodb')
    dynamo_table = dynamodb.Table(table_name)
    return dynamo_table
 
def insert_data_from_json(table, input_file_name):
    with open(input_file_name, "r") as f:
        json_data = json.load(f)
        with table.batch_writer() as batch:
            for record in json_data:
                record["value"] = Decimal("{}".format(record["value"])) # テーブル側で数値型を指定している場合はこのような処理が必要
                batch.put_item(Item=record)
    print('Successfully inserted data.')
 
if __name__ == '__main__':
    aws_access_key_id='idhogehoge'
    aws_secret_access_key='keyhogehoge'
    input_file_name = './sensor_data.json' 
 
    dynamo_table = get_dynamo_table(aws_access_key_id, aws_secret_access_key, 'sensor_value')
    insert_data_from_json(dynamo_table, input_file_name)


DynamoDBマネジメントコンソールにアクセスし、データがちゃんと入っていることを確認できました。

f:id:hktech:20180929003944p:plain

まとめ

Pythonを使ってDynamoDBにJSONデータをインポートする手順についてご紹介しました。大量のデータをPythonスクリプトで一気にインポートしたいときに使ってみてはいかがでしょうか。