TECH PLAY

Finatextホールディングス

Finatextホールディングス の技術ブログ

32

A view of Oku-Nikko, Mt Nantai and Lake Chuzenji (taken by me last week from the summit of Mt Nikko Shirane) Season’s Greetings, I’m Todd, a Data Engineer and Data Scientist at Nowcast, and this article is day 14 of the 2025 Nowcast Advent Calendar. The Problem (Multi Region Data Transfer) At Nowcast we provide our clients access to Japanese Alternative Data. To deliver this data we maintain various data feeds, which are usually updated on a daily basis. We have clients of our data products based all over the world, and one of the first things we did when migrating to Snowflake around 3 years ago was to set up a data delivery system, so that we can share our datasets through Snowflake. In fact, Snowflake shares were one of the features that motivated us to adopt Snowflake in the first place — in effect we could allow clients direct access to tables in our data warehouse, and we wouldn’t have to worry about things going wrong in the data export processes that are necessary for delivering data on AWS S3 for example. The reality however, was a little more complicated, It is not possible to set up cross region sharing with a direct Snowflake share [1] . Because of this limitation, and the fact our clients have Snowflake accounts in many different regions, and are using various cloud providers, the initial version of our Snowflake delivery system was more complex than originally anticipated. The Archaic Solution In our initial solution, we handled all replication into ‘secondary’ regions on our side. Our main data warehouse (Internally referred to as the ‘Datahub’) is set up in the AWS AP-Northeast region. The delivery flow was something like below: Upstream ETL pipelines load new data into the Nowcast DATAHUB DB. Daily Data Transfer job loads data from the DATAHUB DB to the primary DATA_SHARING DB. Some client specific filtering or post-processing will happen here. Replicate the primary DATA_SHARING DB to other accounts, where the relevant tables are shared to clients. Shares are configured on the secondary accounts, so that tables can be delivered to clients. The Upstream ETL Process and Data Transfer jobs are scheduled by Airflow, and the Replication job was triggered by a Snowflake task on the ‘secondary (replica)’ tables’ accounts. It is worth stating that this architecture did work, and we didn’t experience any delivery failures during the 2+ year period that it’s been operational, but there are many downsides to this architecture. Many Managed Snowflake Accounts We have to create a new Snowflake account every time we need to send data to a client that is using an account that we have not previously delivered to. This causes a lot of issues, such as a lot of terraform code being required to set up the account, and some added complexity when it comes to data monitoring — as we need to set up monitoring infrastructure in every account that we are delivering data to. The end result is that it takes a long time to set up new deliveries if we need to create a new account — this isn’t something that we needed to worry about when delivering through other channels, such as S3. Needless Replication Due to the way the system was designed, we ended up replicating all data to all accounts. We didn’t want to make a ‘primary data sharing’ DB for every single customer, or every single product — as this would cause even more terraform/infrastructure bloat than we already had. Because of this, we ended up replicating data into accounts where it wasn’t being utilized — for instance if we had a client in us-east who subscribed to data product A and B, and another client in us-west who subscribed to product C, we would be needlessly replicating C into us-east, and A and B into us-west. Complexity around shutting off feeds If we had to shut off a feed, if it was the only feed in a given account, we were faced with a choice — do we delete the account? We might need to use it again in the future, but we might not. If we do end up deleting the account, we could have a scenario where we create a whole new Snowflake account and associated infrastructure, just to delete it again 6 months down the line. Cost The amalgamation of the above issues is an unwieldy and ultimately overly expensive process. We need to manage the state of multiple accounts, we need to regularly replicate data from our account to the accounts where we can share data to clients. In addition to this, due to the complexity of the system, the engineers didn’t want to deal with it — it was costly not just in the monetary sense, but also when it came to the onboarding cost for engineers using the system. When we caught wind of Snowflake Private Listings [2] we knew we had to take a look. Reworking the Architecture using Snowflake Listings The most attractive point about private listings was the fact that cross account replication was managed by Cross-Account Auto Fulfillment [3] . In other words we would no longer have to manage the accounts to which the data was being shared, we would no longer have to worry about cleaning up unused accounts, and would no longer have to manually replicate data across accounts. After some re-architecting, the new delivery process was deployed earlier this year, and has proven to be cheaper and easier to use than the old system. The flow is as follows: (Same as before) Upstream ETL pipelines load new data into the Nowcast DATAHUB DB. (Same as before) Daily Data Transfer job loads data from the DATAHUB DB to the primary DATA_SHARING DB. Some client specific filtering or post-processing will happen here. shares are configured on the primary account — we create one share for each data product we create one listing for each data product, and then can use this to control which data goes to which client At first glance it may seem more complex than the original architecture, but the key is in what infrastructure needs to be managed by Nowcast. We need to manage a share and listing for each product, but we no longer need to keep track of which account each client is using — and most importantly we no longer need to manage an account for each client. All of this is handled by Snowflake behind the scenes — so the only thing we really need to keep track of is which data needs to be send to which client, and this can all be done from a single Snowflake account. On top of these architectural changes, we also implemented terraform modules for Snowflake shares and listings — so we can define which files should be send to a given client using configuration in hcl like below: Share # example sample data module "data_transfer_sample_example_raw_general_share" { source = "./modules/share-with-grants/v1" share_name = "EXAMPLE_RAW_GENERAL_SHARE" share_comment = "generic sample data share for example data" database_name = "NOWCAST_DATA_SHARING" target_tables = [ "NOWCAST_DATA_SHARING.EXAMPLE.TABLE_A_SAMPLE", "NOWCAST_DATA_SHARING.EXAMPLE.TABLE_B_SAMPLE", "NOWCAST_DATA_SHARING.EXAMPLE.TABLE_C_SAMPLE", ] } Listing module "data_transfer_example_raw_general_listing" { source = "./modules/listing/v2" listing_name = "EXAMPLE_RAW_GENERAL_LISTING" listing_subtitle = "general listing for raw data" listing_description = "listing for transfer raw example tables" share_name = module.data_transfer_sample_example_raw_general_share.share_name replication_config = "0 0 1 JAN *" # static - this is a config that refreshes once a year target_accounts = [ # for testing purposes "XXYYZZ.NOWCAST_TEST_ENV", # other client ID "ABCDEG.12345678" ] } This is much simpler than the previous architecture, which for something like this could potentially require the creation of a whole new account — new databases, schemas, users, and of course the share. This reduction of complexity makes the new Listing based architecture much more favourable among the engineers that need to set up data deliveries! The Benefits The new architecture comes with a huge list of benefits over the old system: Much less infrastructure administration work on our side, especially when managing whole accounts just to transfer a single dataset to a single client. No more manually replicating data using Snowflake tasks All our listing config is in the same account — and can therefore be managed in the same terraform file Utilization of snowflake cost optimization for auto fulfillment for additional cost reduction [4] The new architecture is much more user (engineer) friendly than before — thanks to both the architectural design as well as the new terraform module — providing the data is already being prepared for as given delivery, setting up the data feed takes hours as opposed to days (or even weeks…) We can see which products are being shared with who very easily from the Snowflake private listing tab. There were also some unexpected benefits to the new architecture — for example when you add a consumers account ID to the listing’s config, Snowflake will automatically email the client about the new listing — this makes the data feed feel much more like a professional product. Example of email: In addition to this — to test if the listing is working properly, we can simply add another Nowcast managed account to the listing consumers list — and if we do this for all ongoing data feeds, we are able to test the integrity of our data feeds all from the same place. Previously we would have to monitor the data feeds on the ‘secondary’ accounts, meaning the complexity of the monitoring process could also be reduced due to private listings. Summary This tech blog goes over the history of Nowcast’s data delivery systems — illustrating the flaws in our old architecture — and how we were able to alleviate many of these issues by using Snowflake’s Private Listings. Hopefully this article can help other teams solve the issue oif multi-region data delivering in Snowflake! References https://docs.snowflake.com/guides-overview-sharing#options-for-sharing https://docs.snowflake.com/en/release-notes/2024/other/2024-04-24-pl https://docs.snowflake.com/en/collaboration/provider-listings-auto-fulfillment https://docs.snowflake.com/collaboration/provider-listings-auto-fulfillment-eco Optimizing Multi-Region Data Delivery using Snowflake Listings was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Finatextグループのテックブログは、MediumからZennに移行しました。以下からアクセスください。 Finatext Tech Blog 記事コンテンツ自体は移行していませんので、過去の記事はこのままMediumでお読みいただけます。また、英語圏向けの記事は今後もMediumで公開するかもしれません。 新ブログ、旧ブログとも、よろしくお願いいたします。 テックブログをZennに移行しました was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
こんにちは、Finatextグループのナウキャストでデータエンジニアをしているけびん(X: @Kevinrobot34 ) です。 先日「 みんなの考えた最強のデータ基盤アーキテクチャ2024前半おまとめ拡大版SP! 」というイベントに登壇し、 Snowflake でデータ基盤を作る際に大事になる権限管理とコスト管理の方法について、ナウキャストの事例を紹介させていただきました。 https://medium.com/media/f06fe7134446e775d34dbd4149635d29/href 要は「ロールの階層・ロールの分け方」「ウェアハウスの分け方」をどうするかという話で、ナウキャストの場合は以下のような構成にしています。 ナウキャストにおけるロール・ウェアハウスの構成のイメージ図。一部検証中の部分もあります。 これらについてポイントを改めて紹介します。 このロール・ウェアハウス構成を見る前に、Snowflakeにおける権限管理とコスト管理のポイントを整理しておきましょう。 Snowflake の権限管理 Snowflake における権限管理は DAC と RBAC を組み合わせたようなものとなっています。 オブジェクトの権限はロールに付与される オブジェクトはロールに所有される ロールはユーザーに付与される https://docs.snowflake.com/en/user-guide/security-access-control-overview より また Snowflake ではロールの階層構造を作ることが可能です。これにより、親のロールは子孫のロールの権限も継承することになります。 https://docs.snowflake.com/en/user-guide/security-access-control-overview より またロールにも主要なもので Account Role と Database Role の2種類あり、どれを使うか?もポイントになります。Database Role については以下の記事が分かりやすいです。 【Snowflake】データベース内のオブジェクトはDatabase Roleで権限管理しよう! よって Snowflake の権限管理における具体的なポイントとしては ロールをどう切り分けるか ロールの階層をどう作るか ロールとして Account Role と Database Role のどちらを作るか などがあります。 Snowflake のコスト管理 Snowflake ではウェアハウスが計算資源であり、これを何秒使ったかで課金されます。ウェアハウスは簡単にいくつでも作ることができまたウェアハウス別でコストを確認するのは容易です。 よって Snowflake のコスト管理ではウェアハウスをどう切り分けるかがポイントになります。 ちなみにコスト管理の観点以外にも、ワークロードごとにウェアハウスを切り分けておくことは大事です。例えばウェアハウスを共有してしまうと、あるワークロードの重い処理が別のワークロードに影響を与えてしまうといったこともあり得ます。 ロール・ウェアハウスの全体像 改めてナウキャストのロール・ウェアハウスの全体像を見てみましょう。 ナウキャストにおけるロール・ウェアハウスの構成のイメージ図。一部検証中の部分もあります。 ポイントとしては以下のあたりになります。 ロールについては Service / Functional / Access Role 層の3つのグループを作り管理 Service / Functional Role 層は Account Role で、 Access Role 層は Database Role で実装する Access Role 層で必要に応じて階層を一つ作る ウェアハウスについては Service / Functional Role 層でロールとセットで作成する Service / Functional / Access Role 層などは以下のブログなど、最近よく見かけますが、ウェアハウスの組み合わせ方などでナウキャスト独自の工夫をしています。 Our Top 7 Snowflake RBAC Best Practices 層ごと詳しく見ていきましょう。 Access Role 層について Access Role 層は各オブジェクトの細かい権限を隠蔽し程よくまとめておくことで、ロールの付与の有無で権限を管理できるようにするための層です。 例えば、「ある Schema の任意の table の read 権限」を用意するためには Database の USAGE Schema の USAGE All Tables の SELECT Future Tables の SELECT の4つの権限が必要です。これらを一つのロールに table-select にまとめておくことで、このロールを Functional / Service Role に付与するかどうかで簡単に table の read 権限を付与することができ、管理が楽になります。 Access Role 層のロールはあくまで権限をまとめて利用しやすくしたもののため、 Database Role で実装するのが便利です。これにより ウェアハウスの権限は付与できないため、ウェアハウスの乱立を防げる Access Role 層のロールを直接ユーザーに付与することを防ぐ Snowsight のロール選択画面に Access Role 層のロールは表示させない といったことが可能になります。 Functional Role 層について Functional Role 層は人間の利用するユーザーのためのロールを管理する層です。 Type property が PERSON のユーザーに付与するためのロールを管理しているとも言えます。 この層ではロールだけでなく、ウェアハウスもセットで管理するようにしています。これにより権限管理とコスト管理を同時に考えやりやすくしています。 Functional Role 層のロールはユーザーに付与しウェアハウスの利用権限も付与する必要があるため Account Role で実装します。 また、Functional Role 層同士で階層は作らず、 Access Role 層の Database Role を適切に付与することで権限管理するようにしています。必要以上に階層が増えると認知負荷が増え権限管理が難しくなるためです。 Service Role 層について Service Role 層はシステムの利用するユーザーのためのロールを管理する層で、 Type property が SERVICEのユーザーに付与するためのロールを管理しているとも言えます。 ロール付与対象のユーザーがシステムであること以外は全て Functional Role 層と同じです。 システムの利用するユーザーと人間の利用するユーザーが使うロール・ウェアハウスを別々のものにしておくというのがポイントになります。 システムと人間とで似たような権限を使うことが多いかと思いますが、ロールを共有してしまうと過度に強い権限のロールが使われていく傾向が出てしまい、最小権限の法則に反しよくないです。 またコスト管理の観点からは、システムと人間が使うウェアハウスは分離しておかないと定期バッチとAdhocな開発・分析のコストの分離が面倒になります。 これらのポイントから Functional Role 層と Service Role 層を分けておくことにしています。 おわりに ナウキャストでは一緒に働く仲間を募集中です!興味のある方はTwitter で @Kevinrobot34 にご連絡ください! 会社紹介ページも是非ご覧ください! データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス また最近 Finatext グループ公式のエンジニア向けXアカウント @FinatextDev も作成しさまざまな発信をしているのでぜひフォローしてください! Snowflake における権限管理とコスト管理 was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Photo by Rubaitul Azad on  Unsplash Finatext でプロダクト開発を行っている土田です。 社内向けに「 InnoDB のロックアーキテクチャを理解してデッドロックを説明できるようにする 」という記事を書いたところ、いろいろと反応をいただいたので Zenn 記事として公開することにしました。 InnoDB のロックアーキテクチャを理解してデッドロックを説明できるようにする 開発中に起こったデッドロックを題材にして InnoDB のロックアーキテクチャを解説しています。インデックス、ロックアーキテクチャ、実際のロックの確認、デッドロックを説明する、という順序で書いています。興味の有る方はぜひ読んでみてください。 過去の自分は、DBのロックに対して「なんとなくレコードをロックするんだろう」という理解でした。ですが、今回の調査と執筆を経て、かなり解像度が上がりました。アウトプットは大切ですね。 弊社では、こういったアウトプットを推奨しており、社内でも多くの反応をいただけます。これは嬉しい環境だと思います。 Finatext では一緒に働く方を募集しています。ご興味ある方は覗いてみてください。 https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href InnoDB のロックアーキテクチャに関する記事を書きました was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
シカゴの象徴「Cloud Gate」、通称「豆」 / 筆者撮影 こんばんは。 Finatext のクレジット事業でエンジニアをしています、東郷です。 先日、シカゴの GopherCon 2024 に参加してきたのですが、「 Advanced Generics Patterns 」を発表した Axel Wagner さんに飲み会で「なんで generics で interface Union を使えないの?」と相談したところ興味深い回答をいただけたので、ここに紹介させていただきます。 大切なお話: なにぶん飲み会の立ち話であり、もしかすると筆者が盛大に勘違いしている可能性もあります。 Axel さんの公式なご見解ではなく、あくまで筆者の認識となりますことをご承知おきください。 Axel による GopherCon 2024 の Axel の登壇 / 筆者撮影 Generics Go 1.18 で generics が登場してからというもの、我々の生活は一変しました。 分かりやすい例でいうと builtin の slices.Equal のような、非常に便利な関数を型に囚われることなく実装・利用できるようになりました。 import "slices" func main() { println(slices.Equal([]int{1, 2, 5}, []int{1, 2, 5})) // true println(slices.Equal([]int{1, 3, 5}, []int{1, 2, 5})) // false println(slices.Equal([]int32{1, 2, 5}, []int64{1, 2, 5})) // compile error: type []int64 of []int64{…} does not match inferred type []int32 for S } slices.Equal の signature は下記のようになっており、比較可能な任意の型の要素を持つスライスを渡せるようになっています ( ソースコード ) func Equal[S ~[]E, E comparable](s1, s2 S) bool Interface Union Type これらの generics を使って便利に開発を進められるようになった一方、一つの壁にぶつかりました。 「いずれかの interface」を示す型を union で定義できなかったのです。 import "fmt" type Byteser interface { Bytes() []byte } // cannot use fmt.Stringer in union (fmt.Stringer contains methods) func Stringify[T interface{ fmt.Stringer | Byteser }](v T) string Type Sets Interface union とは何ぞやという点ですが、まずは下記のコードを御覧ください。 import "golang.org/x/exp/constraints" func Sum[T interface{ constraints.Integer | constraints.Float }](vs ...T) T { var sum T for _, v := range vs { sum += v } return sum } func main() { println(Sum(1, 2, 5)) // 8 println(Sum(1.2, 1.5)) // 2.7 println(Sum(int(1), 1.2)) // cannot use 1.2 (untyped float constant) as int value in argument to Sum } ここでの constraints.Integer および constraints.Float はそれぞれ下記のようになっています ( ソースコード ) type Signed interface { ~int | ~int8 | ~int16 | ~int32 | ~int64 } type Unsigned interface { ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr } type Integer interface { Signed | Unsigned } type Float interface { ~float32 | ~float64 } ここで示されているのは、たとえば constraints.Integer ならば「任意の整数型を受け取れる」型が定義されています。これを「 Type Set 」といいます。 これにより、前述の例の Sum 関数は、任意の整数型または浮動小数点方を引数で受け取れるようになっていて、かつ同じ型で合計値を返せるようになっています。 なぜ Type Set はよくて Interface Union がだめなのか これが疑問でした。 type set で primitive は複数の値を union にできるのに、 interface の組み合わせができないのか。 Axel さんに伺った話では、その理由はずいぶんとシンプルなものでした。 Generics は Type Switch を前提とした機能ではない 冒頭で挙げた Stringify 関数を例に挙げると、その中身を仮に実装するなら下記のようなコードになります (interface union が使えないのでコンパイルエラーになります) import "fmt" type Byteser interface { Bytes() []byte } // cannot use fmt.Stringer in union (fmt.Stringer contains methods) func Stringify[T interface{ fmt.Stringer | Byteser }](v T) string { switch v := any(v).(type) { case fmt.Stringer: return v.String() case Byteser: return string(v.Bytes()) default: panic("unreachable") // compiler requires return-equiv statement } } ここでは内部で type switch を行っており、関数の signature 以外は any と何ら変わらない処理となっています。 そもそも generics が導入された動機はこういった type switch や reflection を削減することにメリットがあり 「type switch を前提とするユースケースはそもそも generics の目的に合致しないのでは?」 ということでした。 なるほど言われてみれば確かにその通り。 type sets は互換のある演算子のみを使っており、 type switch 等は一切不要となっています。 回避策 とはいえ「じゃあ any で」としてしまうのは型安全性の不便さが残ってしまうので回避策を考えると、このようなケースで generics が向かないならば、いっそ generics を使わず関数を分離してしまうという手もあるかもしれません。 import "fmt" type Byteser interface { Bytes() []byte } func StringifyStringer[T fmt.Stringer](v T) string { return v.String() } func StringifyByteser[T Byteser](v T) string { return string(v.Bytes()) } あるいは Axel さんの登壇でも例示されたように、 builtin の関数のように func を渡す形の別関数を用意するのもよさそうです ( 例 ) func Equal[S ~[]E, E comparable](s1, s2 S) bool func EqualFunc[S1 ~[]E1, S2 ~[]E2, E1, E2 any](s1 S1, s2 S2, eq func(E1, E2) bool) bool おまけ 実は type switch による performance downside を計測して付記しようとしたのですが、思った結果が得られず「type switch しても速度がほぼ変わらない」という、思ってたんと違う形になりました。が、その謎も思わぬ形で早々と解決しました。 GopherCon 2024 の最終日、 Google Go team の Keith Randall さんが登壇「 Interface Internals 」にて 「Go 1.22 で interface type switch のパフォーマンス改善しといたよ!」 と仰られていたのです。 試してみると、たしかに Go 1.21 では type switch のほうが遅い結果となりました。1.22 ではほぼ差がなくなっており、しっかりと改善の効果を実感できます。 $ asdf local golang 1.21.5 $ go test -benchmem -bench . goos: darwin goarch: arm64 pkg: github.com/ktogo/benchtest BenchmarkStringifyAnyStringer-12 178512208 6.670 ns/op 0 B/op 0 allocs/op BenchmarkStringifyAnyByteser-12 50326334 23.01 ns/op 5 B/op 1 allocs/op BenchmarkStringifyStringe-12 494067585 2.430 ns/op 0 B/op 0 allocs/op BenchmarkStringifyByteser-12 248494794 4.812 ns/op 0 B/op 0 allocs/op PASS ok github.com/ktogo/benchtest 6.380s $ asdf local golang 1.22.1 $ go test -benchmem -bench . goos: darwin goarch: arm64 pkg: github.com/ktogo/benchtest BenchmarkStringifyAnyStringer-12 393826981 2.943 ns/op 0 B/op 0 allocs/op BenchmarkStringifyAnyByteser-12 82505652 13.98 ns/op 5 B/op 1 allocs/op BenchmarkStringifyStringe-12 493270286 2.414 ns/op 0 B/op 0 allocs/op BenchmarkStringifyByteser-12 250246922 4.802 ns/op 0 B/op 0 allocs/op PASS ok github.com/ktogo/benchtest 6.000s We’re Hiring! Finatext グループでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! 株式会社Finatextホールディングス の全ての求人一覧 サーバーサイドエンジニア(クレジット事業) - 株式会社Finatextホールディングス https://medium.com/media/690dbcea386e5161a6c8c1f482bee1d7/href Go with Generics: なぜ Interface Union が使えないのか was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめまして!ナウキャストでデータエンジニア / データサイエンティストをしている大森 (X: @yukimooori )です。普段はオルタナティブデータを活用した民間統計の作成や、データパイプラインの構築などを行なっています。 ナウキャストでは2023年から Snowflake x dbt の環境を導入し、全社の各プロジェクトで利活用を進めています。 今回の記事ではこの Snowflake x dbt 環境 を利用し、「 HRog賃金Now 」という民間統計プロダクトをゼロから開発した中で感じた 分析・開発体験の大幅な向上について まとめてみようと思います! Snowflake 単体でのメリットなども多いですが最後まで読んでいただけると幸いです。 ナウキャストにおける Snowflake x dbt 環境ってどんなもの? インフラについては弊社のデータエンジニアのけびん(X: @Kevinrobot34 )が DataOps Night #4 で登壇した際に詳しく解説しているのでそちらをぜひご参照ください! https://medium.com/media/2afeb664a62891c4a26cf12b4679544e/href 本記事では簡単に、 DWH として Snowflake を利用、モデルの管理として dbt を使っている ということだけ抑えていただければ大丈夫です。 オルタナティブデータを使った民間統計の作成ってどんな作業? 次に環境のメリットを語るために簡単に業務のイメージをご紹介できればと思います! オルタナティブデータを使った民間統計の作成と聞くと皆さんの業務と関係ないように思われるかもしれませんが、 ある程度の規模のデータを使って何かしらの計算モデルを作成するという業務であればそのまま使える ものが多いと思います。 作業の流れとしては以下です!一般的なモデル作成そのままです。 EDA モデルの試作 評価 それぞれ簡単に紹介すると、EDA (Exploratory Data Analysis) は探索的データ分析と訳されるもので、扱うデータの特徴を理解するために様々な角度でデータを見た結果をまとめていく作業です。 EDA がある程度済んだら次はモデルの試作です。ナウキャストでは今回作成した募集賃金指数の「 HRog賃金Now 」の他に国内の消費動向指数の「 JCB消費NOW 」などのモデルを作成しています。ここではできる限りデータのバイアスを取り除き、今の経済状況を適切に捉えられるような加工を EDA の結果を踏まえながら加えていきます。 最後に作成したモデルの評価です。すでにある公的統計などと比較しながら作成した指数を評価していきます。 上記の流れが一巡したらまた EDA に戻ってということを繰り返しながら最終的な統計が完成していきます。 これまでの分析環境 グループ全体で AWS を採用しており、データの格納や分析ツールについては全て AWS のリソースを利用していました。 EDA は Python から Amazon Athena 経由で Amazon S3 のデータへアクセスし、取得したデータを pandas で整形して plotly などで可視化を行っていました。ちょっとデータを見たい時にもいちいち可視化が必要だったのが面倒に感じるポイントでした。 モデル作成時の計算は Amazon ECR に image を push して AWS Batch で並列計算していたため、 分析のコンピューティングリソースの管理なども自分たちで行なっていました。 色々なパターンを並列で回すということをしていましたがこれが一回数時間ととんでもなくかかっていたんですよね。。。 評価については local の notebook などで行い、結果を notebook 形式で共有していました。 Snowflake x dbt への移行によりどんな開発体験の向上があったのか EDA Snowflake では Amazon S3 を source として分析を行うことが可能です。S3 のデータをそのまま load すると毎回時間がかかってしまうため、一度 Snwoflake 上に table を作成し、それを使って分析をしています。 EDA には Snowsight の worksheets を用いましたが、table 形式での出力に加え、簡単な可視化までをサポートしてくれているのが非常に便利 です。 Snowsight worksheets での分析は簡単な可視化までサポートされている この当時は Snowflake Notebooks がまだリリースされていなかったため worksheet を使っていましたが、 今 EDA をするなら Snowflake Notebooks はかなり有力な選択肢 かと思います! Notebooks の機能についてはこちらの記事で詳しく解説されていたので詳細が気になる方はこちらも是非参照ください。 ついにSnowflakeにNotebookが搭載された! 個人的には Notebook 形式で結果が残せるというのはもちろん、 セル間のデータのやり取りができる というのがすごく魅力的です! 大きなデータの処理は SQL で、細かな微調整は Python でということをやりたくなりますが、まさにそれが Notebook の中で完結しているのが体験として最高です! モデルの試作 Snowflake x dbt 環境の導入により、EDA の延長でそのままある程度までは worksheet でモデル作り、大枠ができたタイミングで dbt に移行。 dbt では Jinja テンプレートやマクロを活用して細かな調整や色々なパターンを試していました。 {% macro preprocess(param_a, param_b, param_c, ....) %} {% set employment_types = ["full_time", "part_time"] %} with {% for employment_type in employment_types %} {% set employment_type_loop = loop %} {% set suffix = "_" + employment_type %} wage_table_{{ suffix }} as ( select scraping_date, scraping_location, ... from ... where {% if employment_type == "part_time" %} ... {% endif %} ... {% endfor %} {% for employment_type in employment_types %} {% set employment_type_loop = loop %} {% set suffix = "_" + employment_type %} select * from wage_table_{{ suffix }} {% if not employment_type_loop.last %} union all {% endif %} {% endfor %} {% endmacro %} 実行については local から dbt run でまとめて実行できます。リソースが足りない時には Snowflake の warehouse のサイズを変えれば簡単にスケールアップできるので、長くても一回20分程度で計算が終わるようになりました。これにより コンピューティングに頭を使わずモデルの作成に集中できたため、非常に高速に試行錯誤を繰り返すことができました。 ただし、Jinja テンプレートを使いすぎると可読性が落ちたり、巨大なクエリによりハードウェア側のリミットに当たるという事象も発生したため、ある程度のところでモデルの分割や複雑なロジックを逃すなどを検討しても良いと思います。こちらについては私と同じチームのデータエンジニアの福井(Linkedin: Masahiro Fukui )が記事を書いているので合わせてご覧ください! Snowflake x dbt: クエリの並行実行で直面した課題と並行数のコントロール Snowflake におけるモデル作成で他に便利だった点としては、 Snowflake Marketplace 上に PODB(Prepper Open Data Bank) という e-Stat で国が公開している情報などがTruestar社によって整備されたデータセットを活用できる ところです。 例えば、各企業に従業員数の情報をつけて従業員数区分ごとの分析をしたいといった際に、データを拾ってきて載せるということなく既に Snowflake にあるデータセットとして利用することが可能です。 さらに dbt の generic test によりモデルの品質の担保ができる 点も有用です。これはモデル作成時にも unique テストや not null テストに引っかかっていることを検知できたり、プロダクトへ組み込んだ際も最新の実データについてテストができるという点がプロダクトの品質向上に繋がっています。 例えば分析に用いる企業と業種のマスタについて、以下のように models を定義しておくことで、company_id は unique かつ not null であることが保証されます。 version: 2 models: - name: company_sector_master description: | Mapping of the company id to the corresponding industry sector. columns: - name: scraping_date description: date of scraping data_type: timestamp - name: company_id description: company id data_type: varchar tests: - not_null - unique - name: sector description: the industry sector of the company data_type: varchar tests: - not_null 評価 以前は local の Notebook で結果をまとめていましたが、今回は Streamlit で簡単な評価アプリケーションを作りそこで可視化や評価を行いました。 求人広告データから募集賃金指数を作っていたため、公的統計である毎月勤労統計との比較を行っています。 Streamlit で作成した評価アプリケーション 選択画面 Streamlit で作成した評価アプリケーション 結果表示画面 Streamlit の活用についてはデータエンジニアの瀬能が社内勉強会の資料を上げてくれているのでそちらもご覧ください! Streamlit Bootcamp #1 使っている感触としては、簡単なアプリケーションをサクッと作るには SiS(Streamlit in Snowflake) でやるのが便利ですが、少し凝ったことをするのであれば Streamlit を ECS などで自前でホストしても良いと思います。 SiS だと インストール可能なパッケージ が限られたり、階層構造などが作れなかったりする ためですが、インストール可能なパッケージも順次増えている(例えば以前使えなかった polars が使えるようになっていた)ので徐々に使い勝手も良くなってくると思います。 今回はファイルの階層構造を作るのと Git でのコード管理をしたかったため local でアプリケーションを開発できるようにしておき、 アプリのデプロイ先として SiS を選ぶという方式 をとりました。 民間統計においては公的統計などとの比較など、notebook だと大量の出力で散らかるところを Streamlit でシンプルにできた点、 エンジニア以外のアナリストや営業メンバーもこのアプリケーションを使って結果を見れる点 が良かったと感じています。 全体を通して Snowflake のエコシステムに乗ることにより、 データや結果の共有がスムーズに進んだ のが良かったです。 全体として 以前の環境であればおおよそ半年ほどかかっていた新規の指数開発が二ヶ月ほどに縮まった ため、Snowflake x dbt 環境の導入は大きなコスト削減と開発速度の向上に寄与したと感じています! さいごに Snowflake x dbt 環境を用いた指数モデルの作成における開発体験の向上について分析の三段階に分けて紹介してきましたがいかがだったでしょうか? 弊社環境もまだまだアップデートしていきますし、Snowflake もますます便利になっていくことを期待しています! 弊社の最先端の分析環境で分析したい方、一緒に分析環境をよくしていきたい方、民間統計を作りたい方、この記事を読んで弊社に興味をお持ちいただけた方がいればぜひお話ししましょう! 【共通】カジュアル面談 - 株式会社Finatextホールディングス ナウキャストでは一緒に働く仲間を募集中です! データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス Snowflake x dbt 環境で改善したデータ分析・指数開発体験を大公開! was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
こんにちは。プラットフォームチームの taiki45 です。7月9日に行われたPlatform Engineering Kaigi 2024にて、「セキュリティとコンプライアンスを保ちつつ生産性の高い開発を実現するためのプラットフォーム」というタイトルで、Finatextのプラットフォームチームでの取り組みを発表したので紹介します。 https://medium.com/media/8b71885c7c807cd3eb80663c5879a10c/href Platform Engineering Kaigi 2024 Platform Engineering Kaigi 2024では、“Team Topologies” の共同著者のManuel Paisさんのキーノートがあったり、長年プラットフォームエンジニアリングに取り組んでいるメルカリさんの発表があったり、他にもおもしろい発表が数多くありました。プラットフォームエンジニアリングに関する日本で最初のカンファレンスということもあり盛り上がりを感じました。スケジュールは以下のページから参照できるので、タイトルから検索すると公開されている発表資料にたどり着けると思います。 PEK2024 タイムテーブル 後日Youtubeにアーカイブ動画がアップロードされるようなので、そちらもぜひご覧ください。 Platform Engineering Meetup Finatextグループでは、この発表にあるようなおもしろい技術的チャンレンジがあり、またそれに挑戦できる環境があります。興味を持っていただけたらぜひ採用情報を見てみてください〜! https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href Platform Engineering Kaigi 2024での発表 was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめに こんにちは。Finatextグループ保険事業でデータサイエンティスト / データエンジニアを担当している高橋です。 先日、Finatextグループで行われたAIアイデアソンに出場した件について書きます。アイデアソンの概要については次の記事でまとめていますので、よろしければそちらをご覧ください。 生成AIを用いた業務改善アイデアソンを開催しました さっそくどのように準備を行い発表を迎えたか、時系列でお話ししたいと思います。 アイデアの選定 アイデアソンにはスマートプラス少額短期保険の代表である小山とチームを組んで出場しました。 チームとして最初の活動はアイデア出しです。まず大まかな方向性として、最近研究が進んでいる保険×LLMの範囲で考えることにしました。幸いなことに保険はデータ・AIと親和性が高く、その関連分野であるLLMのアイデアは比較的考えやすかったです。 アイデア出しの後はどのアイデアを発表題材にするか検討します。下記のアイデアソン評価観点を参考に決めました。 1. 効率化対象の業務が生成AIに適した課題か 2. システム化の実現可能性 3. 効率化効果の大きさ AIアイデアソンという名目から、「効率化対象の業務が生成AIに適した課題か」を特に意識しました。そのために、まず LLMと一般的な機械学習モデルとの違いを整理し、なぜLLMでないといけないのかを突き詰めました。 最終的に、RAGやマルチモーダルなどをLLMの特徴と考え、その特徴を活かしやすい「損害査定の効率化」を発表題材に採用しました。 損害査定とは保険会社が契約者から保険金請求を受けて行う一連の処理のことです。これまでの損害査定には資料の継続的蓄積・共有や、高度な判断能力の要求など様々な課題がありました。しかし、LLMを導入することでこれらの課題を解決できると私達は考えました。 損害査定にLLMを導入することで既存の課題を解決し効率化できる デモ実装 発表のために実際に動いているデモを用意しました。実装には、アマゾン ウェブ サービス(AWS)のBedrockを採用し、LLMはマルチモーダル対応のClaude 3を使いました。 デモの大まかな仕組み RAGはKendraとOpenSearchのどちらを使用するか迷いましたが、扱うデータにPDFやWordなどの非構造データが多かったためそれらの扱いに優れるKendraを採用しました。 結果的にPDFのS3保存などの一部作業だけで容易にRAGを構築できました。 発表 損害査定という馴染みの薄い業務の効率化をいかに短時間で分かりやすく伝えるか苦心しました。最終的に、背景や課題の説明を必要最低限にして、デモや総括で有用性を強く強調する構成にしました。 実際のデモ画面 総括として実用性を強調する 最優秀賞を受賞 後日結果発表が行われ、私たちのチームは最優秀賞&AWS優秀賞を受賞できました!(最優秀賞が最高賞なので、つまり優勝です。)これは、データサイエンティストとして、保険×LLMの可能性を示せた結果であるとも思い非常に満足しています。 受賞の様子 また、このような素晴らしいアイデアソン開催に関わっていただいた運営の方々、AWS賞のご提供など各方面でご協力くださったAWSの方々にお礼申し上げたいです。 今後について LLMを用いた損害査定の効率化について、保険商品の種類によっては、複雑なマルチモーダルの入力を処理する難易度の高いケースも考えられます(例えば自動車保険の損害査定で車の画像から損傷度合いを判定するなど)。 そのような場合でもプロダクトとして安定した品質を提供するためには、まだまだ実験や改良を重ねる必要があると考えています。今後も研究を続けていきたいと思います。 最後に Finatextではデータエンジニアなどデータ系の職種で募集を行っています。ぜひご応募ください! データエンジニア|保険事業 - 株式会社Finatextホールディングス https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href カジュアル面談も募集しています。保険×データに興味ある方いましたらぜひお話ししましょう。 【共通】カジュアル面談 - 株式会社Finatextホールディングス 社内AIアイデアソンにて「LLMを用いた損害査定の効率化」で優勝しました was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめに この記事は、Go Conference 2024 で発表した「自動生成されたhttpエンドポイントごとにカスタムミドルウェアを挿入したい話」にて、スライドには収まらなかったサンプル実装について書きます。 概要をざっくり掴みたい方は、スライドも併せてご覧ください 👋 https://medium.com/media/966e637c49df57058f5e0bc7e42c0bb7/href 参加レポも書いたので、こちらもチェックよろしくです 👍 https://medium.com/media/3aec84d9c25956d2d937f15a594dbdd6/href 発表で話したことのおさらい OpenAPI定義からGoコードを生成するツールに oapi-codegen があります。oapi-codegenでは現状、個別のエンドポイントに対してミドルウェア処理を行う方法がサポートされていません。 そのため、発表ではOpenAPI側からのアプローチと独自のContextを用意することで解決した話をしました 👋 方針 発表で説明したとおり、個別のエンドポイントに対してミドルウェアを設定するように実装します。 最終的なコードはこちらです。 GitHub - uh-zz/gocon2024: This is a code sample to supplement the slides presented at Go Conference 2024 OpenAPI 通常のエンドポイントに加えて、Admin用のエンドポイントを用意しています。 ここで注目するのは secrutiry と securitySchemes の部分です。 paths: /things: get: security: - Role: - "normal" responses: 200: description: a list of things content: application/json: schema: type: array items: $ref: '#/components/schemas/ThingWithID' /admin/things: get: security: - Role: - "admin" responses: 200: description: a list of things content: application/json: schema: type: array items: $ref: '#/components/schemas/ThingWithID' components: securitySchemes: Role: type: http scheme: bearer どちらもOpenAPI 3.0 で定義されているキーワードです。 Authentication securitySchemes キーワードを使って、任意の名前でスコープを作成します。 作成したスコープは、 security キーワードを使って各APIに対して設定します。 上記の内容で、 oapi-codegen にてGoコードを生成します。 生成したコードには指定したルーティングエンジンの処理が含まれます。 サンプルでは、ルーティングエンジンは Echo を指定しています。 RegisterHandlers を呼び出すことで、エンドポイントとハンドラ関数の紐づけを行います。 // RegisterHandlers adds each server route to the EchoRouter. func RegisterHandlers(router EchoRouter, si ServerInterface) { RegisterHandlersWithBaseURL(router, si, "") } // Registers handlers, and prepends BaseURL to the paths, so that the paths // can be served under a prefix. func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL string) { wrapper := ServerInterfaceWrapper{ Handler: si, } router.GET(baseURL+"/admin/things", wrapper.GetAdminThings) router.GET(baseURL+"/things", wrapper.GetThings) } ルーティングされるハンドラ関数も以下のように生成されます。 // ServerInterfaceWrapper converts echo contexts to parameters. type ServerInterfaceWrapper struct { Handler ServerInterface } // GetAdminThings converts echo context to params. func (w *ServerInterfaceWrapper) GetAdminThings(ctx echo.Context) error { var err error ctx.Set(RoleScopes, []string{"admin"}) // Invoke the callback with all the unmarshaled arguments err = w.Handler.GetAdminThings(ctx) return err } // GetThings converts echo context to params. func (w *ServerInterfaceWrapper) GetThings(ctx echo.Context) error { var err error ctx.Set(RoleScopes, []string{"normal"}) // Invoke the callback with all the unmarshaled arguments err = w.Handler.GetThings(ctx) return err } OpenAPIで設定したスコープが設定されていることが確認できます。 スコープは、 RoleScopes という名前で ctx に格納されます。 アプリケーションではスコープを取り出して、スコープに紐づくミドルウェアを実行するような実装を行います。 また、上記のハンドラ関数で呼び出している w.Handler の型はServereInterface です。 これはルーティング関数、ハンドラ関数と同様に生成されたインタフェースです。 // ServerInterface represents all server handlers. type ServerInterface interface { // (GET /admin/things) GetAdminThings(ctx echo.Context) error // (GET /things) GetThings(ctx echo.Context) error } アプリケーションは、このインタフェースを満たす実装を用意していきます。 アプリケーション 生成されたインタフェースを満たすように実装したコードは以下のとおりです。 var _ gen.ServerInterface = (*Server)(nil) type Server struct {} func NewServer() Server { return Server{} } func (s Server) GetAdminThings(ctx echo.Context) error { log.Printf("GetAdminThings") return nil } func (s Server) GetThings(ctx echo.Context) error { log.Printf("GetThings") return nil } 上記の実装と生成されたコードを、main関数にて紐づけを行います。 func main() { s := server.NewServer() e := echo.New() gen.RegisterHandlers(e, s) // And we serve HTTP until the world ends. log.Fatal(e.Start("0.0.0.0:8080")) } 独自のContext 各エンドポイントごとのミドルウェア実行を実現するために、独自のContextとそれに紐づくレシーバメソッドを用意します。 実装は、以下の記事を参考にしています。 echo.Context を最大限に活用する - アルパカ三銃士 type ( Middleware func() Middlewares map[string][]Middleware ) type OriginalContext struct { echo.Context } func NewOriginalContext(ctx echo.Context) *OriginalContext { return &OriginalContext{ctx} } func (c OriginalContext) BindValidate(m Middlewares, i interface{}) error { scopes, ok := c.Get(gen.RoleScopes).([]string) if !ok { scopes = []string{} } for _, scope := range scopes { if middleware, ok := m[scope]; ok { for _, mw := range middleware { mw() } } } if i == nil { return nil } if err := c.Bind(i); err != nil { return err } if err := c.Validate(i); err != nil { return err } return nil } 独自のContextであるOriginalContext は、 echo.Context をラップしています。 生成されたコードで確認した通り、echo.Context からOpenAPIで定義したスコープを取得できます。 取得したスコープをもとに、割り当てたミドルウェアのリストを取り出して順次実行することができます。 ミドルウェアは以下のように、サーバを初期化するときにDIすることができます。 type Server struct { m Middlewares } func NewServer() Server { m := Middlewares{ "admin": []Middleware{ func() { log.Printf("admin middleware") }, func() { log.Printf("admin middleware 2") }, }, "normal": []Middleware{ func() { log.Printf("normal middleware") }, }, } return Server{m: m} } func (s Server) GetAdminThings(ctx echo.Context) error { log.Printf("GetAdminThings") c := ctx.(*OriginalContext) if err := c.BindValidate(s.m, nil); err != nil { return err } return nil } func (s Server) GetThings(ctx echo.Context) error { log.Printf("GetThings") c := ctx.(*OriginalContext) if err := c.BindValidate(s.m, nil); err != nil { return err } return nil } まとめ 発表では説明できなかった具体的な実装について紹介しました。 あくまで oapi-codegen で現状サポートされていないという前提のアプローチなので、参考程度に見ていただけると幸いです。 さいごに 引き続き、issueの動向を見守りつつ、アップデートがあれば共有しますので、ぜひフォローもよろしくお願いします 👋 感想・ご意見ありましたら、メンション・DMでコメントよろしくお願いします(引用ポストでも泣いて喜びます)。 x.com 採用情報 Finatext グループでは一緒に働く仲間を募集中です。 https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href 会社のことも含めてカジュアルにお話したい方はこちらのフォームからもご応募お待ちしております 👍 【共通】カジュアル面談 - 株式会社Finatextホールディングス 自動生成されたhttpエンドポイントごとにカスタムミドルウェアを挿入したい話 ~実装編~ was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめに こんにちは!Finatextのソフトウェアエンジニアの和田と申します。Finatextグループ内のナウキャストで、データの開発基盤を作るチームに所属しています。 ナウキャストでは、データ基盤にSnowflakeを採用しています。 今回は2024年の2月にVersion2がリリースされ、Public PreviewとなったSnowflake CLIを使って構築した、Python UDFs・Stored Proceduresの自動デプロイの仕組みを紹介します。 また、Python UDFs・Stored Proceduresに独特の仕様に触れ、それに対応した具体的なリソースの構成についても紹介します。 自動デプロイをする上での既存の課題 現在、SnowflakeでPython UDFやストアドプロシージャを作成する際には、以下のような選択肢があります: インラインでSQLにソースコードを埋め込む Stage上にソースコードをアップロードし、それをimportする これらの方法を使う場合、以下のような課題が発生します: 管理のための作り込み: ソースコードをGitHubなどのバージョン管理システムで管理するためには、まずStageにファイルをアップロードし、その後にCREATE FUNCTIONなどのSQL文を実行する必要があります。これらを自分たちで作り込むには一定の工数が必要となります。 自動化の難しさ: 例えばSnowSQLでこれらの作業をCI/CDツール(例えばGitHub Actions)から自動化しようとすると、Snowflakeへの接続やツールのセットアップに手間がかかります。 Snowflake CLIでの解決方法 ここで登場するのが Snowflake CLI です。このツールを使用することで、上記の課題を簡単に解決できます。具体的には以下のような利点があります: 簡単なコマンド: snow snowpark buildやsnow snowpark deployのようなコマンドを使うだけで、StageへのアップロードからCREATE FUNCTIONの実行までを自動化できます。 簡単な導入: pip installで簡単にインストールできるため、セットアップが非常に簡単です。 Snowflake CLI Snowflake CLIは開発者向けのオープンソースのツールです。Snowflakeのリソースを手軽に作成・管理することができます。 ドキュメントはこちら↓ Snowflake CLI | Snowflake Documentation GitHub - snowflakedb/snowflake-cli: Snowflake CLI is an open-source command-line tool explicitly designed for developer-centric workloads in addition to SQL operations. 実際に利用しているコマンドは以下の4つです。 snow connection test … Snowflakeとの接続の確認 snow snowpark init … ソースコードを管理するディレクトリの初期化 snow snowpark build … ソースコードをまとめてzip化 snow snowpark deploy … Python UDFs・Stored Proceduresのデプロイ 具体的な実装例 実際にPython UDFs・Stored Proceduresをデプロイするまでの流れは下記です。 ソースコードを配置するディレクトリの初期化 GitリポジトリにPush Github Actionsで自動デプロイ 1. ソースコードを配置するディレクトリの初期化 snow snowpark init コマンドで以下のようなディレクトリが作成されます。 example_snowpark/ ├ app/ │ ├ __init__.py │ ├ common.py │ ├ functions.py │ └ procedures.py │ ├ requirements.txt └ snowflake.yml pyファイルの構成などは異なりますが、ほとんど同じ構成を利用しています。後述の snow snowpark build などのコマンドも上記の構成を前提として動作するため、あまり変えないほうが良いでしょう。 2. GitリポジトリにPush ソースコードを配置し snowflake.yml でPython UDFs・Stored Proceduresの設定をしたら、対象となるリポジトリにPushします。ここでは説明を省略しますが、実際にはここでpytestを利用したCIも設定しています。 3. Github Actionsで自動デプロイ snow snowpark build snow snowpark deploy を利用して、自動でソースコードをデプロイします。 GitHub Actionsで下記のWorkflowを設定しています。 steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v5 with: python-version: "${{ inputs.PYTHON_VERSION }}" - name: Install dependencies run: | python -m pip install --upgrade pip pip install snowflake-cli-labs snow --version - name: Check Connection run: | chown $USER ${{ inputs.CONFIG_FILE }} chmod 0600 "${{ inputs.CONFIG_FILE }}" mkdir ~/.config/snowflake cp ${{ inputs.CONFIG_FILE }} ~/.config/snowflake/config.toml snow connection test - name: Deploy run: | bash scripts/snowpark_deploy.sh ${{ inputs.WORKING_DIR }} - name: Send notification about finished deployment when succeeded ... #!/bin/bash set -eu TARGET_DIR=${1} cd $TARGET_DIR # build the source file zip snow connection test snow snowpark build # deploy the functions echo "Deploying functions to Snowpark..." snow snowpark deploy --replace 作成にあたって、以下の注意点があります。 snow snowpark build snow snowpark deploy は snowflake.yml が配置されているディレクトリでしか実行できない snow connection で接続先の設定(user_name, databaseなど)をどこから参照するか考える必要がある。 現状Snowflake CLIではkey pair認証の秘密鍵を設定する際にファイルしか指定できない(直接注入できない)ので、GitHub Actionsから利用するときはBasic認証(user_name, password)の方が利用しやすい。 このうち、接続先の設定については config.toml ファイル内に記載 環境変数から指定 が選べます。パスワードなどは環境変数からの指定が好ましいです。 具体的なリソース構成 下図はプロジェクトのDEV環境のリソース構成と、自動デプロイの流れを示しています。masterブランチでPython UDFs・Stored Proceduresの変更がPushされると、GitHub Actions上で自動デプロイが起動します。 Python UDFs・Stored Proceduresを作成する上での注意点 上図のように、 ソースコード格納先のStageを配置するコンテナ(Database・Schema) Python UDFs・Stored Proceduresの実体が作成されるコンテナ を分離しています。 これは主に、コンテナのCloneを行った際のUDFs・Stored Proceduresの挙動と関連しています。 SnowflakeのZero Copy Cloningは QA用のテスト環境 CI環境 などを作成したい際に低コストで利用できる便利な選択肢ですが、いくつか考慮しなければならない点があります。 その1つとして、UDFs・Stored ProceduresはClone時の挙動がその作成方法で変わるのは気を付けるべきポイントです。 Clone対象のコンテナ内部のStageからファイルをimportしているUDFs・Stored ProceduresはCloneしない そうでないUDFs・Stored ProceduresはCloneする という挙動となっており、シンプルにPython UDFs・Stored Proceduresの実体の作成先とソースコードの配置先を同じコンテナにすると、Cloneを利用する際に別途移行の作業が必要となってしまいます。 この点への対応策として上記の分離を行うことで、コンテナがCloneされたときに常にUDFs・Stored ProceduresもCloneされるようにしています。これで気軽にCloneを行って、パイプラインの検証を行えます! まとめ 以上のようにSnowflake CLIを利用することで、簡単に自動デプロイの仕組みを構築することができます。みなさんぜひ使ってみてください! 仲間を募集中です! Finatextホールディングスでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href また、7月10日に DataOps Night を開催予定です!お気軽にご参加ください! DataOps Night #4 データプロダクト/分析基盤開発の舞台裏 (2024/07/10 19:00〜) Automating Python UDFs and Procedures Deployment in Snowflake via CLI was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Intro Hi, I’m Masa (Masa Fukui, LinkedIn ), and I work as a data engineer at Nowcast (a Finatext Group company), primarily working on data pipelines that process various types of data, including job advertisement data and credit card transaction data. Our data infrastructure is built on Snowflake, dbt, and Airflow. Here is a summary of this blog post: In Snowflake and dbt, while it is convenient to perform many operations in a single model/query using dbt macros, excessive use of macros might lead to unexpected resource issues. When errors occur in Snowflake due to issues such as resource exhaustion, the query profile may become unavailable. In such cases, checking the state of the Queue or Warehouse can be helpful to identify and resolve the underlying issues. Snowflake and dbt each have methods to control the number of concurrent query executions, which can potentially resolve issues that arise from resource contention. Snowflake×dbt×Terraformでモダンなデータ基盤開発してみた As mentioned in the summary, in this blog, I will share a brief story about a performance issue we encountered in our pipelines and how we addressed it by monitoring the query queue and warehouse state in Snowflake, and controlling concurrency for our query execution. This experience also led us to review our warehouse settings and work on pipeline optimizations which in turn resulted in overall cost reduction (details to follow). When we hear the phrase “ a performance issue ,” many things come to mind, but in this story, the main issues come down to: In our pipeline, there are cases where the data volume in the output is substantially larger than the input during query execution, leading to potential performance issues. Excessive resource usage in the Snowflake Warehouse, occasionally resulting in a state where even the Query Profile cannot be displayed. Queries Where the Output Data Volume Exceeds the Input As previously mentioned, this article focuses on the challenges arising from cases where the data volume after processing (Output) is larger than before processing (Input) in a data pipeline. Usually, in data pipelines where big data sources such as large amounts of transaction data are involved, the data volume will often decrease after processing, resulting in Input > Output. Examples of such processing include: Data Selection : Selecting only the necessary columns and excluding unnecessary data from the result and the downstream. Filtering : Extracting only the data that meets specific conditions. Aggregation : Reducing the amount of data by grouping and summarizing it. On the other hand, examples of Input < Output include: JOIN (especially CROSS JOIN) JOIN involves combining data from different sources, which often results in an increase in data volume. While INNER JOIN returns only matching rows and may reduce data volume, CROSS JOIN generates all possible combinations, leading to a significant increase in output data volume. /* total rows: 20,000 */ select * from table1 -- 100 rows cross join table2; -- 200 rows UNION / UNION ALL : UNION, especially UNION ALL which does not exclude duplicates, combines the result sets vertically, often increasing the total data volume. /* total rows: 300 */ select column1, column2 from table1 -- 100 rows union all select column1, column2 from table2 -- 200 rows; Melting : Melting (un-pivoting) converts wide-format data into long-format data, often increasing the number of rows. It is often used in combination with UNION ALL in SQL. /* total rows: 300 */ select id, 'variable1' as variable, value1 as value from table1 -- returns 100 rows union all select id, 'variable2' as variable, value2 as value from table1 -- returns 100 rows union all select id, 'variable3' as variable, value3 as value from table1; -- returns 100 rows In our actual queries, we often use combinations of UNION ALL and the melt technique. In the following section, I will demonstrate an example for this using dbt-labs’ jaffle_shop_duckdb . Background Before we move on, I want to elaborate on why we often write queries that result in Input < Output. In some of the projects I am involved in, we calculate indices such as wage indices and consumption trend indices from job advertisements and transaction data, which requires complex calculations within our pipeline. HRog Wage Now : “Wage index” and “Job Postings Index” created from job advertisements JCB Consumption NOW : “Consumption Trend Index” created from credit card transaction data Since these processes were traditionally done in Python and ran on AWS ECS, our migration to Snowflake and dbt required the extensive use of complex dbt macros and UDFs. Hence the queries introduced in the next section may not necessarily be typical in standard dbt and Snowflake pipeline development. Example of Input < Output Queries with dbt Macros Side Note : The jaffle_shop_duckdb provided by dbt-labs is an amazing sandbox for dbt. The only requirement is Python installed on my machine, and now I have a mini dbt sandbox running locally powered by duckdb , which comes in handy when I want to test a new version of dbt release or just want to run some experiments. Now, let’s look at a simplified example of a query we actually run in our pipelines using dummy code. First, let’s say we have the following orders table as the input which has 99 rows. ( Source ) ┌──────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐ │ column_name │ column_type │ null │ key │ default │ extra │ │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ ├──────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤ │ order_id │ INTEGER │ YES │ │ │ │ │ customer_id │ INTEGER │ YES │ │ │ │ │ order_date │ DATE │ YES │ │ │ │ │ status │ VARCHAR │ YES │ │ │ │ │ credit_card_amount │ DOUBLE │ YES │ │ │ │ │ coupon_amount │ DOUBLE │ YES │ │ │ │ │ bank_transfer_amount │ DOUBLE │ YES │ │ │ │ │ gift_card_amount │ DOUBLE │ YES │ │ │ │ │ amount │ DOUBLE │ YES │ │ │ │ └──────────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘ ┌──────────┬─────────────┬────────────┬───────────┬────────────────────┬───────────────┬──────────────────────┬──────────────────┬────────┐ │ order_id │ customer_id │ order_date │ status │ credit_card_amount │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │ │ int32 │ int32 │ date │ varchar │ double │ double │ double │ double │ double │ ├──────────┼─────────────┼────────────┼───────────┼────────────────────┼───────────────┼──────────────────────┼──────────────────┼────────┤ │ 1 │ 1 │ 2018-01-01 │ returned │ 10.0 │ 0.0 │ 0.0 │ 0.0 │ 10.0 │ │ 2 │ 3 │ 2018-01-02 │ completed │ 20.0 │ 0.0 │ 0.0 │ 0.0 │ 20.0 │ │ 3 │ 94 │ 2018-01-04 │ completed │ 0.0 │ 1.0 │ 0.0 │ 0.0 │ 1.0 │ │ 4 │ 50 │ 2018-01-05 │ completed │ 0.0 │ 25.0 │ 0.0 │ 0.0 │ 25.0 │ │ 5 │ 64 │ 2018-01-05 │ completed │ 0.0 │ 0.0 │ 17.0 │ 0.0 │ 17.0 │ │ 6 │ 54 │ 2018-01-07 │ completed │ 6.0 │ 0.0 │ 0.0 │ 0.0 │ 6.0 │ │ 7 │ 88 │ 2018-01-09 │ completed │ 16.0 │ 0.0 │ 0.0 │ 0.0 │ 16.0 │ │ 8 │ 2 │ 2018-01-11 │ returned │ 23.0 │ 0.0 │ 0.0 │ 0.0 │ 23.0 │ │ 9 │ 53 │ 2018-01-12 │ completed │ 0.0 │ 0.0 │ 0.0 │ 23.0 │ 23.0 │ │ 10 │ 7 │ 2018-01-14 │ completed │ 0.0 │ 0.0 │ 26.0 │ 0.0 │ 26.0 │ ├──────────┴─────────────┴────────────┴───────────┴────────────────────┴───────────────┴──────────────────────┴──────────────────┴────────┤ │ 10 rows 9 columns │ └─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ In our pipeline, we use dbt macros quite extensively to perform processing which resembles the following: -- Outer loop {% set payment_types = [ "credit_card", "coupon", "gift_card", "bank_transfer", ] %} -- Inner loop {% set processors = [ "processor1", "processor2", "processor3", ] %} {% for payment_type in payment_types %} {% set col_name = payment_type + "_amount" %} {% for processor in processors %} select order_id, customer_id, order_date, status, amount as original_amount, -- Call some UDF (argument: processor) custom_udf.process("{{ processor }}") as processed_amount, '{{ payment_type }}' as payment_type from {{ ref("orders") }} where 1=1 and {{ col_name }} > 0 -- Add union all if not the last iteration of the loop {% if not loop.last %} union all {% endif %} {% endfor %} -- Add union all if not the last iteration of the loop {% if not loop.last %} union all {% endif %} {% endfor %} Although this query is solely for demonstration purposes and the resulting data means nothing, the results will look like this, which in our case is expected by the downstream processes: ┌──────────┬─────────────┬────────────┬────────────────┬─────────────────┬─────────────────┬───────────────┐ │ order_id │ customer_id │ order_date │ status │ original_amount │ processed_value │ payment_type │ │ int32 │ int32 │ date │ varchar │ double │ varchar │ varchar │ ├──────────┼─────────────┼────────────┼────────────────┼─────────────────┼─────────────────┼───────────────┤ │ 1 │ 1 │ 2018-01-01 │ returned │ 10.0 │ xxx │ credit_card │ │ 2 │ 3 │ 2018-01-02 │ completed │ 20.0 │ xxx │ credit_card │ │ 6 │ 54 │ 2018-01-07 │ completed │ 6.0 │ xxx │ credit_card │ │ 7 │ 88 │ 2018-01-09 │ completed │ 16.0 │ xxx │ credit_card │ │ 8 │ 2 │ 2018-01-11 │ returned │ 23.0 │ xxx │ credit_card │ . . . Lets break down what this made-up dbt model does: Create nested loops with payment_types and processors list defined using the {% set %} macro. Use each element of these lists as a column for payment_type and as an argument for the made-up Snowflake UDF , processor respectively. In the SELECT statement, filter the data for each corresponding payment_type. Finally, add UNION ALL to combine the result sets of each loop. This will increases the number of rows from the original 99 to around 320. As the input grows or we introduce more processor-payment type combinations, the data volume can easily expand. Our pipeline executes queries like the one above with large volumes of data. While it’s possible to use intermediate tables to ease the processing load of a single model, we often use models that extensively utilize macros as they simplify code management and refactoring by keeping everything in a single model file. We wanted to keep it that way as long as no performance issues occur (foreshadowing…). Issues Encountered in Snowflake and Queue Status The Actual Problem We Faced in Snowflake One day we encountered the following error while executing 4 concurrent queries in Snowflake using a dbt model which is similar to the one introduced in the previous section, Processing aborted due to error 300005:4035471279; incident xxxxxx. As mentioned in this Snowflake KB article , this error is categorized as a Snowflake Internal Error, indicating that the memory of the warehouse has been exhausted due to excessive memory usage. We suspected that the increase in data volume during the execution of dynamically generated queries might have led to unpredictable resource consumption pattern. At this point, potential responses include further query optimization (possibly by using intermediate tables) or reviewing our concurrency settings. However, as mentioned earlier, we decided to first explore if we could solve the issue without changing the current code. Query Profile Unavailable ! When there’s a problem with query execution, Snowflake’s Query Profile is a reliable tool. However, when we tried to open it, it resulted in a timeout and we couldn’t even display the profile. In Snowflake, if a query fails due to execution timeouts for example, you can usually view the execution plan and progress up to the point of failure. However, in our case, where a hardware failure was caused, it seems that the profile-saving process also got interrupted, rendering it unavailable. Duration Metrics and Queue Status With the Query Profile unavailable, we had to rely on other information, such as Duration which is available even in a case of an Internal Error. Upon examining the details, we found that, in addition to the usual Compilation and Execution, there were also Queued Overload and Queued Repair status. The definitions for those from the official docs are as follows QUEUED_OVERLOAD_TIME : This is the time (in milliseconds) the query spent in the warehouse queue, due to the warehouse being overloaded by the current query workload. This status indicates that the warehouse resources are overwhelmed, causing the query to be in a waiting state. QUEUED_PROVISIONING_TIME : The time (in milliseconds) a query waits in the warehouse queue for resources to be allocated, due to warehouse creation, resuming, or resizing. This status indicates that the query is waiting for resources to be allocated and is also commonly seen in normal executions. QUEUED_REPAIR_TIME : The duration (in milliseconds) that a query spends in the warehouse queue waiting for compute resources to be repaired. This situation arises when a faulty server is automatically replaced by a healthy one. This is a rare condition that can happen in the case of hardware failures. As described, this status is rarely seen and occurs when the server fails and is attempting to recover. As a test, we decided to run just 1 model instead of 4 to see if the queue issue would still occur and as a result we confirmed that running a single model did not lead toQUEUED_REPAIR_TIME or QUEUED_OVERLOAD_TIME. From this information, we deduced that the resource-intensive queries running concurrently might have been causing the warehouse resources to be overwhelmed. As each query dynamically increased the data volume, the warehouse resources became strained, leading to a situation where resources were being contended for by the queries. Another possible explanation is that the failure occurred because the resource allocation couldn’t keep up with the unpredictable pattern of volume increase. The Solution and Lessons Learned Solution The solution was quite simple: we reduced the number of concurrent queries. Considering that the original concurrent execution with 4 concurrent queries was failing, we ran benchmarks with the following combinations: Warehouse sizes : MEDIUM, LARGE, XLARGE Number of concurrent queries : 2, 3 Through these tests, we found that setting the concurrent query count to 2 prevented resource contention (QUEUED_REPAIR / OVERLOAD) regardless of the warehouse size. Additionally, we observed that increasing the warehouse size did not linearly reduce execution time. Considering cost efficiency, running with a MEDIUM warehouse size and 2 concurrent queries seemed to be the most reasonable setup. For our lessons learned, we initially implemented the process naively using a 2XLARGE warehouse size as we knew the query was quite resource-intensive. However, we later discovered that it was experiencing QUEUED_OVERLOAD, resulting in unnecessarily long execution times despite not wasting credits. By checking the queue status and benchmarking, we found that reducing the number of concurrent queries could actually shorten the total execution time, as it prevents resource contention, and led us to also reconsider the warehouse size. In this specific case we checked the queue status in the Duration section in the query history, but with MONITOR privileges on the warehouse, you can also track the warehouse load including the queue status over time from the Snowflake Admin tab. As we reduced the number of concurrent queries to 2 and changed the warehouse size from (naively chosen) 2XLARGE to MEDIUM, we were able to also reduce the overall cost of our pipeline to about one-seventh of what it was. (The following chart was generated in SELECT.DEV ) This experience taught us the importance of not only focusing on query optimization via the Query Profile but also paying attention to the queue state and warehouse load during development and monitoring. Going forward, we plan to incorporate queue monitoring into our routine to ensure more efficient query execution. Controlling Concurrency in Snowflake and dbt To conclude this article, I’d like to share a few methods for controlling concurrency in Snowflake and dbt. Snowflake In Snowflake, you can set and adjust the MAX_CONCURRENCY_LEVEL parameter for each warehouse. This can be done by running the following: ALTER WAREHOUSE MY_WAREHOUSE SET MAX_CONCURRENCY_LEVEL = 4; For existing warehouses, you can check the current settings with: SHOW PARAMETERS IN WAREHOUSE MY_WAREHOUSE; dbt In dbt, I will introduce two methods to control concurrency: 1. --threads CLI Argument When using the dbt CLI, you can specify the number of models to execute concurrently using the --threads option. For example, the following command limits the maximum number of concurrently executed models to 2, regardless of how many models could theoretically be run based on their dependencies: $ dbt run --threads 2 Using threads | dbt Developer Hub 2. Forcing Dependencies In dbt, dependencies between models are typically created using the ref macro. However, you can also manually create dependencies by adding comments to models, forcing them to run in a specific order. For example: -- depends_on: {{ ref('upstream_parent_model') }} select * from {{ ref("another_model") }} Forcing Dependencies -- depends_on example code We’re Hiring! Finatext Holdings is looking for new team members! We have various engineering positions available. エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス For data engineers: データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス If you have any questions or are interested, feel free to DM me on LinkedIn ! Optimizing Pipelines: Controlling Concurrency in Snowflake and dbt was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめに こんにちは! Nowcast (株式会社ナウキャスト, Finatextグループ)でデータエンジニアをしているマサ(Masahiro Fukui, Linkedin )と申します。求人広告データやクレジットカードの決済データをはじめとする様々なデータを処理するパイプラインとデータプロダクトの開発に携わってきました。 HRog賃金Now : 求人広告から作成される募集賃金指数や求人数指数 JCB消費NOW : クレジット決済データから作成される消費動向指数 弊社のデータ基盤の構成要素はSnowflake、dbtとAirflowで最近はdbtとSnowflakeを利用した開発が主な仕事になってきました。 本記事の要点を記載すると以下のような内容になります。 Snowflake x dbtの処理ではdbt macroで多くの処理を一つのモデル・クエリで行うことができて便利な一方で過剰なmacro使用はリソース消費の面から注意が必要になる リソース枯渇などWarehouseの異常に起因するエラーがSnowflakeで発生するとクエリプロファイルが表示できなくなることがあり、その際はQueueやWarehouseの状態を参照することも有効なケースがある dbtとSnowflakeにはそれぞれ並行数をコントロールする方法があるため、クエリの並行実行などでリソースの問題が起きる際は一つの解決策となり得る Snowflake×dbt×Terraformでモダンなデータ基盤開発してみた Nowcastにおける主なデータ基盤の構成 本記事ではSnowflakeとdbtを利用したパイプラインを利用する中で直面したパフォーマンス面の課題、特にクエリを同時に並行実行することで発生した課題、その解決にいたるまでの道を体験談として記載していきます。 また、課題に直面する中でWarehouseサイズの見直しや最適化にも着目したことにより、コストの削減にも繋がったので後ほどそちらにも触れていきます。 パフォーマンス面での課題解決といっても色々なケースが考えられますが、本記事で取り上げるテーマや課題は以下になります。 パイプラインにおけるSQL実行においてInputよりOutputのデータ量が大きくなるケースとそれにより起こり得るパフォーマンス面の課題 Snowflake Warehouseにおけるリソース過剰利用により(稀に)起きるQuery Profileすら表示ができないような状態 InputよりもOuputのデータ量が増えるクエリ 前述した通り、パイプラインの処理において処理前(Input)より後(Output)の方がテーブルのデータ量が増えるケースにより起きた課題が本記事のフォーカスになります。 一方で、大量の決済データなどビッグデータをソースとするパイプライン処理においては、データ量の推移として多くのケースでInput > Outputであることが多いのではないでしょうか。 その例として以下のような処理が良くあるかと思います データの選別 : 必要なカラムのみを選択し不要なデータを除外する。 フィルタリング : 特定の条件に合致するデータのみを抽出する 集約 : データをグループ化し集計することでデータの量を削減する 逆にInput < Outputになる例としては以下のような例があるかと思います JOIN (特にCROSS JOIN) JOIN はその名の通りデータの結合なのでデータ量が増えるケースが多いです。 INNER JOIN は両方のテーブルに存在するマッチする行のみを返すため、データ量が減少する場合もありますが、 CROSS JOIN はすべての組み合わせを生成するため出力データの量が大幅に増加することが見込まれます。 /* total rows: 20,000 */ select * from table1 -- 100 rows cross join table2; -- 200 rows UNION / UNION ALL UNION 、特に重複除外をしない UNION ALL は SELECT の結果セットを縦に結合するため最終的なデータ量としては増えることが多いです。 /* total rows: 300 */ select column1, column2 from table1 -- 100 rows union all select column1, column2 from table2 -- 200 rows; Melt データのピボット解除(Melt)はワイドフォーマットのデータをロングフォーマットに変換する処理となるため出力データの行数としては増加することが多いです。SQLでは UNION ALL などと組み合わせて使われる印象です。 /* total rows: 300 */ select id, 'variable1' as variable, value1 as value from table1 -- returns 100 rows union all select id, 'variable2' as variable, value2 as value from table1 -- returns 100 rows union all select id, 'variable3' as variable, value3 as value from table1; -- returns 100 rows 実際に我々が実行しているクエリでは上記の例のうち UNION ALL と Melt の組み合わせのようなものになり、以下でそれを模倣した例をdbt-labsの jaffle_shop_duckdb を用いて記載しています。 少し背景を紹介すると、自分の携わっているプロジェクトでは求人広告や決済データをもとに募集賃金指数や消費動向指数などを算出しており、複雑な数値の計算などをパイプラインで処理する必要があります。 伝統的にはPythonで開発されていた処理などをSnowflakeにて行っているため少し特殊な処理になることがしばしばあり、次項で紹介するようなクエリはdbt、Snowflakeのパイプライン開発においては一般的ではないかもしれません。 dbtマクロを利用したInput < Outputなクエリの例 余談: dbt公式が用意してくれている jaffle_shop_duckdb は、Pythonの環境さえあえればdbtと duckdb を使ってLocalで色々な実験ができる良い感じのSandboxです!自分はdbtの新バージョンなどが出た際などに新機能の挙動の確認などによく使っています。 それでは実際にパイプラインで実行しているクエリをダミーコードで簡易化した例としてみていきます。 まず以下の orders テーブルがInputとしてあったとします。( ソースコード ) ┌──────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐ │ column_name │ column_type │ null │ key │ default │ extra │ │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ ├──────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤ │ order_id │ INTEGER │ YES │ │ │ │ │ customer_id │ INTEGER │ YES │ │ │ │ │ order_date │ DATE │ YES │ │ │ │ │ status │ VARCHAR │ YES │ │ │ │ │ credit_card_amount │ DOUBLE │ YES │ │ │ │ │ coupon_amount │ DOUBLE │ YES │ │ │ │ │ bank_transfer_amount │ DOUBLE │ YES │ │ │ │ │ gift_card_amount │ DOUBLE │ YES │ │ │ │ │ amount │ DOUBLE │ YES │ │ │ │ └──────────────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘ ┌──────────┬─────────────┬────────────┬───────────┬────────────────────┬───────────────┬──────────────────────┬──────────────────┬────────┐ │ order_id │ customer_id │ order_date │ status │ credit_card_amount │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │ │ int32 │ int32 │ date │ varchar │ double │ double │ double │ double │ double │ ├──────────┼─────────────┼────────────┼───────────┼────────────────────┼───────────────┼──────────────────────┼──────────────────┼────────┤ │ 1 │ 1 │ 2018-01-01 │ returned │ 10.0 │ 0.0 │ 0.0 │ 0.0 │ 10.0 │ │ 2 │ 3 │ 2018-01-02 │ completed │ 20.0 │ 0.0 │ 0.0 │ 0.0 │ 20.0 │ │ 3 │ 94 │ 2018-01-04 │ completed │ 0.0 │ 1.0 │ 0.0 │ 0.0 │ 1.0 │ │ 4 │ 50 │ 2018-01-05 │ completed │ 0.0 │ 25.0 │ 0.0 │ 0.0 │ 25.0 │ │ 5 │ 64 │ 2018-01-05 │ completed │ 0.0 │ 0.0 │ 17.0 │ 0.0 │ 17.0 │ │ 6 │ 54 │ 2018-01-07 │ completed │ 6.0 │ 0.0 │ 0.0 │ 0.0 │ 6.0 │ │ 7 │ 88 │ 2018-01-09 │ completed │ 16.0 │ 0.0 │ 0.0 │ 0.0 │ 16.0 │ │ 8 │ 2 │ 2018-01-11 │ returned │ 23.0 │ 0.0 │ 0.0 │ 0.0 │ 23.0 │ │ 9 │ 53 │ 2018-01-12 │ completed │ 0.0 │ 0.0 │ 0.0 │ 23.0 │ 23.0 │ │ 10 │ 7 │ 2018-01-14 │ completed │ 0.0 │ 0.0 │ 26.0 │ 0.0 │ 26.0 │ ├──────────┴─────────────┴────────────┴───────────┴────────────────────┴───────────────┴──────────────────────┴──────────────────┴────────┤ │ 10 rows 9 columns │ └─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ こちらに対してdbtの Macro を駆使して以下のような処理を加えることをパイプラインの中ではしています。 -- 外側のloop {% set payment_types = [ "credit_card", "coupon", "gift_card", "bank_transfer", ] %} -- 内側のloop {% set processors = [ "processor1", "processor2", "processor3", ] %} {% for payment_type in payment_types %} {% set col_name = payment_type + "_amount" %} {% for processor in processors %} select order_id, customer_id, order_date, status, amount as original_amount, -- 架空のprocess UDFを呼ぶ (引数: processor) custom_udf.process("{{ processor }}") as processed_amount, '{{ payment_type }}' as payment_type from {{ ref("orders") }} where 1=1 and {{ col_name }} > 0 -- loopの最後でない場合はunion allを追加 {% if not loop.last %} union all {% endif %} {% endfor %} -- loopの最後でない場合はunion allを追加 {% if not loop.last %} union all {% endif %} {% endfor %} デモ用のクエリなのでデータに意味はないですが結果は以下の通り下流の処理にて想定されるフォーマットになっています。 ┌──────────┬─────────────┬────────────┬────────────────┬─────────────────┬─────────────────┬───────────────┐ │ order_id │ customer_id │ order_date │ status │ original_amount │ processed_value │ payment_type │ │ int32 │ int32 │ date │ varchar │ double │ varchar │ varchar │ ├──────────┼─────────────┼────────────┼────────────────┼─────────────────┼─────────────────┼───────────────┤ │ 1 │ 1 │ 2018-01-01 │ returned │ 10.0 │ xxx │ credit_card │ │ 2 │ 3 │ 2018-01-02 │ completed │ 20.0 │ xxx │ credit_card │ │ 6 │ 54 │ 2018-01-07 │ completed │ 6.0 │ xxx │ credit_card │ │ 7 │ 88 │ 2018-01-09 │ completed │ 16.0 │ xxx │ credit_card │ │ 8 │ 2 │ 2018-01-11 │ returned │ 23.0 │ xxx │ credit_card │ . . . こちらは説明のために作成した架空のdbtモデルですが少しブレークダウンすると以下のような処理になります。 payment_types と processors のリストを {% set %} マクロを用いて作成しNested Loopを作成する それぞれのリストの要素を payment_type のカラムと架空の Snowflake UDF への引数 processor に使用する SELECT においてはそれぞれの該当 payment_type にデータを絞る 最後に UNION ALL を加え各Loopの結果セットを結合 実際に行数としては、上記のような処理を加えると99行 -> 320行ほどに増えていきます。Inputや処理の種類が大きいとデータ量が容易に大きく膨らむことがわかります。 我々のパイプラインでは上記のようなクエリを大きなボリュームのデータを用いて行っています。 中間テーブルを上手く用いるなどをして1モデルの処理量を軽くすることも可能ではあるものの、1モデルファイルで済むことによるコードの管理・リファクタのし易さやなどから上記のようにJinjaを多用したモデルを多く利用していました。 実際にSnowflakeで起きた問題とQueueのステータス 直面したエラー パイプライン上で前項で紹介したようなdbtモデルをSnowflake上でクエリとして4つ並行して実行をしていたところある日実際に起きたエラーがこちらです。 Processing aborted due to error 300005:4035471279; incident xxxxxx. こちらのエラーは Snowflake KB記事 にもある通りSnowflakeのInternal Errorに区分され、メモリの過剰な利用によりWarehouseのメモリが使い尽くされていることを示唆しています。 前項で紹介したMacroで動的に生成されたクエリによる実行中のデータ量の増加などは、Query Optimizerがあったとしても予測不能なリソース消費をしてしまうのかもしれません。 この時点で対応としてはクエリのさらなる最適化 (場合によっては中間テーブルの利用)や並行して実行するクエリ数の見直しが考えられますが、前述の通り現在のコードを一旦変えないで解決できないかをまず探ることとしました。 表示されないクエリプロファイル 何かクエリ実行に問題があった際に頼りになるのがSnowflakeのクエリプロファイル(Query Profile)ですが、いざ見ようとしてみるとプロファイルのローディングにおいてTimeoutが発生し 表示すらできなくなっておりました 。 Snowflakeでは実行時間のTimeoutでクエリ実行が失敗するなどの際にはもちろんクエリ失敗までの実行プランと経過が確認できます。 一方で、推測するに今回のようなハードウェア障害で失敗してしまうケースではプロファイルの保存処理も中断されてしまい、結果的に表示ができなくなっているのではないかと思います。 DurationとQueueステータスの確認 さてクエリプロファイルも見れない中でどうしようか、というところですがクエリプロファイル以外で参照できる情報として Duration があります。 こちらInternal Errorが起きたとしても確認することができます。 実際にこちらの詳細を確認すると以下の通り通常の Compilation と Execution 以外にも Queued overload と Queued repair のステータスが存在しています。 ここで公式の Docs も参照にしつつ、Queueにおけるステータスの定義を確認すると QUEUED_OVERLOAD_TIME : This is the time (in milliseconds) the query spent in the warehouse queue, due to the warehouse being overloaded by the current query workload. Warehouseのリソースが逼迫しておりクエリ自体が実行待ちになってしまっている状態です。 QUEUED_PROVISIONING_TIME : The time (in milliseconds) a query waits in the warehouse queue for resources to be allocated, due to warehouse creation, resuming, or resizing. クエリがリソースが充てられるのを待っている状態であり正常な実行においても良くみられるステータスかと思います。 QUEUED_REPAIR_TIME : The duration (in milliseconds) that a query spends in the warehouse queue waiting for compute resources to be repaired. This situation arises when a faulty server is automatically replaced by a healthy one. This is a rare condition that can happen in the case of hardware failures. こちらは説明にもある通り稀に見られるステータスで、サーバーごとFailしてしまった際に復帰を試みている状態です。 ここでテストとして元々4つ並行して実行していたモデルを1つだけ実行し、その状態でもQueueの逼迫が発生するかを確認することにしました。 結果的には1つのモデル実行の場合 QUEUED_REPAIR_TIME や QUEUED_OVERLOAD_TIME のステータスにならず、スムーズにCompilation -> Executionと実行を完了できることが確認できました。 この情報から、元々リソース消費の激しいクエリが並行して同時に走っていることにより、各クエリで動的にデータ量が膨らむ中でWarehouseのリソースが逼迫し、リソース食い合いのような状況になっていたと推測しました。そして最終的にはリソース配分が回らずにFailしていることを疑いました。 解決策と学び 解決策としては非常にシンプルで並行して実行するクエリ数を減らす対応をしました。 その際にベンチマーキングとして元々並行数 4 にてFailしていることを踏まえ、以下の組み合わせにてクエリをそれぞれ実行してみました。 Warehouseサイズ: MEDIUM ・ LARGE ・ XLARGE 並行数: 2 ・ 3 試してみたところ我々のケースでは並行数 2 に設定をすることでWarehouseのサイズに関わらずリソースの食い合い( QUEUED_REPAIR / OVERLOAD )が起こらないことがわかりました。 加えてWarehouseのサイズ向上と実行時間の短縮は線形ではなく、コスト面も踏まえると MEDIUM を並行数 2 で回すことがリーズナブルでありそうなことがわかりました。 反省点としては、リソースを大量に使うからという理由で元々ナイーブに 2XLARGE を利用して実装していた処理に関して、実は QUEUED_OVERLOAD になっており、クレジットこそ無駄には消費されていないものの無駄に実行完了まで時間がかかっていた状態を作っていたことがあります。 実際には、Queueを確認し、ベンチマークをして並行数を減らした方が今回のように合計の実行時間が短くなるケースがあり、さらにWarehouseサイズの見直しにもつながりました。 今回はクエリ履歴のDurationからQueueの確認をしましたが、Warehouseの MONITOR 権限があればSnowflakeのAdminタブより時系列でQueueのステータスなどWarehouseのLoadをトラックすることができます。 またこのベンチマーキングを経て、並行数を 2 に減らし 2XLARGE のWarehouseサイズを MEDIUM に変更したことにより、結果的にパイプライン上の実行コストも約7分の1に抑えることができました。 (Chartは SELECT.DEV にて生成) 今後はSnowflakeとdbtにおける開発とワークロード監視において、クエリプロファイルを用いてクエリ自体の最適化に取り組むだけではなく、QueueやWarehouseの負荷状態にも注意を払っていこうと思わせてくれる経験になりました。 Snowflakeとdbtにおける並行実行のコントロール方法 並行数をコントロールしたい場合にSnowflakeとdbtそれぞれで方法があるため、そちらを最後に幾つかご紹介してこの記事を締めくくりたいと思います。 Snowflake SnowflakeにおいてはWarehouseごとに 最大並行レベル ( MAX_CONCURRENCY_LEVEL )をパラメータとして設定ができます。 設定は以下のように行います。 ALTER WAREHOUSE MY_WAREHOUSE SET MAX_CONCURRENCY_LEVEL = 4; また既存のWarehouseについて以下のように現在の設定を確認することができます。 SHOW PARAMETERS IN WAREHOUSE MY_WAREHOUSE; dbt dbtにおいてはここでは2つの方法を紹介します。 --threads CLI オプション dbtではCLIにて dbt run や dbt build を使用する際に --threads を使用することで並行して実行を試みるモデル実行数を指定することができます。 例えば以下のコマンドでは仮に依存関係上幾つでもモデルを実行できたとしても最大で並行実行されるモデルの数は2つになります。 $ dbt run --threads 2 Using threads | dbt Developer Hub モデルの強制依存 dbtにおいてはrefマクロなどで参照を作ることでモデル間の依存関係・DAGが生成されますが、以下のようにコメントをモデルに追加することで依存関係を人工的に作成することができます。 -- depends_on: {{ ref('upstream_parent_model') }} select * from {{ ref("another_model") }} Forcing Dependencies -- depends_on example code 仲間を募集中です! Finatextホールディングスでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス 何か気になることがあればお気軽に Linkedin などでDMお待ちしております! また、NowcastではDataOps Nightという勉強会イベントをシリーズで開催しており、7/10 (水) 19:00に第4回目が 「データプロダクト/分析基盤開発の舞台裏」 というタイトルで企画されています! DataOps Night #4 データプロダクト/分析基盤開発の舞台裏 (2024/07/10 19:00〜) データの基盤やデータを用いたプロダクト開発に関する様々な知見が得られる勉強会になっていますので、是非ご興味のある方は以下のリンクからご参加いただければと思います! Snowflake x dbt: クエリの並行実行で直面した課題と並行数のコントロール was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Unsplash の Kristina Tripkovic が撮影した写真 おはこんばんちは。 Finatextグループのクレジット事業でソフトウェアエンジニアをしている Hanake です。 ところで、皆様はローカルでのメール送信をどのようにテストされていますか? SMTP サーバを立ててテストしていますか?もしくは開発環境用にマネージドなメール送信サービスを利用していますか? いずれにせよ、本番との環境の差異やコスト面での課題があるかと思います。 今回は、 LocalStack と aws-ses-v2-local を使って可能な限り本番に近い環境で非同期のメール送信をローカルで再現する方法をご紹介します。 LocalStack とは LocalStack は、 Amazon Web Services(以下 AWS) のクラウドサービスをローカルでエミュレートするためのツールです。 AWS の主要サービスは ほぼ全てサポート されており、Docker Imageとして提供されているため、簡単にローカル環境で AWS のサービスを利用することができます。 一方で有料版と無料版があり、メール送信のマネージドサービスである Amazon Simple Email Service(以下 SES) のメール送信機能は有料版でしか利用することができません。 また Apache License 2.0 で提供されているため、無料版でも商用利用が可能です。 aws-ses-v2-local とは AWS の SES を使ったメール送信をローカルで再現するための LocalStack とは別のツールです。 Node.js で実装されており、 Nodemailer を ラップする形で実装がされています。 aws-ses-v2-local は npm package として提供されており、簡単にインストールすることができます。 また MIT License で提供されているため、こちらも商用利用も可能です。 ディレクトリ構成 されそれでは構成の説明に入っていこうかと思います。 今回の検証では以下のディレクトリ構成を想定しています。 . ├── build │ ├── api │ │ └── Dockerfile │ ├── aws_ses │ │ └── Dockerfile │ └── worker │ └── Dockerfile ├── cmd │ ├── api │ │ └── main.go │ └── worker │ └── main.go ├── docker-compose.yml ├── go.mod ├── go.sum ├── internal │ ├── client │ │ ├── ses │ │ │ └── client.go │ │ └── sqs │ │ └── client.go │ ├── config │ │ └── config.go │ └── job │ └── send_email.go └── scripts └── localstack └── init.sh Docker Compose まずは Docker Compose の構成ですが以下のようになっています。 version: "3.8" services: api: build: context: . dockerfile: build/api/Dockerfile platform: linux/amd64 container_name: ses_local_api env_file: - .env ports: - "8080:8080" worker: build: context: . dockerfile: build/worker/Dockerfile platform: linux/amd64 container_name: ses_local_worker env_file: - .env localstack: image: localstack/localstack:3.5 container_name: ses_local_localstack ports: - "127.0.0.1:4566:4566" - "127.0.0.1:4510-4559:4510-4559" environment: - DEBUG=${DEBUG:-0} volumes: - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack" - "/var/run/docker.sock:/var/run/docker.sock" - ./scripts/localstack:/etc/localstack/init/ready.d aws_ses: build: context: . dockerfile: build/aws_ses/Dockerfile container_name: ses_local_aws_ses ports: - "8005:8005" environment: AWS_SES_ACCOUNT: '{"SendQuota":{"Max24HourSend":1000000,"MaxSendRate":250,"SentLast24Hours":0}}' SMTP_TRANSPORT: '{"host":"smtp","port":25,"secure":false}' 大まかな流れは以下になります http://localhost:8080 をlistenしているapi コンテナにメール送信のリクエストを送信 api コンテナは localstack コンテナの Amazon Simple Queue Service(以下 SQS) にメッセージを送信 worker コンテナは SQS からメッセージを受信し、aws_sesコンテナ にメール送信をリクエストを送信 ブラウザで http://localhost:8005/ にアクセスして aws_sesコンテナ に送信されたメールを確認 各種コンテナの説明 各種コンテナの説明は以下の通りです。 apiコンテナ apiコンテナ は実際にメール送信を行うための API サーバコンテナです。 今回は Go にてメール送信をするだけの軽量な API サーバを実装しています。 実装 package main import ( "log/slog" "net/http" "github.com/kelseyhightower/envconfig" "github.com/khanake/ses_local/internal/client/sqs" "github.com/khanake/ses_local/internal/config" ) func enqueueEmailHandler(c config.Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } client := sqs.NewSQSClient(c) if err := client.Enqueue(r.Context(), "Send Email"); err != nil { slog.Error(err.Error()) w.WriteHeader(http.StatusInternalServerError) return } } } func main() { slog.Info("Starting server on port 8080") var c config.Config if err := envconfig.Process("", &c); err != nil { panic(err) } http.HandleFunc("/send_mail", enqueueEmailHandler(c)) http.ListenAndServe(":8080", nil) } 実際の業務でメール送信を実装される場合は、非同期でメール処理を行うことが多いかと思います。 そのため、今回の実装でも client.Enqueue の部分で AWS SQS にメッセージを送信しています。 Dockerfile FROM golang:1.22.4 WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 go build -o api cmd/api/main.go CMD ./api workerコンテナ 非同期処理を実行するための worker コンテナです。 api コンテナ と同様に Go にて SQS からメッセージを受信してメール送信を行うだけの軽量な worker を実装しています。 package main import ( "context" "log/slog" "time" "github.com/kelseyhightower/envconfig" "github.com/khanake/ses_local/internal/client/ses" "github.com/khanake/ses_local/internal/client/sqs" "github.com/khanake/ses_local/internal/config" "github.com/khanake/ses_local/internal/job" ) func main() { var c config.Config err := envconfig.Process("", &c) if err != nil { panic(err) } sqsClient := sqs.NewSQSClient(c) sesClient := ses.NewSESClient(c) sendEmailJob := job.NewSendEmailJob(sqsClient, sesClient, c) slog.Info("Worker is running") for { slog.Info("Fetching new jobs") ctx := context.Background() if err := sendEmailJob.Execute(ctx); err != nil { slog.Error(err.Error()) } time.Sleep(5 * time.Second) } } Dockerfile FROM golang:1.22.4 WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 go build -o worker cmd/worker/main.go CMD ./worker 実際には受け取ったメッセージ内容によって処理を分岐させる必要がありますが、今回はメール送信のみを行うようにしています。 localstackコンテナ LocalStack を動かすためのコンテナですが、 公式のサンプル からほぼ変更はありません。 変更点としては 必要な AWS リソースをコンテナ起動時に作成するために volumes に ./scripts/localstack:/etc/localstack/init/ready.d を追加しています。 これにより、コンテナ起動時に ./scripts/localstack/init.sh が実行され、必要なリソースが作成されます。(今回の場合は SQS のキューを作成しています) init.sh #!/bin/bash echo "localstack setup start" awslocal sqs create-queue\ --queue-name sample-task-queue aws_sesコンテナ aws-ses-v2-local を動かすためのコンテナです。 aws-ses-v2-local は npm パッケージ では配布されていますが、Docker Image は提供されていないため、自前でビルドする必要があります。 Dockerfile FROM node:18 WORKDIR /app RUN npm install -g aws-ses-v2-local ENTRYPOINT ["aws-ses-v2-local", "--host=0.0.0.0"] AWS Client の設定 Docker Compse により、 api コンテナは 8080 ポートで、localstackコンテナ は 4566 ポート、aws_sesコンテナ は 8005 ポートで立ち上がるようになりました。 ただ、検証環境や本番環境では実際の AWS に接続を行う必要があるため、開発環境のみ SQS を localstackコンテナ に接続するように、SES を aws_ses に接続するように設定を行います。 今回は環境変数での分岐を採用しました。設定値は以下になります。 AWS_SES_QUEUE_URL=http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sample-task-queue AWS_SES_SENDER_EMAIL_ADDRESS=test@test.com AWS_ACCESS_KEY_ID=test AWS_SECRET_ACCESS_KEY=test LOCAL_SES_ENABLED=1 LOCAL_SQS_ENABLED=1 LOCAL_SES_ENABLED は SES を aws_sesコンテナ に接続するかどうかを判定するための環境変数となり、 同様に LOCAL_SQS_ENABLED は SQS を localstackコンテナ に接続するかどうかを判定するための環境変数となります。 フラグが有効な場合にリクエスト先を分岐するために、以下のように SES Client を実装します。 package ses import ( "context" "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sesv2" "github.com/khanake/ses_local/internal/config" ) type SESClient struct { client *sesv2.Client } func NewSESClient(c config.Config) *SESClient { sdkConfig, err := aws_config.LoadDefaultConfig(context.Background()) if err != nil { panic(err) } client := sesv2.NewFromConfig(sdkConfig, func(o *sesv2.Options) { if c.LocalSESEnabled { o.BaseEndpoint = aws.String("http://aws_ses:8005") } }) return &SESClient{client} } func (s *SESClient) SendEmail(ctx context.Context, input *sesv2.SendEmailInput) (*sesv2.SendEmailOutput, error) { return s.client.SendEmail(ctx, input) } これにより、LOCAL_SES_ENABLED が有効な場合は http://aws_ses:8005 にリクエストを送信し、無効な場合は実際の AWS にリクエストを送信するようになります。 同様に SQS に対しても以下のように SQS Client を実装します。 package sqs import ( "context" "github.com/aws/aws-sdk-go-v2/aws" aws_config "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/khanake/ses_local/internal/config" ) type Cleanup func(context.Context) error type SQSClient struct { client *sqs.Client queueURL string } func NewSQSClient(c config.Config) *SQSClient { sdkConfig, err := aws_config.LoadDefaultConfig(context.Background()) if err != nil { panic(err) } client := sqs.NewFromConfig(sdkConfig, func(o *sqs.Options) { o.BaseEndpoint = aws.String("http://localstack:4566/") }) return &SQSClient{client, c.AWSSESQueueURL} } func (s *SQSClient) Enqueue(ctx context.Context, task string) error { _, err := s.client.SendMessage(ctx, &sqs.SendMessageInput{ MessageBody: aws.String(task), QueueUrl: aws.String(s.queueURL), }) if err != nil { return err } return nil } func (s *SQSClient) Dequeue(ctx context.Context) (*string, Cleanup, error) { var noop Cleanup = func(context.Context) error { return nil } resp, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(s.queueURL), MaxNumberOfMessages: 1, }) if err != nil { return nil, nil, err } if len(resp.Messages) == 0 { return nil, noop, nil } var cu Cleanup = func(ctx context.Context) error { _, err := s.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: aws.String(s.queueURL), ReceiptHandle: resp.Messages[0].ReceiptHandle, }) return err } return resp.Messages[0].Body, cu, nil } 上記の Client を使ってメールを送信する非同期 job は以下のように実装します。 package job import ( "context" "log/slog" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sesv2" "github.com/aws/aws-sdk-go-v2/service/sesv2/types" "github.com/khanake/ses_local/internal/client/ses" "github.com/khanake/ses_local/internal/client/sqs" "github.com/khanake/ses_local/internal/config" ) type sendEmailJob struct { sqsClient *sqs.SQSClient sesClient *ses.SESClient config config.Config } func NewSendEmailJob( sqsClient *sqs.SQSClient, sesClient *ses.SESClient, config config.Config, ) *sendEmailJob { return &sendEmailJob{ sqsClient: sqsClient, sesClient: sesClient, config: config, } } func (j *sendEmailJob) Execute(ctx context.Context) error { res, cleanup, err := j.sqsClient.Dequeue(ctx) if err != nil { return err } if res == nil { slog.Info("No jobs found") return nil } slog.Info("Recieved message", slog.String("message", *res)) if err := j.sendEmail(ctx); err != nil { return err } return cleanup(ctx) } func (j *sendEmailJob) sendEmail(ctx context.Context) error { input := &sesv2.SendEmailInput{ FromEmailAddress: aws.String(j.config.AWSSESSenderEmailAddress), Destination: &types.Destination{ ToAddresses: []string{"test@test.com"}, }, Content: &types.EmailContent{ Simple: &types.Message{ Subject: &types.Content{ Data: aws.String("Test email"), }, Body: &types.Body{ Text: &types.Content{ Data: aws.String("This is a test email"), }, }, }, }, } _, err := j.sesClient.SendEmail(ctx, input) if err != nil { return err } return nil } 実際には送信先や送信内容は DB や各種メールのテンプレートから取得するとはおもいますが、今回は固定値で送信しています。 動作確認 それでは実際に動作確認を行っていきます。 まずは以下のコマンドでコンテナを起動します。 docker-compose up メール送信のリクエストは http://localhost:8080/send_mail に POSTリクエストを送信することで行うことができます。 以下のコマンドでリクエストを送信します。 curl -X POST http://localhost:8080/send_mail すると、worker コンテナにて SQS からメッセージを受信し、aws_ses にメール送信のリクエストを送信します。 実際にメールが送信されたか aws_sesのweb UIにアクセスして確認するために http://localhost:8005 をブラウザで開きます。 まとめ 今回は LocalStack と aws-ses-v2-local を使って非同期のメール送信をローカルで再現する方法をご紹介しました。 昨今はパスワードレス認証の実装方法としてメール送信にリンクを埋め込むことも多いかと思いますが、こういった方法でローカル環境でのメール送信を再現することで、開発効率を向上させることができるかと思います。 仲間を募集中です! Finatext グループでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! 株式会社Finatextホールディングス の全ての求人一覧 サーバーサイドエンジニア(クレジット事業) - 株式会社Finatextホールディングス https://medium.com/media/690dbcea386e5161a6c8c1f482bee1d7/href LocalStackとaws-ses-v2-localを使って非同期のメール送信をローカルで再現する was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
こんにちは、プラットフォームチームの taiki45 です。この記事ではGitHubのorganization-wide workflowsを実現するソフトウェアである “orgu” を紹介します。 https://github.com/Finatext/orgu GitHub organizationの全リポジトリまたは特定の属性のリポジトリ群に特定のジョブを実行し続けたい GitHubが用意している “Ruleset workflows” は似たことを実現できるがEnterpriseプランが必要 “orgu” というOSSを開発して、AWS Lambda(他のプラットフォーム上でも可)上で動かすことにより、organization-wide workflowsを実現しました ちなみにorguの読み方は「オルグ」です。Organization-wide workflowsを連想させ、かつ短い名前ということで選びました。トルコ語で編み物という意味のようです。 Motivation 例えばsecrets scannningのようなCIジョブはGitHubの全リポジトリに対して実行を行いたいです。他にもGitHub Actionsのworkflowファイルのlinterやgosecのようなlinterなど、全リポジトリあるいは特定の属性のリポジトリ群に対してCIジョブを実行し続けたい、という要求は常にありました。 「ジョブ自体は reusable workflow を使って実装しつつ、テンプレートとなるリポジトリを用意してテンプレートに必須なジョブを入れておく」方法により部分的に解決を試みていたり、リポジトリ群に特定のファイルを同期するソフトウェアを開発して問題の解決を試みたりもしました。 しかし、Finatextはマルチドメイン・マルチプロダクトな会社であり、リポジトリ数は現在700を超えていて毎月二桁ペースでリポジトリ数が増えています。初期のアーキテクチャがマイクロサービスに寄りすぎていたためリポジトリ数が多かった過去はありますが、現在はその教訓を活かし必要程度の分散アーキテクチャを採用しています。それでも毎年40%という事業の急成長に従い開発するソフトウェアは増加しています。 マルチドメイン・マルチプロダクトであることの技術的なデメリットはありますが、そのおかげで高い成長率を維持できる強い事業ポートフォリオを作れています。つまり、この状況はテクノロジーで解決すべき問題です。 GitHub Ruleset workflows and post-ZIRP era 2023年1月にGitHubは “organization-wide required workflows” という機能の公開ベータを発表しました。 これはまさにFinatextが求めていた機能であり導入し便利に使っていました。そしてこのorganization-wide required workflowsの上でsecrets scanなど各種ジョブを動かす計画でジョブの開発を進めていました。 しかし、2023年8月に状況は変わります。Organization-wide required workflowsの公開ベータ卒業とともにorganization-wide required workflowsを “Repository Rules” という機能へ統合することを発表しました 。 Organization-wide required workflowsには「リポジトリを管轄する」ような側面があるので、そのような機能と統合することは理にかなっています。問題はGitHubのプランとの兼ね合いでした。Organization-wide required workflowsはTeamプランでも使える機能であり、なのでFinatextでも積極的に活用する予定で導入を進めていました。しかし、Repository Rules機能はEnterprise以上のプランのユーザーのみが使える機能で、FinatextはTeamプランなので、organization-wide required workflowsを使っていたジョブは10月に止まることになりました。 もちろん、GitHubはソフトウェア開発をする場としては最も適していると判断していて、積極的にGitHubの機能を活用して開発生産性を高めています。しかし費用対効果を冷静に判断することは重要です。特にこの post-ZIRPの時代 は、利益や調達した資金をどう使うかという意思決定はより重要になってきています。Finatextも例外ではなく、時代の流れを捉えた意思決定をし続けた結果、急成長の会社ながら昨年度黒字化を達成し、今年度も引き続き黒字化を見込んでいます。 Finatextでは、積極的にGitHubを活用していることもあり、希望するメンバー全員にGitHubにアクセスできるようにしています。そのため、ユーザーあたり月$4が月$21に変化することと、それで享受できるメリットを考慮した結果、今のところTeamプランを利用し続けることにしています。 以上のような背景・経緯で、organization-wide workflowsを実現するためのソフトウェアの開発を始めました。 Introducing orgu GitHubには Checks API という「GitHub Actions外でCIジョブを実行し結果をフィードバックする仕組み」があります。Checks APIについての予備知識があると読解がスムーズになると思います。 Architecture orguのアーキテクチャは、GitHubからのイベントを受け取る “orgu-front” と実際にCIジョブを実行する “orgu-runner” の2つに分かれています。デフォルトではAWS Lambda上で動かす想定で、Finatextでも実際にLambda上でジョブを実行しています。orgu自体はKubernetes上などLambda以外で動かす拡張ができるように設計・実装しています。 GitHub Appsのwebhookイベントを利用して “pull_request synchronize” や “check_suite requested” などのイベントをorgu-frontが受け取ります。orgu-frontはイベントを “Event Queue” に送ります。このEvent QueueはキューイングとファンアウトができるものならなんでもOKです。デフォルトではAmazon EventBridge Event BusをEvent Queueとして使っています。orgu-frontはイベントをキューしたことを GitHubのChecks API を使って開発者に伝えています。 Event Queueによってイベントはファンアウトされます。ファンアウトした先でイベントのフィルター処理をはさんでいます。これは、特定の属性のリポジトリに対してのみジョブを実行したい時に、他の関係ないリポジトリのイベントでorgu-runnerを起動するとコンピューティングリソースの無駄になるからです。デフォルトではAmazon EventBridgeの “event patterns” という機能を使って実現しています。また、Finatextではリポジトリの属性は管理はGitHubの “custom properties” という機能を使って行っていて、orguでもそれを使っています。 orgu-runnerとCIジョブの関係 Event Queueのフィルターを通過したイベントによってorgu-runnerのイベントループが実行されます。orgu-runnerはジョブを実行するリポジトリをテンポラリなディレクトリにクローンしてきて、その中でジョブの実行をします。orgu-runnerもGitHubのChecks APIを使ってジョブの起動や終了ステータスやログを開発者に伝えています。 orgu-runnerはジョブを実行する時に、一時的に有効な GitHub installation access token やリポジトリ名やpull request情報やリポジトリのcustom propertiesなどをジョブに環境変数経由で伝えます。ジョブは受け取った情報を利用してジョブを実行しつつ、受け取ったアクセストークンを利用してGitHubのAPIにジョブの実行結果を投げます。ジョブの実行結果を伝える方法は自由に選択できるようになっていますが、 reviewdog というソフトウェアがおすすめです。従来のようにpull requestにコメントする形でフィードバックもできますし、Checks APIを使ってより見やすくうるさくない方法でフィードバックすることもできます。 Secrets scanのようなジョブでは、GitHubのUI経由で開発者にフィードバックするだけでなく、 Opsgenie というアラート管理のサービスを使ってアラートを作成して、秘匿値となりえる値の検出から対応完了までをトラックできるようにしています。この辺りの「どのようにジョブの結果をアウトプットするか」はorguでは管理していないので、ジョブ毎に自由にカスタマイズできるようになっています。 orgu-runnerの構成についてもう少し。orgu-frontはorguが用意しているコンテナイメージを使って動かすことができますが、orgu-runnerのコンテナイメージは各CIジョブ毎にユニークなものを作成します。これはジョブが必要な依存をコンテナイメージ内に用意するためで、各コンテナイメージ内にorguのバイナリを入れて、コンテナ実行のエントリーポイントはorgu-runnerのコマンドになります。orgu-runnerのイベントループ内でCIジョブが起動して実行されるイメージです。言葉で説明するとわかりにくいですが、リポジトリの example/Dockerfile に実際の Dockerfile があるのでそれを見てもらうと理解が早いと思います。 User experience orguの使用イメージを掴んでもらうため、いくつかのスクリーンショットを貼ります。ここではsecrets scan という、リポジトリにプッシュされた秘匿値らしきものを検知するCIジョブを走らせてみます。 Status checksの様子。GitHub Actionsのワークフローと同じように、今はチェックマーク(✅)になっている箇所がジョブの実行中は黄色で実行中っぽいUIになります。 orgu-runnerがジョブの実行結果をフィードバックしているUIです。GitHubのChecks APIを使って開発者へフィードバックしています。このsecrets scanジョブでは実行対象のリポジトリだけでなく、secrets scanの設定を置いているリポジトリ(ここでは gitleaks-config)をチェックアウトする必要があるため、ジョブの内部で orgu checkout コマンドを使ってクローンしてきています。Status checksと同じく、ジョブの実行中はチェックマークが黄色になっていてグルグル動いていてジョブ実行中っぽいUIになります。ジョブが失敗した時はここで、ジョブの失敗を終了コードやログとともに開発者へ伝えます。 ちなみに、スクリーンショットからは見切れてますが、右上に “Re-run all checks” というリトライのためのボタンが用意されていて、ここからジョブを再実行できるようにもなっています。 CIジョブが特に問題を見つけなかった時は、上記のようなUIで開発者へジョブの実行だけフィードバックします。ジョブが、例えば秘匿値を見つけた場合など、なにか問題を見つけた時はジョブ自身がpull requestコメントやChecks APIを使って開発者へフィードバックします。 Checks APIではアノテーションを使ってフィードバックします。スクショではマークダウンがうまく適用できてないのとHTMLタグも残念な感じになってますが修正したので見逃してください… Checks APIを使った時のpull requestの “Checks” タブのUIです。ジョブの実行自体は成功に終わってるので、orgu-runnerが run-secrets-scan check経由で成功を報告しています。ジョブの中でChecks APIを使ったcheckが secrets-scan で、このcheckは秘匿値が見つかったので失敗を報告しています。 Pull requestコメントでフィードバックする場合のイメージです。アノテーションより良くも悪くも注目度があるので、ジョブによってはこの報告スタイルが向いてることもあります。 Getting Started Mediumだとコードブロックが見にくいと思うので、 リポジトリのREADME に用意しています。簡単に手元で動作させることもできるようになっているのでぜひ試してみてください。 Outside of AWS Lambda orguはAWS Lambda以外でも動くように設計をしています。その辺りは READMEに書いた ので気になった方は読んでみてください。 orguの実装 orguはRustで実装しています。特徴としては、gitコマンドを呼び出すのではなく libgit2 というライブラリを使ってgit周りのオペレーションをしているので、orgu-runnerのコンテナイメージにgitコマンドがなくても動くgitコマンドフリーになっているところがあります。git fetch中のログ出力もいい感じにカスタマイズできるのも利点です。欠点としては、libgit2のRustバインディングである git2 crate がかなりがんばってくれてはいるんですが、async対応なpure RustなGitライブラリが望まれますね… gitoxide ゴゴゴ 他はありきたりですが、 clap や tracing あたりのライブラリが強力で、言語自体の表現力が高いことやコンパイラが賢くて書きやすかったです。デファクトなGitHubクライアントライブラリが不在な気がするのでここだけどうにかしたいですね…ゴゴゴ おわりに Platform Engineering Kaigi 2024 というイベントで登壇する予定です。プラットフォームエンジニアリングの領域の技術的な取り組みについて喋る予定なので、興味があればぜひ聞きに来てください。このorguの話などおもしろいと思います! 引き続きおもしろいと思う情報発信をしていく予定なので、よければ筆者のXアカウントをフォローしてもらえるとうれしいです! https://x.com/taiki45 Finatextに興味を少しでも持っていただけたら、ぜひテックブログの他の記事や採用情報を覗いてみてください! https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href orgu: New OSS to implement GitHub organization-wide workflows was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Photo by Melanie Hughes on  Unsplash はじめに こんにちは、Finatextグループのナウキャストでデータエンジニアをしているけびん(X: @Kevinrobot34 )です。Snowflake と dbt そして Terraform を使ってデータ分析基盤の構築をするとともに、POSデータのパイプラインの開発・運用を行っています。 POSデータの弊社のパイプラインは Snowflake と dbt で実装されているのですが、POSのトランザクションデータは大きく、差分更新するために incremental model を使っていました。その際に爆発的なjoinによりパフォーマンスの問題が起きてしまいました。今回の記事ではその詳細と対策の話を書いていきます。 incremental model とは dbt は、例えば select 文を書いた hoge.sql というファイルを用意しておくと、そのselect 文を CTAS ( CREATE TABLE AS SELECT ) でラップしたSQLにコンパイルし、DWH上で簡単にELTを実行できるようにしてくれるツールです。 dbt は基本的にはテーブルを毎回洗い替えてしまうので、シンプルで分かりやすい一方、データ量が大きいテーブルだと毎回洗い替えするわけにもいきません。いわゆる差分更新をやりたくなるわけですが、dbt の incremental model を使えば実現できます。 is_incremental() というマクロを使うことで、初回の実行時には where 文を外しデータの全量をスキャンし、二回目以降の実行時には where 文により差分だけスキャンしてその分を更新する、というような仕組みです。 詳細は以下のドキュメントなどを参照してください。 Configure incremental models | dbt Developer Hub また弊社のエンジニアが dbt incremental model と冪等性に関する記事も書いておりますのでご覧ください! DBT Incremental Strategy and Idempotency POSデータの概要 Point Of Salesの略で、「いつ」「どこで」「何が」「いくつ」「いくらで」売れたかというデータです。これらの情報が全国のスーパーマーケットやドラッグストアから集められております。 弊社が取り扱っているPOSデータはヒストリーも長くデータの量が大きいので以下のような incremental model を dbt-snowflake を利用して実装していました。 {{ config( materialized="incremental", unique_key=["data_date"], cluster_by=["data_date"], incremental_strategy="delete+insert", ) }} select data_date::date as data_date, store_code, jan_code, sales::int as sales, quantity::int as quantity, from {{ source("pos", "transaction") }} {%- if is_incremental() %} where data_date >= dateadd( day, {{ var('time_lag') }}, (select max(data_date) as max_data_date from {{ this }}) ) {%- endif %} ある日付の購買データは、その後一定期間は訂正されることがあるため、更新されうる期間の分のデータは削除し、最新のデータをインサートするという delete+insert による incremental model として実装していました。 このSQLモデルを実行すると、当然初回実行時はテーブルを0から作成するので遅いのですが、二回目以降も非常に遅いことが分かりました。 この事象を改善するためには dbt がコンパイルした実際のクエリがどのようなもので、それがどのようにSnowflake上で実行されているかを理解するのが大事でした。 Incremental model の挙動 実際にコンパイルされたSQLと向き合ったり、dbtの実装を観察すると、delete+insert の incremental model の場合には以下のように3ステップで実行されていることがわかりました。 step1 — 差分に対応するテーブルを作成 最初のstepはシンプルで、sqlモデルのselect文をCTASでラップしたものを一時テーブルとして作成します。where文が含まれているので差分に対応したデータのみを含む一時テーブルとなります。 create or replace temporary table pos_transaction__dbt_tmp as ( select * from ( select data_date::date as data_date, store_code, jan_code, sales::int as sales, quantity::int as quantity from source_transaction where data_date >= dateadd( day, -8, (select max(data_date) as max_data_date from pos_transaction) ) ) order by (data_date) ); ※見やすさのためにDB名やスキーマ名を削除したり一部フォーマットしています。今後のSQLも同様です。 step2 — 差分テーブルと元テーブルを突き合わせ削除を実行 step1で用意した差分テーブルと元のテーブルを突き合わせます。このときのwhere文は unique_key の設定に従って実行されます。incremental_strategy が merge の際には unique_key はその名の通りユニークなキーとして利用されるのですが、 delete+insert の際にはただ単にこのstep2での突き合わせの条件となるというのがポイントです。 delete from pos_transaction using pos_transaction__dbt_tmp where ( pos_transaction__dbt_tmp.data_date = pos_transaction.data_date ); step3 — 増分テーブルをインサート step1で作った差分テーブルを元テーブルにインサートします。 insert into pos_transaction (“DATA_DATE”, “STORE_CODE”, “JAN_CODE”, “SALES”, “QUANTITY”) ( select “DATA_DATE”, “STORE_CODE”, “JAN_CODE”, “SALES”, “QUANTITY” from pos_transaction__dbt_tmp ); 実際のパフォーマンスの問題 Snowflake は Query History と Query Profile が非常に便利で、パフォーマンスの問題があったときの調査が非常にしやすいです。 https://medium.com/media/8a4ba4b1fd093f895e5d38d3896c99f3/href 今回も各 Step の Query Profile を眺めていたところ Step2のdeleteのところが以下の通り重いことがわかりました。 Joinにより行数が爆発的に増えていることがQuery Profileで確認できます。テストのために少ないデータでQueryを回していますが実データはもっと量が多いので爆発的なjoinは困ります。 こちらを見てわかる通り、join後の行数が非常に大きくなっています。2つのテーブルで data_date が同じデータは多数存在し、それらのデカルト積でデータが作られてしまっているというようなイメージですね。Snowflake のドキュメントでも「爆発結合(Exploding Joins)」として紹介されています。 クエリプロファイルを使用したクエリの分析 | Snowflake Documentation 元々やりたかったことは、修正される可能性のある範囲の data_date のデータを単に削除することです。 dbt-adapters の実装に deep dive joinの際に where で data_date のみが指定されてしまっているのが根本の原因です。そもそもこのクエリがどうやって生成されているか見てみましょう。dbtのコードとしては このあたり です。 {% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {% if unique_key %} {% if unique_key is sequence and unique_key is not string %} delete from {{target }} using {{ source }} where ( {% for key in unique_key %} {{ source }}.{{ key }} = {{ target }}.{{ key }} {{ "and " if not loop.last}} {% endfor %} {% if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {% endif %} ); {% else %} delete from {{ target }} where ( {{ unique_key }}) in ( select ({{ unique_key }}) from {{ source }} ) {%- if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {%- endif -%}; {% endif %} {% endif %} insert into {{ target }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ source }} ) {%- endmacro %} これを見ると unique_key がシークエンスで渡されていると step2 の where 句が unique_key を直接比較する形になっている一方、 unique_key を文字列で指定するとそのキーを in で比較する形式に変わることが分かります。 そこで以下のようにconfigにおける unique_key の指定の仕方を変更してみました。 -- before {{ config( materialized="incremental", unique_key=["data_date"], cluster_by=["data_date"], incremental_strategy="delete+insert", ) }} -- after {{ config( materialized="incremental", unique_key="data_date", cluster_by=["data_date"], incremental_strategy="delete+insert", ) }} この新しいconfigで再度クエリを実行したところ、step2のデータの削除のクエリが以下のように変更されました。 delete from pos_transaction where (data_date) in ( select (data_date) from pos_transaction__dbt_tmp ); これはまさに「更新されうる期間の分のデータは削除」というやりたかったことであり、また書き方も変わったことで爆発的なjoinもなくなりパフォーマンスも大幅に改善しました! Joinが圧倒的に軽くなっているのがわかります。 まとめ dbt-snowflake delete+insert の incremental model を実装する際には join の爆発に気をつけましょう! dbt は incremental model などの場合、意外と複雑なクエリを生成していたりするので、実際に実行されるクエリと向き合うことが大事です。dbtのコードを直接読むのも面白いのでおすすめです! Snowflake は Query Profile が便利なので、よく見てクエリの最適化に活かすのがおすすめです! 仲間を募集中です! Finatextホールディングスでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス 気になることがあれば気軽にXで @Kevinrobot34 にご連絡ください! References [CT-3493] [Bug] unique_key list incremental model has performance issues on the delete phase · Issue #150 · dbt-labs/dbt-adapters Improve performance of `delete+insert` incremental strategy by ataft · Pull Request #151 · dbt-labs/dbt-adapters About incremental strategy | dbt Developer Hub dbt-snowflake で delete+insert の incremental model を実装する際には join の爆発に気をつけよう was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
こんにちは、ナウキャストのデータ&AIソリューションチームの 藤井 です。今回は、先日開催した、業務改善を目的とした社内AIアイデアソンのレポートを書いていこうと思います! 開催までの経緯 Finatextグループは、生成AIに関連したソリューションを提供するデータ&AIソリューション事業を立ち上げ、生成AIを次の競争力の源泉にしようとしています。社内でも生成AI活用が進んでいて、社内専用のGPTチャットやSlackと連携した議事録要約システム等がリリースされています。また、全エンジニアがGitHub Copilotを使えるようになっております。 僕も含めたエンジニアは業務において生成AIをフル活用しており、GPTにSQL文を書かせたり、コーディングの際にもGitHub Copilotを活用してます。 しかし、エンジニアが生成AIを活用している一方で、ビジネスサイドでも生成AIをもっと活用する余地があると感じました。新しい技術に対する感度や抵抗感は人や職種によって異なり、社内情報の取り扱いに対する心理的ハードルや「どのように使えばよいかわからない」という理由で躊躇している人もいるのではないかと思います。そこで、「生成AIへの抵抗感をなくすために、アイデアソンを開催したら面白いのではないか!」と思い、Slackのtimes(注:個人が自由に呟けるチャンネル)にそのような旨を投稿したところ、グループCEOの林が「やりなよ!賞金出すよ!」と言ってくださり、開催に至りました。 ⁠開催概要 今回のアイデアソンのテーマはタイトルにもある通り、「生成AIを用いた業務改善」であり、3つの大きな評価観点と3つの賞を用意し、それらに基づいてアイデアや発表内容を考えてもらいました。 ⁠評価観点 評価観点として、 効率化対象の業務が生成AIに適した課題か システム化の実現可能性 効率化効果の大きさ の3つを設定し、これらの要素を発表に盛り込んでもらいました。 運営メンバーで話し合った結果、生成AIを用いたシステムを構築する際に最低限考慮する観点が上記だと考えました。①の観点が欠けていると既存のシステムでの対応で十分、②の観点が欠けていると夢物語、③の観点が欠けていると実装する意味がない、となります。 賞の一覧 賞としては、以下の3つを用意しました。 最優秀賞 ・上記の3つの観点で総合的に最も良いと判断されたチームに授与 ⁠優秀賞 ・上記の3つの観点で総合的に二番目に良いと判断されたチームに授与 イノベーション賞 ・創造的で革新的なアイデアを出したチームに授与 これらの賞を勝ち取るために、各チームにアイデアを考えてもらいました! プログラム構成 今回のアイデアソンは①アイデアソン②予選会③決勝戦④結果発表会の4部構成になっております。それぞれの様子を写真と一緒にお見せいたします! アイデアソン アイデアソンでは、参加者の方々に一箇所に集まってもらい、各チームに分かれて実際のアイデアを考えてもらいました。実際に考えてもらう前に、運営メンバーから生成AIに関する簡単な解説を行った上で、会場内にデータ&AIソリューション事業部の数名をアドバイザーとして配置し、生成AIに触れたことがない人でもスムーズにアイデアを出せるように工夫しました。 オフライン参加の方々で集合写真を撮影しました! 各チームが真剣に話し合っています ⁠予選会 当初の企画段階では予選会は無かったのですが、想定よりもかなり多くの方にご参加いただけたため、急遽予選会を企画しました。ここでは、17チームから決勝戦に進むことができる7チームに絞りました。 ナウキャストCEOの辻中と運営メンバーで評価をしたのですが、どのチームもクオリティが高く、かなり迷いました。正直、抽象的なふわっとしたアイデアが多く出るかと危惧していたのですが、どのチームも業務上のボトルネックを見極めたうえで、生成AIを具体的にどこに用いるのか、そのフィジビリティはあるのかをしっかりと検証しておりました。この時点で、「レベル高すぎるだろ!」と思いました。 各チームが発表している様子 ⁠決勝戦 予選会で勝ち残ったチームに審査員の前で発表してもらいました。審査員は、グループCEOの林、フィンテックソリューション事業責任者の木下、弊社のアマゾン ウェブ サービス(AWS)担当の方々の計4名でした。 実は、企画の過程でAWS様に協賛いただけることになり、AWS担当者の方々に決勝戦の審査員としてご参加いただくとともに、AWS賞をご用意いただけることになっていました。 決勝戦に進んだチームの発表が予選会よりも洗練されており、林から繰り出される厳しい(?)質問にもスムーズに回答していたのが印象的でした。 決勝戦の様子。アイデアソンに参加していないメンバーも見に来てくれました! ユニフォームを着て、チームで一致団結して発表してます! 厳しい質問をするグループCEOの林 ⁠結果発表会 待ちに待った結果発表会は、四半期に一度全社で行われるメガタウンホールの締めで行いました。どのチームが受賞したかは運営メンバーにも知らされておらず、審査員長の林が結果が書かれている封筒を司会である僕に渡し、それを読み上げるというライブ感満載な形で進みました。 結果は以下のとおりです! 最優秀賞&AWS優秀賞 ハイテク忍者 保険業の損害査定業務(保険会社が保険契約者から保険金請求を受けて行う一連の作業)の効率化アイデアを発表してもらいました。約款を検索するRAGや複数メディア(画像等)を扱うためのマルチモーダルを組み合わせたレベルの高いアイデアでした。 優秀賞 荒岡さんを一躍有名にする会 証券業の広告審査や営業考査といったコンプライアンス領域の生成AIでの効率化アイデアを発表してもらいました。プロンプトを工夫していて、業務への深い知識と技術を組み合わせた⁠素晴らしい発表でした イノベーション賞 チームサウナ部 入社後のメンバーの活躍を実現するためのエンゲージメントツールのアイデアを発表してもらいました。Slackの投稿頻度や各人が社会wikiで作成している自己管理シートを分析し、個人に対してアドバイスを行うというユニークなアイデアでした AWS最優秀賞 チームかじゅまる 貸金業の審査業務の効率化アイデアを発表してもらいました。複雑な審査業務をユースケースごとに分割し、それらに対して生成AIでの処理を行うというフローが丁寧に作り込まれていました。また、業務改善効果も緻密に計算されていました 最後に 今回、AIアイデアソンの企画運営を通じて感じたことは3つあります。 1つ目は、Finatextグループのスピード感です。僕が特に何も考えず軽く言ったことが実際に形になるまでがとても早く、それに対してリーダーポジションの方々含めて快くご協力いただいたのが印象的でした。 2つ目は、メンバーのレベルの高さです。上記でも記載している通り、アイデアや発表のレベルがとても高く、とても驚きました。正直、ビジネスサイドでここまで生成AIを活用/理解できている組織はなかなか無いのではと思います。 3つ目は、生成AIの組織浸透に貢献できたことです。結果発表後、参加チームが自主的に自チームのアイデアの実装を進めていたり、活用方法を議論していたりと、生成AIが組織に浸透しつつあることを実感してます。(動作検証のために社内GPTチャットに新しい機能を追加してくれ!という声もあり、実装スピードを上げる必要があるなとも感じてます笑) 以上のように、Finatextの魅力を再確認しながら、更なるレベルアップに貢献することができた、とても楽しい企画運営でした! なお、イノベーション賞を受賞したチームサウナ部の岩本さんと、最優秀賞を受賞したハイテク忍者の高橋さんも、それぞれ記事を公開する予定ですのでぜひお楽しみに! イベントのお知らせ 生成AIやデータ利活用に関するイベントを開催します。ぜひご参加ください! 【LT大会】生成AI・LLMを活用した業務効率化・課題解決について語る会 (2024/06/28 19:00〜) DataOps Night #4 データプロダクト/分析基盤開発の舞台裏 (2024/07/10 19:00〜) 仲間を募集中です! Finatextグループでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス 生成AIを用いた業務改善アイデアソンを開催しました was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Background I’m Todd, a data engineer at Nowcast primarily working on data onboarding. In this tech blog I’ll give a brief history of ETL pipeline design at Nowcast, illustrate an issue we encountered between Airflow and DBT’s “Incremental Models”, and introduce the solution that we have developed. ETL in Python Historically at Nowcast, ETL pipelines were written using Python — the pipeline consisted of many Python scripts which applied transformations to data that was stored in AWS S3, Athena, RDBMS etc. We built a docker image containing these scripts, uploaded it to ECR and then invoked ECS tasks from Airflow. These scripts were generally designed to be idempotent, with some partition field such as the date being a parameter — pass in 2024–01–01 and the data from 2024–01–01 would be processed. When calling one of these scripts, the command actually being run would look something like below — with the date parameter being managed by Airflow: python transform_data.py 2024-01-01 --some --other --arguments Airflow Airflow is a scheduling and workflow management tool that has been in use at Nowcast for many years now — it is basically being used for 2 things: 1. task scheduler 2. task dependency management Historically, Airflow runs every day, and passes an ‘execution date’ parameter into a number of different Python scripts which process the data. If there is a problem, or we need to run something historically we just re-run historic tasks on Airflow DAGs. For instance if some data transformation script on 2024–01–01 fails, we can just rerun that script again once we have identified and repaired the issue. Keep note that this is only possible because of the fact that the scripts process one partition at a time, and take in the date as a parameter. ETL in DBT Around the end of 2022 we began to migrate our Python ETL flows to Snowflake, as it results in much faster, cheaper and cleaner pipelines. We decided to use DBT as our pipeline runner — DBT is a layer than sits on top of SQL that includes many features such as DB model definition, templating, dependency management and data regression testing. It has been a very useful tool for building our ETL pipelines quickly and efficiently. In Python each transformation in the pipeline is a script — whereas in DBT it is a templated SQL CTAS query — these are very easy to read compared to a complex script that is sometimes made up of thousands of lines of code in several different files. Best practise in DBT is to use Incremental Models: Incremental Models Incremental models are an efficient way of defining how to (incrementally) add data to our SQL models — consider we have a table that describes credit card transactions — we can make a DBT model (CTAS) that looks something like this: {{ config( materialized="table", ) }} select transaction_id, transaction_date, user_id, store_name_description, transaction_amount from {{ ref('external_table_transaction') }} This will create a table that loads transaction data from the table external_table_transaction. The problem is that every time we rerun this query it will reload the entire table — the more data in the table the slower and more expensive our query becomes — the solution to the problem is to use incremental models: {{ config( materialized="incremental", unique_key=["transaction_id"], incremental_strategy="delete+insert", ) }} select transaction_id, transaction_date, user_id, store_name_description, transaction_amount from {{ ref('external_table_transaction') }} {%- if is_incremental() %} where transaction_date = (select max(transaction_date) + 1 as next_date from {{ this }}) {%- endif %} Here we can see some of the macro/templating functionality that makes DBT so powerful — what we are now basically saying is that we should only load transaction data from external_table_transactionwhere the transaction_date is 1 day greater than the latest data in our table — it is simple and powerful. Instead of dealing with potentially billions of rows of data that grows larger with each update, we now only need to deal with the previously unseen rows — and if we need to, we still have the option of reloading the table in a full refresh. The Problem Incremental models are very appealing — they are mathematically beautiful and in cases where we have to deal with a stream of data they work very well. The problem arises when we need to control which data is being processed — the incremental model is not capable of dealing with rerunning only a specific partition — instead it loads data based on the incremental model’s rule. Perhaps in theory this is not a problem because if the incremental model is run in an ideal environment, all data will be loaded exactly once — but reality is messy — DAGs break, data is delivered late or in some cases not at all, and sometimes we need to re-load historical records. In addition, if the Airflow pipeline fails for whatever reason, the DBT jobs can become de-synced from the Airflow runs. Below is a list of issues related to incremental models that we have experienced at Nowcast since migrating to DBT: • One pipeline had some repairs which were backdated 2 years, we needed to load historical data, which needed to be done in an adhoc way because the (incremental) data pipeline cannot handle historical reruns • Another DAG broke for 3 days due to an upstream issue, no data was loaded for 3 days, and when the DAG ran on day 4, it was loading data from day 1 — in other words it had gone out of sync. • A third pipeline had an upstream skip day (day with missing data), and our incremental model tried to load data by adding `1` to the max date in the data — but this date never showed up, so the data was never loaded, resulting in manual intervention being required. But we can’t simply abandon incremental models — some pipelines have tens of billions of rows to deal with, so writing a query to deal with the table as a bulk will be slow and expensive. Why idempotency and partitioning matter The key issue with incremental models is that they are not idempotent, and cannot be configured to run against a specific partition. The old approach we took to ETL pipelines had a number of idempotent scripts that we could rerun as many times as we wanted. If there was a problem with historic data we could always just regenerate some specific partitions — and because the scripts were idempotent we could run a given day multiple times and it would not cause any problems. Incremental models do not have the ability to rerun specific partitions of the data — instead they treat all data like a stream, only loading unseen data — basically loading data that satisfies a specific rule, rather than a specific partition of the data. As we use Airflow — a scheduling tool — our data pipeline is basically needs to conform with some kind of temporal partitioning — it could be hourly, daily, weekly, monthly — the important part is that Airflow is running on some schedule. If we rerun a historical Airflow task — we expect it to run against the temporal partition corresponding to the job that we have invoked — but an incremental model will only ever ‘look ahead’, instead of being configured against a historic partition, which is what we expect when we run jobs on Airflow. Lets consider 2 Airflow DAGs that run daily — one DAG has jobs that take a date as parameter, and run only that partition when invoked. The other DAG uses incremental models, and processes unseen data when we run it. When both DAGs are running normally — processing previously unseen daily data, they will behave identically, but what happens when we run into a problem when we need to rerun a historic date — for instance 2024–01–01 needs reloading? The partitioned DAG would work as expected — rerunning 2024–01–01, but the incremental model would simply load previously unseen data regardless of which date is being passed into Airflow. As a commenter on the the limits of incrementally puts it: I’ve always thought that partitioning should be a feature of dbt. The nice thing about it is that you can compose models that are idempotent, which is preferable to an incremental strategy In short — when we are using a scheduling tool like Airflow that expects a temporal partition — incremental models do not work well. The Solution The solution is simple — we can use DBT variables — and we also don’t need to fully throw out the functionality of incremental models. We can add one or more variables to explicitly run against one or more partitions: {%- set target_date = var("target_date", "") %} {{ config( materialized="incremental", unique_key=["transaction_id"], incremental_strategy="delete+insert", ) }} select transaction_id, transaction_date, user_id, store_name_description, transaction_amount from {{ ref('external_table_transaction') }} {%- if target_date != "" %} where transaction_date = '{{ target_date }}' {%- else %} {%- if is_incremental() %} where transaction_date = (select max(transaction_date) + 1 as next_date from {{ this }}) {%- endif %} {%- endif %} This adds a new parameter to the DBT model called `target_date`. If `target_date` is undefined then the model will run with incremental behavior — but if the variable is passed in, it will instead run against the specified partition. This way of structuring the models works much better when called from Airflow. In addition, this model has now become idempotent — assuming the source data is the same, we can run the same query with the same parameters and expect to get the same result — whereas with the incremental model, the data that is loaded depends on the contents of the table, and changes that have occured upstream. This solution effectively gives us 3 modes: Bulk, Incremental and Partitioned — so it plays well with both Airflow and DBTs intended incremental strategy if we choose to use it. By running DBT with no arguments like below, it would use an incremental model: dbt run --select my_model Explicitly running a refresh will cause a bulk load: dbt run --select my_model --full-refresh And passing in the target_date parameter we added will cause it to run only against a specific partition: dbt run --select my_model --vars "{target_date : '2024-01-01'}" Now we are back at a command where Airflow is able to control the date parameter that’s being passed in — allowing for a much smoother integration! References The following documents were used when researching this problem: DBT — On the limits of incrementality Medium — Idempotent data pipeline with DBT and Airflow Engineering At Nowcast If you want to know about building data pipelines in DBT, feel free to schedule a casual meeting using the link below — in the ‘message to us’ section you can write ‘I want to talk to Todd’. 【共通】カジュアル面談 - 株式会社Finatextホールディングス Nowcast is currently hiring data engineers! If you are interested, you can apply here: データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス DBT Incremental Strategy and Idempotency was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
こんにちは、プラットフォームチームの Taiki です。 Finatextのプラットフォームチームでは、最近GoだけでなくRustも用いているので、Rust関連のブログ記事やZenn本を書きました。ぜひご覧ください! https://medium.com/media/8e8e799d90c074249d5d00d189ca025d/href https://medium.com/media/d6cc252e92f63973065eed666aabde23/href https://medium.com/media/93da62a7168749b8109f5b7e865c72db/href 特に実用Rust本はたくさんの方に反応いただき、Zenn本のトレンド入りしたり公式アカウントにご紹介いただきました。がんばって書いたので著者として嬉しい限りですありがとうございます! Zenn公式 on Twitter: "📖 こちらの本は無料で読めます 🔥実用Rustアプリケーション開発by @taiki45https://t.co/W4lQ58L83T / Twitter" 📖 こちらの本は無料で読めます 🔥実用Rustアプリケーション開発by @taiki45https://t.co/W4lQ58L83T Finatextに興味を少しでも持っていただけたら、ぜひテックブログの他の記事や採用情報を覗いてみてください!とてもいい会社です!🥳 https://medium.com/media/0e21039193734ed8b6607f59dbc55bbc/href Rust関連のアウトプットの紹介 was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
Photo by Aron Visuals on  Unsplash こんにちは、Finatextの @s_tajima です。 今回は、社内向けに書いた、「システムのエントロピーをコントロールすることの大切さ」という記事を、社外向けにも公開します。「Finatextは、こんな考えのもとにシステムのアーキテクチャを決めているのか」という参考にしていただけると嬉しいです! この記事における「システムのエントロピー」とは、「システムの無秩序さ・乱雑さ」を表現しています。 組織全体で技術スタックが統一されていれば「エントロピーは低い」、バラバラな技術が採用されていれば「エントロピーは高い」と言えます。 システムのエントロピーが低ければ低いほど、環境の把握は簡単で、改修のハードルが低く、自動化等もしやすいです。つまりアジリティが上げやすいのです。 逆に、エントロピーが高くなると、システムの理解に時間がかかり、自動化や改善のハードルや手間が高まり、アジリティは下がります。 巨大で歴史のある金融機関やシステム開発会社と比較されることの多い弊社にとって、アジリティは競争優位性の要です。 そんなアジリティを維持するためにも、基本的にはエントロピーが低い状態を保つことは非常に重要です。 熱力学においては、「エントロピーが自然に低くなることはない」というエントロピー増大の法則がありますが、システムのエントロピーも同じように増大の法則があります。組織の規模拡大、新しい機能やサービスの開発、技術トレンドの変化等は、システムのエントロピーを増大させる要因になります。 特に、成長企業においてはこの増大の勢いは必然的に大きいものになりがちです。 関連する話として、 KISSの原則 (Keep It Simple, Stupid) という有名な格言がありますが、Simple(≒ エントロピーが低い状態)を維持するのは言うほど簡単ではないのです。 システムのエントロピーの増大は避けられませんが、“増大のペース” はある程度コントロールすることができます。つまり、エントロピーの増大を “どれだけ遅らせられるか” というのが重要なテーマになります。 これは、自分が新しい機能を開発したり、新しい技術を採用するときに、 それがどの程度のエントロピー増大をもたらすか エントロピーの増大を減らす方法がないか といった要素を考えましょうということです。 一方で、”基本的には” と書いた通り、エントロピーは低ければ低いほど良いかというと、そうではない場合があります。 エントロピーの増加を避けるために新しい技術を採用できず、生産性の低い古のソフトウェアだけを使い続けることになる エントロピーを低くすることにこだわりすぎるあまり、技術選定の自由度を下げ、エンジニアのモチベーションを下げる というように、エントロピーを低く保てても、それ以外の弊害が発生することはあります。 よって、 エントロピー増大に見合うリターンがあるか エントロピーを低く保てる代わりに他のリスクを生んでいないか を考慮することも大切です。 新しい技術を採用しつつ、エントロピーを低く保つことも不可能ではありません。「新しいシステムを導入するときに、古いシステムを退役させること」はその1つの手法です。 例えば、 「ライブラリAが使いにくいから、ライブラリBを使い始めました。新しいところからライブラリBを使います。」 「ライブラリAが使いにくいので、すべてをライブラリBで置き換えます。」 という2つのパターンだと、後者を選択できれば 新しい技術を使いつつ、エントロピーへの影響も小さく留められます。 「新しいシステムを導入すること」ができる人は実は世の中にそれなりに多くいます。 一方で、「古いシステムの退役もさせられる人」というのは実はそれほど多くないと感じています。 うちで働くエンジニアは、後者も含めてやれる力をつけてほしいと思っています。 また、熱力学におけるエントロピー増大の法則は、「いかなる状況においてもエントロピーが下がらない」というものではなく、「自然には下がらない( ≒ 閉じた系であること)」という前提条件があります。同じように、システムのエントロピーも、「自然には下がらない」だけで、例えばサービスの機能開発を停止してリファクタリングに専念するなどの「特別な対応」によって下げることが可能なケースはあります。しかし、こういった「特別な対応」は事業の成長にブレーキをかけるものであることが多く、できる限り避けるべきものと考えています。 ここまでの話に反して、Finatextグループは根本的なところでは「エントロピーを爆発的に増大させる方針」を採用しています。しかし個人的には、今のところこの方針は「エントロピー増大に見合う大きなメリットがある」と考えています。 1つめの方針は、 “マルチドメイン・マルチプロダクトな会社であること” です。これは、1つの事業で1つのプロダクトを運営する会社と比べると、圧倒的に強くエントロピーを増大させる力が働きます。 そんなリスクを負ってでもこの意思決定をしているお陰で、現在も高い成長を維持できる事業ポートフォリオが作れているというメリットがあります。 システムのエントロピーが低いシングルプロダクトスタートアップが、一定の事業成長の後に踊り場を迎えて苦労している事例にも心当たりがあります。 2つめの方針は、”技術的な意思決定の権限をそれぞれのチームに大きく委譲していること” です。 これにより、当然ながら中央集権的な意思決定と比べてエントロピーが増大しやすくなりますが、 意思決定のボトルネックが生まれるのを防ぎ、エンジニアがプロダクトに対して Jibungoto を感じやすくなるメリットがあると考えています。 そんな方針を持ちつつも、過剰なエントロピーの増大を防ぐために、 Platform Engineeringを採用 したり、意思決定の合理性に関する説明責任を求めるといった構造を作っています。 この話に絡めると、 エントロピーの増大具合が大きければ大きいほどその説明責任は大きくなります。 まとめとしては、システムのエントロピーを意識し、増大のペースをうまくコントロールすることは、会社としての競争優位性を高めることになるので、Finatextではそんな視点を意識した意思決定を心がけているというお話でした。 もし興味をもっていただけた方がいたら、ぜひX ( @s_tajima ) などでご連絡ください! システムのエントロピーをコントロールすることの大切さ was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター
はじめに こんにちは、Finatextグループのナウキャストでデータエンジニアをしているけびん(X: @Kevinrobot34 )です。会社の様々な開発基盤を作るチームに所属していて、データエンジニアとしてパイプラインを開発しつつ、様々なクラウドを便利に利用できる仕組みを作る(CCoE的な)仕事を担当しています。 ところで、皆さんはLLMを利用していますか? 弊社でも生成AIが関連しているプロダクトを日々利用したり、開発したりしています。 https://nowcast.co.jp/news/20240419/ 昨今各種クラウドが生成AIに関するサービスやAPIなどをリリースしていますが、やはり Azure は Azure OpenAI Service を利用できるのでキャッチアップが欠かせなくなっていると考えています。 このような状況を踏まえ、昨年末からガバナンスが効くような形で Azure の環境を整備しました。様々な設定をしたのですが、ここでは特に重要だった以下の6つのポイントにフォーカスし、それぞれどのようなことをしたのか紹介していこうと思います。 Billing の階層の整備 リソース階層の整備 ログをWORMなストレージに集約 Azure Policy による予防的・発見的統制の整備 新規サブスクリプションを作成するためのフローの整備 Azure OpenAI Service の設定 注意点 弊社では Microsoft Customer Agreement (MCA) でMicrosoft と直接従量課金制で契約をしており、これを前提とします。Enterprise Agreement など他の契約の場合でも当てはまる話がほとんどですが、Billing周りの設定などは違い得るのでご注意ください。 Billing の階層の整備 Azure を利用する際に考えるべきな軸は様々なものがありますが、そのうちの一つに「会計・支払いの管理」があります。支払い方法は何か、請求書は誰・どこの会社宛か、などです。 こういった会計に関する取り扱いをするために、Azure には会計に関する設定の階層があり、 Billing Account:契約の単位 Billing Profile:請求書の単位、支払い方法の紐付けはここ Invoice Section:請求書内でコストをグループ化する単位 という3つのスコープがあります。 https://learn.microsoft.com/ja-jp/azure/cost-management-billing/manage/view-all-accounts#microsoft-customer-agreement より Finatextホールディングスには、僕が所属するデータ事業を行う「ナウキャスト」だけでなく、複数の子会社が存在します。それぞれの会社がAzureを利用しており、その会社ごとに支払方法(会社のクレジットカードなど)を設定できるようにしたり、各社の経理担当のメンバーにメールで請求書を連携したりする必要がありました。 そこで以下のように各階層を用意しました。 Billing Account : Finatext Group で一つ Billing Prodile : 会社ごとに作成し、対応する支払方法を設定し、経理担当メンバーへのメールでの請求書連携の設定も行う Invoice Section : 会社ごとに一つだけ作成 今回設定した会計の階層のイメージ 支払い方法は会社ごとに設定したいので “Billing Profile” は会社ごとの作成し、各社のクレカを支払い方法として紐づけるようにしています。 また “Invoice Section” については大きな会社で事業部ごとに支払いなどを分割してみられるようにしたいときに複数作ると良いようですが、現状の規模感だと不要と考え、このような構成にしました。 サブスクリプションがどの Invoice Section に属するかは、サブスクリプション作成時に設定します。 作成後に後から変更することも可能なようですが 、制限もあるようなので、ここで紹介した会計に関する設定は最初に設計をやり切るのが大事になります。 リソース階層の整備 Azure で開発を進めるにあたり、Azureのリソース階層を踏まえ Azure リソースをどのように管理するかも非常に重要です。 https://learn.microsoft.com/ja-jp/azure/cloud-adoption-framework/ready/azure-setup-guide/organize-resources より ガバナンスが効いた開発を進めるためには適切なガードレールを設定することが大事です。後述する通りAzure Policyを利用することで統制をかけておりますが、一方でアジリティを維持するために開発環境では少し弱めの統制にし本番環境では強めの統制を行う、といった調整をしたくなります。 そこで、Management Group を適切に設定することにしました。弊社では AWS をメインで利用しており Organizational Units を適切に設定することによるマルチテナント環境整備が進んでおりまして、これを参考にすることにしました。具体的には、 サービス用環境(Service) or 社内管理用環境(Management) 本番環境(Prod) or 非本番環境(SDLC) という2つの階層を作り、それぞれに適切なAzure Policyを設定することで、セキュリティと開発効率のバランスが取れるようにしました。 例えば Management-SDLC のグループには sandbox 環境のサブスクリプションを置いており、検証を進めやすいように強すぎない統制をかけています。一方 Service-Prod のグループにはサービスの本番環境サブスクリプションが置いてあり安全性を担保するために強めの統制をかけるようにしています。 Management Group と サブスクリプションの階層のイメージ Azure と AWS とではプラクティスも違う部分もあるかもしれませんが、今のところ問題なく運用できております。 Management Group を作成し、後述する Azure Policy と組み合わせて使っていく場合、 Management Group の設定・管理とサブスクリプションの配置は非常に重要になります。 しかしデフォルトの設定ではすべてのユーザーが新しい管理グループを作成することができてしまいます。 リソース階層を保護し 適切なガードレールを運用するために、 “Hierarchy Settings Administrator” などの権限がないと管理グループの設定を変更できないようにする必要があるので注意しましょう。 ログをWORMなストレージに集約 システム操作のトレーサビリティを確保するためにログはとても重要です。ログが満たすべき性質は様々なものがありますが、以下は特に重要ではないでしょうか? ログが改善されていないことを担保する(完全性の担保) “Write Once, Read Many (WORM)” な状態でログを保存する ログを集約しておく 集約しておくことで権限管理(機密性の担保)や、WORMの設定の担保などがしやすい また集約されていることで分析もしやすくなる(可用性の担保) 新しくAzureのセットアップをするにあたり、これらの設定をしました。 まず前提としてログは数年以上にわたり長期間保存しておきたいので、 Azure Storage に保存することにしました。何年保存すべきかは要件によって変わりますが、以下の新井さんの投稿が参考になります。 https://medium.com/media/de08594424c28bf0969c8d664335fd17/href 完全性の担保をするためにWORMにログを保存する方法ですが、 Azure Storage で immutable policy を設定すれば実現可能です。 またログの集約は、ログを保存するためのサブスクリプションを作成し、 Azure Storage はこのサブスクリプションに作成するようにしました。 この際にはいくつか 注意点 があります。 ネットワークの制限がかけられている場合には、マイクロソフトのサービスはバイパスすることを許可する設定が必要です Azure Storage Account と対象のリソースは同じリージョンになくてはなりません これらに注意してログ集約のサブスクリプションと Azure Storage を用意し、各種ログのエクスポート先として指定できるようにしました。以下のように Activity Log や Resource Log を保存していくことができます。 ログの種類によってスキーマやパスの構造は異なるので、ドキュメントを参照しながら分析できる環境は適宜用意する必要があります。弊社ではSnowflakeも利用しているので、これらのデータを外部テーブルなどとして読み込み分析できる環境を作ろうと考えています。 Azure Policy による予防的・発見的統制 クラウドを安全に使うためにはガードレールを適切に設定しておくことが必要不可欠です。ガードレールにも種類があり、主要なものとして「予防的統制」と「発見的統制」があります。 予防的統制 想定される危険なイベント・行動・問題を事前に定義しておき、その発生をできる限り予防するもの ネットワークへの不正アクセスや、意図しないシステムの変更などを防ぐ、第一の防御手段 AWS だと Organization SCPs などが該当 発見的統制 セキュリティに関わるイベントが発生した際に、問題の早期検知と記録を行い、進行中のインシデントに対して関係者にアラートをあげるような統制 システムに影響を及ぼす脅威やリスクの全体的な視認性を高めるのに使用する AWS だと Config などが該当 予防的統制と発見的統制は両方とも同時に行っておくべきものにあります。Azure では Azure Policy を使うことでこれが実現できます。 Azure Policy は様々な効果がありますが、 Deny を使うことで予防的統制を、 Audit や AuditIfNotExists を使うことで発見的統制を行うことができます。 RBACのロールなどと同じく、Azure Policy でも大量な builtin policy が用意されており途方に暮れるほどでした。そこでまず CIS Benchmark を読むことにしました。CIS Benchmark には Azure のサービスごとに様々なベストプラクティスがまとまっています。その内容としてはプラクティスの概要から、それを builtin のAzure Policy で実現する方法まで記載されており、ガードレールを検討する際にまず読むのをオススメします。 Azure のドキュメントにはサービスごとに builtin の Azure Policy がまとまっていたりするので、この辺りも一通り読みました。具体的に設定した Policy は 指定したサービスしか利用できないように予防的統制 指定したサービスは japaneast でしか使えないように予防的統制 Azure Storage や Azure OpenAI Service へのアクセスで HTTPS を利用させる予防的統制 Azure Storage や Azure OpenAI Service で resoure log の出力されることを監視する発見的統制 … などがあります。一部は builtin policy では実現できなかったため custom policy を作成しました。しかし大体似たような builtin policy が用意されていることが多いので、まずは builtin policy をひたすら見てそれらを使えないか模索することが多かったです。 これにより Azure Portal の Policy の画面から以下のように統制の状況確認が簡単にできるようになりました。 https://learn.microsoft.com/ja-jp/azure/governance/policy/assign-policy-portal より Portal から統制の状況の確認がしやすくなりましたが、特に発見的統制は見つかったら早めに確認したいので、今後はPolicyによる監査の結果をSlackに通知したりする仕組みを作れたりすると良いなと考えています。 また Azure Policy には予防的・発見的統制以外にも、様々な機能があり、発見した問題に対して修復まで行うといったことも可能なので、これらについても検討していきたいと思っています。Terraform でのリソース管理との両立のバランスが難しいなと思っていますが、より安全で便利な環境を目指すために Azure Policy を引き続き活用していく予定です。 サブスクリプション作成フローの整備 Azure のサブスクリプションはプロジェクトやプロダクトごとに、また開発の環境(dev/prodなど)ごとに切り分けようと考えてサブスクリプションを複数作成していたのですが、 MCA では最初サブスクリプションを5個しか作れないという制限があるのをご存知でしょうか? ・Azure.com を通じて直接購入した Microsoft 顧客契約には、最大 5 つのサブスクリプションを含めることができます。 ・24 時間ごとに 1 つのサブスクリプションを作成することができます。 https://learn.microsoft.com/ja-jp/azure/cost-management-billing/troubleshoot-subscription/create-subscriptions-deploy-resources より MCAで本格利用する前にはまずこれをサポート経由で上限を上げてもらう必要があります。意外と時間がかかるので注意が必要です。 次に本格的にAzureを利用するにあたり、サブスクリプションの作成フローを整理しました。いろいろ整理した結果、大きく分けて2つのステップにまとまりました。1つ目は実際にサブスクリプションを作成するステップで、2つ目はTerraformで初期のリソースを作成するステップです。それぞれ意識したポイントを紹介します。 まず一つ目の実際にサブスクリプションを作成するステップですが、ここでも権限管理をしっかり行いました。サブスクリプション作成はガバナンスのために一部のメンバーしかできないようにしたいですが、一方でCTOしかできないといった状況は避けたいです。 これは Azure RBACのロールではなく Billing の階層のリソースの IAM の機能を使うことで実現ができました。具体的には Invoice Section の IAM に “ Azure subscription creator ” というロールがあり、これを必要なメンバーにのみ割り当てるようにすれば良いです。 次にTerraformで初期のリソースを作成するステップでの工夫ですが、工数を削減するために適宜モジュールを作成しました。前の節で紹介したように様々なログは一つのサブスクリプションに集約したいので、そのための診断設定などを簡単に作成できるようにしています。また IAM のロールの割り当ての記述も簡単に行えるような工夫も行なっています。 これらの工夫により、数時間もあれば初期設定も含め新しいサブスクリプションを用意することができるようになりました。 Azure OpenAI Service の設定 新しくサブスクリプションを作成しても、そのままでは Azure OpenAI Service を利用することはできません。よく知られているようにマイクロソフトへの利用申請とオプトアウトの申請が必要です。オプトアウト申請については以下の Japan Cognitive Searvices Support Blog が詳しいです。 Azure OpenAI 関連の申請について 自分が10個ほどの新規サブスクリプションを作成した際には、それぞれ Azure OpenAI Service の利用申請:約1週間ほど Azure OpenAI Service のオプトアウト申請:約1週間ほど の追加の時間が必要でした。合計で10日から2週間ほどの追加の時間が必要なイメージです。会社のメンバーにはこれらを踏まえ、新しいサブスクリプションが必要であれば早めに連絡できるように依頼しています。 オプトアウト申請が問題なくできているかを確認するために以下のような Azure Policy を作成し簡単に状況確認ができるようにもしております。便利なのでおすすめです。 { "properties": { "displayName": "policy_ai_service_content_logging", "policyType": "Custom", "mode": "Indexed", "version": "1.0.0", "parameters": { "effect": { "type": "string", "metadata": { "description": "The effect determines what happens when the policy rule is evaluated to match", "displayName": "Effect" }, "allowedValues": [ "Audit", "Deny" ] } }, "policyRule": { "if": { "allOf": [ { "equals": "Microsoft.CognitiveServices/accounts", "field": "type" }, { "equals": "OpenAI", "field": "kind" }, { "count": { "field": "Microsoft.CognitiveServices/accounts/capabilities[*]", "where": { "allOf": [ { "equals": "ContentLogging", "field": "Microsoft.CognitiveServices/accounts/capabilities[*].name" }, { "equals": "false", "field": "Microsoft.CognitiveServices/accounts/capabilities[*].value" } ] } }, "notEquals": 1 } ] }, "then": { "effect": "[parameters('effect')]" } }, "versions": [ "1.0.0" ] }, ... } まとめ これら6つの設定は一つ一つはよく知られているような話かもしれないですが、これらをすべて設定することでガバナンスが効いたAzure環境を作り上げることができたかなと思っています。 しかし、例えば Microsoft Entra ID にまつわる様々な設定など、まだまだ必要なことも残っており、引き続き検証や設定の作業を進めていく予定です。 仲間を募集中です! Finatextホールディングスでは一緒に働く仲間を募集中です!様々なエンジニア系のポジションがあるので気軽に覗いてみてください! エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス データ事業エンジニアポジション の求人一覧 - 株式会社Finatextホールディングス 気になることがあれば気軽にXで @Kevinrobot34 にご連絡ください! ガバナンスが効くAzureの設定をゼロから構築した話 was originally published in The Finatext Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.
アバター