ã¯ããã« ããã«ã¡ã¯ïŒ éçº1éšããªãã·ã¥ãããã³ã®è柀ã§ãã çŸåšã¯ããªãã·ã¥ãªãµãŒããšãããé£ãã¬ã³ãåæããŒã«ã®éçºãè¡ã£ãŠããŸãã æ¬èšäºã§ã¯ãããªãã·ã¥ãªãµãŒãã§æäŸããããŒã¿ã®å質æ
ä¿ãããããã«è¡ãªã£ãããšã玹ä»ãããŠããã ããŸãã ããŒã¿å質æ
ä¿ã®å¿
èŠæ§ ããªãã·ã¥ãªãµãŒãã¯é£ãã¬ã³ããåæããããã«ãããã·ã¥ããŒãã§æ§ã
ãªããŒã¿ãæäŸããŠããŸãã ããŒã¿ãééã£ãŠãããšã誀ã£ãæææ±ºå®ã«ã€ãªãããããããããããããŒã¿ã®æ£ç¢ºæ§ã«çްå¿ã®æ³šæãæãå¿
èŠããããŸãã ãŸããæäŸããããŒã¿ãå€å²ã«ããããããããŒãã«ã®äŸåé¢ä¿ãè€éã«ãªã£ãŠããã€ã€ãããæŸçœ®ããŠãããšéèšã®ããžãã¯ãééããŠããŸãå¯èœæ§ãã䌌ããããªããŒã¿ã§æŽåæ§ãä¿ãããªãå¯èœæ§ããããŸãã å®éã®ERå³ãäžéšæç²ãããšä»¥äžã®ããã«ãªã£ãŠãããçµæ§è€éã§ãã è€éãªéèšã®äžã§ãå®å¿ããŠãµãŒãã¹ããå©çšããŠããã ãããã«ãããŒã¿ã®åè³ªã®æ
ä¿ã«ã¯ç¹ã«æ³šåããŠããŸãã å®éã«åãçµãã ã㚠以äžã®2ã€ã®åãçµã¿ãè¡ããŸããã ããŒãã«äœæåŠçã®åäœãã¹ãã®äœæ å
ãšãªãæ€çŽ¢ãã°ããŒã¿ã®å¢æžã«å¯Ÿããã¢ã©ãŒãäœæ 1ã€ãã€è©³çްã玹ä»ãããŠããã ããŸãã ããŒãã«äœæåŠçã®åäœãã¹ãã®äœæ åè¿°ã®ERå³ãèŠãŠãããããã«ããŒãã«æ°ãå€ããETLãè€éã«ãªã£ãŠããŸãã ãã§ã¯ãªããåããŒãã«äœæã®åŠçã§ãè€éãªããšãããŠãããããåããŒãã«äœæã®åŠçããšã«åäœãã¹ããè¡ãããã«ããŠãããŒãã«ããšã«å質ãæ
ä¿ã§ããããã«ããŸããã å
¥åããŒã¿ãšæåŸ
ãããåºåããŒã¿ãäœæããå
¥åããŒã¿ã䜿çšããŠå®éã®ããŒãã«äœæåŠçãè¡ããåºåãããããŒã¿ãšæåŸ
ãããåºåããŒã¿ãæ¯èŒããå®å
šã«äžèŽããŠãããã©ããã確èªããŸãã å
·äœäŸãçšããŠãã©ã®ããã«ãã¹ããè¡ãªã£ãã®ã玹ä»ããŸãã ETLã®äœæã¯databricksã䜿çšããŠããŸãã åŠçå
容 æ€çŽ¢ãã°ããŒã¿ããæå®ããæé(2024-01-01~2025-12-31)ã®ããŒã¿ãæœåºããæ€çŽ¢ã¯ãŒãã«ã©ã ã®ååŸã®ã¹ããŒã¹ãåé€ããã â»ä»åã¯ç°¡åãªäŸã«ããŠããŸãããå®éã«ã¯ãã£ãšè€éãªåŠçãè¡ãªã£ãŠããŸãã ã³ãŒãã®å®è£
äŸ å®éã«äœæãããã¹ãã®ã³ãŒããç°¡ç¥åãããã®ã玹ä»ããŸãã æåŸ
ãããåºåããŒã¿ãäœæããŠãããŒã¿ãã¬ãŒã ã«æ ŒçŽããŸãã æçµçã«ããŒã¿ãã¬ãŒã ãäžèŽããŠãããã®ãã¹ããè¡ããããå
šãŠã®ã«ã©ã ã§ãœãŒããè¡ãªã£ãŠããŸãã columns = [ 'event_date' , 'user_id' , 'search_word' ] expected_data = [ ( '2024-01-01' , 2 , 'ãã£ãã' ), ( '2024-01-01' , 5 , 'ãã£ãã' ), ( '2024-01-01' , 6 , 'ãã£ãã' ), ( '2024-01-01' , 7 , 'ãã£ãã' ), ( '2024-01-01' , 8 , 'ãã£ãã' ), ( '2024-01-01' , 9 , 'ãã£ãã' ), ( '2024-01-01' , 10 , 'ãã£ãã' ), ( '2024-01-01' , 11 , 'ãã£ãã è±è' ), ( '2024-01-01' , 12 , 'ãã£ãã è±è' ), ( '2025-12-31' , 3 , 'ãã£ãã' ) ] expected_df = pd.DataFrame(expected_data, columns=columns).sort_values([ 'event_date' , 'user_id' , 'search_word' ], ascending=[ True , True , True ]).reset_index(drop= True ) å
¥åããŒã¿(æ€çŽ¢ãã°ããŒã¿)ãäœæããŸãã 以äžã®é
ç®ã確èªã§ããããã«äœæããŸãã 2024-01-01~2025-12-31ã®ããŒã¿ã®ã¿ãæœåºãããã ååŸã®ã¹ããŒã¹ãåé€ãããã ååŸã§ã¯ãªãå Žæã«ã¹ããŒã¹ãå
¥ã£ãŠããå Žåã«åé€ãããªãã columns = [ 'event_date' , 'user_id' , 'search_word' ] input_data = [ # æåŸ
éãã®æéã®ããŒã¿ãå
¥ããã®ç¢ºèª ( '2023-12-31' , 1 , 'ãã£ãã' ), ( '2024-01-01' , 2 , 'ãã£ãã' ), ( '2025-12-31' , 3 , 'ãã£ãã' ), ( '2026-01-01' , 4 , 'ãã£ãã' ), # ã¹ããŒã¹åé€ã®ç¢ºèª ( '2024-01-01' , 5 , ' ãã£ãã' ), ( '2024-01-01' , 6 , 'ãã£ãã ' ), ( '2024-01-01' , 7 , 'ããã£ãã' ), ( '2024-01-01' , 8 , 'ãã£ããã' ), ( '2024-01-01' , 9 , ' ãã£ãã ' ), ( '2024-01-01' , 10 , 'ããã£ããã' ), ( '2024-01-01' , 11 , 'ãã£ãã è±è' ), ( '2024-01-01' , 12 , ' ãã£ãã è±è ' ) ] input_df = pd.DataFrame(input_data, columns=columns) spark_input_df = spark.createDataFrame(input_df).createOrReplaceTempView( "spark_input_df" ) äœæããå
¥åããŒã¿ãéçºç°å¢ã®DeltaããŒãã«ãžæžã蟌ã¿ãŸãã input_data = spark.sql((f """ SELECT event_date, user_id, search_word FROM spark_input_df """ )) input_data \ .write \ .format( "delta" ) \ .mode( "overwrite" ) \ .partitionBy( "event_date" ) \ .save(delta_table_path/search_log) å®éã®ETLã®åŠçãéçºç°å¢ã§å®è¡ããŸãã åŠçå
容ã®è©³çްã¯åŸè¿°ããŸãã args = { "env" : "dev" , "date" : "2025-12-31" } dbutils.notebook.run( "../01_SearchLog" , 0 , args) äžèšã®åŠçã§æžã蟌ãŸããããŒã¿ãèªã¿èŸŒã¿ãããŒã¿ãã¬ãŒã ã«æ ŒçŽããŸãã ããŒã¿ãã¬ãŒã ã®å®å
šäžèŽæ¯èŒãè¡ããããå
šãŠã®ã«ã©ã ã§ãœãŒãããŠããŸãã output = spark.sql((f """ SELECT event_date, user_id, search_word FROM delta_table_path/search_data ORDER BY event_date, user_id, search_word """ )) output_df = output.toPandas() åºåããŒã¿ãšæåŸ
åºåããŒã¿ãæ¯èŒããŸãã def assert_output_equals_expected (output_df: pd.DataFrame, expected_df: pd.DataFrame): output_df = output_df.reset_index(drop= True ) try : assert_frame_equal(output_df, expected_df) print ( "ããŒã¿ãã¬ãŒã ãå®å
šã«äžèŽããŠããŸãã" ) except AssertionError as e: dbutils.notebook.exit(f "ããŒã¿ãã¬ãŒã ãäžèŽããŸããã§ãã: \n {e}" ) assert_output_equals_expected(output_df, expected_df) å®éã«å®è¡ãã01_Searchã®åŠçã®å
容ã¯ãã¡ãã«ãªããŸãã dbutils.widgets.text( "env" , "dev" ) dbutils.widgets.text( "date" , "yyyy-MM-dd" ) end_date = dbutils.widgets.get( "date" ) env = dbutils.widgets.get( "env" ) search_data = spark.sql(f """ select event_date, time, user_id, regexp_replace(search_word, '^[ \\ u0020 \\ u3000]+|[ \\ u0020 \\ u3000]+$', '') as search_word --ååŸã®åè§ã¹ããŒã¹ãšå
šè§ã¹ããŒã¹ãåé€ from delta_table_path/search_log where event_date >= '2024-01-01' and event_date <= '{end_date}' """ ) search_data \ .write \ .format( "delta" ) \ .mode( "overwrite" ) \ .partitionBy( "event_date" ) \ .save(delta_table_path/search_data) 課é¡ã»æ¹åç¹ 1ã€ã®åŠçã倿Žããããšã§ãä»ã®åŠçã«ã圱é¿ãåã¶å¯èœæ§ããããããåºæ¬çã«ã¯å
šãŠã®åŠçã«å¯ŸããŠãã¹ããå®è¡ããã®ã§ãããéãå€ããããçŸç¶ã ãšå
šãŠå®äºãããŸã§ã«1æéåã»ã©ããã£ãŠããŸããŸãã äŸåé¢ä¿ãæŽçããŠãå¿
èŠæäœéã®ãã®ã®ã¿ãã¹ãã宿œããããã«ããã°æ¹åã§ãããšã¯æããŸãã å
ãšãªããã°ããŒã¿ã®å¢æžã«å¯Ÿããã¢ã©ãŒãäœæ ããŒãã«äœæåŠçã®åäœãã¹ãã®äœæã«ãã£ãŠãããªãã·ã¥ãªãµãŒãã®ããã«äœæããããŒãã«ã®ããŒã¿ã®åè³ªã¯æ
ä¿ãããããã«ãªããŸãããå
ã®æ€çŽ¢ãã°èªäœã«äžå
·åãèµ·ããå Žåã«ã¯å¯ŸåŠããããšãã§ããŸããã äŸãã°ãæ€çŽ¢ãã°ããŒã¿ã®ETLãé
å»¶ããŠãä»¶æ°ãæ£åžžã§ã¯ãªãã£ãå Žåã¯ãä»å玹ä»ããåäœãã¹ãã®ã¿ã§ã¯ã察åŠããããšãã§ããŸããã ããã§ãå
ã®æ€çŽ¢ãã°ããŒã¿ã䜿çšããŠãæ€çŽ¢ãã°æ°ã®å鱿¯ãæ¯æ¥éèšããŠãéŸå€ãäžåãå¢å çã»æžå°çã ã£ãå Žåã«ã¢ã©ãŒããåºãä»çµã¿ãäœæããŸããã ã¢ã©ãŒãã¯PdMã®äººã§ãè§Šããããã«ããããã«ãredashã§äœæããŸããã 以äžã®ãããªã¯ãšãªãäœæããŠãalert_flag=1ãšãªãã¬ã³ãŒãããã£ãå Žåã«ã¢ã©ãŒããåºãããã«ããŸããã ã¢ã©ãŒãçšã®SQL å®éã«ã¯ãŠãŒã¶ãŒå±æ§ããšã®ãã°æ°ã確èªããŸãããä»åã¯å
šäœã®ãã°æ°ã®ã¿ã確èªããã¯ãšãªã玹ä»ããŸãã WITH search_log AS ( SELECT event_date, user_id, search_word FROM search_log WHERE event_date >= DATE_SUB(FROM_UTC_TIMESTAMP( CURRENT_DATE (), ' Asia/Tokyo ' ), 8 ) AND event_date < FROM_UTC_TIMESTAMP( CURRENT_DATE (), ' Asia/Tokyo ' ) ), daily_count AS ( SELECT event_date, COUNT (*) AS count FROM search_log GROUP BY event_date ) growth_rate AS ( SELECT event_date, count AS current_count, LAG( count , 7 ) OVER ( ORDER BY event_date) AS prev_week_count, ROUND (( count - LAG( count , 7 ) OVER ( ORDER BY event_date)) * 100.0 / LAG( count , 7 ) OVER ( ORDER BY event_date), 2 ) AS growth_rate FROM daily_count ) SELECT event_date, current_count, prev_week_count, CASE WHEN growth_rate < -20 OR growth_rate > 20 THEN 1 ELSE 0 END AS alert_flag FROM growth_rate 課é¡ã»æ¹åç¹ çŸç¶ã¯éå»ã®ããŒã¿ãå
ã«1幎ã®ãã¡å¹ŽnåçšåºŠçºçããå¢å çã»æžå°çãalert_flagã®éŸå€ã«èšå®ããŠããŸããããã®nãæ±ºãæã¡ã«ãªã£ãŠããŸãã®ã§ãé©åãªéŸå€ãèšå®ããã®ãé£ããã§ãã éŸå€ãå³ãããéãããšæ¬åœã«åé¡ãèµ·ããŠããæã«æ°ä»ãããéŸå€ãç·©ãããããŠããŸããšé »ç¹ã«ã¢ã©ãŒãã鳎ãä¿¡ææ§ããªããªã£ãŠããŸãã®ã§ãéçšããŠããäžã§ã¡ããã©è¯ãéŸå€ãèŠã€ããŠããããã§ãã ãŸãšã ããŒã¿ã®å質æ
ä¿ãããããã«åãçµãã ããšã«ã€ããŠç޹ä»ãããŠããã ããŸããã 課é¡ããã¬ãã¥ãŒãšãã¹ãäœæã«ããªãã®æéãããããšãã£ãèŸãç¹ã¯ãããŸãããããŒã¿ã®å質æ
ä¿ãã§ããŠããããã宿œããŠããã£ããšæã£ãŠããŸãïŒ åããããªèª²é¡ãæã£ãŠããæ¹ã®åèã«ãªããã幞ãã§ãã æåŸãŸã§èªãã§ããã ãããããšãããããŸããã