Pythonで実装する場合のCloud Functions(Cloud Run関数)のディレクトリ構成

Google Cloud(GCP)環境のCloud Functions(Cloud Run関数)をPythonで実装する場合のディレクトリ構成を考えてみました。

試し試しやってるので、使い勝手悪かったら都度更新していく予定です。

ディレクトリ構成

とりあえず、今考えているディレクトリ構成

  • main.py
  • service.py
  • config(ディレクトリ)
    • cloudbuild.yaml
    • requirements.txt
    • environment.py
    • environment(ディレクトリ)
      • <環境名>.yaml
  • common
    • bigquery_common.py
    • storage_common.py
    • util_common.py

main.py

リクエストがあった場合に、最初に呼び出されるmain関数を持つ

基本的にserviceの関数を呼び出すのみで、シンプルに全体が見えるようにする。

例)

import requests
import functions_framework

from service import Response_param, Functions


# HTTPトリガ
@functions_framework.http
def main(request: requests) -> Response_param:
    """
    メイン処理
    Return:
        result True(正常終了)、False(エラー発生)
    """

    # パラメータ取得
    # request_json = request.get_json(silent=True)
    funcs = Functions("https://xxx.xxx/")
    res = funcs.call_api()

    res: Response_param = {}
    res['result'] = True
    res['message'] = 'test'
    print(f"res:{res}")
    return res

service.py

機能に必要な処理を主に記述するファイル

リクエストパラメータ、レスポンスパラメータの型などはクラス、コレクション型などで定義しておく。

例)

import requests
from typing import TypedDict
from dataclasses import dataclass

from util.util import Util_data, Util

# リクエストパラメータ
@dataclass
class Request_param(self, param1: str, param2: int):
    param1 = param1

# 返却値の型
class Response_param(TypedDict):
    result: bool
    message: str

class Functions:
    def __init__(self, url: str):
        self.url = url

    def validate():
         if self.〜 is None:
            return

    @Util.retry_api
    def call_api(self, url: str) -> bool:
        headers = {
            "Content-Type": "application/json",
        }
        res = requests.post(url, headers=headers)
        res_text = res.text
        # res.raise_for_status()
        util_data = Util_data()
        util_data.logger.info(f"APIリクエスト:{url}")
        util_data.logger.info(f"res_text:{res_text}")
        return True

config / cloudbuild.yaml

Cloud Buildsを使用してデプロイする場合に必要なyamlファイル

cloudbuild.yamlについては、こちらを参考にしてください。

config / requirements.txt

必要なライブラリを定義

config / environment.py

環境変数の読み込みをするファイル

例)

import os

from dataclasses import dataclass


@dataclass
class Env:
    """
    環境変数保持用データクラス
    """
    # 環境変数
    GCP_PROJECT = os.getenv("PROJECT_ID")
    # DB接続情報
    SECRET_DB_INFO = os.getenv("DB_USER_ID")
    SECRET_DB_INFO = os.getenv("DB_PASSWORD")
    SECRET_DB_INFO = os.getenv("DB_HOST")
    SECRET_DB_INFO = os.getenv("DB_USER_ID")

config / environment / 〜

各環境(開発環境、本番環境など)の環境変数を定義

common / bigquery_common.py

BigQuery関連でよく使用する処理を共通関数として記述

BigQuery APIについては公式ドキュメント等も参照ください。

例)

from google.cloud import bigquery

from common.util_common import Util_data


class BigQueryCommon:
    """
    BigQuery関連の処理をまとめたクラス
    """
    def __init__(self, project_id: str):
        """
        初期化

        Args:
            project_id (str): _description_
        """
        # BigQuery Clientオブジェクト
        self.client = bigquery.Client(project=project_id)

    def get_table_refference(self, dataset_id: str, table_id: str) -> bigquery.TableReference:
        """
        テーブル情報を取得

        Args:
            dataset_id (str): データセットID
            table_id (str): テーブルID

        Returns:
            bigquery.TableReference: テーブル情報
        """
        # フルテーブルID
        full_table_id = f"{dataset_id}.{table_id}"
        # table referrence(5回リトライ、60秒タイムアウト)
        table_ref = self.client.get_table(full_table_id, 5)
        return table_ref

    def execute_query(self, query: str) -> bool:
        """
        クエリを実行する

        Args:
            query (str): クエリ

        Returns:
            bool: 実行結果(True:正常修了、False:エラー)
        """
        result = True
        try:
            self.client.query_and_wait(query, retry=5)
        except Exception as e:
            result = False
            Util_data.logger.error(f"Error:{e}")

        return result

    def extract_csv(self, csv_uri: str, dataset_id: str, table_id: str) -> bool:
        """
        クエリの結果をCSV出力(Google Storage)

        Args:
            csv_uri (str): 出力先のCSVファイル名
            dataset_id (str): BigQueryテーブルのデータセットID
            table_id (str): BigQueryテーブルのテーブルID

        Returns:
            bool: _description_
        """
        query = f"EXPORT DATA OPTIONS(uri='{csv_uri}', format='CSV', \
            overwrite=true, header=false, field_delimiter=',' ) AS \
            SELECT * FROM {dataset_id}.{table_id} ORDER BY id;"
        result = self.execute_query(query=query)
        return result

common / storage_common.py

Google Cloud Storage関連でよく使用する処理を共通関数として記述

例)

※そのうち追記します。

common / util_common.py

その他、よく使う共通処理を記述

例)

import time
import logging

import google.cloud.logging
from functools import wraps

from sqlalchemy import create_engine, pool
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import sessionmaker

from config.environment import Env


class Util:
    def __init__(self):
        self.logger = self._set_logger()
        self.db_session = self.set_db_session()

    def _set_logger(self):
        # loggerの設定
        logger = logging.getLogger()
        # Cloud Logging ハンドラを logger に接続
        logging_client = google.cloud.logging.Client()
        logging_client.setup_logging()

    def set_db_session(database):
        """
        MYSQL接続
        :param string database:
        :return:
        """
        # SecretManagerからDB接続情報を取得
        engine = create_engine(
            URL.create(
                drivername="mysql+pymysql",
                username=Env.DB_USER_NAME,
                password=Env.DB_PASSWORD,
                host=Env.DB_HOST,
                database=database,
            ),
            poolclass=pool.NullPool
        )

        # Cloud SQLへの接続セッション作成
        Session = sessionmaker(bind=engine, autocommit=False, autoflush=False,)
        session = Session()
        return session

    def call_function(self, *args, **kwargs):
        """
        functionを呼び出す際に呼出前後にログを出力する共通関数
        param: dict 呼出引数
        """
        def _decorator(func):
            @wraps(func)
            def _exec_call_function(*args, **kwargs):
                self.logger.info(f"start function:{func}, params:{args}")
                res = func(*args, **kwargs)
                self.logger.info(f"finish function:{func}, return:{res}")
            return _exec_call_function
        return _decorator

    def retry_execute(func):
        """
        リトライ処理するための共通関数
        param: max_retries 試行回数
        param: retry_delay 再試行までの秒数
        """
        @wraps(func) 
        def _retry_decorator(self, *args, **kwargs):
            retry_count: int = 0
            max_retries: int = 3
            retry_delay: int = 5
            while retry_count < max_retries:
                try:
                    self.logger.info(f"{retry_count+1}回目の実行開始({func.__name__}) {args}, {kwargs}")
                    res = func(*args, **kwargs)
                    self.logger.info(f"{retry_count+1}回目の実行終了 response:{res}")
                    # if res.status_code == 200:
                    return res
                except Exception as e:
                    # ログ出力
                    self.logger.warning(f"{retry_count+1}回目の実行 Exception Message: {e}")
                # カウンタインクリメント
                retry_count += 1
                # delay
                time.sleep(retry_delay)
        return _retry_decorator

コメントする