데이터 엔지니어( 실습 정리 )

카프카, 스파크 스트리밍,하둡 던파 실시간 대시보드 총정리

세용용용용 2023. 10. 6. 22:30

API 데이터를 주기적으로 Kafka 토픽에 전송하려면 Kafka 프로듀서를 작성하고 이를 스케줄러로 실행하여 주기적으로 API에서 데이터를 가져와야 합니다.

 

먼저 python-pip부터 설치 >>> sudo apt install python3-pip

confluent-kafka-python 라이브러리를 설치 >>> pip install confluent-kafka

 

1) Python 스크립트를 작성하여 API 데이터를 Kafka 토픽에 전송


vim donpa_test.py

from confluent_kafka import Producer
import requests
import json
import time
import pandas as pd  # pandas 라이브러리 임포트

# 이전 데이터를 추적하기 위한 변수 초기화
last_data = None

# Kafka 설정
kafka_config = {
    'bootstrap.servers': 'kafka01:9092,kafka02:9092,kafka03:9092',
    'client.id': 'api-producer'
}

# Kafka 프로듀서 생성
producer = Producer(kafka_config)

# API 엔드포인트 및 파라미터 설정
api_url = "https://api.neople.co.kr/df/auction-sold"
params = {
    "itemName": "균열의 단편",
    "wordType": "match",
    "wordShort": "false",
    "limit": 1,
    "apikey": "JUG54ELPbKttanbis2VPFNqC9LJOM7v4"
}

# 주기적으로 API 호출 및 데이터를 Kafka 토픽에 전송
while True:
    try:
        response = requests.get(api_url, params=params)
        data = response.json()
        data = data.get('rows')
        data = [{"soldDate": row["soldDate"], "itemName": row["itemName"], "count": row["count"], "unitPrice": row["unitPrice"]} for row in data]
        data = data[0]
        # 이전 데이터와 중복되지 않는 경우에만 데이터를 전송
        if data != last_data:
            last_data = data
            message = json.dumps(data, ensure_ascii=False).encode('utf-8')

            # Kafka 토픽에 데이터 전송
            producer.produce("donpa1", key="api_data", value=message)
            # 전송한 메시지 확인
            producer.poll(0)

            print("API 데이터를 Kafka 토픽에 전송했습니다.")

    except Exception as e:
        print(f"오류 발생: {str(e)}")

    # 5분(300초) 대기
    time.sleep(10)

 

스크립트 실행 >>> python3 donpa_test.py

 

 

2) 토픽에 들어갔는지 확인(kafka02)

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka:9092,kafka:9092 --topic donpa1 --from-beginning

오우야쓰 성공!!!

 

3) 토픽에 데이터가 들어갈때마다 스파크 스트리밍을 통해 hdfs데이터 저장

카프카 hosts설정해주기

sudo vi /etc/hosts

127.0.0.1 localhost
172.31.41.230 nn1
172.31.43.215 nn2
172.31.44.88 dn1
172.31.33.238 dn2
172.31.37.126 dn3

172.31.46.129 kafka01
172.31.34.181 kafka02
172.31.36.66 kafka03


# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts

 

 

1) 스파크 토픽의 데이터를 콘솔에 띄우기

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

sc = SparkSession.builder \
     .master('yarn') \
     .appName('donpa_spark_stream') \
     .getOrCreate()

sc.sparkContext.setLogLevel('INFO')

# read stream
df =  sc.readStream.format('kafka') \
      .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
      .option("subscribe", "donpa1") \
      .option("startingOffsets", "earliest") \
      .load()

# JSON 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))


# display console
query2 = df.select("data.*").writeStream \
         .outputMode("append") \
         .format('console') \
         .foreachBatch(lambda batch_df, batch_id: batch_df.show() if batch_id != 0 else None) \
         .option('truncate', False) \
         .start()

query2.awaitTermination()

 

스파크 애플리케이션 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 donpa_spark_stream1.py

3.5.0 >> 나의 스파크 버전 명시

 

콘솔 출력 확인

 

 

2) 스파크 토픽의 데이터를 hdfs에 저장하기

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

sc = SparkSession.builder \
     .master('yarn') \
     .appName('donpa_spark_stream') \
     .getOrCreate()


sc.sparkContext.setLogLevel('INFO')

# read stream
df   = sc.readStream.format('kafka') \
      .option("kafka.bootstrap.servers", "kafka01:9092,kafka02:9091,kafka03:9092") \
      .option("subscribe", "donpa1") \
      .option("startingOffsets", "earliest") \
      .load()

# JSON 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# value 컬럼을 JSON 파싱
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(df.json, schema).alias("data"))

# Define a function to process each batch
def process_batch(batch_df, batch_id):
    if batch_id == 0:
        # Skip processing for batch ID 0
        pass
    else:
        # Your processing logic for other batch IDs
        batch_df.write.option("header", "true") \
                .format("csv") \
                .mode("append") \
                .save("/donpa1_spark_stream")

# Write stream - HDFS using foreachBatch
query2 = df.select("data.*").writeStream \
            .foreachBatch(process_batch) \
            .outputMode("append") \
            .option("checkpointLocation", "/check") \
            .start()


query2.awaitTermination()

 

스파크 애플리케이션 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 spark_test.py

 >> 나의 스파크 버전 명시(2.12:3.5.0)

 

hdfs dfs -ls /donpa1_spark_stream >>> 저장되었는지 확인

 

 

 

3) hdfs최근 저장 파일 콘솔에 출력해보기

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()


import subprocess

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)
# 가장 최신 파일 가져오기 (첫 번째 파일)
latest_file = sorted_files[2]
# 파일 경로 파싱
latest_file_path = latest_file.split()[-1]

# HDFS 경로를 사용하여 Spark DataFrame을 읽습니다.
latest_df = spark.read.option('header', 'true').csv(f'hdfs://{latest_file_path}')

# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
latest_df.show()

spark.stop()

spark-submit --master yarn donpa_test.py >>> 스파크 애플리케이션 실행

 

출력화면

 

4) hdfs에 저장하는 스파크 에플리케이션 백그라운드 실행

spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py & >>> 스파크 에플리케이션 백그라운드 실행 명령어

 

nohup spark-submit --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 /home/ubuntu/donpa_spark_stream/spark_test.py > /dev/null 2>&1 & >>> 표준출력 무시, 백그라운드 실행 명령어

 

가장 최근 저장된 csv출력해보기

spark-submit --master yarn /home/ubuntu/spark_save_rdb/donpa_test.py

 

1) 백그라운드 스파크 애플리케이션 종료하는법

ps aux | grep spark_test.py >>> pid확인

 

kill <PID> >>> 종료

kill 23888 >>> 종료 시키기

 

5) 최종적으로 최근 10개의 csv를가져와서 데이터 마트인 RDB에 넣어보자

1) 최근 10개 데이터 가져와 merge시켜 데이터프레임으로 출력시켜보자

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# SparkSession 생성
spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()


import subprocess

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)

# 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# 빈 데이터 프레임 생성
result_df = spark.createDataFrame([], schema)

index_num = 2
hdfs_path_list = []
for i in range(10):
    # 파일 가져오기
    file = sorted_files[index_num]
    # 파일 경로 파싱
    file_path = file.split()[-1]
    # hdfs경로 리스트에 append
    hdfs_path_list.append(file_path)
    # index_num 증감
    index_num += 1


for i in hdfs_path_list:
    df = spark.read.option('header', 'true').csv(f'hdfs://{i}')
    result_df = df.union(result_df)
    
# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
result_df.show()

spark.stop()

스파크 애플이케이션 실행 >>> spark-submit --master yarn donpa_test.py

 

2) 최근 10개의 데이터 프레임을 데이터 마트인 RDB에 넣어보자

마리아DB 설치할 인스턴스 생성해주자

# 시스템 패키지 정보를 업데이트
sudo apt update

# 마리아 db설치
sudo apt install mariadb-server

# MariaDB 서버를 시작합니다
sudo systemctl start mariadb

# 부팅 시 자동으로 시작하도록 MariaDB를 활성화
sudo systemctl enable mariadb

# MariaDB에 root 사용자로 로그인
sudo mysql -u root

# root 사용자의 비밀번호를 설정
ALTER USER 'root'@'localhost' IDENTIFIED BY '<new_password>';

# 권한을 새로 고침
FLUSH PRIVILEGES;

# MariaDB에서 데이터베이스를 생성
CREATE DATABASE donpa_datamart1;

# 해당 데이터베이스 사용
use donpa_datamart1

# 데이터베이스를 삭제
DROP DATABASE donpa_datamart1;

 

마리아DB에서 모든 통신을 허용

sudo vim /etc/mysql/mariadb.conf.d/50-server.cnf

0.0.0.0 으로 변경

마리아DB 서비스를 재시작하여 변경 사항을 적용 >>> sudo systemctl restart mariadb

마리아DB 3306포트에서 실행중인지 확인 >>> sudo ss -tuln | grep 3306

 

nn1에서 접속 테스트 해보기

sudo apt install mariadb-client-core-10.3 >>> 마리아db 클라이언트 설치



마리아db에서
mysql -u root -p >> 접속후
SELECT host, user FROM mysql.user; >> 호스트 접근권한 확인

# 172.31.41.230 접근권한 설정
GRANT ALL PRIVILEGES ON *.* TO 'root'@'172.31.41.230' IDENTIFIED BY '1234' WITH GRANT OPTION

FLUSH PRIVILEGES; >>> 변경사항 적용


nn1에서 접속해보기
mysql -h 172.31.39.99 -u root -p

성공

 

1) mariadb-java-client 설치

mkdir mariadb_jdbc

sudo apt-get install wget
wget https://downloads.mariadb.com/Connectors/java/connector-java-3.1.0/mariadb-java-client-3.1.0.jar

 

스파크 jars에 복사

sudo cp /home/ubuntu/mariadb_jdbc/mariadb-java-client-3.1.0.jar /usr/local/spark/jars/

 

잘 복사 되었는지 확인

ls /usr/local/spark/jars/mariadb-java-client-3.1.0.jar

 

2) 데이터 프레임 마리아db에 저장하는 코드

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .appName('Write to MaríaDB') \
    .getOrCreate()

# MaríaDB 연결 정보 설정
mariadb_url = "jdbc:mysql://172.31.41.219:3306/donpa_datamart1"
mariadb_properties = {
    "user": "root",
    "password": "1234",
    "driver": "org.mariadb.jdbc.Driver"
}

# 데이터프레임 생성 (이 부분은 여러분의 데이터프레임 생성 코드로 대체하세요)
# 아래는 예시로 빈 데이터프레임을 생성하는 부분입니다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

data = [(None, "Item1", 7, 5.0),
        (None, "Item2", 9, 10.0)]

df = spark.createDataFrame(data, schema)

df.show()
# MaríaDB에 데이터프레임 쓰기
df.write \
    .jdbc(url=mariadb_url, table="your_table_name", mode="overwrite", properties=mariadb_properties)

# 스파크 애플리케이션 종료
spark.stop()

스파크 애플리케이션 실행 >>> spark-submit --master yarn --deploy-mode client --jars /home/ubuntu/mariadb_jdbc/mariadb-java-client-3.1.0.jar mariadb_save_test.py

 

이슈 발생 이슈발생 ㅠㅠㅠ

mariadb-java-client 2.7.0으로 해야 호환이 되는 문제 발견!!!!

다시 깔아주고 해당 버전으로 실행하자

 

 

 

최종적으로 최근 30분 거래데이터 마리아db에 넣는 코드

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

spark = SparkSession.builder \
        .master('yarn') \
        .appName('show_df') \
        .getOrCreate()

# MaríaDB 연결 정보 설정
mariadb_url = "jdbc:mysql://172.31.41.219:3306/donpa_datamart1"
mariadb_properties = {
    "user": "root",
    "password": "1234",
    "driver": "org.mariadb.jdbc.Driver"
}

import subprocess
import datetime

# 현재 시간과 30분 뺀 시간 계산
current_time = datetime.datetime.now()
time_threshold = current_time - datetime.timedelta(minutes=30)

# HDFS 디렉토리 경로
hdfs_directory = "/donpa1_spark_stream"
# HDFS 디렉토리 내 파일 목록 가져오기
hdfs_ls_command = f"hdfs dfs -ls {hdfs_directory}"
result = subprocess.run(hdfs_ls_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# 가져온 파일 목록을 줄 단위로 분할
file_lines = result.stdout.strip().split('\n')
# 파일 목록을 시간 역순으로 정렬
sorted_files = sorted(file_lines, key=lambda line: line.split()[-2], reverse=True)[1:]


# 스키마 정의
schema = StructType([
    StructField("soldDate", TimestampType(), True),
    StructField("itemName", StringType(), True),
    StructField("count", IntegerType(), True),
    StructField("unitPrice", DoubleType(), True)
])

# 빈 데이터 프레임 생성
result_df = spark.createDataFrame([], schema)

index_num = 2
hdfs_path_list = []
while index_num < len(sorted_files):
    # 파일 가져오기
    file = sorted_files[index_num]
    
    # 파일 생성시간 추출
    file_time_str = file.split()[-3] + ' ' + file.split()[-2]
    file_time = datetime.datetime.strptime(file_time_str, '%Y-%m-%d %H:%M')
    
    if file_time >= time_threshold:
        # 파일 경로 파싱
        file_path = file.split()[-1]
        hdfs_path_list.append(file_path)
        # index_num 증감
        index_num += 1
    else:
        break

for i in hdfs_path_list:
    df = spark.read.option('header', 'true').csv(f'hdfs://{i}')
    result_df = df.union(result_df)

# DataFrame을 사용하여 원하는 작업을 수행할 수 있습니다.
result_df.show()

# MaríaDB에 데이터프레임 쓰기
result_df.write \
    .jdbc(url=mariadb_url, table="your_table_name", mode="overwrite", properties=mariadb_properties)

# 스파크 애플리케이션 종료
spark.stop()

spark-submit --master yarn --deploy-mode client --jars /usr/local/spark/jars/mariadb-java-client-2.7.0.jar mariadb_save_test.py >>> 스파크 애플리케이션 실행 해보자

 

마리아db에서 확인

select * from your_table_name;

좋다 좋다 드디어 해결 버전 호환 문제였음 ㅎㅎㅎ

 

이제 해당 코드를 airflow를 사용하여 워크 플로우 설정하자

6) 주기적으로 최근30분 거래 데이터 가져와 데이터 마트인 RDB를 최신화 시켜주자(airflow 워크플로우 사용)

- 아파치 에어플로우는 워크플로우 자동화 및 스케줄링 도구

 

python3 -m venv airflow-env # 새 가상 환경 생성

source airflow-env/bin/activate # 새 가상 환경 활성화

 

1. airflow를 설치후 실행, 이후 웹ui에 접속해 작업을 정의

(1) pip install apache-airflow >>> airflow 설치

버전 호환 맞춰주기

pip install werkzeug==2.2.2

pip install cachelib==0.9.0

 

(2) echo 'export PATH="$PATH:$HOME/.local/bin"' >> ~/.bashrc : airflow 명령이 설치된 경로가 PATH 환경 변수에 추가,

추가하지 않으면 나중에 airflow 명령을 사용할때 경로를 입력해야하는 불편함 초래...

source ~/.bashrc >> 변경사항 적용

 

(3) airflow db init : airflow 초기화

 

(4) cd airflow >>> vim airflow.cfg 

9898포트로 변경해주자

timezone두 서울로 바꿔주자

false로 수정

rm -rf ./airflow.db

airflow db init

 

(5) airflow users create --username wntpdyd0218 --firstname syoung --lastname ju --role Admin --email sy02229@gmail.com --password 1234
사용자 이름 : wntpdyd0218

비밀번호 : 1234

 

(6) airflow webserver -D >>> 에어플로우 웹서버 백그라운드 실행후

http://13.208.214.61:9898 접속

 

2. DAG 정의 파일을 작성

(1) vi airflow.cfg 

dag디렉토리명 위치 확인후 해당위치에 mkdir dags디렉토리 생성해주기

 

cd dags >>> 해당 위치에서 파일 작성!!

 

donpa_item01.py

from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
import pytz

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 5),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'timezone': 'Asia/seoul',
}

dag = DAG(
    'spark_submit_dag1',
    default_args=default_args,
    schedule_interval='*/30 * * * *',  # 5분마다 실행
    catchup=False,  # 과거 작업은 실행하지 않음
)

#  실행
mariadb_submit_task = BashOperator(
    task_id='mariadb_update',
    bash_command='/usr/local/spark/bin/spark-submit --master yarn --deploy-mode client --jars /usr/local/spark/jars/mariadb-java-client-2.7.0.jar /home/ubuntu/dbmart_save/mariadb_save_donpa01.py',
    dag=dag,
)


# Task 간 의존성 설정
# mariadb_submit_task >> mariadb_submin_tast1
~

 

airflow scheduler -D : scheduler 백그라운드 실행하기

airflow 웹사이트에 들어가보면(실행시켜주자)

 

데이터 마트인 mariadb가 최신화 되었는지 확인

성공성공

 

이제 30분마다 최신의 던파 아이템 거래 데이터로 데이터 마트가 최신화 될것이다!!!

최종적으로 bi툴을 선정해 30분마다 거래 데이터를 대시보드로 구현해보자

 

 

7) 데이터 마트에 들어간 10개의 데이터를 실시간 bi툴로 시각화 해보자

태블로는 여태 두번정도 써보았으니 새로운 대시보드를 사용하고 싶었다.

Redash를 사용해 던파 데이터를 시각화 대시보드로 구현해보자!!

 

1) 먼저 도커를 설치해주자

# 패키지 업데이트를 수행
sudo apt-get update

# Docker 패키지 관련 소프트웨어 및 의존성 패키지를 설치
sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common

# Docker 공식 GPG 키를 추가
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

# Docker 저장소를 시스템에 추가
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

sudo apt-get update
# Docker CE를 설치
sudo apt-get install -y docker-ce

# Docker 서비스를 시작
sudo systemctl start docker
sudo systemctl enable docker

# 현재 사용자를 Docker 그룹에 추가하여 루트 권한 없이 Docker를 사용할 수 있도록함
sudo usermod -aG docker $USER

 

2) Redash 설치 및 실행

#패키지 업데이트
sudo apt update

#git 설치
sudo apt install git

#redash setup clone
git clone https://github.com/getredash/setup.git

cd setup/
#setup 스크립트 실행 권한
sudo chmod +x setup.sh

#스크립트 실행
./setup.sh

 

docker ps -a >>> 시스템에 있는 모든 Docker 컨테이너를 나열

 

http://외부ip:5000 >>> redash접속

 

로그인 화면

 

3) Redash 기본 개념

오픈 소스 비즈니스 인텔리전스 도구로, 데이터 시각화 및 대시보드 작성, 데이터 쿼리 및 다양한 데이터 소스와의 통합을 지원

Redash를 사용하여 데이터를 시각화하고 실시간 대시보드를 만들 수 있음

 

1. 대시보드 : 작성한 쿼리를 테이블, 차트 형태로 등록하여 종합적인 시각화를 구현

2. 데이터소스 : 대시보드에 시각화할 데이터의 저장소(기본적으로 db등이 있음)

3. 쿼리 : db와 똑같이 쿼리라고 생각하면됨 데이터에서 필요한 값을 뽑아내는 최종 결과값을 테이블, 차트 형태로 표현

4. Alert(알림) : 생성한 쿼리의 값에 조건을 걸어 만족하면 알림을 보내준다. 트리거 느낌????

 

4) Redash db연결

1. 우측 상단 가운데 모양 클릭

 

2. + New Data Source클릭

 

3. mysql 선택

 

4. 데이터 소스 등록

등록후 data sources목록을 보면 등록완료

이제 db연결을 완료했으니 시각화할 쿼리를 만들어 보자

 

5) Create Query

create >>> query클릭

 

1. 쿼리 실행 및 등록

해당 창에서 왼쪽은 데이터 소스를 선택할수는 창과 맨위의 new query는 쿼리의 이름이다.클릭해서 변경가능오른쪽은 쿼리를 작성하고 execute버튼으로 퀴리가 실행됨 마지막으로 save는 저장하는것

 

테스트로 테이블의 모든 데이터를 출력해보자 >>> select * from your_table_name;

 

+ New VisualiZation 클릭하여 차트를 만들어주자

최근 가격 변동을 시각화

SAVE를 누르면 하단 테이블 옆에 추가된다

쿼리와 chart 작업후 내용 저장을 위해 save를 해준다

 

최종적으로 생성쿼리를 대시보드에 등록하기 위해 Publish버튼을 클릭

 

2. 작성한 쿼리를 대시보드에 등록해 모니터링 해보자

create >>> Dashboard 클릭

 

대시보드 이름 작성후 save

 

아래는 대시보드 화면이다. 쿼리 등록을 위해 add widget버튼 클릭

 

쿼리선택후 원하는 visualization선택하자

 

 

선택후 add to Dashboard 클릭

 

대시보드에 차트가 삽입된것을 확인

 

대시보드 변경사항 적용을 위해 상단의 Done Editing클릭

 

최종적으로 상단의 Publish를 클릭해 대시보드를 Publish시킴

 

총 거래 건수를 확인하는 쿼리 하나를 더 추가해보자

 

쿼리 등록 ( 이미지를 추가하기위해 빈 컬럼 추가)

SELECT '' AS image, itemName, SUM(count) AS total_count FROM your_table_name GROUP BY itemName;

 

visualization Type를 테이블로하고 image컬럼을 이미지 링크를 달아준다

작성후 쿼리를 저장해주자

 

 

이제 기존 대시보드에 쿼리를 추가하기위해 edit클릭

 

추가후 대시보드 저장