❏ テスト内容


・キーワード(都道府県名)で元データから抽出し、キーワード毎に集計を行い、その結果を出力テーブルにINSERTする

・キーワード(東京、埼玉、千葉)毎に上記を繰り返し実行する

❏ テストデータの準備


IMAロール

【GlueのIAMロールを生成】

 AWS > GlueのIAMロールを生成 と同様

 ・ロール名:AWSGlueServiceRoleDefaultCM

S3

【テストデータの用意】

 1. S3 > バケットを作成  の順にクリック。以下を設定して [バケットを作成]

 ・バケット名:demo-sagemaker-testdatas

 ・AWS リージョン:ap-northeast-1

 ・他は全てデフォルト

 2. バケット「demo-sagemaker-testdatas」にテストデータを格納すえる

 ・s3://demo-sagemaker-testdatas/profile_data/

 ・s3://demo-sagemaker-testdatas/pref_tokyo_capital/

 ・s3://demo-sagemaker-testdatas/pref_chiba_local/

 ・s3://demo-sagemaker-testdatas/pref_saitama_local/


Glue

【Glueクローラの追加】

 1. Glue > 左側メニューのチュートリアル下のクローラの追加 >クローラの追加 の順にクリック

 2. 以下を設定して [次へ]

 ・クローラの名前:demo-sagemaker

 3. 以下を設定して [次へ]

 ・Crawler source type:Data stores(デフォルト)

 ・Repeat crawls of S3 data stores:Crawl all folders(デフォルト)

 4. 以下を設定して [次へ]

 ・データストアの選択:S3

 ・接続:未選択

 ・クロールするデータの場所:自分のアカウントで指定されたパス(デフォルト)

 ・インクルードパス:s3://demo-sagemaker-testdatas/profile_data/ ※配下のファイル全てにするため末尾に/を付与

 5. 以下を設定して [次へ]

 ・別のデータストアの追加:”はい”を選択すると上記「4. 」へ戻るので、上記【データの用意】>「2. 」で用意した全てを設定する

 6. 以下を設定して [次へ]

 ・IAM ロール:既存の IAM ロールを選択して「AWSGlueServiceRoleDefaultCM」を設定

 7. 以下を設定して [次へ]

 ・頻度:オンデマンドで実行

 8. [データベースの追加]をクリックして以下を設定して [次へ]

 ・データベース名:demo_sagemaker_db

 9.  [完了]

 10. クローラ一覧でdemo-sagemakerを選択して[クローラの実行]をクリック

 11. クローラ完了後、 左側メニューのチュートリアル下のテーブルの確認をクリックして、一覧に全テーブルが作成されている事を確認

 12. AthenaでDatabaseに「demo_sagemaker_db」があること、Tablesに全テーブルがある事を確認

 

 Glueクローラの追加でハマった事

 テストデータには次の仕様が必要らしい

 ・1行目にカラム名を入れる

 ・1カラムだけではエラーになる


Athena

【OUTPUTテーブルのCREATE TABLE】

 例 rescount_tokyoの場合

CREATE EXTERNAL TABLE IF NOT EXISTS demo_sagemaker_db.rescount_tokyo (
  pref string,
  cnt_people bigint,
  cnt_days bigint
)
PARTITIONED BY (yyyymmdd string)
LOCATION 's3://demo-sagemaker-testdatas/rescount_tokyo/'; MSCK REPAIR TABLE rescount_tokyo;

❏ SageMalerでのテスト手順


SageMaker

【ノートブックインスタンスの作成】

 AWS > ノートブックインスタンスの作成と同様

 ノートブックインスタンス名:demo-sagemaker

 ・新しいIAMロール:AmazonSageMaker-ExecutionRole-20220212T143930


セキュリティ認証情報

【アクセスキーの作成】

 1. 右上のアカウント > マイセキュリティ資格情報 > アクセスキー > 新しいアクセスキーの作成 の順にクリック

 2. アクセスキー IDとシークレットアクセスキーをエディターにコピペ

SageMaker

【JupyterでPython3の起動】

 1. 一覧から作成したインスタンスを選択

 2. アクション > Jupyterを開く > ブラウザの別ウィンドウでノートブックのインターフェイスが開く

 3. 右上の[NEW] から [Terminal] を選択 

 4. aws認証情報の更新を行う

$ aws configure
AWS Access Key ID [None]: *************************
AWS Secret Access Key [None]: **********************************
Default region name [ap-northeast-1]: ap-northeast-1
Default output format [None]: json

 5.  右上の[NEW] からAnacondaのPython3系環境 [conda_python3]を選択 

 6.  左上のJupyterの右側の「Untitled」をクリックして以下を設定して[Rename]

 ・Rename Notebook:demo_insert_table


【Python3を実行】

 ・後述のソースを入力して[Run]、または [Kernel ] > [Restart and Run ALL]

 

 Python3の実行でハマった事

 ・queries = queries_str.split(';')[0:-1]

  最初は[-1]としておりリスト取得ができなかった。[0:-1]に変更したらできた

 ・def exec_queries(queries_str, conn, print_query=True):

  INSERTを実行すると権限エラーが発生。IAMロールにAmazonAthenaFullAccessとAmazonSageMakerFullAccessを追加しても変わらず。

  Terminalでaws configureを実施しら解決した。追加したポリシーは2件とも関係なかった

[ error msg ]
Insufficient permissions to execute the query.
SageMaker is not authorized to perform: glue:GetPartition on resource
because no identity-based policy allows the glue:GetPartition action .

 

【あとかたづけ(アクセスキーの削除)※重要

 1. Terminalで削除

$ rm -rf ~/.aws

 2. 右上のアカウント > マイセキュリティ資格情報 > アクセスキー > アクションの削除 の順にクリック

❏ ソース

import pandas as pd
import numpy as np
import pickle
import csv
import re
import os

import boto3
import datetime
import pytz
import s3fs

from datetime import datetime as dt
from datetime import timedelta
from dateutil.relativedelta import relativedelta

!pip install --upgrade pip
!pip install openpyxl
!pip install PyAthena

import pyathena
import itertools

print("===process end===")

#--------------------------------------------------------------------
#全体の実行時間を測る
grand_start_time = dt.now()
print(grand_start_time)

#--------------------------------------------------------------------
#ロールの取得
from sagemaker import get_execution_role
role = get_execution_role()
print(role)

#regionの取得
region = boto3.Session().region_name
print(region)

#athenaに接続
from pyathena import connect
conn = connect(s3_staging_dir='s3://demo-sagemaker-testdatas/athena/result/ ',
         region_name=region)
print(conn)

#--------------------------------------------------------------------
#パラメータ設定

#Athena
db_name = 'demo_sagemaker_db'

#S3
bucket_name="demo-sagemaker-testdatas"

#集計対象期間
start_day = '2022-01-01'
end_day = '2022-01-07'

#イベント名(一つづつ指定する)
pref_names = ['tokyo','chiba','saitama']

print("count for event:{}".format(len(pref_names)))

#--------------------------------------------------------------------
#日付のリスト作成
date_list_2 = [date for date in pd.date_range(start_day, end_day, freq='D')]

#月曜と日曜の日付取得
date_list_s = [date for date in date_list_2 if date.isoweekday()==1]
date_list_e = [date + timedelta(days=6) for date in date_list_2 if date.isoweekday()==1]

#代入用リスト作成
start_yyyymm = [date.strftime('%Y%m%d') for date in date_list_s]
end_yyyymm = [date.strftime('%Y%m%d') for date in date_list_e]
print('\nstart_yyyymm:\n',start_yyyymm)
print('----------')
print('end_yyyymm:\n',end_yyyymm)

yyyymmdd = dt.now(pytz.timezone('Asia/Tokyo')).strftime("%Y%m%d")

#--------------------------------------------------------------------
#マッチング条件の制御準備
dic_is_perfect = {}

for pref_name in pref_names:
  if pref_name == 'tokyo':
    dic_is_perfect[pref_name] = 1
 else:
   dic_is_perfect[pref_name] = 0

print(dic_is_perfect)

#--------------------------------------------------------------------
#完全一致の検索数集計

def q_into_search_count_perfect(prefecture_table_name, insert_table_name, start_date_number, end_date_number):

    q_into_search_count = '''

    INSERT INTO {insert_table}
    WITH table1 as (
    SELECT
      pref
    FROM
      {kw_table}
    ),
    table2 as ( --データ抽出
    SELECT
      pref,
      yyyymmdd
    FROM
      demo_sagemaker_db.profile_data a 
    WHERE
      yyyymmdd between {start_date} and {end_date} -- 任意の指定期間
      AND
      pref in (select pref from table1)
    )

    --対象データ抽出
    SELECT
      pref,
      count(pref) as cnt_people,
      count(distinct yyyymmdd) as cnt_days,
      '{start_date}' as yyyymmdd
    FROM
      table2
    GROUP BY
      pref
    ;

    SELECT
      count(*) AS count
    FROM
      {insert_table}
    WHERE
      yyyymmdd = '{start_date}'
    ;

    '''.format(
      insert_table=insert_table_name,
      kw_table=prefecture_table_name,
      start_date=start_date_number,
      end_date=end_date_number
    )

    df_result=exec_queries(q_into_search_count, conn)

    return df_result[1]

#--------------------------------------------------------------------
#リスト取得

def q_select_profile_data_list(select_table_name, start_date_number, end_date_number):

    q_select_profile_data = '''

    SELECT
      pref,
      ROUND( AVG(age), 1 ) AS age
    FROM
      {select_table}
    WHERE
      yyyymmdd between {start_date} and {end_date} -- 任意の指定期間
    GROUP BY
      pref
    ;

    '''.format(
      select_table=select_table_name,
      start_date=start_date_number,
      end_date=end_date_number
    )

    df_result=exec_queries(q_select_profile_data, conn)

    return df_result[0]

#--------------------------------------------------------------------
#複数クエリ実行関数
def exec_queries(queries_str, conn, print_query=True):
  queries = queries_str.split(';')[0:-1]
  dfs = []
  for query in queries:
    if print_query:
      print('-----\nExec query\n-----\n{}'.format(query))
      print('...', end='')
    try:
      df = pd.read_sql(query, conn)
    except Exception as e:
      print(e)
    else:
      dfs.append(df)
    print('ok')
  return dfs

#--------------------------------------------------------------------
#ログ文字列追加関数
logs = ''
def add_log(new, title='Other'):
  global logs
  logs += '----------\n-- {}\n----------\n'.format(title)
  logs += new
  logs += '\n\n\n'

#S3のパス生成(fnを指定しない場合はフォルダーパス)
def get_s3_path(bucket, paths, fn=''):
   return 's3://' + bucket + '/' + '/'.join(paths) + '/' +fn

#追加の期間
"""
start_yyyymm += ['20210118']
end_yyyymm += ['20210124']

print("count for event:{}, {}".format(len(pref_names), pref_names))
print("count for start_yyyymm:{}".format(len(start_yyyymm)))

calc_number1=list(range(0,6,1))
"""

#対象イベントの制御
"""
pref_names=['hoge']
print("{}".format(pref_names))
"""

calc_number=list(range(0,1,1))
print(calc_number)


#時間別に全イベントを回す計算
count_list=calc_number

#ループ(テーマ毎)
for pref_name in pref_names:

    print("\n==={}: start===".format(pref_name))
    
    logs = ''
    
    #開始時間を測る
    strat_time = dt.now(pytz.timezone('Asia/Tokyo'))
    print(strat_time)
    
    #パラメータに修正
    prefecture_table_name= db_name+'.pref_'+pref_name+'_'+['local' if dic_is_perfect[pref_name]==0 else 'capital'][0]
    insert_table_name=db_name+'.rescount_'+pref_name
    print("prefecture_table_name: {}".format(prefecture_table_name))
    print("insert_table_name    : {}".format(inser_table_name))
    
    start_date_number=start_yyyymm[calc_number]
    end_date_number=end_yyyymm[calc_number]
    print("start_date_number-end_date_number:{}-{}".format(start_date_number, end_date_number))
    
    #関数の実行
    sql_row_count = q_into_search_count_perfect(prefecture_table_name, insert_table_name, start_date_number, end_date_number)
        
    #終了時間を測る
    end_time = dt.now(pytz.timezone('Asia/Tokyo'))
    print(end_time)
    print("processing_time:{}".format(end_time-strat_time))
    print("count:{}".format(sql_row_count))
    
    #ログの用意
    log_str=" start_time={}\n end_time:{}\n processing_time:{}\n row_count:{}".format(strat_time, end_time, end_time-strat_time, sql_row_count)
    add_log(log_str, 'Calc for {}:{}'.format(pref_name, start_yyyymm[0]))
    
    #カレントディレクトリー確認
    current_path = !pwd
    print("current_path:{}".format(current_path))
    
    #ログファイルをjupyter上に書き出す
    f = open('./logss/calc_log_{}_{}.txt'.format(pref_name, yyyymmdd), 'a')  # w:上書き a:追加
    f.write(logs)
    f.close()
    
    print("==={}:calc endding===\n".format(pref_name))

    # ファイルアウトプットの確認
    !ls -l ./logs/

    #ログファイルをS3に書き出し
    log_path = get_s3_path(bucket_name, ['logs', pref_name], 'logs_{}_{}.txt'.format(pref_name, yyyymmdd))
    
    fs = s3fs.S3FileSystem()
    
    with fs.open(log_path, 'wb') as f:
      f.write(logs.encode())
    
    print(logs)

for pref_name in pref_names:

    #リスト取得
    select_table_name=db_name+'.profile_data'
    ql_row_datas = q_select_profile_data_list(select_table_name, start_date_number, end_date_number)
    ql_row_datas
        
    #データーフレームをCSV出力
    out_path = 'result/'+pref_name
    out_name = pref_name+'.csv'
    print("{}/{}".format(out_path, out_name))

    os.makedirs(out_path, exist_ok=True)  
    ql_row_datas.to_csv('{}/{}'.format(out_path, out_name))

本日日付をyyyymmddで取得

yyyymmdd = dt.now(pytz.timezone('Asia/Tokyo')).strftime("%Y%m%d")
print(yyyymmdd)
------------------------------
20220214

文字列の連結

pref_names = ['tokyo','chiba','saitama']
result = '/'.join(pref_names)
print(result)
------------------------------
tokyo/chiba/saitama

SageMakerのカレントディレクトリーの確認

current_path = !pwd
print("current_path:{}".format(current_path))
------------------------------ current_path:['/home/ec2-user/SageMaker']

Teminalからの確認

sh-4.2$ ls /home/ec2-user/SageMaker
demo_insert_table.ipynb  logs  lost+found  test.ipynb

ログファイルをjupyter上に書き出す

log_str=" start_time={}\n end_time:{}".format(strat_time, end_time)
add_log(log_str, 'Calc for {}:{}'.format(pref_name, start_yyyymm[0]))

f = open('./logss/calc_log_{}_{}.txt'.format(pref_name, yyyymmdd), 'a')  # w:上書き a:追加
f.write(logs)
f.close()    

Teminalからの確認

sh-4.2$ ls -l /home/ec2-user/SageMaker/logs/
total 4
-rw-rw-r-- 1 ec2-user ec2-user 184 Feb 14 10:35 log_tokyo_20220214.txt

sh-4.2$ cat /home/ec2-user/SageMaker/logs/log_tokyo_20220214.txt
----------
-- Calc for tokyo:20220103
----------
 start_time=2022-02-14 19:35:22.975045+09:00
 end_time:2022-02-14 19:35:22.975109+09:00

ログファイルをS3に書き出し

def get_s3_path(bucket, paths, fn=''):
   return 's3://' + bucket + '/' + '/'.join(paths) + '/' +fn

log_path = get_s3_path(bucket_name, ['logs', pref_name], 'logs_{}_{}.txt'.format(pref_name, yyyymmdd))

fs = s3fs.S3FileSystem()

with fs.open(log_path, 'wb') as f:
  f.write(logs.encode())

Teminalからの確認

sh-4.2$ aws s3 ls s3://demo-sagemaker-testdatas/logs/ --recursive
2022-02-14 11:45:02          0 logs/
2022-02-19 07:45:44        184 logs/tokyo/logs_tokyo_20220219.txt

S3上のファイルをデーターフレームに格納

df = pd.read_csv('s3://demo-sagemaker-testdatas/profile_data/profile_data.txt')
df.head(3)


データーフレームをCSV出力

#リスト取得
select_table_name=db_name+'.profile_data'
ql_row_datas = q_select_profile_data_list(select_table_name, start_date_number, end_date_number)
ql_row_datas
    
#データーフレームをCSV出力
out_path = 'result/'+pref_name
out_name = pref_name+'.csv'
print("{}/{}".format(out_path, out_name))

os.makedirs(out_path, exist_ok=True)  
ql_row_datas.to_csv('{}/{}'.format(out_path, out_name))

 

Teminalからの確認

sh-4.2$ ls -l result/tokyo/tokyo.csv
-rw-rw-r-- 1 ec2-user ec2-user 51 Feb 20 03:19 result/tokyo/tokyo.csv

sh-4.2$ cat result/tokyo/tokyo.csv
,pref,age
0,tokyo,39.3
1,chiba,41.3
2,saitama,33.8

データーフレームをS3へ出力

for pref_name in pref_names:

    #リスト取得
    select_table_name=db_name+'.profile_data'
    ql_row_datas = q_select_profile_data_list(select_table_name, start_date_number, end_date_number)
    ql_row_datas
        
    #データーフレームをS3へ出力
    out_to_s3 = ql_row_datas.to_csv(None, index=False).encode()
    
    fs = s3fs.S3FileSystem()
    
    res_path = get_s3_path(bucket_name, ['result', pref_name], 'res_{}_{}.txt'.format(pref_name, yyyymmdd))

    with fs.open(res_path, 'wb') as f:
      f.write(out_to_s3)

 

Teminalからの確認

sh-4.2$ aws s3 cp s3://demo-sagemaker-testdatas/result/tokyo/res_tokyo_20220220.txt .
download: s3://demo-sagemaker-testdatas/result/tokyo/res_tokyo_20220220.txt to ./res_tokyo_20220220.txt

sh-4.2$ cat res_tokyo_20220220.txt
pref,age
tokyo,39.3
chiba,41.3
saitama,33.8

❏ DataFrame


データフレームの生成

data = {
    '名前' :['田中', '山田', '高橋'],
    '役割' : ['営業部長', '広報部', '技術責任者'],
    '身長' : [178, 173, 169]
    }

#columns引数:列の順序を指定
df = pd.DataFrame(data, columns=["名前", "役割", "身長"])

#カラムの名称を変更
df.columns = ["Name", "Position", "height"]
print(df)

#条件による行の抽出
print(df.query('height >= 170 and height < 180'))

❏ ループ


日単位でループ

import pandas as pd
customer_master = pd.read_csv('customer_master.csv')
customer_master.head()

20220101

20220102


複数日単位でループ

import datetime

st_date1 = datetime.datetime(2022, 1, 1)
ed_date1 = datetime.datetime(2022, 1, 7)
st_date2 = st_date1

while st_date2 <= ed_date1:
    
    ed_date2 = st_date2 + datetime.timedelta(days=2)
    ed_date2 = ed_date2 if ed_date2 < ed_date1 else ed_date1
    
    start_date = st_date2.strftime('%Y%m%d')
    end_date = ed_date2.strftime('%Y%m%d')
    print("start_date - end_date : {} - {}".format(start_date, end_date))
    
    st_date2 = ed_date2 + datetime.timedelta(days=1)

start_date - end_date : 20220101 - 20220103

start_date - end_date : 20220104 - 20220106

start_date - end_date : 20220107 - 20220107


月単位でループ

import datetime
import calendar

def get_last_date(year, month):
    return datetime.datetime(year, month, calendar.monthrange(year, month)[1])

st_date1 = datetime.datetime(2020, 11, 1)
ed_date1 = datetime.datetime(2021, 2, 7)
st_date2 = st_date1

while st_date2 <= ed_date1:
    
    # 月末算出
    ed_date2 = get_last_date(st_date2.year, st_date2.month)
    
    start_date = st_date2.strftime('%Y%m%d')
    end_date = ed_date2.strftime('%Y%m%d')
    print("start_date - end_date : {} - {}".format(start_date, end_date))
    
    st_date2 = ed_date2 + datetime.timedelta(days=1)

start_date - end_date : 20201101 - 20201130

start_date - end_date : 20201201 - 20201231

start_date - end_date : 20210101 - 20210131

start_date - end_date : 20210201 - 20210228

❏ リンク集