この記事は約5分で読めます。
この記事は1年以上前に書かれたものです。
内容が古い可能性がありますのでご注意ください。
CI部の宮本です。最近、天気が悪い日はもれなく頭痛に悩まされています。鎮痛剤で対応していますが、何か良い方法はないものでしょうかねえ。。
さて、今回は先日のブログ SnowSQL コマンドで Snowflake に接続する の続編です。Webインターフェース、CLIツールと続いて、プログラミング言語からのアクセスを試してみたいと思います。
Snowflake で対応しているコネクタとドライバー
対応している言語、ドライバーは以下の通りです。主要な言語は対応している様です。Rubyとかは無いですが、ODBCドライバーがあるので対応は出来そうです。
- Python用Snoflakeコネクタ
- Spark用Snoflakeコネクタ
- Kafka用Snoflakeコネクタ
- Node.jsドライバー
- Goドライバー
- .NETドライバー
- JDBCドライバー
- ODBCドライバー
参考) コネクタとドライバ
Python用コネクタを試してみる
今回はPython用コネクタを試してみたいと思います。
環境準備、Python用コネクタのインストール
今回の環境は以下の通りです。
- MacOS 10.14.6
- Python 3.8.1
適当にディレクトリを作成し、仮想環境を作成、有効化します。(この辺りはお好みで)
$ mkdir snowflake-python $ cd snowflake-python $ python -m venv .venv $ source .venv/bin/activate
コネクタのインストールは pip で行えます。
$ pip install snowflake-connector-python
インストールを確認するサンプルコード
公式のこちらを参考にSnowflakeへの接続、バージョンの確認を行うコードを実行してみます。
# verify_installation.py import snowflake.connector # Gets the version ctx = snowflake.connector.connect( user='<your_user_name>', password='<your_password>', account='<your_account_name>' ) cs = ctx.cursor() try: cs.execute("SELECT current_version()") one_row = cs.fetchone() print(one_row[0]) finally: cs.close() ctx.close()
<your_user_name>
、<your_password>
、<your_account_name>
はご自身のものに置き換えて下さいね。
早速実行してみましょう!
$ python verify_installation.py 4.19.1
接続出来たようです。私の環境では、Snowflakeのバージョンは 4.19.1
でした。
簡単なクエリを試してみる(1行ずつFetchするパターン)
予め用意されているサンプルデータをクエリしてみます。コードはこんな感じです。
# fetch_per_row.py import os import snowflake.connector snowflake.connector.paramstyle='qmark' # (4) with snowflake.connector.connect( # (1) user=os.environ['SNOWFLAKE_USER'], # (2) password=os.environ['SNOWFLAKE_PASSWORD'], account=os.environ['SNOWFLAKE_ACCOUNT'], database='SNOWFLAKE_SAMPLE_DATA', schema='TPCH_SF1' ) as con: with con.cursor() as cur: # (3) cur.execute("SELECT C_NAME, C_NATIONKEY, C_ACCTBAL FROM CUSTOMER WHERE C_MKTSEGMENT = ? LIMIT 3", ['BUILDING']) # (4) for row in cur: print(type(row)) name, nationkey, acctbal = row print(f'{name=}, {nationkey=}, {acctbal=}')
(1) <a href="https://docs.snowflake.com/ja/user-guide/python-connector-api.html#object-connection" target="_blank" rel="noopener">Connection</a>
は Python の context manager に対応しています。このように with句で囲んであげると、コネクションのクローズ漏れを防ぐことが出来ます。
(2) ユーザー情報を環境変数から読み取るようにしました。
(3) <a href="https://docs.snowflake.com/ja/user-guide/python-connector-api.html#object-cursor" target="_blank" rel="noopener">Cursor</a>
も context manager に対応しています。
(4) SQLを実行してます。ポイントとしては WHERE C_MKTSEGMENT = ?
の部分です。?
はバインド変数と呼ばれるもので、実際のSQLは ?
に BUILDING
が設定されます。バインド変数を使用することで、SQLインジェクション対策になりますので、特にユーザーからの入力を条件とする場合は必ず使用しましょう。
なお、Pythonコネクターでは以下のタイプのバインドをサポートしているようです。(参考
- pyformat
- format
- qmark
- numeric
pyformat
、format
はクライアント側でバインド、qmark
、numeric
はサーバー側でバインドされるようです。今回は何となくサーバー側でバインドする方が良さそう、というのと個人的にしっくりきた qmark
を採用しました。
バインド変数を利用する際は、Python と Snowflake それぞれのデータ型を意識する必要があります。カラム C_MKTSEGMENT
は Snowflake 上は TEXT
ですが、Python 上では str
として扱われます。その他の型マッピングについてはこちらを参考にして下さい。
解説が長くなってしまいました。早速実行してみましょう。
$ python fetch_per_row.py <class 'tuple'> name='Customer#000000001', nationkey=15, acctbal=Decimal('711.56') <class 'tuple'> name='Customer#000000008', nationkey=17, acctbal=Decimal('6819.74') <class 'tuple'> name='Customer#000000011', nationkey=23, acctbal=Decimal('-272.60')
結果は1行ずつ Tuple で返却されるようです。結果に関しても、恐らく先程のマッピング表に従って型がマッピングされるはずです。(ドキュメントが見当たらなかったので推測ですが。。)
簡単なクエリを試してみる(まとめてFetchするパターン)
先程の例はデータを1件ずつFetchするパターンでした。次は対象データをまとめてFetchするパターンを試してみましょう。前者はデータ量が多く、一度にクライアントのメモリにデータを保持出来ない場合、反対に後者はデータ量が少ない場合に使用すると良いでしょう。
# fetch_all.py import os import snowflake.connector snowflake.connector.paramstyle='qmark' with snowflake.connector.connect( user=os.environ['SNOWFLAKE_USER'], password=os.environ['SNOWFLAKE_PASSWORD'], account=os.environ['SNOWFLAKE_ACCOUNT'], database='SNOWFLAKE_SAMPLE_DATA', schema='TPCH_SF1' ) as con: with con.cursor(snowflake.connector.DictCursor) as cur: # (2) cur.execute("SELECT C_NAME, C_NATIONKEY, C_ACCTBAL FROM CUSTOMER WHERE C_MKTSEGMENT = ? LIMIT 3", ['BUILDING']) rows = cur.fetchall() # (1) print(type(rows)) for row in rows: print(type(row)) print(row) print(f'{row["C_NAME"]=}, {row["C_NATIONKEY"]=}, {row["C_ACCTBAL"]=}') # (2)
(1) fetchall()
メソッドを利用することで、検索結果をまとめて取得することが出来ます。戻り値は list で取得できます。
(2) snowflake.connector.DictCursor
と指定することで、Tuple では無くDict で結果を取得することが出来ます。row["列名"]
の様に値を取り出せます。
実行してみましょう。
$ python fetch_all.py <class 'list'> 3 <class 'dict'> {'C_NAME': 'Customer#000000001', 'C_NATIONKEY': 15, 'C_ACCTBAL': Decimal('711.56')} row["C_NAME"]='Customer#000000001', row["C_NATIONKEY"]=15, row["C_ACCTBAL"]=Decimal('711.56') <class 'dict'> {'C_NAME': 'Customer#000000008', 'C_NATIONKEY': 17, 'C_ACCTBAL': Decimal('6819.74')} row["C_NAME"]='Customer#000000008', row["C_NATIONKEY"]=17, row["C_ACCTBAL"]=Decimal('6819.74') <class 'dict'> {'C_NAME': 'Customer#000000011', 'C_NATIONKEY': 23, 'C_ACCTBAL': Decimal('-272.60')} row["C_NAME"]='Customer#000000011', row["C_NATIONKEY"]=23, row["C_ACCTBAL"]=Decimal('-272.60')
結果を list 形式でまとめて取得出来ました。
まとめ
簡単ですが、Pythonコネクタから Snowflake への接続方法についてご紹介しました。このPython コネクタは PEP-249 に準拠しているそうなので、その辺りもおさえておくとより理解が深まりそうです。