Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions SOLUTION_OVERVIEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Solution Implementation Overview

**Project:** Stock Analysis Data Engineering Solution

## Solution Architecture

### **Core Components Created:**

#### **1. Data Processing Layer**
- **`StockAnalyzer` Class** (`scripts/stock_analyzer_job.py`)
- Main processing engine for financial calculations
- Handles distributed data processing using PySpark
- Implements all 4 business objectives

#### **2. Infrastructure as Code**
- **CloudFormation Template** (`stack.yml`)
- Complete infrastructure definition
- Glue jobs, S3 buckets, IAM roles, databases, tables

#### **3. Testing & Validation**
- **`LocalStockAnalyzer` Class** (`test_local.py`)
- Local testing framework
- Validates business logic before cloud deployment

## Business Objectives Implementation

### **Objective 1: Average Daily Returns**
```python
def objective1_average_daily_return(self, df: DataFrame) -> DataFrame:
# Formula: ((current_price - previous_price) / previous_price) * 100
# Self-join to get previous day's price, then average across all stocks
```
**Purpose:** Market trend analysis for portfolio managers

### **Objective 2: Highest Worth Stock**
```python
def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame:
# Formula: closing_price * volume
# Identifies most liquid stock by dollar volume
```
**Purpose:** Liquidity analysis for trading strategies

### **Objective 3: Most Volatile Stock**
```python
def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame:
# Formula: STDDEV(daily_returns) * SQRT(252)
# Annualized volatility for risk assessment
```
**Purpose:** Risk management and portfolio optimization

### **Objective 4: Top 30-Day Returns**
```python
def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame:
# Formula: ((current_price - price_30_days_ago) / price_30_days_ago) * 100
# Best performing ticker-date combinations
```
**Purpose:** Investment opportunity identification

## Key Classes and Functions

### **StockAnalyzer Class (Main Processing Engine)**
```python
class StockAnalyzer:
def __init__(self, glue_context: GlueContext, bucket_name: str)
def load_stock_data(self, input_path: str) -> DataFrame
def objective1_average_daily_return(self, df: DataFrame) -> DataFrame
def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame
def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame
def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame
def run_all_objectives(self, df: DataFrame) -> dict
def save_results_to_s3(self, results: dict)
```

**Key Features:**
- **Distributed Processing:** Leverages PySpark for scalability
- **SQL-Based Logic:** Uses optimized SQL queries with self-joins
- **Error Handling:** Comprehensive logging and exception management
- **Modular Design:** Each objective is a separate, testable method
6 changes: 4 additions & 2 deletions create-update-stack.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ then
echo "Creating a new stack: $stack"
aws cloudformation create-stack --stack-name "$stack" \
--capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
--template-body file://"$stack_yml"
--template-body file://"$stack_yml" \
--parameters ParameterKey=StackName,ParameterValue="$STACK_NAME"

# Wait for the stack creation to complete
echo "Waiting for stack creation to complete: $stack"
Expand All @@ -54,7 +55,8 @@ else
echo "Updating the stack: $stack"
aws cloudformation update-stack --stack-name "$stack" \
--capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \
--template-body file://"$stack_yml"
--template-body file://"$stack_yml" \
--parameters ParameterKey=StackName,ParameterValue="$STACK_NAME"

# Wait for the stack update to complete
echo "Waiting for stack update to complete: $stack"
Expand Down
163 changes: 163 additions & 0 deletions infra/deploy_infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Infrastructure deployment script for stock analysis project.
This script handles file uploads that are not managed by CloudFormation.
"""

import os
import boto3
from dotenv import load_dotenv
import logging

class InfrastructureDeployer:
def __init__(self):
# Load environment variables from .env file
load_dotenv()

self.stack_name = os.getenv('STACK_NAME')
self.region = os.getenv('AWS_DEFAULT_REGION', 'eu-central-1')

if not self.stack_name:
raise ValueError("STACK_NAME not found in .env file")

self.bucket_name = f'data-engineer-assignment-{self.stack_name.lower()}'

# Initialize AWS clients
self.s3_client = boto3.client(
's3',
region_name=self.region,
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)

self.cloudformation = boto3.client(
'cloudformation',
region_name=self.region,
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)

self.logger = self._setup_logger()

def _setup_logger(self) -> logging.Logger:
logging.basicConfig(level=logging.INFO)
return logging.getLogger(__name__)

def wait_for_stack_completion(self):
"""Wait for CloudFormation stack to be ready"""
try:
self.logger.info(f"Waiting for stack {self.stack_name} to be ready...")

# Check if stack exists and is in a complete state
waiter = self.cloudformation.get_waiter('stack_create_complete')
try:
waiter.wait(StackName=self.stack_name)
self.logger.info("Stack creation completed")
except Exception:
# If create waiter fails, try update waiter
waiter = self.cloudformation.get_waiter('stack_update_complete')
waiter.wait(StackName=self.stack_name)
self.logger.info("Stack update completed")

except Exception as e:
self.logger.error(f"Error waiting for stack: {str(e)}")
raise

def upload_glue_script(self):
"""Upload the Glue job script to S3"""
local_file = "scripts/stock_analyzer_job.py"
s3_key = "scripts/stock_analyzer_job.py"

try:
if os.path.exists(local_file):
self.logger.info(f"Uploading {local_file} to s3://{self.bucket_name}/{s3_key}")
self.s3_client.upload_file(local_file, self.bucket_name, s3_key)
self.logger.info("Glue script uploaded successfully")
else:
self.logger.error(f"Glue script {local_file} not found")
raise FileNotFoundError(f"Glue script {local_file} not found")

except Exception as e:
self.logger.error(f"Error uploading Glue script: {str(e)}")
raise

def upload_stock_data(self):
"""Upload the stock data CSV file to S3"""
local_file = "stocks_data.csv"
s3_key = "input/stocks_data.csv"

try:
if os.path.exists(local_file):
self.logger.info(f"Uploading {local_file} to s3://{self.bucket_name}/{s3_key}")
self.s3_client.upload_file(local_file, self.bucket_name, s3_key)
self.logger.info("Stock data uploaded successfully")
else:
self.logger.error(f"Stock data file {local_file} not found")
raise FileNotFoundError(f"Stock data file {local_file} not found")

except Exception as e:
self.logger.error(f"Error uploading stock data: {str(e)}")
raise

def get_stack_outputs(self):
"""Get CloudFormation stack outputs"""
try:
response = self.cloudformation.describe_stacks(StackName=self.stack_name)
stack = response['Stacks'][0]

outputs = {}
if 'Outputs' in stack:
for output in stack['Outputs']:
outputs[output['OutputKey']] = output['OutputValue']

return outputs

except Exception as e:
self.logger.error(f"Error getting stack outputs: {str(e)}")
raise

def deploy_post_cloudformation_resources(self):
"""Deploy resources that need to be created after CloudFormation stack"""
self.logger.info("=== Post-CloudFormation Deployment ===")

# Wait for stack to be ready
self.wait_for_stack_completion()

# Upload files to S3
self.upload_glue_script()
self.upload_stock_data()

# Get stack outputs
outputs = self.get_stack_outputs()

self.logger.info("=== Deployment Summary ===")
for key, value in outputs.items():
self.logger.info(f"{key}: {value}")

self.logger.info(f"S3 Bucket: {self.bucket_name}")
self.logger.info("Files uploaded successfully!")

self.logger.info("=== Next Steps ===")
self.logger.info(f"1. Go to AWS Glue Console")
self.logger.info(f"2. Find job: {outputs.get('GlueJobName', 'N/A')}")
self.logger.info(f"3. Run the job to process the data")
self.logger.info(f"4. Query results in Athena using database: {outputs.get('GlueDatabaseName', 'N/A')}")

def main():
"""Main deployment function"""
print("=== Infrastructure Post-Deployment Script ===")

try:
deployer = InfrastructureDeployer()
deployer.deploy_post_cloudformation_resources()

except Exception as e:
print(f"Deployment failed: {str(e)}")
print("\nMake sure:")
print("1. Your .env file is configured correctly")
print("2. CloudFormation stack has been deployed using create-update-stack.sh")
print("3. AWS credentials are valid")
exit(1)

if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pyspark==3.5.0
py4j==0.10.9.7
findspark==2.0.1
boto3==1.34.0
botocore==1.34.0
python-dotenv==1.0.0
setuptools==70.0.0
Loading