Google Cloud(GCP)環境のCloud Functions(Cloud Run関数)をPythonで実装する場合のディレクトリ構成を考えてみました。
試し試しやってるので、使い勝手悪かったら都度更新していく予定です。
ディレクトリ構成
とりあえず、今考えているディレクトリ構成
- main.py
- service.py
- config(ディレクトリ)
- cloudbuild.yaml
- requirements.txt
- environment.py
- environment(ディレクトリ)
- <環境名>.yaml
- common
- bigquery_common.py
- storage_common.py
- util_common.py
main.py
リクエストがあった場合に、最初に呼び出されるmain関数を持つ
基本的にserviceの関数を呼び出すのみで、シンプルに全体が見えるようにする。
例)
import requests
import functions_framework
from service import Response_param, Functions
# HTTPトリガ
@functions_framework.http
def main(request: requests) -> Response_param:
"""
メイン処理
Return:
result True(正常終了)、False(エラー発生)
"""
# パラメータ取得
# request_json = request.get_json(silent=True)
funcs = Functions("https://xxx.xxx/")
res = funcs.call_api()
res: Response_param = {}
res['result'] = True
res['message'] = 'test'
print(f"res:{res}")
return res
service.py
機能に必要な処理を主に記述するファイル
リクエストパラメータ、レスポンスパラメータの型などはクラス、コレクション型などで定義しておく。
例)
import requests
from typing import TypedDict
from dataclasses import dataclass
from util.util import Util_data, Util
# リクエストパラメータ
@dataclass
class Request_param(self, param1: str, param2: int):
param1 = param1
# 返却値の型
class Response_param(TypedDict):
result: bool
message: str
class Functions:
def __init__(self, url: str):
self.url = url
def validate():
if self.〜 is None:
return
@Util.retry_api
def call_api(self, url: str) -> bool:
headers = {
"Content-Type": "application/json",
}
res = requests.post(url, headers=headers)
res_text = res.text
# res.raise_for_status()
util_data = Util_data()
util_data.logger.info(f"APIリクエスト:{url}")
util_data.logger.info(f"res_text:{res_text}")
return True
config / cloudbuild.yaml
Cloud Buildsを使用してデプロイする場合に必要なyamlファイル
cloudbuild.yamlについては、こちらを参考にしてください。
config / requirements.txt
必要なライブラリを定義
config / environment.py
環境変数の読み込みをするファイル
例)
import os
from dataclasses import dataclass
@dataclass
class Env:
"""
環境変数保持用データクラス
"""
# 環境変数
GCP_PROJECT = os.getenv("PROJECT_ID")
# DB接続情報
SECRET_DB_INFO = os.getenv("DB_USER_ID")
SECRET_DB_INFO = os.getenv("DB_PASSWORD")
SECRET_DB_INFO = os.getenv("DB_HOST")
SECRET_DB_INFO = os.getenv("DB_USER_ID")
config / environment / 〜
各環境(開発環境、本番環境など)の環境変数を定義
common / bigquery_common.py
BigQuery関連でよく使用する処理を共通関数として記述
BigQuery APIについては公式ドキュメント等も参照ください。
例)
from google.cloud import bigquery
from common.util_common import Util_data
class BigQueryCommon:
"""
BigQuery関連の処理をまとめたクラス
"""
def __init__(self, project_id: str):
"""
初期化
Args:
project_id (str): _description_
"""
# BigQuery Clientオブジェクト
self.client = bigquery.Client(project=project_id)
def get_table_refference(self, dataset_id: str, table_id: str) -> bigquery.TableReference:
"""
テーブル情報を取得
Args:
dataset_id (str): データセットID
table_id (str): テーブルID
Returns:
bigquery.TableReference: テーブル情報
"""
# フルテーブルID
full_table_id = f"{dataset_id}.{table_id}"
# table referrence(5回リトライ、60秒タイムアウト)
table_ref = self.client.get_table(full_table_id, 5)
return table_ref
def execute_query(self, query: str) -> bool:
"""
クエリを実行する
Args:
query (str): クエリ
Returns:
bool: 実行結果(True:正常修了、False:エラー)
"""
result = True
try:
self.client.query_and_wait(query, retry=5)
except Exception as e:
result = False
Util_data.logger.error(f"Error:{e}")
return result
def extract_csv(self, csv_uri: str, dataset_id: str, table_id: str) -> bool:
"""
クエリの結果をCSV出力(Google Storage)
Args:
csv_uri (str): 出力先のCSVファイル名
dataset_id (str): BigQueryテーブルのデータセットID
table_id (str): BigQueryテーブルのテーブルID
Returns:
bool: _description_
"""
query = f"EXPORT DATA OPTIONS(uri='{csv_uri}', format='CSV', \
overwrite=true, header=false, field_delimiter=',' ) AS \
SELECT * FROM {dataset_id}.{table_id} ORDER BY id;"
result = self.execute_query(query=query)
return result
common / storage_common.py
Google Cloud Storage関連でよく使用する処理を共通関数として記述
例)
※そのうち追記します。
common / util_common.py
その他、よく使う共通処理を記述
例)
import time
import logging
import google.cloud.logging
from functools import wraps
from sqlalchemy import create_engine, pool
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import sessionmaker
from config.environment import Env
class Util:
def __init__(self):
self.logger = self._set_logger()
self.db_session = self.set_db_session()
def _set_logger(self):
# loggerの設定
logger = logging.getLogger()
# Cloud Logging ハンドラを logger に接続
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
def set_db_session(database):
"""
MYSQL接続
:param string database:
:return:
"""
# SecretManagerからDB接続情報を取得
engine = create_engine(
URL.create(
drivername="mysql+pymysql",
username=Env.DB_USER_NAME,
password=Env.DB_PASSWORD,
host=Env.DB_HOST,
database=database,
),
poolclass=pool.NullPool
)
# Cloud SQLへの接続セッション作成
Session = sessionmaker(bind=engine, autocommit=False, autoflush=False,)
session = Session()
return session
def call_function(self, *args, **kwargs):
"""
functionを呼び出す際に呼出前後にログを出力する共通関数
param: dict 呼出引数
"""
def _decorator(func):
@wraps(func)
def _exec_call_function(*args, **kwargs):
self.logger.info(f"start function:{func}, params:{args}")
res = func(*args, **kwargs)
self.logger.info(f"finish function:{func}, return:{res}")
return _exec_call_function
return _decorator
def retry_execute(func):
"""
リトライ処理するための共通関数
param: max_retries 試行回数
param: retry_delay 再試行までの秒数
"""
@wraps(func)
def _retry_decorator(self, *args, **kwargs):
retry_count: int = 0
max_retries: int = 3
retry_delay: int = 5
while retry_count < max_retries:
try:
self.logger.info(f"{retry_count+1}回目の実行開始({func.__name__}) {args}, {kwargs}")
res = func(*args, **kwargs)
self.logger.info(f"{retry_count+1}回目の実行終了 response:{res}")
# if res.status_code == 200:
return res
except Exception as e:
# ログ出力
self.logger.warning(f"{retry_count+1}回目の実行 Exception Message: {e}")
# カウンタインクリメント
retry_count += 1
# delay
time.sleep(retry_delay)
return _retry_decorator