この記事は、 NTT Communications Advent Calendar 2024 12日目の記事です。 Azure Databricksを使ってレイクハウスアーキテクチャのログ基盤を構築し、 構造化されていないアプリケーションログの保管や加工、分析を試します。 はじめに レイクハウスアーキテクチャ ログ基盤とレイクハウス Azure Databricksでアプリケーションログを分析する Azure Databricksの準備 Terraformを使ったリソース作成 カタログとスキーマの作成 ログの取り込み ログの加工 BronzeからSilver SliverからGold ログの分析 (可視化) まとめ 参考文献 はじめに こんにちは、コミュニケーション&アプリケーションサービス部の吉仲です。 2022年度に入社し、初期配属からメール系システムと文書要約APIの開発・運用業務に取り組んでいます。 今回は業務から少し離れ、自身が興味のあるデータエンジニアリングの分野を題材にします。 本記事では、Azure Databricksを使ってレイクハウスアーキテクチャのログ基盤を構築し、 そこへ非構造化ログを取り込み、データの加工やダッシュボードでの可視化を試していきます。 本記事に含まれる内容は以下の通りです。 なぜログ基盤をレイクハウスアーキテクチャにしたいか Azure Databricksの構築 (Terraformを使用) Delta Live Tablesを使ったパイプラインの実装 サンプルログを地図上にマッピングして可視化するダッシュボードの作成 レイクハウスアーキテクチャ レイクハウスとは、 Databricks が提唱している新しいタイプのデータマネジメントアーキテクチャです。 www.databricks.com 簡単に言うと、レイクハウスはデータ レイク とデータウェア ハウス の良いとこ取りをしたアーキテクチャです。 レイクハウスは、構造化データから非構造化データまでを一元的かつ効率的に扱うことができ、 ビジネスインテリジェンス (BI) とAI/機械学習のどちらにも対応します。 ログ基盤とレイクハウス ログ基盤を構築する目的として、ログの 保管 と 分析 の2つがあると思います。 この保管と分析を 低コスト かつ 効率的 に実現できる点に、ログ基盤でレイクハウスアーキテクチャを取るメリットがあると考えます。 各種のアプリケーションやシステムにおいて、全てが構造化ロギングを行っているとは限りません。 例えば、OSSのメール転送エージェント (MTA) であるpostfixでは、メール配送時に以下のようなログを出力します。 見ての通りjsonやxml形式で出力されておらず、なおかつ1回のメール配送の情報が複数行に点在しています。 Dec 12 08:13:10 mtaserver postfix/smtpd[1180]: connect from example.com[192.0.2.1] Dec 12 08:13:10 mtaserver postfix/smtpd[1180]: ABCDEF01: client=example.com[192.0.2.1] Dec 12 08:13:10 mtaserver postfix/cleanup[1187]: ABCDEF01: message-id=<message.id@mtaserver.example.com> Dec 12 08:13:11 mtaserver postfix/smtpd[1180]: disconnect from example.com[192.0.2.1] ehlo=1 mail=1 rcpt=1 data=1 quit=1 commands=5 Dec 12 08:13:11 mtaserver postfix/qmgr[264]: ABCDEF01: from=<from@example.com>, size=2200, nrcpt=1 (queue active) Dec 12 08:13:11 mtaserver postfix/smtp[1738]: ABCDEF01: to=<to@example.net>, relay=example.net[192.0.2.2]:25, delay=0.3, delays=0.2/0/0.07/0.03, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as ABCDEF01) Dec 12 08:13:11 mtaserver postfix/qmgr[264]: ABCDEF01: removed このようなログは未加工のままでは扱いが難しいため、加工して構造化したログを用意した上で分析します。 ただし、加工によって少なからず情報が抜け落ちてしまうことがあります。 将来的に、加工方法や分析目的が変化/追加されることを考慮すると、未加工のログ (生ログ) も保管しておきたいです。 データレイクとデータウェアハウスを組み合わすことで、生ログの保管と加工済みログの分析を効果的に実現できます。 しかし、この構成ではデータを二重で持つことになり、データ保存コストが高くなりやすいです。 ここで、レイクハウスの出番です。 データレイク : さまざまなデータを低コストで保管できる データウェアハウス : 高度な分析を効率的に行える この両方の強みを兼ね備えたレイクハウスにより、生ログの保管と加工済みログの分析を 低コスト かつ 効率的 に実現できると考えます。 また、後述するマルチレイヤーのデータ設計により、加工方法や分析目的の変化/追加に対して柔軟で拡張性の高い基盤を実現できます。 Azure Databricksでアプリケーションログを分析する ここでは、 Azure Databricks を使ってログ基盤としてのレイクハウスを構築し、サンプルのログを分析してみます。 具体的には、前述のpostfixサンプルログを保管し、分析するところまでをAzure Databricksで試します。 なお、ログ基盤を構築・運用する上ではログの 収集 も非常に重要なポイントですが、今回はフォーカスしません。 簡単のため、データはAzure Databricksに直接アップロードすることとします。 Azure Databricksの準備 まずはじめに、以下のような構成でAzure Databricksを構築します。 リソースグループの中に以下のリソースを作成します。 仮想ネットワーク ネットワークセキュリティグループ Azure Databricksワークスペース ワークスペースを作成すると、自動的に専用のリソースグループと以下のリソースが作成されます。 ストレージアカウント (Azure Data Lake Storage Gen2, ADLS2) Azure Databricks用のアクセス コネクタ マネージドID Databricks上のデータマネジメント機能 Unity Catalog の外部ロケーションとしてADLS2を登録している状態が出来上がります。 1 Terraformを使ったリソース作成 今回は Terraform を使ってリソースを作成していきます。 前提は以下の通りです。 Azureで「共同作成者」ロールが割り当てられたアカウントを保持していること Azure CLI およびTerraformを実行できる環境があること Azure CLIで上記のアカウントにログイン済みであること こちら を参考に、 Azureのリソースグループと仮想ネットワーク、Azure Databricksワークスペースを定義します。 今回はリソースの名前に "databricksdemo" というprefixを付けます。 Terraformコード terraform { required_providers { azurerm = "~> 4.0" random = "~> 3.6" databricks = { source = "databricks/databricks" version = "1.59.0" } } } data "azurerm_client_config" "current" {} data "external" "me" { program = [ "az" , "account" , "show" , "--query" , "user" ] } locals { subscription_id = "<AzureサブスクリプションID>" prefix = "databricksdemo" tags = { "Environment" = "Demo" "Owner" = lookup (data.external.me.result, "name" ) } region = "japaneast" cidr = "10.179.0.0/20" } provider "azurerm" { subscription_id = local.subscription_id features {} } provider "databricks" { host = azurerm_databricks_workspace.this.workspace_url } resource "azurerm_resource_group" "this" { name = "$ { local.prefix } -rg" location = local.region tags = local.tags } resource "azurerm_virtual_network" "this" { name = "$ { local.prefix } -vnet" resource_group_name = azurerm_resource_group.this.name location = azurerm_resource_group.this.location address_space = [ local.cidr ] tags = local.tags } resource "azurerm_network_security_group" "this" { name = "$ { local.prefix } -nsg" resource_group_name = azurerm_resource_group.this.name location = azurerm_resource_group.this.location tags = local.tags } resource "azurerm_subnet" "public" { name = "$ { local.prefix } -public" resource_group_name = azurerm_resource_group.this.name virtual_network_name = azurerm_virtual_network.this.name address_prefixes = [ cidrsubnet (local.cidr, 3 , 0 ) ] delegation { name = "databricks" service_delegation { name = "Microsoft.Databricks/workspaces" actions = [ "Microsoft.Network/virtualNetworks/subnets/join/action" , "Microsoft.Network/virtualNetworks/subnets/prepareNetworkPolicies/action" , "Microsoft.Network/virtualNetworks/subnets/unprepareNetworkPolicies/action" , ] } } } resource "azurerm_subnet_network_security_group_association" "public" { subnet_id = azurerm_subnet.public.id network_security_group_id = azurerm_network_security_group.this.id } resource "azurerm_subnet" "private" { name = "$ { local.prefix } -private" resource_group_name = azurerm_resource_group.this.name virtual_network_name = azurerm_virtual_network.this.name address_prefixes = [ cidrsubnet (local.cidr, 3 , 1 ) ] delegation { name = "databricks" service_delegation { name = "Microsoft.Databricks/workspaces" actions = [ "Microsoft.Network/virtualNetworks/subnets/join/action" , "Microsoft.Network/virtualNetworks/subnets/prepareNetworkPolicies/action" , "Microsoft.Network/virtualNetworks/subnets/unprepareNetworkPolicies/action" , ] } } } resource "azurerm_subnet_network_security_group_association" "private" { subnet_id = azurerm_subnet.private.id network_security_group_id = azurerm_network_security_group.this.id } resource "azurerm_databricks_workspace" "this" { name = "$ { local.prefix } -workspace" resource_group_name = azurerm_resource_group.this.name location = azurerm_resource_group.this.location sku = "premium" managed_resource_group_name = "$ { local.prefix } -workspace-rg" tags = local.tags custom_parameters { no_public_ip = true virtual_network_id = azurerm_virtual_network.this.id private_subnet_name = azurerm_subnet.private.name public_subnet_name = azurerm_subnet.public.name public_subnet_network_security_group_association_id = azurerm_subnet_network_security_group_association.public.id private_subnet_network_security_group_association_id = azurerm_subnet_network_security_group_association.private.id } } data "databricks_node_type" "smallest" { local_disk = true depends_on = [ azurerm_databricks_workspace.this ] } data "databricks_spark_version" "latest_lts" { long_term_support = true depends_on = [ azurerm_databricks_workspace.this ] } resource "databricks_cluster" "this" { cluster_name = "$ { local.prefix } -cluster" node_type_id = data.databricks_node_type.smallest.id spark_version = data.databricks_spark_version.latest_lts.id data_security_mode = "SINGLE_USER" autotermination_minutes = 10 spark_conf = { "spark.databricks.cluster.profile" : "singleNode" , "spark.master" : "local[*]" , "spark.databricks.unityCatalog.volumes.enabled" : "true" , } custom_tags = { "ResourceClass" : "SingleNode" } } output "databricks_host" { value = "https://$ { azurerm_databricks_workspace.this.workspace_url } /" } Terraform コマンドにより、上記で定義したリソースを作成します。 terraform init terraform plan terraform apply # 途中で "yes" を入力 # ... # Outputs: # databricks_host = "https://<AzureDatabricksワークスペースのURL>/" リソースグループ <prefix>-rg (今回は databricksdemo-rg ) の中に以下のリソースが作成されます。 <prefix>-nsg <prefix>-vnet <prefix>-workspace また、リソースグループ <prefix>-workspace-rg が自動的に作成され、その中に以下のリソースが作成されます。 dbmanagedidentity dbstorage<randam-string> unity-catalog-access-connector 以上で準備は完了です。 terraform apply 実行後に表示されるDatabricksワークスペースのURLをクリックし、Databricks UIを開きます。 カタログとスキーマの作成 Databricksでは、 メダリオンアーキテクチャ というマルチレイヤーのデータ設計が推奨されています。 具体的な説明はここでは省略しますが、このアーキテクチャにより、加工方法や分析目的の変化/追加に対して柔軟になります。 メダリオンアーキテクチャの実装方法については こちら を参考にし、以下のような設計とします。 demo というカタログを作成し、その中にBronze/Sliver/Goldレイヤー用のスキーマを作成します。 bronze :postfixの生ログをそのまま保存 silver :各行から必要な情報 (client IP等) を抽出し、構造化したログを保存 gold :複数行に跨るメール配送の情報を結合し、分析用に整備したログを保存 上記のカタログとスキーマは、以下の操作により作成します。 サイドバーで [カタログ] をクリック カタログ一覧の右上の [⚙]>[外部ロケーション] をクリックし、外部ロケーションのURLを控える サイドバー上部の [新規]>[ノートブック] から新しいノートブックを作成して以下を実行 (適宜、クラスターの起動とノートブックのアタッチを実施) %sql CREATE CATALOG demo MANAGED LOCATION ' <外部ボリュームのURL>/demo ' ; CREATE SCHEMA demo.bronze MANAGED LOCATION ' <外部ボリュームのURL>/demo/bronze ' ; CREATE SCHEMA demo.silver MANAGED LOCATION ' <外部ボリュームのURL>/demo/silver ' ; CREATE SCHEMA demo.gold MANAGED LOCATION ' <外部ボリュームのURL>/demo/gold ' ; 【参考】スキーマ作成後の状態 以上でカタログとスキーマの作成は完了です。 ログの取り込み 次に、Databricks UI上で前述のpostfixサンプルログをBronzeレイヤーに取り込みます。 なお、本格的なログ基盤を構築する場合は、Azureの Event Hubs や Data Factory 等を使ってデータソースからログを取り込みます。 以下の操作でボリュームとディレクトリを作成し、サンプルログをアップロードします。 先ほどのノートブックを開き、以下のクエリを実行 %sql CREATE VOLUME demo.bronze.maillog; サイドバーの [新規]>[データを追加またはアップロード] をクリック サンプルログを maillog というファイルとしてアップロードし、 送信先パスに /Volumes/demo/bronze/maillog/postfix/2024-12-12 を入力して [アップロード] をクリック 同様に、日時等を変えたサンプルログ②を /Volumes/demo/bronze/maillog/postfix/2024-12-13 にアップロードしておきます。 サンプルログ② Dec 13 18:15:30 mtaserver postfix/smtpd[1181]: connect from example.com[192.0.2.3] Dec 13 18:15:30 mtaserver postfix/smtpd[1181]: ABCDEF02: client=example.com[192.0.2.3] Dec 13 18:15:30 mtaserver postfix/cleanup[1188]: ABCDEF02: message-id=<message.id@mtaserver.example.com> Dec 13 18:15:31 mtaserver postfix/smtpd[1181]: disconnect from example.com[192.0.2.3] ehlo=1 mail=1 rcpt=1 data=1 quit=1 commands=5 Dec 13 18:15:31 mtaserver postfix/qmgr[265]: ABCDEF02: from=<from@example.com>, size=2200, nrcpt=1 (queue active) Dec 13 18:15:31 mtaserver postfix/smtp[1739]: ABCDEF02: to=<to@example.net>, relay=example.net[192.0.2.2]:25, delay=0.5, delays=0.4/0/0.07/0.03, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as ABCDEF02) Dec 13 18:15:31 mtaserver postfix/qmgr[265]: ABCDEF02: removed アップロードしたサンプルログをDatabricksで確認してみます。 先ほどのノートブックで以下を実行します。 %sql SELECT * from read_files( " /Volumes/demo/bronze/maillog/postfix/2024-12-*/maillog " , format => " text " ) Databricks上で、保管したサンプルログの中身を表示できるようになりました。 ただし、このままではログの検索や分析が難しいため、データ加工により構造化ログを作成していきます。 ログの加工 ここでは、 Delta Live Tables (DLT) パイプラインを使ってログを加工することで、 BronzeからSilver、そしてGoldレイヤーにデータをインジェストしていきます。 BronzeからSilver Bronzeレイヤーに保存した生ログを構造化してSilverレイヤーにインジェストします。 postfixログの構造化のために、 Logstash で使われるGrokという正規表現と、PythonでGrokを扱うための pygrok を利用します。 postfixログ用のGrokパターンは以下のものを使います。 これはあくまでサンプルログ用のパターンで、実際のログを構造化するためにはより複雑なパターンが必要になります。 postfixサンプルログ用のGrokパターン # common patterns PROCESS ([\w._\/%\-]+) PROGRAM (postfix[\w\-.]*) PROCESS_AND_PID %{PROGRAM:program}\/%{PROCESS:process}(?:\[%{NUMBER:pid}\])? QUEUEID (?:[0-9A-F]{6,}|[0-9a-zA-Z]{12,}|NOQUEUE) EMAIL_ADDRESSPART [a-zA-Z0-9_.+-=:~]+ EMAIL_ADDRESS %{EMAIL_ADDRESSPART}@%{EMAIL_ADDRESSPART} RELAY (?:%{HOSTNAME:relayhost}(?:\[%{IP:relayip}\](?::[0-9]+(.[0-9]+)?)?)?) CLIENT (?:%{HOSTNAME:clienthost}(?:\[%{IP:clientip}\](?::[0-9]+(.[0-9]+)?)?)?) POSREAL [0-9]+(.[0-9]+)? STATUS sent|deferred|bounced|expired|delivery RESPONSECODE [0-9][0-9][0-9] DSN %{NONNEGINT}.%{NONNEGINT}.%{NONNEGINT} # postfix/smtp POSTFIX_SMTP %{QUEUEID:qid}: to=<%{EMAIL_ADDRESS:to_addr}>, relay=<?%{RELAY}>?, delay=%{POSREAL:delay}, delays=%{DATA:delays}, dsn=%{DSN:dsn}, status=%{STATUS:result} \(%{DATA:reason}\) # postfix/smtpd POSTFIX_SMTPD %{POSTFIX_SMTPD_CLIENT}|%{POSTFIX_SMTPD_CONNECTS} POSTFIX_SMTPD_CLIENT %{QUEUEID:qid}: client=<?%{CLIENT}>? POSTFIX_SMTPD_CONNECTS (?:dis)?connect from %{CLIENT}(?: %{DATA:command_counts})? # postfix/cleanup POSTFIX_CLEANUP %{QUEUEID:qid}: (resent-)?message-id=<?%{DATA:messageid}>? # postfix/qmgr POSTFIX_QMGR %{QUEUEID:qid}: (?:removed|from=<(?:%{EMAIL_ADDRESS:from_addr})?>(?:, size=%{NUMBER:size}, nrcpt=%{NUMBER:nrcpt} \(%{GREEDYDATA:queuestatus}\))?) # aggregate all patterns POSTFIX_PREFIX (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{PROCESS_AND_PID}: POSTFIX_LOG %{POSTFIX_PREFIX} (?:%{POSTFIX_SMTP}|%{POSTFIX_SMTPD}|%{POSTFIX_QMGR}|%{POSTFIX_CLEANUP}) このファイルを前述の手順で demo.bronze の管理ボリュームの other/grok/postfix.grok にアップロードしておきます。 続いて、DLTパイプラインを作成します。 サイドバーから [Delta Live Tables] を開き、画面右上の [パイプラインを作成] をクリックします。 以下の内容で設定後、[作成]をクリックします。(他はデフォルトでOK) パイプライン名: databricksdemo-bronze-to-silver 製品エディション: Core パス: (空のまま) ストレージオプション: Unity Catalog カタログ: demo ターゲットスキーマ: silver クラスターモード: 固定サイズ ワーカー: 1 ワーカータイプ: Standard_D3_v2 作成完了後、パイプライン詳細画面の右サイドバーからソースコード (ノートブック) のURLを開きます。 開いたノートブックに以下のソースコードを記載します。 # 依存関係インストール %pip install pygrok== 1.0 . 0 regex== 2024.11 . 6 # Grokパターンの定義 import json from pygrok import Grok grok_path = "/Volumes/demo/bronze/other/grok/" grok_pattern = "^%{POSTFIX_LOG}$" grok = Grok(grok_pattern, custom_patterns_dir=grok_path) # スキーマ、データ変換処理の定義 from pyspark.sql.functions import ( col, concat, current_date, from_json, lit, month, pandas_udf, regexp_extract, to_timestamp, year, ) from pyspark.sql.types import StringType, StructField, StructType keys = [ "timestamp" , "logsource" , "program" , "process" , "pid" , "qid" , "clienthost" , "clientip" , "messageid" , "command_counts" , "from_addr" , "size" , "nrcpt" , "queuestatus" , "to_addr" , "relayhost" , "relayip" , "delay" , "delays" , "dsn" , "result" , "reason" , ] schema = StructType([StructField(k, StringType(), True ) for k in keys]) @ pandas_udf ( "string" ) def parse_grok_udf (log_line): return log_line.apply( lambda x: json.dumps({k: v for k, v in grok.match(x).items() if v is not None }) ) # Bronzeレイヤーからのデータ読込み (一時ビュー作成) from datetime import datetime import dlt volume_path = f "/Volumes/demo/bronze/maillog/postfix/{datetime.now():%Y-%m}-*/maillog" # デモのため月次バッチ @ dlt.view () def raw_data (): return spark.read.format( "text" ).load(volume_path) # Silverレイヤーのテーブル作成 this_year = year(current_date()) dt_format = "yyyy MMM dd HH:mm:ss" @ dlt.table ( name= "maillog_postfix" , table_properties={ "quality" : "silver" }, ) def structured_data (): return ( dlt.read( "raw_data" ) .withColumn( "parsed" , from_json(parse_grok_udf(col( "value" )), schema)) .select( "parsed.*" ) .withColumn( "timestamp" , to_timestamp(concat(lit(this_year), lit( " " ), col( "timestamp" )), dt_format)) .sort(col( "timestamp" ).asc()) ) DLTパイプラインのクラスター databricksdemo-bronze-to-silver に接続後、[検証] をクリックします。 検証が完了して画面下部にグラフが描画された後、[起動] をクリックしてパイプラインを実行します。 すると、以下のように demo.silver.maillog_postfix というテーブルが作成されます。 作成されたテーブルは以下の通りです。 今回はGrokパターンを作り込んでいないので、完全には構造化できていない部分もあります ( command_counts や reason など)。 この部分も含めた構造化の難易度はそれほど高くないように思いますが、今回はここまでにします。 SliverからGold Goldレイヤーには、分析の目的に合わせたテーブルを作成します。 ここでは、メール送信もしくは受信の接続元IPを地図上にマッピングし、 異常な接続元の有無を可視化するような分析を実施していきます。 Goldレイヤーにテーブルを作成するために、以下の内容を実施します。 GeoLite2 から以下のCSVファイルをダウンロードし 2 、Databricksに取り込む GeoLite2-City-Blocks-IPv4.csv GeoLite2-City-Locations-en.csv 複数行に跨るメール配送の情報を結合 (1キューID/1レコードの状態にする) GeoLite2を使って、各メール配送ログのクライアントIPの地理情報を取得 1.について、postfixサンプルログやGrokパターンファイルと同様の手順で、 demo.bronze.other.geolite2 にCSVファイルをアップロードします。 その後、以下のコードを実行して demo.silver にテーブルを作成します。 ( spark.read.format( "csv" ) .option( "Header" , True ) .option( "inferSchema" , True ) .load( "/Volumes/demo/bronze/other/geolite2/<csv_file>" ) # CSVファイルを指定 .write.format( "delta" ) .mode( "overwrite" ) .saveAsTable( "demo.silver.<table_name>" ) # テーブル名を指定: `geolite2_city_blocks_ipv4` or `geolite2_city_locations_en` ) 2.について、前述の手順と同様にDLTパイプライン databricksdemo-silver-to-gold を作成し、 以下のソースコードを記載した上でパイプラインを実行します。 なお、地理情報のマッピング方法は こちら を参考にさせていただきました。 # IPアドレスと地理情報のマッピングのための変換処理の定義 import ipaddress as ip from pyspark.sql.functions import pandas_udf, col @ pandas_udf ( 'long' ) def to_network_address (cidr): return cidr.apply( lambda x: int (ip.IPv4Network(x).network_address)) @ pandas_udf ( 'long' ) def to_broadcast_address (cidr): return cidr.apply( lambda x: int (ip.IPv4Network(x).broadcast_address)) @ pandas_udf ( 'long' ) def to_address_int (cidr): return cidr.apply( lambda x: int (ip.IPv4Address(x))) # GeoLite2のテーブルの読み込み df_ip_blocks = spark.table( "demo.silver.geolite2_city_blocks_ipv4" ) df_locations = spark.table( "demo.silver.geolite2_city_locations_en" ) # 1キューID/1レコードに結合したビューの作成 import dlt from pyspark.sql.functions import collect_list, first from pyspark.sql.types import DateType @ dlt.view () def concatenated_data (): return ( spark.table( "demo.silver.maillog_postfix" ) .alias( "df" ) .groupBy(col( "timestamp" ).cast(DateType()).alias( "date" ), "logsource" , "qid" ) .agg( first( "timestamp" ).alias( "timestamp" ), first( "program" ).alias( "program" ), collect_list( "process" ).alias( "processes" ), first( "clienthost" , ignorenulls= True ).alias( "clienthost" ), first( "clientip" , ignorenulls= True ).alias( "clientip" ), first( "messageid" , ignorenulls= True ).alias( "messageid" ), first( "from_addr" , ignorenulls= True ).alias( "from_addr" ), first( "to_addr" , ignorenulls= True ).alias( "to_addr" ), first( "result" , ignorenulls= True ).alias( "result" ), first( "reason" , ignorenulls= True ).alias( "reason" ), ) .filter(col( "qid" ).isNotNull()) ) # Goldレイヤーのテーブル作成 @ dlt.table ( name= "maillog_postfix_with_locations" , table_properties={ "quality" : "gold" }, ) def maillog_postfix_with_locations (): return ( dlt.read( "concatenated_data" ) .hint( "range_json" , 65536 ) .join( df_ip_blocks.alias( "b" ), [ to_address_int(col( "clientip" )) > to_network_address(col( "b.network" )), to_address_int(col( "clientip" )) < to_broadcast_address(col( "b.network" )), ], "left" , ) .alias( "f" ) .join( df_locations.withColumnRenamed( "geoname_id" , "geoname_id_2" ).alias( "l" ), col( "f.geoname_id" ) == col( "l.geoname_id_2" ), "left" , ) ) パイプライン実行後は、以下のような demo.gold.maillog_postfix_with_locations テーブルが作成されます。 ログの分析 (可視化) 最後に、Goldレイヤーに作成したテーブルを使って、地図上に接続元をマッピングするダッシュボードを作成します。 なお、上で記載したpostfixサンプルログは例示用IPアドレスを使っているため、地理情報を取得できません。 以降の例では、適当なIPアドレスを使ったサンプルログでダッシュボードを作成しています。 ダッシュボード作成の手順は以下の通りです。 カタログから demo.gold.maillog_postfix_with_locations テーブルを開き、画面右上の [作成]>[ダッシュボード] をクリック 作成画面で、下部のナビゲーションから [ビジュアライゼーションを追加] をクリックしてボックスを配置 (デフォルトで配置されているビジュアライゼーションは削除してOK) 右側のメニューで、ビジュアライゼーションのパラメータを以下の通りに設定 可視化: ポイントマップ 緯度: latitude 経度: longitude 画面右上の [公開] をクリックし、アクセス許可のある人を確認した上で再度 [公開] をクリック 作成したダッシュボードは以下の通りです。 これにより、例えば社内メールが海外から送信されているといった事象を可視化できるようになりました。 本記事でのログ分析はここまでとします。 もちろん、Goldレイヤーのテーブルを使ってより高度な分析 (異常検知のモデル作成など) も可能だと思います。 まとめ 本記事では、Azure Databricksを使って非構造化ログ (postfixのサンプルログ) の保管から加工、可視化までを試しました。 Databricksでは、ログが構造化されているかを問わず、一元的なログ管理・分析が可能です。 同じUI上でパイプライン実装からデータ探索、ダッシュボード作成まで可能であり、非常に便利なプラットフォームと感じました。 また、今回は紹介しませんでしたが、Databricksにはマネージドの MLflow が組み込まれています。 レイクハウスはBI/AIの両方と親和性が高いことも強みなので、次はDatabricksを使ったMLOpsにも挑戦してみたいです。 最後までご覧いただきありがとうございました!それでは、明日の記事もお楽しみに! 参考文献 https://www.databricks.com/ https://github.com/databricks/terraform-provider-databricks/ https://registry.terraform.io/providers/databricks/databricks/latest/docs https://learn.microsoft.com/ja-jp/azure/databricks/ データブリックス・ジャパン (2022)『データブリックスクイックスタートガイド』 プロジェクトによっては、自動作成されるもの以外のストレージを外部ロケーションとしてUnity Catalogに登録する方法が適している場合もあります。 ↩ MaxMind のウェブサイトでユーザー登録することで、GeoLite2のデータをダウンロードできるようになります。 ↩