Home [Analytics on AWS] Serve with Lambda
Post
Cancel

[Analytics on AWS] Serve with Lambda

이 모듈에서는 매우 구체적인 사용 사례 예제로 Lambda 함수를 생성 할 것입니다. 우리가 작성할 람다 함수는 Athena가 S3의 processsed data에서 Hits 별 Top 5 Popular Songs를 쿼리하고 가져 오는 코드를 호스팅합니다.

image

1. Lambda 함수 생성

Lambda 콘솔 https://us-east-1.console.aws.amazon.com/lambda/home?region=us-east-1#/functions로 이동합니다.

image

리전이 버지니아 북부로 설정되어 있는지 확인하고 함수 생성을 누릅니다.

image

함수 이름은 Analyticsworkshop_top5Songs를 입력하고 나머지는 위와 동일하게 설정한 뒤 함수를 생성합니다.

2. Lambda 함수 작성

이 섹션에서는 방금 만든 람다 함수에 대한 코드를 제공합니다. boto3를 사용하여 Athena 클라이언트에 액세스합니다.

image

아까 만든 함수 Analyticsworkshop_top5Songs에 들어가 아래로 스크롤하면 코드 소스 란이 있습니다. 여기 적혀있는 코드를 다음과 같이 변경합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import boto3
import time
import os

# Environment Variables
DATABASE = os.environ['DATABASE']
TABLE = os.environ['TABLE']
# Top X Constant
TOPX = 5
# S3 Constant
S3_OUTPUT = f's3://{os.environ["BUCKET_NAME"]}/query_results/'
# Number of Retries
RETRY_COUNT = 10

def lambda_handler(event, context):
    client = boto3.client('athena')
    # query variable with two environment variables and a constant
    query = f"""
        SELECT track_name as \"Track Name\", 
                artist_name as \"Artist Name\",
                count(1) as \"Hits\" 
        FROM {DATABASE}.{TABLE} 
        GROUP BY 1,2 
        ORDER BY 3 DESC
        LIMIT {TOPX};
    """
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={ 'Database': DATABASE },
        ResultConfiguration={'OutputLocation': S3_OUTPUT}
    )
    query_execution_id = response['QueryExecutionId']
    # Get Execution Status
    for i in range(0, RETRY_COUNT):
        # Get Query Execution
        query_status = client.get_query_execution(
            QueryExecutionId=query_execution_id
        )
        exec_status = query_status['QueryExecution']['Status']['State']
        if exec_status == 'SUCCEEDED':
            print(f'Status: {exec_status}')
            break
        elif exec_status == 'FAILED':
            raise Exception(f'STATUS: {exec_status}')
        else:
            print(f'STATUS: {exec_status}')
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')
    # Get Query Results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result['ResultSet']['Rows'])
    # Function can return results to your application or service
    # return result['ResultSet']['Rows']

3. Environment Variables

위에서 코드를 작성해 변경한 내용을 환경 변수의 변경하는 것만으로도 동일하게 적용할 수 있습니다.

image

구성 - 환경 변수 탭으로 이동하고 편집을 클릭합니다.

image

위와 같이 다음 세 개의 환경변수를 만든 후 저장을 누릅니다.

  • Key: DATABASE, Value: analyticsworkshopdb

  • Key: TABLE, Value: processed_data

  • Key: BUCKET_NAME, Value: yourname-analytics-workshop-bucket

image

구성 - 일반 구성으로 이동합니다.

image

제한 시간을 10초로 변경한 뒤 저장을 누릅니다.

4. Execution Role

구성 - 권한으로 이동합니다.

image

실행 역할 아래의 파란 글씨를 눌러 IAM 콘솔탭을 엽니다.

image

권한 추가 - 정책 연결을 눌러 AmazonS3FullAccess와 AmazonAthenaFullAccess를 추가합니다. 모두 추가했다면 IAM 콘솔탭을 닫습니다.

5. 테스트 이벤트 구성

image

함수 코드 섹선에서 Deploy를 눌러 함수를 배포합니다.

이제 새로 생성된 람다 함수의 실행 결과를 확인하기 위해 Dummy 테스트 이벤트를 구성해 보겠습니다.

image

Test를 클릭합니다.

image

이벤트 이름을 Test로 설정하고 나머지는 그대로 둔 뒤 저장을 누릅니다. 이제 다시 Test를 클릭해 봅시다. Execution Result 섹션에서 json 형식의 출력을 볼 수 있습니다.

image

길어서 잘렸지만 이런 형태입니다.

6. Athena를 통한 확인

Athena 콘솔 https://us-east-1.console.aws.amazon.com/athena/home?region=us-east-1#/query로 이동합니다.

1
2
3
4
5
6
7
SELECT track_name as "Track Name",
    artist_name as "Artist Name",
    count(1) as "Hits" 
FROM analyticsworkshopdb.processed_data 
GROUP BY 1,2 
ORDER BY 3 DESC 
LIMIT 5;

image

Database는 analyticsworkshopdb를 택하고 위 코드를 넣은 쿼리를 실행하면 위와 같은 결과를 얻을 수 있습니다. 위에서 Him & I가 1등이었는데 같은 결과가 나왔습니다. 위에는 잘려있지만 나머지 부분도 같음을 확인하였습니다.

This post is licensed under CC BY 4.0 by the author.