Amazon Web Services ブログ

Amazon Managed Service for Apache Flink のランダムカットフォレストによるリアルタイム異常検出

2023 年 8 月 30 日: Amazon Kinesis Data Analytics は Amazon Managed Service for Apache Flink に名称が変更されました。AWS News Blogこちらをご覧ください。

リアルタイム異常検出とは、ストリーミングデータで予期しない挙動が発生したときにそれを検出する手法です。そして、オンライン機械学習アルゴリズムは、明示的なルールを必要とせずに変化するベースラインに適応できるため、リアルタイム異常検出のユースケースで人気があります。このアルゴリズムは、受信するデータが時間の経過とともに継続的に変化する連続的なデータストリームに対して特に役立ちます。

ランダムカットフォレスト (RCF) は、異常検出のユースケースで広く使用されているアルゴリズムの 1 つです。一般的には、入力データに対して RCF アルゴリズムを高いスループットで実行したい場合が多く、ストリーミングデータ処理フレームワークはそのようなケースで役立ちます。Amazon Managed Service for Apache Flink 上で RCF が使用可能になり、ストリーミングデータ処理において異常検出ができるようになりました。Apache Flink は、データストリーム上でリアルタイムでステートフルな計算を行うための人気のオープンソースフレームワークで、高いスループットで RCF を入力ストリームに実行するために使用できます。

この記事では、Amazon Managed Service for Apache Flink を使用して、異常検出用のオンライン RCF アルゴリズムを実行する方法を説明します。

ソリューションの概要

今回作成するアーキテクチャを下の図に示します。アーキテクチャは、Amazon Kinesis Data Streams による入力データストリーム、Amazon Managed Service for Apache Flink によるFlink ジョブ、Amazon Kinesis Data Streams による出力データストリームの 3 つのコンポーネントで構成されています。データフローに関しては、Python スクリプトを使用して異常な正弦波データを入力データストリームに配信し、そのデータを Flink ジョブで RCF によって処理し、結果の異常スコアを出力データストリームに配信します。

以下のグラフは、予想される結果の例です。正弦波データソースが定数 -17 に異常に低下した際に、異常スコアがピークに達した様子を示しています。

このソリューションは以下の 3 つの簡単なステップで実装できます。

  1. AWS CloudFormation 経由で AWS リソースをセットアップします。
  2. 入力データストリームに正弦波データを配信するようにデータジェネレータを設定します。
  3. Amazon Managed Service for Apache Flink で RCF Flink Java コードを実行します。

AWS Cloud Formation で AWS リソースをセットアップする

次の CloudFormation スタックは、2 つの Kinesis Data Streams、1 つの Amazon Managed Service for Apache Flink アプリケーション、および Amazon Simple Storage Service (Amazon S3) バケットを含む、このチュートリアルに必要なすべての AWS リソースを作成します。

AWS アカウントにサインインし、[Launch Stack] を選択します。

BDB-2063-launch-cloudformation-stack

AWS CloudFormation コンソールのステップに従ってスタックを作成します。

データジェネレータをセットアップする

次の Python スクリプトを実行して、入力データストリームに異常な正弦波データを入力します。

import json 
import boto3 
import math

STREAM_NAME = "ExampleInputStream-RCF"


def get_data(time):
	rad = (time/100)%360 
	val = math.sin(rad)*10 + 10
	
	if rad > 2.4 and rad < 2.6: 
		val = -17 
		
	return {'time': time, 'value': val} 
	
def generate(stream_name, kinesis_client): 
	time = 0
	
	while True: 
		data = get_data(time) 
		kinesis_client.put_record( 
			StreamName=stream_name, 
			Data=json.dumps(data), 
			PartitionKey="partitionkey") 
			
		time += 1 
		
		
if __name__ == '__main__':
	generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2')) 

Amazon Managed Service for Apache Flink で RCF Flink Java コードを実行する

CloudFormation スタックは、RCF Flink ジョブ JAR ファイルを自動的にダウンロードしてパッケージ化します。そのため、Amazon Managed Service for Apache Flink コンソールにアクセスし、作成された分析アプリケーションを実行するだけでリアルタイム異常検出を体験できます。

分析アプリケーションを実行することにより、 Kinesis Data Streams からのデータを継続的に読み込み、過去のデータポイントを基に新しいデータポイントの異常スコアを計算する Flink ジョブが作成されました。

以下のセクションでは、RCF の実装と Flink のジョブコードについて詳しく説明します。

RCF の実装

RCF 実装方法は多数公開されています。このチュートリアルでは、AWS での実装を Flink ジョブで使用するカスタムラッパー (RandomCutForestOperator) でラップして使用します。

RandomCutForestOperator は Apache Flink ProcessFunction として実装されています。この関数を使用すると、ストリーム内のすべての要素を処理するカスタムロジックを記述できます。カスタムロジックは、inputDataMapper.apply によるデータ変換から始まり、続いて rcf.getAnomalyScore 経由で AWS RCF ライブラリを呼び出して異常スコアを取得します。RandomCutForestOperator の実装コードは GitHub で確認できます。

RandomCutForestOperatorBuilder には主に 2 つのタイプのパラメータが必要です。

  • RandomCutForestOperator のハイパーパラメータ
    • Dimensions – 入力データが float 型で構成される 1 次元の正弦波であるため、1 に設定します。
    • ShingleSize – 1 に設定します。この設定により、RCF アルゴリズムは異常スコア推定において過去と現在のデータポイントを使用します。季節性を持つデータを対象にする場合、この値を増やすことを考慮する必要があります。
    • SampleSize – 628 に設定します。この設定により、各ツリーのデータサンプルには最大 628 個のデータポイントが保持されます。
  • 入出力処理用の DataMapper パラメータ
    • InputDataMapperRandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPERを使用して、入力データを float から float[] にマップします。
    • ResultMapperRandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER を使用します。これは、異常スコアと対応する正弦波データポイントを結合してタプルにする BiFunction です。

Flink ジョブコード

次のコードスニペットは、Apache Flink ストリーミング Java コードのコアストリーミング構造を示しています。まず、ソース Kinesis Data Streams からデータを読み込み、次に RCF アルゴリズムを使用して処理します。次に、計算された異常スコアが出力 Kinesis Data Streams に書き込まれます。

DataStream<Float> sineWaveSource = createSourceFromStaticConfig(env);

sineWaveSource
		.process( 
				RandomCutForestOperator.<Float, Tuple2<Float, Double>>builder() 
						.setDimensions(1) 
						.setShingleSize(1) 
						.setSampleSize(628) 
						.setInputDataMapper(RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER) 
						.setResultMapper(RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER) 
						.build(), 
				TupleTypeInfo.getBasicTupleTypeInfo(Float.class, Double.class)) 
		.addSink(createSinkFromStaticConfig()); 

この例では、ベースライン入力データは正弦波です。次のスクリーンショットに示すように、データが規則的であれば低い異常スコアが返されます。ただし、データに異常がある場合 (正弦波入力データが定数まで低下した場合)、高い異常スコアが返されます。異常スコアは出力 Kinesis Data Streams に配信されます。この結果は、Amazon Managed Service for Apache Flink Studio アプリケーションを作成することで視覚化できます。手順については、「ストリーミングデータのインタラクティブ分析」を参照してください。

RCF は教師なしアルゴリズムなので、この手順には明示的なルールやラベル付きデータセットを指定する必要はありません。つまり、入力データストリーム、データ変換、および一部のハイパーパラメーターのみで実行できます。RCF 自身が入力データに基づいて予想されるベースラインを決定し、予期しない動作を特定します。

さらに、RCF はベースラインが時間の経過とともに変化しても、継続的に適応することができます。そのため、再トレーニングの頻度は最小限で済みます。季節的な傾向、インフレーション、機器のキャリブレーションのドリフトなどにより、データが時間の経過とともにゆっくりとドリフトすることはよくあるため、RCF はストリーミングデータの異常検出に有効なアルゴリズムです。

クリーンアップ

今後料金が発生しないようにするには、次の手順を実行してください。

  1. Amazon S3 コンソールで、CloudFormation スタックによって作成された S3 バケットを空にします。
  2. AWS CloudFormation コンソールで、CloudFormation スタックを削除します。

まとめ

この記事では、Amazon Managed Service for Apache Flink を使用するオンラインの教師なし機械学習アルゴリズムである RCF を使用して、入力ストリーミングデータの異常検出を実行する方法を説明しました。また、このアルゴリズムが自動的にデータのベースラインを学習し、時間の経過に伴うベースラインの変化に適応する方法についても説明しました。リアルタイム異常検出のユースケースにこのソリューションを検討していただければ幸いです。


著者について

Daren Wong は AWS のソフトウェアエンジニアです。彼は、AWS で Apache Flink アプリケーションを実行するためのマネージドサービスである Amazon Managed Service for Apache Flink に携わっています。

Aleksandr Pilipenko は AWS のソフトウェアエンジニアです。彼は、AWS で Apache Flink アプリケーションを実行するためのマネージドサービスである Amazon Managed Service for Apache Flink に携わっています。

Hong Liang Teoh は AWS のソフトウェアエンジニアです。彼は、AWS で Apache Flink アプリケーションを実行するためのマネージドサービスである Amazon Managed Service for Apache Flink に携わっています。

このブログは 2023 年 5 月 1 日に Daren Wong、Aleksandr Pilipenko、Hong Liang Teoh によって執筆された「Real-time anomaly detection via Random Cut Forest in Amazon Managed Service for Apache Flink」を日本語化したものです。
翻訳はソリューションアーキテクトの山下一樹が担当しました。