BigQuery

BigQuery

BigQuery を Python から使う

https://cloud.google.com/bigquery/docs/pandas-gbq-migration

pandas.read_gbq()

小さいクエリの結果をデータフレームとして受け取るのに便利

ライブラリ pandas_gbq をインストールしておく。

import pandas

query = """
        SELECT
          CONCAT(
            'https://stackoverflow.com/questions/',
            CAST(id as STRING)) as url,
          view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE tags like '%google-bigquery%'
        ORDER BY view_count DESC
        LIMIT 10"""
        

df = pandas.read_gbq(query, project_id='myproject')


# 大きいデータをダウンロードするときは use_bqstorage_api=True
df = pandas.read_gbq(query, project_id='myproject', dialect='standard', use_bqstorage_api=True)

google.cloud.bigquery

環境変数 GOOGLE_APPLICATION_CREDENTIALS

事前に環境変数 GOOGLE_APPLICATION_CREDENTIALS にクレデンシャル情報を保存したjsonファイルへのパスを格納しておく。

クレデンシャル情報を保存したjsonファイルは pandas.read_gbq() を実行したときに以下に保存される。

  • Windows
    • %APPDATA%\pandas_gbq\bigquery_credentials.dat
  • Linux/Mac/Unix
    • ~/.config/pandas_gbq/bigquery_credentials.dat

RStudio の retuculate で Python を使うときWindowsで定義した環境変数 GOOGLE_APPLICATION_CREDENTIALS を認識してくれない。そのため retuculate で実行している Python の中で環境変数を定義する。

import os
# Set environment variables
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'C:/Users/tsuda/AppData/Roaming/pandas_gbq/bigquery_credentials.dat'
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client(project='world-fishing-827')

query_job = client.query(
    """
    SELECT
      CONCAT(
        'https://stackoverflow.com/questions/',
        CAST(id as STRING)) as url,
      view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE tags like '%google-bigquery%'
    ORDER BY view_count DESC
    LIMIT 10"""
)

results = query_job.result()  # Waits for job to complete.

# save as dataframe
df = client.query(sql).to_dataframe()

# データのサイズが大きい時は create_bqstorage_client=True
df = client.query(sql).to_dataframe(create_bqstorage_client=True)

データフレームをBigQueryへアップロードする

pandas-gbq

データフレームをCSVに変換して送信する。ネスとしたデータには対応しない

table_id = 'my_dataset.new_table'

df.to_gbq(table_id)

google-cloud-bigquery

pyarrow ライブラリに依存している。データフレームをparquetに変換して送信する。ネストしたデータにも対応。


from google.cloud import bigquery
import pandas

df = pandas.DataFrame(
    {
        'my_string': ['a', 'b', 'c'],
        'my_int64': [1, 2, 3],
        'my_float64': [4.0, 5.0, 6.0],
        'my_timestamp': [
            pandas.Timestamp("1998-09-04T16:03:14"),
            pandas.Timestamp("2010-09-13T12:03:45"),
            pandas.Timestamp("2015-10-02T16:00:00")
        ],
    }
)
client = bigquery.Client()
table_id = 'my_dataset.new_table'
# Since string columns use the "object" dtype, pass in a (partial) schema
# to ensure the correct BigQuery data type.
# 部分的にカラムの値の型を指定している
job_config = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("my_string", "STRING"),
])

# アップロードジョブを定義
job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

# ジョブの実行
job.result()

csv をアップロードする

from google.cloud import bigquery
client = bigquery.Client()
filename = '/home/ec2-user/ml-25m/movies.csv'
dataset_id = 'cmbqdataset'
table_id = 'bqtable_via_python'

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True

with open(filename, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

job.result()  # Waits for table load to complete.

print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))

Runquery

def RunQueryPandas(query, credentials): client = bigquery.Client(credentials=credentials) query_job = client.query(query) results = query_job.result() return(results)

def RunQueryBq(query, destination_table):

command = f'bq query -n 10 --replace  --allow_large_results --use_legacy_sql=false \
            "{query}"'

#--destination_table={destination_table}
print(command)
os.system(command)

def RunQueryGCP(query): client = bigquery.Client(project='world-fishing-827’) query_job = client.query(query) results = query_job.result() return(results)

Check if the table exists

def table_exists(dataset_table): dataset = dataset_table.split('.')[-2] table = dataset_table.split('.')[-1]

q = '''SELECT size_bytes FROM {dataset}.__TABLES__ WHERE table_id='{table}' '''.format(dataset=dataset, table=table)
print(q)
df = pd.read_gbq(q, project_id='world-fishing-827',progress_bar_type=None)
if(len(df))==0:
    return False
else:
    return True

Create date partitioned table if not exist

def make_date_parititoned_if_doesnt_exist(dataset_table): if not table_exists(dataset_table): dataset_table =
convert_period_between_project_and_dataset_to_colon(dataset_table) command = “bq mk –time_partitioning_type=DAY {}".format(dataset_table) print(command) os.system(command)

def convert_period_between_project_and_dataset_to_colon(table): t = table.split(".") if len(t) == 2: return table if len(t) == 3: return f”{t[0]}:{t[1]}.{t[2]}”





# try gcp API

```{python}

# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')



# OUTPUT
dataset_id_full = f'{project}.{dataset}'

# データセットへのリファレンス
dataset_ref = bigquery.Dataset(dataset_id_full)

# 作成したいテーブルへのリファレンス
table_ref = dataset_ref.table(table)

# テーブルオブジェクト
table_obj = bigquery.Table(table_ref)

# 日付でパーティションに指定する
table_obj.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="date"  # name of column to use for partitioning
)

# パーティションに使うカラムについてはスキーマを定義する必要がある
schema = [bigquery.SchemaField("date", "DATE")]


# テーブルを作成
table_obj = client.create_table(table_obj, schema)

tables.insert
table_ref





# データセット内へ新しいテーブルへのリファレンス
table_ref = dataset.table(table)

# Configure the query job.
job_config = bigquery.QueryJobConfig()

# Set the destination table to the table reference created above.
job_config.destination = table_ref

# Run the query.
query_job = client.query(query, job_config=job_config)
query_job.result()  # Waits for the query to finish