Loading
Keunyoung Yoon

Data Engineer

Data Scientist

Data Analyst

Keunyoung Yoon

Data Engineer

Data Scientist

Data Analyst

Blog Post

Streamlining Data Processing

09/26/2024 Portfolio

Project Overview

This project aims to build an automated data pipeline for collecting, processing, storing, and analyzing financial market data. By efficiently managing real-time market trends and stock time series data, the pipeline facilitates the extraction of key analytical insights to support strategic decision-making.

Github Repositories

https://github.com/youngyoony/Spark-ETL-Project

Key Components

  • Docker Compose Configuration (docker-compose.yml)
  • Airflow DAG Configuration (spark-jobs.py)
  • Data Collection Script (fetch_data.py)
  • Configuration File (config.py)
  • Data Processing Script (process_data.py)
  • Data Analysis Script (analyze_data.py)

Docker Compose Configuration (docker-compose.yml)

Docker Compose is utilized to containerize various services required for the project, ensuring easy deployment and management. The primary services include:

  • Spark: Handles large-scale data processing.
  • Elasticsearch: Optimizes data storage and search capabilities.
  • Kibana: Provides data visualization for Elasticsearch.
  • Airflow: Manages workflow scheduling and orchestration.
version: '3'

x-spark-common: &spark-common
  image: bitnami/spark:latest
  volumes:
    - ./jobs:/opt/bitnami/spark/jobs
    - ./data:/opt/bitnami/spark/data
    - ./resources:/opt/bitnami/spark/resources
  networks:
    - default-network

x-airflow-common: &airflow-common
  build:
    context: .
    dockerfile: Dockerfile
  env_file:
    - airflow.env
  volumes:
    - ./jobs:/opt/airflow/jobs
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./data:/opt/bitnami/spark/data
    - ./resources:/opt/bitnami/spark/resources
  depends_on:
    - postgres
  networks:
    - default-network

services:
  spark-master:
    <<: *spark-common
    hostname: spark-master
    command: bin/spark-class org.apache.spark.deploy.master.Master
    expose:
      - "7077"
    ports:
      - "9090:8080"
      - "7077:7077"
      - "4444:4040"

  spark-worker-1:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 1g
      SPARK_MASTER_URL: spark://spark-master:7077

  spark-worker-2:
    <<: *spark-common
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 1g
      SPARK_MASTER_URL: spark://spark-master:7077

  #Jupyter notebook
  jupyter-spark:
    image: minz95/de2024:jupyter
    #build:
    #  dockerfile: jupyter/Dockerfile
    networks:
      - default-network
    ports:
      - "8888:8888"
      - "4040-4080:4040-4080"
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./data:/home/jovyan/data
      - ./jobs:/home/jovyan/jobs
      - ./resources:/home/jovyan/resources
    environment:
      - JUPYTER_TOKEN=password

  es:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.4.3
    networks:
      - default-network
    environment:
      - node.name=es
      - discovery.type=single-node
      - discovery.seed_hosts=es
      - xpack.security.enabled=false
      - xpack.security.enrollment.enabled=false
      - xpack.security.http.ssl.enabled=false
      - xpack.security.transport.ssl.enabled=false
      - cluster.routing.allocation.disk.threshold_enabled=false
      - ELASTIC_PASSWORD=password
    mem_limit: 1073741824
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./es-data:/usr/share/es/data
    ports:
      - 9200:9200

  kibana:
    image: docker.elastic.co/kibana/kibana:8.4.3
    networks:
      - default-network
    environment:
      - SERVERNAME=kibana
      - ELASTICSEARCH_HOSTS=http://es:9200
      - ELASTICSEARCH_USERNAME=kibana
      - ELASTICSEARCH_PASSWORD=password
    ports:
      - 5601:5601
    depends_on:
      - es

  postgres:
    image: postgres:14.0
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    networks:
      - default-network

  webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8081:8080"
    depends_on:
      - scheduler

  scheduler:
    <<: *airflow-common
    command: bash -c "airflow db init && airflow db migrate && airflow users create --username airflow --firstname airflow --lastname airflow --role Admin --email airflow@gmail.com --password airflow && airflow scheduler"

networks:
  default-network:

Key Configuration Details:

  • Network Setup: All services are connected through a common spark-network, allowing seamless communication.
  • Volume Mounting: The Spark service mounts the local ./data directory to persist data.
  • Port Mapping: Each service’s default port is mapped to the host to enable external access.

Airflow DAG Configuration (spark-jobs.py)

Apache Airflow is employed to define and manage the workflow, scheduling the Spark jobs to run automatically at specified intervals.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 4, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'spark_data_pipeline',
    default_args=default_args,
    schedule_interval='0 1 * * *',
)

fetch_data = BashOperator(
    task_id='fetch_data',
    bash_command='python /usr/local/airflow/dags/fetch_data.py',
    dag=dag,
)

process_data = BashOperator(
    task_id='process_data',
    bash_command='python /usr/local/airflow/dags/process_data.py',
    dag=dag,
)

analyze_data = BashOperator(
    task_id='analyze_data',
    bash_command='python /usr/local/airflow/dags/analyze_data.py',
    dag=dag,
)

fetch_data >> process_data >> analyze_data

Key Configuration Details:

  • Scheduling: The DAG is set to execute daily at 1 AM (schedule_interval='0 1 * * *').
  • Task Definition: Three primary tasks—fetch_dataprocess_data, and analyze_data—are defined using BashOperator to run respective Python scripts.
  • Task Dependencies: The workflow ensures that data fetching occurs before processing, which in turn precedes analysis.

Data Collection Script (fetch_data.py)

This script collects real-time financial market trends and stock time series data from external APIs using the requests library and saves the data as JSON files.

import requests
import json
import os
from config import MARKET_TRENDS_FILE, STOCK_TIME_SERIES_FILE

def fetch_market_trends():
    url = "https://real-time-finance-data.p.rapidapi.com/market-trends"
    querystring = {"trend_type":"MARKET_INDEXES","country":"us","language":"en"}
    headers = {
        "x-rapidapi-key": "7c8a2c6a79msh04674dc52dbd72dp125f50jsn20be7524e794",
        "x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
    }
    response = requests.get(url, headers=headers, params=querystring)
    response.raise_for_status()
    return response.json()

def fetch_stock_time_series():
    url = "https://real-time-finance-data.p.rapidapi.com/stock-time-series"
    querystring = {"symbol":"AAPL:NASDAQ","period":"1D","language":"en"}
    headers = {
        "x-rapidapi-key": "7c8a2c6a79msh04674dc52dbd72dp125f50jsn20be7524e794",
        "x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
    }
    response = requests.get(url, headers=headers, params=querystring)
    response.raise_for_status()
    return response.json()

def save_data_to_file(data, filename):
    os.makedirs(os.path.dirname(filename), exist_ok=True)  # Create directory if it doesn't exist
    with open(filename, 'w') as f:
        json.dump(data, f)

if __name__ == "__main__":
    market_trends = fetch_market_trends()
    save_data_to_file(market_trends["data"]["trends"], '/opt/bitnami/spark/data/market_trends.json')

    stock_time_series = fetch_stock_time_series()
    save_data_to_file(stock_time_series["data"], '/opt/bitnami/spark/data/stock_time_series.json')

Key Functionalities:

  • Data Retrieval: Functions fetch_market_trends and fetch_stock_time_seriesfetch market trends and stock time series data respectively from the API.
  • Data Storage: The save_data_to_file function saves the fetched data into specified JSON files, ensuring the necessary directories are created.

Configuration File (config.py)

Centralizes configuration settings to enhance code reusability and maintainability, including Elasticsearch connection details and file paths.

# Elasticsearch
ES_HOST = "es"
ES_PORT = 9200

# File paths
MARKET_TRENDS_FILE = "jobs/data/market_trends.json"
STOCK_TIME_SERIES_FILE = "jobs/data/stock_time_series.json"
ANALYSIS_MARKET_TRENDS_FILE = "jobs/data/analysis_market_trends.csv"
ANALYSIS_STOCK_TIME_SERIES_FILE = "jobs/data/analysis_stock_time_series.csv"

Key Configuration Items:

  • Elasticsearch Connection: Defines ES_HOST and ES_PORT for connecting to the Elasticsearch cluster.
  • File Paths: Specifies paths for storing collected data and analysis results, ensuring consistent reference across scripts.

Data Processing Script (process_data.py)

Utilizes Apache Spark to preprocess the collected data and store the processed results in Elasticsearch.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, monotonically_increasing_id, row_number
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.window import Window
from config import ES_HOST, ES_PORT

def load_data(spark, file_path):
    return spark.read.json(file_path)

def process_market_trends(data):
    processed_data = data.select(
        col("symbol"),
        col("name"),
        col("price"),
        col("change"),
        col("change_percent")
    )
    return processed_data

def process_stock_time_series(data):
    # Create additional column for price change percentage and analyze volume
    spark = data.sparkSession
    data = data.withColumn("price_change_percent",
                           (col("price") - col("previous_close")) / col("previous_close") * 100)

    array_df = data.selectExpr("array(time_series.*) as arr", "*") \
                   .drop("time_series")

    flatten_df = array_df.select(
        explode("arr").alias("ts"),
        col("*")
    ).drop("arr")

    schema1 = StructType([
        StructField("col", StringType(), True)
    ])

    cols = [(c,) for c in data.select('time_series.*').columns]
    column_df = spark.createDataFrame(cols, schema1)

    ws = Window.orderBy("index")
    df1_with_index = column_df.withColumn("index", monotonically_increasing_id()) \
                               .withColumn("idx", row_number().over(ws)) \
                               .drop("index")

    df2_with_index = flatten_df.withColumn("index", monotonically_increasing_id()) \
                                .withColumn("idx", row_number().over(ws)) \
                                .drop("index")

    combined_df = df1_with_index.join(df2_with_index, "idx").drop("idx")

    processed_data = combined_df.select(
        col("symbol"),
        col("ts.price").alias("price"),
        col("previous_close"),
        col("ts.change").alias("change"),
        col("ts.change_percent").alias("change_percent"),
        col("price_change_percent"),
        col("ts.volume").alias("volume"),
        col("col").alias("time_series"),
    )

    return processed_data

def save_to_elasticsearch(df, index_name, mapping_id="symbol"):
    es_write_conf = {
        "es.nodes": ES_HOST,
        "es.port": str(ES_PORT),
        "es.resource": index_name,
        "es.mapping.id": mapping_id
    }
    df.write.format("org.elasticsearch.spark.sql").options(**es_write_conf).mode("overwrite").save()

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("FinanceDataProcessing") \
        .getOrCreate()

    print("=========== Processing (Market trends) ===================")
    market_trends = load_data(spark, "/opt/bitnami/spark/data/market_trends.json")
    processed_market_trends = process_market_trends(market_trends)
    save_to_elasticsearch(processed_market_trends, "market_trends")

    print("=========== Processing (stock time series) ===================")
    stock_time_series = load_data(spark, "/opt/bitnami/spark/data/stock_time_series.json")
    processed_stock_time_series = process_stock_time_series(stock_time_series)
    save_to_elasticsearch(processed_stock_time_series, "stock_time_series", mapping_id="time_series")

Key Functionalities:

  • Data Loading: The load_data function reads JSON files using Spark.
  • Data Preprocessing:
    • Market Trends: Selects relevant columns such as symbol, name, price, change, and change percentage.
    • Stock Time Series: Calculates price change percentage, explodes nested time series data, and restructures the DataFrame for analysis.
  • Data Storage: The save_to_elasticsearch function writes the processed data to Elasticsearch indices, specifying a unique mapping ID for consistency.

Data Analysis Script (analyze_data.py)

This script retrieves data from Elasticsearch, performs analytical operations, and saves the results as CSV files.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev
from config import ES_HOST, ES_PORT

def load_data_from_elasticsearch(spark, index_name, mapping_id="symbol"):
    es_read_conf = {
        "es.nodes": ES_HOST,
        "es.port": str(ES_PORT),
        "es.resource": index_name,
        "es.read.metadata": "true",
        "es.mapping.id": mapping_id
    }
    return spark.read.format("org.elasticsearch.spark.sql").options(**es_read_conf).load()

def analyze_market_trends(data):
    # Top gainers
    top_gainers = data.orderBy(col("change_percent").desc()).limit(5)
    # Top losers
    top_losers = data.orderBy(col("change_percent").asc()).limit(5)
    return top_gainers, top_losers

def analyze_stock_time_series(data):
    # Price change percentage analysis
    data = data.withColumn("price_change_percent",
                           (col("price") - col("previous_close")) / col("previous_close") * 100)

    price_analysis = data.groupBy("symbol").agg(
        avg("price_change_percent").alias("average_price_change_percent"),
        stddev("price_change_percent").alias("stddev_price_change_percent")
    )

    # Volume analysis
    volume_analysis = data.groupBy("symbol").agg(
        avg("volume").alias("average_volume"),
        stddev("volume").alias("stddev_volume")
    )

    return price_analysis, volume_analysis

def save_analysis_result(df, file_path):
    # Remove '_metadata' column if present
    if '_metadata' in df.columns:
        df = df.drop('_metadata')
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv(file_path)

if __name__ == "__main__":
    spark = SparkSession.builder.appName("FinanceDataAnalysis").getOrCreate()

    # Market Trends Analysis
    market_trends = load_data_from_elasticsearch(spark, "market_trends")
    top_gainers, top_losers = analyze_market_trends(market_trends)
    save_analysis_result(top_gainers, "/opt/bitnami/spark/data/top_gainers.csv")
    save_analysis_result(top_losers, "/opt/bitnami/spark/data/top_losers.csv")

    # Stock Time Series Analysis
    stock_time_series = load_data_from_elasticsearch(spark, "stock_time_series")
    price_analysis, volume_analysis = analyze_stock_time_series(stock_time_series)
    save_analysis_result(price_analysis, "/opt/bitnami/spark/data/price_analysis.csv")
    save_analysis_result(volume_analysis, "/opt/bitnami/spark/data/volume_analysis.csv")

Key Functionalities:

  • Data Retrieval: The load_data_from_elasticsearch function fetches data from specified Elasticsearch indices.
  • Market Trends Analysis: Identifies the top 5 gainers and losers based on change percentage.
  • Stock Time Series Analysis: Calculates average and standard deviation for price change percentages and trading volumes.
  • Result Storage: Saves analytical results as CSV files, ensuring removal of any metadata columns.

Technology Stack

  • Docker & Docker Compose: Facilitates containerized deployment and management of services.
  • Apache Airflow: Orchestrates workflow scheduling and task dependencies.
  • Apache Spark: Enables large-scale data processing and analysis.
  • Elasticsearch: Provides efficient data storage and search capabilities.
  • Kibana: Assists in visualizing data stored in Elasticsearch.
  • Python: Scripts for data collection, processing, and analysis.

Project Advantages

  • Automated Workflow: Airflow automates the entire data pipeline, ensuring reliable operation without manual intervention.
  • Scalability: Leveraging Spark and Elasticsearch allows the pipeline to handle large volumes of data efficiently.
  • Flexible Deployment: Docker Compose facilitates easy deployment and scaling across various environments.
  • Comprehensive Analysis: Integrates data collection, storage, and analysis into a unified pipeline, ensuring consistent data management and insightful analysis.
  • Modular Code Structure: Separation of functionalities into distinct scripts enhances code readability and maintainability.

Data Collection (fetch_data.py)

This script fetches real-time market trends and specific stock time series data from external financial APIs using the requests library. The collected data is stored as JSON files for subsequent processing.

python
def fetch_market_trends():
    url = "https://real-time-finance-data.p.rapidapi.com/market-trends"
    querystring = {"trend_type":"MARKET_INDEXES","country":"us","language":"en"}
    headers = {
        "x-rapidapi-key": "!!!MY API KEY!!!_Censored",
        "x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
    }
    response = requests.get(url, headers=headers, params=querystring)
    response.raise_for_status()
    return response.json()

Docker Setup (docker-compose.yml)

The Docker Compose file defines and configures all necessary services, ensuring they operate within a unified network and have the necessary dependencies.

services:
  spark:
    image: bitnami/spark:latest
    ports:
      - "8080:8080"
    volumes:
      - ./data:/opt/bitnami/spark/data
    networks:
      - spark-network
  # Other service definitions omitted for brevity

Workflow Definition (spark-jobs.py)

The Airflow DAG automates the execution of data collection, processing, and analysis tasks. Each task is defined independently and executed in sequence to maintain the integrity of the data pipeline.

fetch_data = BashOperator(
    task_id='fetch_data',
    bash_command='python /usr/local/airflow/dags/fetch_data.py',
    dag=dag,
)

process_data = BashOperator(
    task_id='process_data',
    bash_command='python /usr/local/airflow/dags/process_data.py',
    dag=dag,
)

analyze_data = BashOperator(
    task_id='analyze_data',
    bash_command='python /usr/local/airflow/dags/analyze_data.py',
    dag=dag,
)

Conclusion

This project establishes a robust and automated data pipeline that encompasses data collection, processing, storage, and analysis for financial markets. By integrating Docker and Airflow for deployment and orchestration, Spark and Elasticsearch for data handling, and Python for scripting, the pipeline ensures efficient management and insightful analysis of large-scale financial data. This unified system not only enhances data reliability and scalability but also provides actionable insights to support strategic financial decision-making.

Write a comment