Streamlining Data Processing
Project Overview
This project aims to build an automated data pipeline for collecting, processing, storing, and analyzing financial market data. By efficiently managing real-time market trends and stock time series data, the pipeline facilitates the extraction of key analytical insights to support strategic decision-making.
Github Repositories
https://github.com/youngyoony/Spark-ETL-Project
Key Components
- Docker Compose Configuration (
docker-compose.yml
) - Airflow DAG Configuration (
spark-jobs.py
) - Data Collection Script (
fetch_data.py
) - Configuration File (
config.py
) - Data Processing Script (
process_data.py
) - Data Analysis Script (
analyze_data.py
)
Docker Compose Configuration (docker-compose.yml
)
Docker Compose is utilized to containerize various services required for the project, ensuring easy deployment and management. The primary services include:
- Spark: Handles large-scale data processing.
- Elasticsearch: Optimizes data storage and search capabilities.
- Kibana: Provides data visualization for Elasticsearch.
- Airflow: Manages workflow scheduling and orchestration.
version: '3'
x-spark-common: &spark-common
image: bitnami/spark:latest
volumes:
- ./jobs:/opt/bitnami/spark/jobs
- ./data:/opt/bitnami/spark/data
- ./resources:/opt/bitnami/spark/resources
networks:
- default-network
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
env_file:
- airflow.env
volumes:
- ./jobs:/opt/airflow/jobs
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./data:/opt/bitnami/spark/data
- ./resources:/opt/bitnami/spark/resources
depends_on:
- postgres
networks:
- default-network
services:
spark-master:
<<: *spark-common
hostname: spark-master
command: bin/spark-class org.apache.spark.deploy.master.Master
expose:
- "7077"
ports:
- "9090:8080"
- "7077:7077"
- "4444:4040"
spark-worker-1:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
spark-worker-2:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
#Jupyter notebook
jupyter-spark:
image: minz95/de2024:jupyter
#build:
# dockerfile: jupyter/Dockerfile
networks:
- default-network
ports:
- "8888:8888"
- "4040-4080:4040-4080"
volumes:
- ./notebooks:/home/jovyan/work
- ./data:/home/jovyan/data
- ./jobs:/home/jovyan/jobs
- ./resources:/home/jovyan/resources
environment:
- JUPYTER_TOKEN=password
es:
image: docker.elastic.co/elasticsearch/elasticsearch:8.4.3
networks:
- default-network
environment:
- node.name=es
- discovery.type=single-node
- discovery.seed_hosts=es
- xpack.security.enabled=false
- xpack.security.enrollment.enabled=false
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
- cluster.routing.allocation.disk.threshold_enabled=false
- ELASTIC_PASSWORD=password
mem_limit: 1073741824
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es-data:/usr/share/es/data
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:8.4.3
networks:
- default-network
environment:
- SERVERNAME=kibana
- ELASTICSEARCH_HOSTS=http://es:9200
- ELASTICSEARCH_USERNAME=kibana
- ELASTICSEARCH_PASSWORD=password
ports:
- 5601:5601
depends_on:
- es
postgres:
image: postgres:14.0
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
networks:
- default-network
webserver:
<<: *airflow-common
command: webserver
ports:
- "8081:8080"
depends_on:
- scheduler
scheduler:
<<: *airflow-common
command: bash -c "airflow db init && airflow db migrate && airflow users create --username airflow --firstname airflow --lastname airflow --role Admin --email airflow@gmail.com --password airflow && airflow scheduler"
networks:
default-network:
Key Configuration Details:
- Network Setup: All services are connected through a common
spark-network
, allowing seamless communication. - Volume Mounting: The Spark service mounts the local
./data
directory to persist data. - Port Mapping: Each service’s default port is mapped to the host to enable external access.
Airflow DAG Configuration (spark-jobs.py
)
Apache Airflow is employed to define and manage the workflow, scheduling the Spark jobs to run automatically at specified intervals.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 4, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'spark_data_pipeline',
default_args=default_args,
schedule_interval='0 1 * * *',
)
fetch_data = BashOperator(
task_id='fetch_data',
bash_command='python /usr/local/airflow/dags/fetch_data.py',
dag=dag,
)
process_data = BashOperator(
task_id='process_data',
bash_command='python /usr/local/airflow/dags/process_data.py',
dag=dag,
)
analyze_data = BashOperator(
task_id='analyze_data',
bash_command='python /usr/local/airflow/dags/analyze_data.py',
dag=dag,
)
fetch_data >> process_data >> analyze_data
Key Configuration Details:
- Scheduling: The DAG is set to execute daily at 1 AM (
schedule_interval='0 1 * * *'
). - Task Definition: Three primary tasks—
fetch_data
,process_data
, andanalyze_data
—are defined usingBashOperator
to run respective Python scripts. - Task Dependencies: The workflow ensures that data fetching occurs before processing, which in turn precedes analysis.
Data Collection Script (fetch_data.py
)
This script collects real-time financial market trends and stock time series data from external APIs using the requests
library and saves the data as JSON files.
import requests
import json
import os
from config import MARKET_TRENDS_FILE, STOCK_TIME_SERIES_FILE
def fetch_market_trends():
url = "https://real-time-finance-data.p.rapidapi.com/market-trends"
querystring = {"trend_type":"MARKET_INDEXES","country":"us","language":"en"}
headers = {
"x-rapidapi-key": "7c8a2c6a79msh04674dc52dbd72dp125f50jsn20be7524e794",
"x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
}
response = requests.get(url, headers=headers, params=querystring)
response.raise_for_status()
return response.json()
def fetch_stock_time_series():
url = "https://real-time-finance-data.p.rapidapi.com/stock-time-series"
querystring = {"symbol":"AAPL:NASDAQ","period":"1D","language":"en"}
headers = {
"x-rapidapi-key": "7c8a2c6a79msh04674dc52dbd72dp125f50jsn20be7524e794",
"x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
}
response = requests.get(url, headers=headers, params=querystring)
response.raise_for_status()
return response.json()
def save_data_to_file(data, filename):
os.makedirs(os.path.dirname(filename), exist_ok=True) # Create directory if it doesn't exist
with open(filename, 'w') as f:
json.dump(data, f)
if __name__ == "__main__":
market_trends = fetch_market_trends()
save_data_to_file(market_trends["data"]["trends"], '/opt/bitnami/spark/data/market_trends.json')
stock_time_series = fetch_stock_time_series()
save_data_to_file(stock_time_series["data"], '/opt/bitnami/spark/data/stock_time_series.json')
Key Functionalities:
- Data Retrieval: Functions
fetch_market_trends
andfetch_stock_time_series
fetch market trends and stock time series data respectively from the API. - Data Storage: The
save_data_to_file
function saves the fetched data into specified JSON files, ensuring the necessary directories are created.
Configuration File (config.py
)
Centralizes configuration settings to enhance code reusability and maintainability, including Elasticsearch connection details and file paths.
# Elasticsearch
ES_HOST = "es"
ES_PORT = 9200
# File paths
MARKET_TRENDS_FILE = "jobs/data/market_trends.json"
STOCK_TIME_SERIES_FILE = "jobs/data/stock_time_series.json"
ANALYSIS_MARKET_TRENDS_FILE = "jobs/data/analysis_market_trends.csv"
ANALYSIS_STOCK_TIME_SERIES_FILE = "jobs/data/analysis_stock_time_series.csv"
Key Configuration Items:
- Elasticsearch Connection: Defines
ES_HOST
andES_PORT
for connecting to the Elasticsearch cluster. - File Paths: Specifies paths for storing collected data and analysis results, ensuring consistent reference across scripts.
Data Processing Script (process_data.py
)
Utilizes Apache Spark to preprocess the collected data and store the processed results in Elasticsearch.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, monotonically_increasing_id, row_number
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.window import Window
from config import ES_HOST, ES_PORT
def load_data(spark, file_path):
return spark.read.json(file_path)
def process_market_trends(data):
processed_data = data.select(
col("symbol"),
col("name"),
col("price"),
col("change"),
col("change_percent")
)
return processed_data
def process_stock_time_series(data):
# Create additional column for price change percentage and analyze volume
spark = data.sparkSession
data = data.withColumn("price_change_percent",
(col("price") - col("previous_close")) / col("previous_close") * 100)
array_df = data.selectExpr("array(time_series.*) as arr", "*") \
.drop("time_series")
flatten_df = array_df.select(
explode("arr").alias("ts"),
col("*")
).drop("arr")
schema1 = StructType([
StructField("col", StringType(), True)
])
cols = [(c,) for c in data.select('time_series.*').columns]
column_df = spark.createDataFrame(cols, schema1)
ws = Window.orderBy("index")
df1_with_index = column_df.withColumn("index", monotonically_increasing_id()) \
.withColumn("idx", row_number().over(ws)) \
.drop("index")
df2_with_index = flatten_df.withColumn("index", monotonically_increasing_id()) \
.withColumn("idx", row_number().over(ws)) \
.drop("index")
combined_df = df1_with_index.join(df2_with_index, "idx").drop("idx")
processed_data = combined_df.select(
col("symbol"),
col("ts.price").alias("price"),
col("previous_close"),
col("ts.change").alias("change"),
col("ts.change_percent").alias("change_percent"),
col("price_change_percent"),
col("ts.volume").alias("volume"),
col("col").alias("time_series"),
)
return processed_data
def save_to_elasticsearch(df, index_name, mapping_id="symbol"):
es_write_conf = {
"es.nodes": ES_HOST,
"es.port": str(ES_PORT),
"es.resource": index_name,
"es.mapping.id": mapping_id
}
df.write.format("org.elasticsearch.spark.sql").options(**es_write_conf).mode("overwrite").save()
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("FinanceDataProcessing") \
.getOrCreate()
print("=========== Processing (Market trends) ===================")
market_trends = load_data(spark, "/opt/bitnami/spark/data/market_trends.json")
processed_market_trends = process_market_trends(market_trends)
save_to_elasticsearch(processed_market_trends, "market_trends")
print("=========== Processing (stock time series) ===================")
stock_time_series = load_data(spark, "/opt/bitnami/spark/data/stock_time_series.json")
processed_stock_time_series = process_stock_time_series(stock_time_series)
save_to_elasticsearch(processed_stock_time_series, "stock_time_series", mapping_id="time_series")
Key Functionalities:
- Data Loading: The
load_data
function reads JSON files using Spark. - Data Preprocessing:
- Market Trends: Selects relevant columns such as symbol, name, price, change, and change percentage.
- Stock Time Series: Calculates price change percentage, explodes nested time series data, and restructures the DataFrame for analysis.
- Data Storage: The
save_to_elasticsearch
function writes the processed data to Elasticsearch indices, specifying a unique mapping ID for consistency.
Data Analysis Script (analyze_data.py
)
This script retrieves data from Elasticsearch, performs analytical operations, and saves the results as CSV files.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev
from config import ES_HOST, ES_PORT
def load_data_from_elasticsearch(spark, index_name, mapping_id="symbol"):
es_read_conf = {
"es.nodes": ES_HOST,
"es.port": str(ES_PORT),
"es.resource": index_name,
"es.read.metadata": "true",
"es.mapping.id": mapping_id
}
return spark.read.format("org.elasticsearch.spark.sql").options(**es_read_conf).load()
def analyze_market_trends(data):
# Top gainers
top_gainers = data.orderBy(col("change_percent").desc()).limit(5)
# Top losers
top_losers = data.orderBy(col("change_percent").asc()).limit(5)
return top_gainers, top_losers
def analyze_stock_time_series(data):
# Price change percentage analysis
data = data.withColumn("price_change_percent",
(col("price") - col("previous_close")) / col("previous_close") * 100)
price_analysis = data.groupBy("symbol").agg(
avg("price_change_percent").alias("average_price_change_percent"),
stddev("price_change_percent").alias("stddev_price_change_percent")
)
# Volume analysis
volume_analysis = data.groupBy("symbol").agg(
avg("volume").alias("average_volume"),
stddev("volume").alias("stddev_volume")
)
return price_analysis, volume_analysis
def save_analysis_result(df, file_path):
# Remove '_metadata' column if present
if '_metadata' in df.columns:
df = df.drop('_metadata')
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(file_path)
if __name__ == "__main__":
spark = SparkSession.builder.appName("FinanceDataAnalysis").getOrCreate()
# Market Trends Analysis
market_trends = load_data_from_elasticsearch(spark, "market_trends")
top_gainers, top_losers = analyze_market_trends(market_trends)
save_analysis_result(top_gainers, "/opt/bitnami/spark/data/top_gainers.csv")
save_analysis_result(top_losers, "/opt/bitnami/spark/data/top_losers.csv")
# Stock Time Series Analysis
stock_time_series = load_data_from_elasticsearch(spark, "stock_time_series")
price_analysis, volume_analysis = analyze_stock_time_series(stock_time_series)
save_analysis_result(price_analysis, "/opt/bitnami/spark/data/price_analysis.csv")
save_analysis_result(volume_analysis, "/opt/bitnami/spark/data/volume_analysis.csv")
Key Functionalities:
- Data Retrieval: The
load_data_from_elasticsearch
function fetches data from specified Elasticsearch indices. - Market Trends Analysis: Identifies the top 5 gainers and losers based on change percentage.
- Stock Time Series Analysis: Calculates average and standard deviation for price change percentages and trading volumes.
- Result Storage: Saves analytical results as CSV files, ensuring removal of any metadata columns.
Technology Stack
- Docker & Docker Compose: Facilitates containerized deployment and management of services.
- Apache Airflow: Orchestrates workflow scheduling and task dependencies.
- Apache Spark: Enables large-scale data processing and analysis.
- Elasticsearch: Provides efficient data storage and search capabilities.
- Kibana: Assists in visualizing data stored in Elasticsearch.
- Python: Scripts for data collection, processing, and analysis.
Project Advantages
- Automated Workflow: Airflow automates the entire data pipeline, ensuring reliable operation without manual intervention.
- Scalability: Leveraging Spark and Elasticsearch allows the pipeline to handle large volumes of data efficiently.
- Flexible Deployment: Docker Compose facilitates easy deployment and scaling across various environments.
- Comprehensive Analysis: Integrates data collection, storage, and analysis into a unified pipeline, ensuring consistent data management and insightful analysis.
- Modular Code Structure: Separation of functionalities into distinct scripts enhances code readability and maintainability.
Data Collection (fetch_data.py
)
This script fetches real-time market trends and specific stock time series data from external financial APIs using the requests
library. The collected data is stored as JSON files for subsequent processing.
python
def fetch_market_trends():
url = "https://real-time-finance-data.p.rapidapi.com/market-trends"
querystring = {"trend_type":"MARKET_INDEXES","country":"us","language":"en"}
headers = {
"x-rapidapi-key": "!!!MY API KEY!!!_Censored",
"x-rapidapi-host": "real-time-finance-data.p.rapidapi.com"
}
response = requests.get(url, headers=headers, params=querystring)
response.raise_for_status()
return response.json()
Docker Setup (docker-compose.yml
)
The Docker Compose file defines and configures all necessary services, ensuring they operate within a unified network and have the necessary dependencies.
services:
spark:
image: bitnami/spark:latest
ports:
- "8080:8080"
volumes:
- ./data:/opt/bitnami/spark/data
networks:
- spark-network
# Other service definitions omitted for brevity
Workflow Definition (spark-jobs.py
)
The Airflow DAG automates the execution of data collection, processing, and analysis tasks. Each task is defined independently and executed in sequence to maintain the integrity of the data pipeline.
fetch_data = BashOperator(
task_id='fetch_data',
bash_command='python /usr/local/airflow/dags/fetch_data.py',
dag=dag,
)
process_data = BashOperator(
task_id='process_data',
bash_command='python /usr/local/airflow/dags/process_data.py',
dag=dag,
)
analyze_data = BashOperator(
task_id='analyze_data',
bash_command='python /usr/local/airflow/dags/analyze_data.py',
dag=dag,
)
Conclusion
This project establishes a robust and automated data pipeline that encompasses data collection, processing, storage, and analysis for financial markets. By integrating Docker and Airflow for deployment and orchestration, Spark and Elasticsearch for data handling, and Python for scripting, the pipeline ensures efficient management and insightful analysis of large-scale financial data. This unified system not only enhances data reliability and scalability but also provides actionable insights to support strategic financial decision-making.