ç®æ¬¡ ç®æ¬¡ ã¯ããã«ïŒLLMã¯ããªãïŒããã©ããŸã§çè§£ããŠããã®ã DAGãšã調æŽãã®åºæ¬ æ¬èšäºã§ç»å Žããçšèªã®èª¬æ DAGïŒDirected Acyclic GraphïŒ èª¿æŽããïŒadjustmentïŒ ããã¯ãã¢ãã¹ïŒbackdoor pathïŒ èª¿æŽéå ZïŒadjustment setïŒ d-separation ã³ã©ã€ã㌠/ éã³ã©ã€ã㌠ã¹ããã1ïŒDAGããŒã¹ã®ãç¬ç«æ§ïŒããã¯ãã¢ãã§ãã«ãŒããPythonã§å®è£
ãã 1-1. å æã°ã©ããæ±ãã¯ã©ã¹ïŒCausalDAG 1-2. d-separation ãšããã¯ãã¢ãã¹ãå€å®ããïŒDSeparationChecker ã¹ããã2ïŒLangGraphã§ãå æãã§ãã¯AIãšãŒãžã§ã³ãããçµã 2-1. Stateã®èšèš 2-2. LLMã«ã調æŽãã¹ã倿°ã»ããããææ¡ããã 2-3. DAGåŽã§ãã®ææ¡ããã§ãã¯ãã 2-4. LangGraphã§ããŒããã€ãªã ã¹ããã3ïŒåºåã®äŸã§å®éã«åãããŠã¿ã 3-0. LLMïŒGeminiïŒã®ã»ããã¢ãã 3-1. åºåè²»ãšå£²äžïŒAdSpendâSalesïŒã®äŸ æåŸã« ããã«ã¡ã¯ãInsight Edge ã§ãªãŒãããŒã¿ãµã€ãšã³ãã£ã¹ããããŠããäºååµã§ãã æ¬èšäºã¯ã Insight Edge Advent Calendar 2025 ã® 3æ¥ç®ãæ
åœããŠãå±ãããŸããäžæãæ¬¡ã®äººãžããã³ãæž¡ããããã«é 匵ããŸãã®ã§ããããããé¡ãããŸãïŒïŒ ä»åã¯ã LLMã»LangGraphã»å æã°ã©ãïŒDAGïŒ ãçµã¿åãããŠã ãåºåããŒã¿ã«å¯Ÿã㊠LLM ã«èª¿æŽãã¹ã倿°ãéžã°ãããã®åŠ¥åœæ§ãã³ãŒãã§æ€èšŒããã ãšããããŒããæ±ããŸãã ããžãã¹ãµã€ãã®æ¹ãž ïŒ ãLLM ã«å æçãªåããæãããšããã©ããŸã§âçç±ä»ãâãä¿¡é ŒããŠä»»ããããã®ãïŒããšããæ€èšŒãšããŠã ãšã³ãžãã¢ã»ããŒã¿ãµã€ãšã³ãã£ã¹ãã®æ¹ãž ïŒ ãDAG ã d-separation ãå®è£
ããLangGraph ã§å®éã« AI ãšãŒãžã§ã³ãåããå
·äœçãªææ³ããšããŠã ããããã®èŠç¹ã§æ¥œããã§ããã ããå
容ã«ãªã£ãŠããŸãã®ã§ããã²æåŸãŸã§ãä»ãåããã ããïŒ ã¯ããã«ïŒLLMã¯ããªãïŒããã©ããŸã§çè§£ããŠããã®ã ChatGPT ã Gemini ã®ãããªå€§èŠæš¡èšèªã¢ãã«ïŒLLMïŒã¯ã 質åã«çãã æç« ãèŠçŽãã ã³ãŒããæžã ãšãã£ãããšããšãŠãåŸæã§ãã äžæ¹ã§ãããŒã¿ãµã€ãšã³ã¹å¯ãã®äººãããããšã ããã®ã¢ãã«ãæ¬åœã«âå æé¢ä¿âãçè§£ããŠããã®ïŒã ãšããçåããããšæããŸãã ããšãã°ãããããåããšããŠã以äžãäŸã«æããŸãã ãåºåè²»ãå¢ãããšå£²äžã¯äžãããŸããïŒã LLM ã¯ããããã ãåºåè²»ãšå£²äžã«ã¯æ£ã®çžé¢ãèŠãããããšãå€ãäžæ¹ã§ã å£ç¯èŠå ããã£ã³ããŒã³ãªã©ä»ã®èŠå ã圱é¿ããŠããããã åºåè²»ã ãã®å¹æãåãåºãã«ã¯æ³šæãå¿
èŠã§ãã ã®ããã«ãããªããããããçããè¿ããŠãããŸãã ããããã©ãã ã説æã粟緻ã«ãªã£ãŠããçŸå®ã®ããŒã¿ã«ã¯ å£ç¯ïŒSeasonïŒ ãã£ã³ããŒã³ æ¯æ° ãšãã£ãã åºåè²»ãšå£²äžã®äž¡æ¹ã«å¹ããŠããèŠå ãæœãã§ããŸãã åé¡ã¯ãããããèŠå ãã©ãæ±ããã ã°ã©ããšããŠæç€ºãããã®ããã§ãã©ããŸã§ä¿¡ããŠãã説æãªã®ããããã§ãã¯ã§ããã ãšããç¹ã«ãããŸãã ããã§æ¬èšäºã§ã¯ã LLMã«ããã®å æã°ã©ãïŒDAGïŒãªããã©ã®å€æ°ã調æŽãã¹ããïŒããšèããã㊠ãã®çãããå ææšè«ã®ã«ãŒã«ã«ç
§ãããŠæ£ããããã©ãããããã¡ããçšæããPythonã³ãŒãã§ãã§ãã¯ãã ãšãã âå ææšè«ãã¹ãçšã® AI ãšãŒãžã§ã³ãâ ãäœããŸãã ããã§ LangGraph ã¯ã ãLLM ã«èããããã¹ãããããšãPythonã§å æã«ãŒã«ãã§ãã¯ãããã¹ãããããã€ãªãã§ãããã¯ãŒã¯ãããŒãšã³ãžã³ ãšããŠäœ¿ã£ãŠããŸãã ã€ãŸãã LLM = å æé¢ä¿ã«ã€ããŠèª¬æããããããã®å€æ°ã調æŽãã¹ãããš ææ¡ãã圹 Pythonã³ãŒã = å æã°ã©ãïŒDAGïŒã«ããšã¥ããŠãããã®ææ¡ã¯çè«äžã¡ãããšçãéã£ãŠããããã å€å®ãã圹 ãšãã圹å²åæ
ããLangGraph ã§ã²ãšã€ã®ãšãŒãžã§ã³ããšããŠãŸãšããŠããããšããã€ã¡ãŒãžã§ãã ãªããæ¬èšäºã®ã³ãŒã㯠Google Cloud ã® Vertex AI äžã®ããŒãããã¯ç°å¢ïŒPythonïŒ ã§å®è¡ããŠããŸããåæ§ã®æ§æã§ããã°ãããŒã«ã«ç°å¢ãä»ã®ã¯ã©ãŠãã§ãåºæ¬çã«ã¯åãããã«åããããšãã§ããŸãã DAGãšã調æŽãã®åºæ¬ ããã§ãç°¡åã«åæãšãªãã€ã¡ãŒãžãããããŠãããŸãã åºåã®ç°¡åãªäŸã DAGïŒå æã°ã©ãïŒã§æããšã次ã®ããã«ãªããŸãã å³ïŒ DAGïŒå æã°ã©ãïŒäŸ Season ⊠å£ç¯ïŒå¹Žæ«ã»ãŒã«æãã©ããçïŒ AdSpend ⊠åºåè²» Sales âŠ å£²äž ããã§ã®çŽæçãªã€ã¡ãŒãžã¯æ¬¡ã®ããã«ãªããŸãã å£ç¯ïŒSeasonïŒãè¯ããšãèªç¶ãšå£²äžã¯äžããããã åæã«ãè¯ãå£ç¯ã«ã¯åºåè²»ãå¢ãããã¡ ããã«ãåºåè²»ãå¢ãããšå£²äžãå¢ããã¯ã ããã§ã ãåºåè²»ïŒAdSpendïŒã®å¹æã ãããã§ããã ãçŽ çŽã«èŠããã ãšæã£ããã å£ç¯ïŒSeasonïŒã«ããå·®ãã§ããã ãå
¬å¹³ã«ããããå¿
èŠããããŸãã â ãããçµ±èšã®äžçã§ã¯ ã調æŽããã ãšåŒã³ãŸãã æ¬èšäºã§ç»å Žããçšèªã®èª¬æ 以éã®èª¬æãèªã¿ãããããããã«ã å
ã«æ¬èšäºã§ç»å Žããå°éçšèªã®æå³ãç°¡åã«ç¢ºèªããŠãããŸãã ïŒå³å¯ãªå®çŸ©ãããå
šäœåã®ææ¡ãåªå
ããŠããŸãïŒ DAGïŒDirected Acyclic GraphïŒ å€æ°ãäžžãå æé¢ä¿ãç¢å°ã§è¡šçŸãããå æããããã§ãã A â B ã¯ãAãBã«åœ±é¿ããïŒå¯èœæ§ãããïŒããšããé¢ä¿ã衚ããŸãã å³2 DAGïŒå æã°ã©ãïŒäŸïŒåæ²ïŒ 調æŽããïŒadjustmentïŒ ãããèŠå ã®éããããããŠãå
¬å¹³ã«æ¯ã¹ããããšãæããŸãã äŸïŒ 幎霢ãé«ã人ãšè¥ã人ã§è¬ã®å¹æãæ¯ã¹ãã â 幎霢ãããããŠæ¯ã¹ã å£ç¯ã«ãã売äžã®å·®ããªãããåºåã®å¹æã ããèŠã ããã¯ãã¢ãã¹ïŒbackdoor pathïŒ å æã®ç¢å°ãšã¯å¥ã«ã âè£éâã®ããã«çŽã蟌ãã§ããçµè·¯ ã§ãã ãSeason â AdSpendããšãSeason â Salesãã§ã§ãã AdSpend â Season â Sales ã®ãããªãã¹ã¯ã Season ãéããŠãåºåè²»ãšå£²äžãäžç·ã«åããŠããã ããã®ãã¹ãšè§£éã§ããŸãã ãããæ®ã£ããŸãŸã ãšã ãåºåè²»ãå¹ããŠããã®ããå£ç¯ãå¹ããŠããã®ãåãããªãã ãšããåé¡ãçããŸãã 調æŽéå ZïŒadjustment setïŒ ããã¯ãã¢ã®âè£éâããµããããã«ã ãæ¡ä»¶ãšããŠå
¥ããŠããã¹ã倿°ã®éåã ã§ãã äŸïŒ Z = {Season} ãªãã ãå£ç¯ãåãç¶æ³ã§åºåè²»ã®éãã ããèŠãããšããã€ã¡ãŒãžã«ãªããŸãã d-separation ãã°ã©ãäžã§ X ãš Y ã®éã«ããŸã æ
å ±ãæµããéãæ®ã£ãŠãããã©ããã ãã æ©æ¢°çã«ãã§ãã¯ããããã®ã«ãŒã«ã§ãã å
šãŠã®ãã¹ãâéããŠããâ â d-separated â ãã®æ¡ä»¶äžã§ã¯ç¬ç« 1æ¬ã§ãâéããâãã¹ããã â d-connected â ãŸã äŸåãæ®ã£ãŠãã ã³ã©ã€ã㌠/ éã³ã©ã€ã㌠ãã¹äžã®çãäžã®ç¹ã®ãç¢å°ã®å
¥ãæ¹ãã«ããåºå¥ã§ãã A â C â B ã®ããã«ã äž¡åŽããç¢å°ãéãŸã£ãŠãã C ã ãã³ã©ã€ããŒïŒã¶ã€ããå°ç¹ïŒããšåŒã³ãŸãã A â C â B ã A â C â B ã®ããã«ãç¢å°ãâéãæããâ圢㯠éã³ã©ã€ã㌠ã§ãã æ¬èšäºã§ã¯ãd-separation ã«åºã¥ãå€å®ããžãã¯ã Python ã§å®è£
ãã LLM ãææ¡ãã調æŽéåã ãè£éãé©åã«é®æããŠãããã ãèªåã§ãã§ãã¯ã§ããããã«ããŸãã å®éã«æ¬çªåæã§äœ¿ããšãã¯ãããã§ç޹ä»ããããžãã¯ã DoWhy/EconML ãªã©ã®ãã¬ãŒã ã¯ãŒã¯ãšçµã¿åãããã®ãããããã§ãã ã¹ããã1ïŒDAGããŒã¹ã®ãç¬ç«æ§ïŒããã¯ãã¢ãã§ãã«ãŒããPythonã§å®è£
ãã ãŸãã¯ãå æã°ã©ããæ±ãããã®åå°ãšããŠã次ã®2ã€ã®ã¯ã©ã¹ãå®è£
ããŸãã DAG ã®æ§é ïŒèŠªã»åã®é¢ä¿ïŒãä¿æãã CausalDAG d-separation ãšããã¯ãã¢æ¡ä»¶ããã§ãã¯ãã DSeparationChecker ãã®2ã€ã¯ããããŸã§ãDAG äžã§ãã¹ããã©ã£ãŠãå ææšè«ã®ã«ãŒã«ã«æ²¿ã£ãŠãã§ãã¯ãããããã®ãŠãŒãã£ãªãã£ã§ããå®ããŒã¿ãåŠç¿ãããæšå®ããéšåã¯å«ãã§ããŸããã â»ã现ããããžãã¯ãŸã§ã¯è¿œããªããã©ãå
šäœã®æ§æã ãç¥ãããããšããæ¹ã¯ã 以éã®ã³ãŒãããã£ãšçºããŠããããè£éæ€æ»çšã®ã¯ã©ã¹ããããã ãªããããã«æããŠé ããã°ååã§ãã 1-1. å æã°ã©ããæ±ãã¯ã©ã¹ïŒCausalDAG æåã«ãDAG ã®æ§é ã衚çŸããã¯ã©ã¹ãå®çŸ©ããŸãã 芪ããŒããšåããŒãã®å¯Ÿå¿é¢ä¿ãããŒãã®äžèЧãç¥å
ããŒãã®éåãªã©ãæ±ããããã«ããŸãã from collections import defaultdict, deque from typing import Dict, List, Set, Iterable class CausalDAG : def __init__ (self, edges: Iterable[ tuple [ str , str ]]): """ edges: (parent, child) ã®ãã¢ã®ãªã¹ãã§ DAG ãå®çŸ©ããã äŸ: edges = [ ("Season", "AdSpend"), ("Season", "Sales"), ("AdSpend", "Sales"), ] """ self.parents: Dict[ str , List[ str ]] = defaultdict( list ) self.children: Dict[ str , List[ str ]] = defaultdict( list ) self.nodes: Set[ str ] = set () for u, v in edges: self.parents[v].append(u) self.children[u].append(v) self.nodes.add(u) self.nodes.add(v) def all_nodes (self) -> Set[ str ]: return set (self.nodes) def ancestors_of (self, zs: Iterable[ str ]) -> Set[ str ]: """ Z ã®ãã¹ãŠã®ç¥å
ããŒã Anc(Z) ãè¿ãã d-separation ã§ã¯ã ãã³ã©ã€ããŒã Z ãŸã㯠Z ã®ç¥å
ãæã€ãšãããã¹ãéãã ãšããã«ãŒã«ã§å¿
èŠã«ãªãã """ zs = set (zs) visited: Set[ str ] = set () queue: deque[ str ] = deque(zs) while queue: z = queue.popleft() for p in self.parents[z]: if p not in visited: visited.add(p) queue.append(p) return visited ãã® CausalDAG ã¯ã©ã¹ã§ã¯ã ã³ã³ã¹ãã©ã¯ã¿ã§ (芪, å) ã®ãšããžäžèЧãã åããŒãã®èŠªãªã¹ã parents åããŒãã®åãªã¹ã childrenãæ§ç¯ããŠããŸãã all_nodes() ã§ããŒãã®éåãååŸãã ancestors_of(zs) ã§ãããããŒãéå Z ã®ãç¥å
ããŒãéåããæ±ããŸãã åŸã§èª¬æãã d-separation ã®å€å®ã§ã¯ã ãã³ã©ã€ããŒã®ç¥å
ã«æ¡ä»¶ã¥ããããããŒããå«ãŸããŠãããã ã倿ããå¿
èŠãããããããã®ç¥å
éåã䜿ããŸãã 1-2. d-separation ãšããã¯ãã¢ãã¹ãå€å®ããïŒDSeparationChecker 次ã«ãDAG ã®äžã§ d-separation ãšããã¯ãã¢ãã¹ã®æç¡ããã§ãã¯ããã¯ã©ã¹ã§ãã ããã§ã¯ãDAG ããç¡åã°ã©ãããšããŠèŠããšãã®å
šãŠã®åçŽãã¹ãåæãã åãã¹ã d-separation ã®ã«ãŒã«ã«ç
§ãããŠãéããŠãããïŒéããŠãããããå€å®ããŸãã class DSeparationChecker : """ DAG ã«å¯Ÿã㊠d-separation / ããã¯ãã¢æ¡ä»¶ãå€å®ããã¯ã©ã¹ã """ def __init__ (self, dag: CausalDAG): self.dag = dag # ---------- d-separation é¢é£ ---------- def _is_collider_on_path (self, prev_node: str , mid_node: str , next_node: str ) -> bool : """ ãã¹äžã®3ç¹ prev -> mid -> next ã«ãããŠãmid ãã³ã©ã€ããŒãã©ãããå€å®ã å®çŸ©: mid ã«2æ¬ã®ç¢å°ãâåãã£ãŠããâãšããmid ã¯ã³ã©ã€ããŒã ã€ãŸã (prev -> mid) ã〠(next -> mid) ã®ãšãã """ return (prev_node in self.dag.parents[mid_node]) and \ (next_node in self.dag.parents[mid_node]) def _compute_ancestors_of_Z (self, Z: Set[ str ]) -> Set[ str ]: """ ã³ã©ã€ããŒã Z ãŸã㯠Z ã®ç¥å
ã«å«ãŸãããšãã ãã®ã³ã©ã€ããŒãéããã¹ã¯ãéããããã ãã®ãã Anc(Z) ãåãã£ãŠèšç®ããŠããã """ return self.dag.ancestors_of(Z) def _find_all_simple_paths (self, start: str , goal: str , max_len: int = 10 ) -> List[List[ str ]]: """ ç¡åã°ã©ããšããŠèŠããšãã®åçŽãã¹ããã¹ãŠåæããã DAG ã¯å°ããåæãªã®ã§ãæ·±ãå¶é max_len ã軜ããããŠããã """ neighbors: Dict[ str , List[ str ]] = {} for n in self.dag.all_nodes(): neighbors[n] = list ( set (self.dag.parents[n]) | set (self.dag.children[n])) paths: List[List[ str ]] = [] stack: List[ tuple [ str , List[ str ]]] = [(start, [start])] while stack: node, path = stack.pop() if node == goal: paths.append(path) continue if len (path) >= max_len: continue for nxt in neighbors[node]: if nxt in path: continue # simple path only stack.append((nxt, path + [nxt])) return paths def _path_is_active (self, path: List[ str ], Z: Set[ str ], ancestors_Z: Set[ str ]) -> bool : """ äžãããããã¹ããæ¡ä»¶éå Z ã®ããšã§ã¢ã¯ãã£ããã©ãããå€å®ã ã«ãŒã«ïŒçž®çŽçïŒ: - éã³ã©ã€ããŒäžéããŒã j: j â Z ãªããã¹ã¯ããã㯠- ã³ã©ã€ããŒäžéããŒã j: j â Z ãŸã㯠j â Anc(Z) ãªããã¹ãéããã ãã以å€ãªãããã㯠""" if len (path) <= 2 : # çŽæ¥ã€ãªãã£ãŠããå Žåã¯ãäžéããŒãããªãã®ã§åžžã«åè£ return True for i in range ( 1 , len (path) - 1 ): prev_node = path[i - 1 ] mid_node = path[i] next_node = path[i + 1 ] is_collider = self._is_collider_on_path(prev_node, mid_node, next_node) if not is_collider: # éã³ã©ã€ããŒã®å Žåããã®ããŒãã«æ¡ä»¶ã¥ãããšãã¹ã¯ããã㯠if mid_node in Z: return False else : # ã³ã©ã€ããŒã®å Žåã # ãã®ããŒãèªèº« or ãã®ç¥å
ã Z ã«å«ãŸããå Žåã«ãã¹ãéãããã if (mid_node not in Z) and (mid_node not in ancestors_Z): return False return True def d_separated (self, X: Iterable[ str ], Y: Iterable[ str ], Z: Iterable[ str ]) -> bool : """ X ãš Y ãæ¡ä»¶éå Z ã®ããšã§ d-separated ãã©ãããå€å®ããã æ»ãå€: True -> X â«« Y | Z ïŒç¬ç«ïŒ False -> X Ìžâ«« Y | ZïŒäŸåïŒ """ X = set (X) Y = set (Y) Z = set (Z) ancestors_Z = self._compute_ancestors_of_Z(Z) for x in X: for y in Y: paths = self._find_all_simple_paths( x, y, max_len= len (self.dag.all_nodes()) + 1 ) for p in paths: if self._path_is_active(p, Z, ancestors_Z): # 1æ¬ã§ãã¢ã¯ãã£ããã¹ãããã° d-connectedïŒäŸåïŒ return False # ã¢ã¯ãã£ããã¹ãèŠã€ãããªããã° d-separatedïŒç¬ç«ïŒ return True # ---------- ããã¯ãã¢ãã¹é¢é£ ---------- def has_active_backdoor_path ( self, treatment: str , outcome: str , Z: Iterable[ str ], ) -> bool : """ treatment -> outcome ã®å æå¹æãæšå®ããããšãã«ã ãããã¯ãã¢ãã¹ãã Z ã®äžã§ã¢ã¯ãã£ããã©ãããå€å®ããã ããã¯ãã¢ãã¹ãšã¯: - treatment ãã outcome ãžã®ãã¹ã®ãã¡ã - æåã®ãšããžãã芪 -> treatmentãã«ãªã£ãŠãããã®ã (äŸ: Season -> AdSpend ã®ããã«ãæåã 'å
¥ã£ãŠãã' ãã¹) """ Z = set (Z) ancestors_Z = self._compute_ancestors_of_Z(Z) # treatment ãã outcome ãžã®ãã¹ãŠã®åçŽãã¹ paths = self._find_all_simple_paths( treatment, outcome, max_len= len (self.dag.all_nodes()) + 1 , ) for p in paths: if len (p) < 2 : continue first_neighbor = p[ 1 ] # æåã®ãšããžããneighbor -> treatmentããããã§ã㯠# parent -> child ã®å®çŸ©ããã # "neighbor -> treatment" ãªã neighbor 㯠treatment ã®èŠªã§ããã¯ã if treatment not in self.dag.children[first_neighbor]: # neighbor -> treatment ã§ã¯ãªãã®ã§ããã¯ãã¢åè£ã§ã¯ãªã continue # ãã®ãã¹ã Z ã®ããšã§ã¢ã¯ãã£ããã©ãããå€å® if self._path_is_active(p, Z, ancestors_Z): return True # ã¢ã¯ãã£ããªããã¯ãã¢ãã¹ãååšãã return False # ã©ã®ããã¯ãã¢ãã¹ãã¢ã¯ãã£ãã§ã¯ãªã def is_valid_backdoor_adjustment_set ( self, treatment: str , outcome: str , Z: Iterable[ str ], ) -> bool : """ Z ã treatment -> outcome ã®å æå¹æãæšå®ããããã® ã劥åœãªããã¯ãã¢èª¿æŽéåããã©ãããå€å®ããã å®çŸ©: - treatment ãš outcome ã®éã«ãZ ã®ããšã§ã¢ã¯ãã£ããªããã¯ãã¢ãã¹ãååšããªããšã Trueã """ return not self.has_active_backdoor_path(treatment, outcome, Z) ãã®ã¯ã©ã¹ã§ã¯ã DAG äžã®ãã¹ãŠã®ãã¹ãæŽãåºãã åãã¹ã d-separation ã®ã«ãŒã«ã«åŸã£ãŠãéããŠãããïŒéããŠãããããå€å®ãã ãã®çµæãšã㊠ãX ãš Y ãæ¡ä»¶ä»ãã§ç¬ç«ã«ãªã£ãŠãããïŒd_separatedïŒã ãããã¯ãã¢ãã¹ããã¹ãŠéããŠããŠã調æŽéåãšããŠåŠ¥åœãïŒis_valid_backdoor_adjustment_setïŒã ãè¿ãä»çµã¿ããŸãšããŠããŸãã ãããŸã§ã§ãDAG äžã®ãã¹ã«å¯ŸããŠå ææšè«ã®åºæ¬ã«ãŒã«ãæ©æ¢°çã«é©çšãã LLM ã®ææ¡ããã§ãã¯ããããã®åå°ãæŽããŸããã ã¹ããã2ïŒLangGraphã§ãå æãã§ãã¯AIãšãŒãžã§ã³ãããçµã 次ã«ããã® d-separation ãã§ãã«ãŒã LLM ãšçµã¿åããã AI ãšãŒãžã§ã³ããšããŠåããããã«ãLangGraph ã䜿ã£ãŠã¯ãŒã¯ãããŒãçµã¿ç«ãŠãŸãã ãã®ãšãŒãžã§ã³ãã¯ã次ã®2ã¹ãããã§åããŸãã LLM ã«ã調æŽãã¹ã倿°éå Zããææ¡ããã ãã®ææ¡ Z ããDAG ã«åºã¥ããŠããã¯ãã¢ãéããéåã«ãªã£ãŠãããã©ããããã§ãã¯ãã 2-1. Stateã®èšèš LangGraph ã¯ãç¶æ
ïŒStateïŒãæã€ã¯ãŒã¯ãããŒãšã³ãžã³ããšããã€ã¡ãŒãžã§ãã åããŒã㯠State ãåãåããæŽæ°ãã State ãæ¬¡ã®ããŒããžæž¡ããŸãã ä»åã®ãšãŒãžã§ã³ãã§ã¯ã次ã®ãã㪠State ãå®çŸ©ããŸãã from typing import TypedDict, List, Optional class CausalAgentState (TypedDict, total= False ): # å
¥å question: str # ãŠãŒã¶ãŒã®å æçãªåãïŒèª¬æçšïŒ treatment: str # ä»å
¥å€æ° X target: str # 广ãç¥ããã倿° Y # LLM ã®åºå candidate_adjustment: List[ str ] # LLM ãææ¡ãã調æŽéå Z llm_raw_answer: str # LLM ã®çã®åç # æ€æ»çµæ d_separated: Optional[ bool ] # X ãš Y ã Z ã§ d-separated ãã©ããïŒåèå€ïŒ backdoor_ok: Optional[ bool ] # Z ã劥åœãªããã¯ãã¢èª¿æŽéåãã©ãã # ãã° debug_log: List[ str ] ããã§ã¯ã treatment / target ã«ãåºåè²»ããã売äžããªã©ã®å€æ°åãå
¥ãã candidate_adjustment ã« LLM ãææ¡ãã調æŽéå Z ãæ ŒçŽãã backdoor_ok ã§ããã® Z ãããã¯ãã¢èª¿æŽãšããŠåŠ¥åœãããèšé²ããŸãã debug_log ã«ã¯ãåã¹ãããã®å
éšç¶æ
ã LLM ã®çåºåã®äžéšãæååãšããŠæ®ããŠãããŸãã 2-2. LLMã«ã調æŽãã¹ã倿°ã»ããããææ¡ããã æ¬¡ã«ãLLM ã«å¯ŸããŠãã©ã®å€æ°ã§èª¿æŽãã¹ããããå°ããéšåã§ãã LangChain ã® ChatPromptTemplate ã䜿ãã ãJSON é
åã ããè¿ãã ããã«åŒ·ãæç€ºããŸãã from langchain_core.prompts import ChatPromptTemplate import json ADJUST_PROMPT = ChatPromptTemplate.from_template( """ You are a careful causal inference assistant. We have a causal DAG over variables and we want to estimate the causal effect of {treatment} on {target}. Your task: 1. Propose a set of variables Z to adjust for (back-door adjustment set). 2. Return ONLY a JSON list of variable names, like: ["VarA", "VarB"] IMPORTANT: - Output MUST be a single JSON array. - Do NOT add any explanation. - Do NOT use Markdown code fences. - Do NOT wrap the JSON in ```json or ```. Variables available: {all_vars} Causal DAG description: {dag_text} """ ) def _extract_json_array_from_text (text: str ) -> str : """ LLM ãè¿ããããã¹ããã JSON é
åéšåã ããæãåºããŠãŒãã£ãªãã£ã - ```json ... ``` ã®ãããªã³ãŒããããã¯ãå¥ãã - ããã¹ãäžã®æåã® '[' ããæåŸã® ']' ãŸã§ãåãåºã """ t = text.strip() # 1. ã³ãŒãããã㯠```...``` ãå¥ãã if t.startswith( "```" ): lines = t.splitlines() # å
é ã® ```xxx ãåã if lines and lines[ 0 ].startswith( "```" ): lines = lines[ 1 :] # æ«å°Ÿã® ``` ãåã if lines and lines[- 1 ].startswith( "```" ): lines = lines[:- 1 ] t = " \n " .join(lines).strip() # 2. æåã® '[' ãšæåŸã® ']' ãæ¢ã start = t.find( "[" ) end = t.rfind( "]" ) if start != - 1 and end != - 1 and start < end: return t[start : end + 1 ] # èŠã€ãããªããã°ãã®ãŸãŸè¿ãïŒãã®åŸã® json.loads ã§èœã¡ãŠãã©ãŒã«ããã¯ïŒ return t def propose_adjustment_node ( state: CausalAgentState, dag: CausalDAG, dag_text: str , llm, ) -> CausalAgentState: treatment = state[ "treatment" ] target = state[ "target" ] all_vars = sorted ( list (dag.all_nodes())) prompt = ADJUST_PROMPT.format( treatment=treatment, target=target, all_vars= ", " .join(all_vars), dag_text=dag_text, ) resp = llm.invoke(prompt) raw_content = resp.content if hasattr (resp, "content" ) else str (resp) # JSON é
åéšåã ãã«ã¯ãªãŒãã³ã° cleaned = _extract_json_array_from_text(raw_content) candidate_Z: List[ str ] = [] try : parsed = json.loads(cleaned) if isinstance (parsed, list ): # æååã ãã«æããŠãã candidate_Z = [ str (x) for x in parsed] except Exception : candidate_Z = [] debug_log = list (state.get( "debug_log" , [])) debug_log.append(f "[propose_adjustment_node] raw LLM: {raw_content[:120]}..." ) debug_log.append(f "[propose_adjustment_node] cleaned: {cleaned}" ) debug_log.append(f "[propose_adjustment_node] parsed Z: {candidate_Z}" ) new_state: CausalAgentState = { **state, "candidate_adjustment" : candidate_Z, "llm_raw_answer" : raw_content, "debug_log" : debug_log, } return new_state ãã®ããŒãã¯ãDAG ã®æ
å ±ïŒå€æ°åãæ§é ã®èª¬æïŒãããã³ããã«åã蟌ãã§ LLM ã«æž¡ãã ã調æŽãã¹ã倿°ã®åè£ Z ã JSON é
åã§è¿ããŠãããã 圹å²ãæã¡ãŸããè¿ã£ãŠããããã¹ããã JSON é
åã®éšåã ããæãåºããŠããŒã¹ãããã®çµæã candidate_adjustmentïŒLLM ãææ¡ãã ZïŒãšã㊠State ã«ä¿åããŸããããããŠãå
ã®åºåãæœåºçµæã¯ debug_log ã«èšé²ããŠãããŸããããã«ãããLLM ã®åºå圢åŒãå€å°ã¶ããŠããã倿°åã®é
åãã ããåãåºããŠäœ¿ããããã«ããŠããŸãã 2-3. DAGåŽã§ãã®ææ¡ããã§ãã¯ãã ãã®ããŒãã¯ãLLM ãææ¡ãã調æŽéå Z ã«ã€ããŠã ããã¯ãã¢ãã¹ããã¹ãŠéããŠãããã©ããïŒ backdoor_ok ïŒ åèãšããŠãX ãš Y ã Z ã®ããšã§ d-separated ã«ãªã£ãŠãããã©ããïŒ d_separated ïŒ ã DSeparationChecker ã§å€å®ãããã®çµæã State ã«æžã蟌ãã ãã®ã·ã³ãã«ãªãã§ãã¯åœ¹ã§ãã def check_adjustment_node ( state: CausalAgentState, checker: DSeparationChecker, ) -> CausalAgentState: treatment = state[ "treatment" ] target = state[ "target" ] Z = state.get( "candidate_adjustment" , []) # 1. ããã¯ãã¢èª¿æŽãšããŠåŠ¥åœãïŒ backdoor_ok = checker.is_valid_backdoor_adjustment_set( treatment=treatment, outcome=target, Z=Z, ) # 2. ãªãã·ã§ã³: d-separation ããã°ãšããŠæ®ããŠããïŒX ãš Y ãå®å
šç¬ç«ãã©ããïŒ d_sep = checker.d_separated([treatment], [target], Z) debug_log = list (state.get( "debug_log" , [])) debug_log.append( f "[check_adjustment_node] X={treatment}, Y={target}, Z={Z}, " f "backdoor_ok={backdoor_ok}, d_separated={d_sep}" ) new_state: CausalAgentState = { **state, "d_separated" : d_sep, # ããã¯åèå€ "backdoor_ok" : backdoor_ok, # å®éã«èŠããã®ã¯ãã¡ã "debug_log" : debug_log, } return new_state ããã§è¡ã£ãŠããããšã¯ã·ã³ãã«ã§ãã checker.is_valid_backdoor_adjustment_set(...) ã§ã LLM ãææ¡ãã Z ã ãããã¯ãã¢ãã¹ããã¹ãŠéããŠãããã ãå€å®ããŸãã â ããã backdoor_ok ã§ãã checker.d_separated(...) ã¯ãX ãš Y ã Z ã®ããšã§å®å
šã«ç¬ç«ã«ãªããã©ãããå€å®ããŸãã å®åäžã¯åžžã«ç¬ç«ã§ããå¿
èŠã¯ãªããããã§ã¯ãããŸã§åèå€ãšããŠãã°ã«æ®ããŠããŸãã 2-4. LangGraphã§ããŒããã€ãªã æåŸã«ãLangGraph ã® StateGraph ã䜿ã£ãŠã propose_adjustment_node ïŒLLMã«èª¿æŽéåãææ¡ãããïŒ check_adjustment_node ïŒDAGã§ãã®ææ¡ãæ€æ»ããïŒ ãšãã2ã€ã®ããŒããäžã€ã®ã¯ãŒã¯ãããŒãšããŠã€ãªããŸãã from langgraph.graph import StateGraph, END def build_causal_langgraph ( dag: CausalDAG, dag_text: str , llm, ): graph = StateGraph(CausalAgentState) # d-separation / ããã¯ãã¢ãã§ãã«ãŒ checker = DSeparationChecker(dag) # éšåé©çšã§ dag / dag_text / llm ãéã蟌ããããŒã颿°ãå®çŸ© def _propose_node (s: CausalAgentState) -> CausalAgentState: return propose_adjustment_node( s, dag=dag, dag_text=dag_text, llm=llm, ) def _check_node (s: CausalAgentState) -> CausalAgentState: return check_adjustment_node(s, checker=checker) # ããŒããç»é² graph.add_node( "propose_adjustment" , _propose_node) graph.add_node( "check_adjustment" , _check_node) # ãããŒãå®çŸ© graph.set_entry_point( "propose_adjustment" ) graph.add_edge( "propose_adjustment" , "check_adjustment" ) graph.add_edge( "check_adjustment" , END) # å®è¡å¯èœãªã¢ããªã±ãŒã·ã§ã³ãè¿ã app = graph.compile() return app ãã® build_causal_langgraph 颿°ã¯ããLLM ã«èª¿æŽå€æ°ãèããããŠãDAG åŽã§ãã§ãã¯ãããããã®å æãã§ãã¯çšãšãŒãžã§ã³ããçµã¿ç«ãŠã颿°ã§ãã ãšãŒãžã§ã³ãã« question ïŒèª¬æçšã®åãïŒã treatment ïŒä»å
¥ããã倿°ïŒã target ïŒå¹æãç¥ããã倿°ïŒãæž¡ããšã ãŸã LLM ãã調æŽãã¹ã倿°ã®åè£ Zããææ¡ãã ãã®ããš DSeparationChecker ããããã¯ãã¢ãéããŠãããã©ããããå€å®ãã ãã®çµæãšããŠãLLM ã®åçå
å®¹ãææ¡ããã Zãå€å®çµæ backdoor_ok ãªã©ã final_state ã«ãŸãšãŸã£ãŠè¿ã£ãŠããŸãã ã¹ããã3ïŒåºåã®äŸã§å®éã«åãããŠã¿ã ããããã¯ãå®éã«åºåã® DAG ã䜿ã£ãŠãšãŒãžã§ã³ããåãããŠã¿ãŸãã LLM ã«ã¯ Vertex AI ã® Gemini ãå©çšããŸãã 3-0. LLMïŒGeminiïŒã®ã»ããã¢ãã ãŸããVertex AI äžã§ Gemini ãåŒã³åºãããã®èšå®ãè¡ããŸãã æ¬èšäºã®ã³ãŒã㯠Vertex AI ã®ããŒãããã¯ç°å¢ïŒPythonïŒã§å®è¡ããŠããŸããã é©åãªèªèšŒãšãããžã§ã¯ãèšå®ãè¡ãã°ãããŒã«ã«ç°å¢ãªã©ããã§ãåæ§ã®ã³ãŒãã§åŒã³åºãããšãã§ããŸãã from langchain_google_vertexai import ChatVertexAI llm = ChatVertexAI( model= "gemini-2.5-flash" , project= "your-gcp-project-id" , # ããªãã® GCP ãããžã§ã¯ã ID location= "us-central1" , temperature= 0 , ) ããã§ã¯ãã¢ãã«åããªãŒãžã§ã³ããããžã§ã¯ãIDãªã©ãæå®ããŠããŸãã temperature=0 ãšããŠããã®ã¯ãå ææšè«ã®ããã«ãè«ççãªäžè²«æ§ããéèŠãããã±ãŒã¹ã§ã¯ãã©ã³ãã æ§ãæããæ¹ãæãŸããããã§ãã 3-1. åºåè²»ãšå£²äžïŒAdSpendâSalesïŒã®äŸ å
ã»ã©èª¬æããåºåã® DAG ãããã®ãŸãŸã³ãŒãã«èœãšã蟌ã¿ãŸãã # 1. DAG ãšãã®èª¬æããã¹ã edges = [ ( "Season" , "AdSpend" ), ( "Season" , "Sales" ), ( "AdSpend" , "Sales" ), ] dag = CausalDAG(edges) dag_text = """ Variables: - Season: categorical (e.g., 'Holiday', 'Normal', ...) - AdSpend: continuous, amount of advertising spend - Sales: continuous, sales amount Causal structure (DAG): - Season -> AdSpend - Season -> Sales - AdSpend -> Sales Goal: We want to estimate the causal effect of AdSpend on Sales. """ # 2. LangGraph ã¢ããªã±ãŒã·ã§ã³ãæ§ç¯ causal_app = build_causal_langgraph(dag, dag_text, llm=llm) # 3. åæç¶æ
ãå®çŸ©ããŠå®è¡ initial_state: CausalAgentState = { "question" : "åºåè²»(AdSpend)ã®å£²äž(Sales)ãžã®å æå¹æãæšå®ãããã" , "treatment" : "AdSpend" , "target" : "Sales" , "debug_log" : [], } final_state = causal_app.invoke(initial_state) print ( "=== [AdSpendâSales] LLM ã®çåç ===" ) print (final_state.get( "llm_raw_answer" , "" )) print ( " \n === LLM ãææ¡ãã調æŽéå Z ===" ) print (final_state.get( "candidate_adjustment" )) print ( " \n === ããã¯ãã¢èª¿æŽãšããŠåŠ¥åœãïŒ ===" ) print (f "backdoor_ok -> {final_state.get('backdoor_ok')}" ) print ( " \n === d-separation å€å®çµæïŒåèå€ïŒ ===" ) print (f "(AdSpend â«« Sales | Z) ? -> {final_state.get('d_separated')}" ) print ( " \n === Debug log ===" ) for log in final_state.get( "debug_log" , []): print (log) ãã®ã³ãŒãã§ã¯ããŸã edges ã§åºåã®å ææ§é ïŒDAGïŒãå®çŸ©ããããã CausalDAG ã«æž¡ããŠããŸãã dag_text ã«ã¯ DAG ã®æå³ãè±èªã§ãŸãšããŠãããLLM ã«æž¡ãããã³ããã®äžéšãšããŠäœ¿ããŸãã build_causal_langgraph(...) ã§å æãã§ãã¯çšã®ãšãŒãžã§ã³ããäœæãã initial_state ã«è³ªåæã»ä»å
¥å€æ° AdSpend ã»ç®ç倿° Sales ãã»ããã㊠causal_app.invoke(initial_state) ãåŒã³åºããšãäžé£ã®ãããŒãå®è¡ãããŸãã å®è¡çµæãšããŠãLLM ã®çåçãææ¡ããã調æŽéå Zããã® Z ãããã¯ãã¢èª¿æŽãšããŠåŠ¥åœãã©ããïŒ backdoor_ok ïŒãd-separation ã®å€å®çµæãªã©ãåŸãããŸãããã®äŸã§ã¯ãLLM ã Season ãå«ããããªèª¿æŽéåãææ¡ãã backdoor_ok -> True ãšãªãããšãæåŸ
ããŠããŸãã 以äžããäžèšã³ãŒãã®å®è¡çµæã§ãã åºåçµæ ãã®åºåã¯ã次ã®ããšã瀺ããŠããŸãã LLM ã¯ãåºå广ãè©äŸ¡ããããã« Season ã調æŽãã¹ã倿°ãšããŠæ£ããææ¡ããŠãã Python åŽã® d-separation ãã§ãã«ãŒãããSeason ã調æŽããã°ããã¯ãã¢ãã¹ïŒAdSpend â Season â SalesïŒã¯éããããšå€æããbackdoor_ok -> True ã«ãªã£ãŠãã äžæ¹ã§ãAdSpend â Sales ãšããå æãã¹ã¯æ®ã£ãŠãããããSeason ã§èª¿æŽããŠã AdSpend ãš Sales ã¯ç¬ç«ã«ã¯ãªããªãïŒd_separated=FalseïŒ ã€ãŸããã®ä»çµã¿ã¯ã ãLLM ã DAG ãèžãŸããŠåŠ¥åœãªèª¿æŽéåãææ¡ã§ããŠãããïŒã ããã³ãŒãåŽã§æ©æ¢°çã«ãã§ãã¯ã§ããŠãã ããšããã·ã³ãã«ãªäŸã§ç¢ºèªã§ããããšããçµæã«ãªã£ãŠããŸãã ä»åã®å®è£
ã¯ãããŸã§ã å ææ§é ïŒDAGïŒã¯äººéãŸãã¯å¥ããã»ã¹ãäžãã LLM ã¯ãã©ã®å€æ°ã§èª¿æŽãããããææ¡ãã PythonïŒd-separation ãã§ãã«ãŒïŒãããã®ææ¡ãå æè«çã«åŠ¥åœãã©ãããæ€èšŒãã ãšãããããå°ããªãã€ãã©ã€ã³ã§ããããã§ãã LLM ã«èªç±ã«ããã¹ãããã®ã§ã¯ãªãã ãDAG ã«æ²¿ã£ãå æçãªäžè²«æ§ã ããã§ãã¯ããæ çµã¿ãè¶³ã LangGraph ã§ãLLM ã«èããããã¹ãããããšãã«ãŒã«ããŒã¹ã§æ€èšŒããã¹ããããããããã«åé¢ãã ãšããèšèšã®æå¿ãã¯ååã«æãããããšæããŸãã æåŸã« æ¬èšäºã§ã¯ãLangGraphãçšããå®è£
ã³ãŒãã亀ãã€ã€ãAIãšãŒãžã§ã³ããšå æã°ã©ããçµã¿åãããŠã調æŽãã¹ã倿°ããéžã°ããã¢ãããŒãã玹ä»ããŸããã ãããããŠè£è¶³ããŸããšãä»åæ±ã£ãç¯å²ã¯ãããŸã§ 調æŽéåã®ãã§ã㯠ãŸã§ã§ãã å®ããŒã¿ããå æå¹æãæšå®ããããåå®ä»®æ³ãè©äŸ¡ãããããæ®µéã§ã¯ãå³å¯ãªçµ±èšçæšå®ãæåºŠåæãäžå¯æ¬ ã§ããå®åã§ã®åæã«ãããŠã¯ãä»å玹ä»ããããžãã¯ã DoWhy ã EconML ãªã©ã®æ¢åãã¬ãŒã ã¯ãŒã¯ãšçµã¿åãããŠæŽ»çšããããšãããããããŸãã ä»åŸã®çºå±ãšããŠã¯ã ããå°ãè€é㪠DAGïŒå€æ®µã®äº€çµ¡ãã³ã©ã€ããŒãä»åšå€æ°ãªã©ïŒã§ LLM ããã¹ããã 調æŽéåã®åè£ãè€æ°åºãããã©ãããããã«ãããã§ãã¯ãã å®ããŒã¿ãšæ¥ç¶ããDoWhy/EconML åŽã§æšå®ããçµæã LLM ã«èŠçŽããã ãšãã£ãæ¹åæ§ãèããããŸãã LLM ã«ããªãïŒããèªããã€ã€ããã®è£åŽã§ å æã°ã©ããš Python ã®ããžãã¯ã§è¶³å Žãåºããââ ããããçµã¿åããæ¹ã®äžäŸãšããŠãæ¬èšäºãäœãã®ãã³ãã«ãªãã°å¹žãã§ãã æåŸãŸã§èªãã§ããã ããããããšãããããŸãã! ããã§ã¯ãåŒãç¶ããã Advent Calendar ã©ã€ãïŒïŒïŒããéãããã ãã!!