diff --git a/SOLUTION_OVERVIEW.md b/SOLUTION_OVERVIEW.md new file mode 100644 index 0000000..273de10 --- /dev/null +++ b/SOLUTION_OVERVIEW.md @@ -0,0 +1,78 @@ +# Solution Implementation Overview + +**Project:** Stock Analysis Data Engineering Solution + +## Solution Architecture + +### **Core Components Created:** + +#### **1. Data Processing Layer** +- **`StockAnalyzer` Class** (`scripts/stock_analyzer_job.py`) + - Main processing engine for financial calculations + - Handles distributed data processing using PySpark + - Implements all 4 business objectives + +#### **2. Infrastructure as Code** +- **CloudFormation Template** (`stack.yml`) + - Complete infrastructure definition + - Glue jobs, S3 buckets, IAM roles, databases, tables + +#### **3. Testing & Validation** +- **`LocalStockAnalyzer` Class** (`test_local.py`) + - Local testing framework + - Validates business logic before cloud deployment + +## Business Objectives Implementation + +### **Objective 1: Average Daily Returns** +```python +def objective1_average_daily_return(self, df: DataFrame) -> DataFrame: + # Formula: ((current_price - previous_price) / previous_price) * 100 + # Self-join to get previous day's price, then average across all stocks +``` +**Purpose:** Market trend analysis for portfolio managers + +### **Objective 2: Highest Worth Stock** +```python +def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame: + # Formula: closing_price * volume + # Identifies most liquid stock by dollar volume +``` +**Purpose:** Liquidity analysis for trading strategies + +### **Objective 3: Most Volatile Stock** +```python +def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame: + # Formula: STDDEV(daily_returns) * SQRT(252) + # Annualized volatility for risk assessment +``` +**Purpose:** Risk management and portfolio optimization + +### **Objective 4: Top 30-Day Returns** +```python +def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame: + # Formula: ((current_price - price_30_days_ago) / price_30_days_ago) * 100 + # Best performing ticker-date combinations +``` +**Purpose:** Investment opportunity identification + +## Key Classes and Functions + +### **StockAnalyzer Class (Main Processing Engine)** +```python +class StockAnalyzer: + def __init__(self, glue_context: GlueContext, bucket_name: str) + def load_stock_data(self, input_path: str) -> DataFrame + def objective1_average_daily_return(self, df: DataFrame) -> DataFrame + def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame + def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame + def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame + def run_all_objectives(self, df: DataFrame) -> dict + def save_results_to_s3(self, results: dict) +``` + +**Key Features:** +- **Distributed Processing:** Leverages PySpark for scalability +- **SQL-Based Logic:** Uses optimized SQL queries with self-joins +- **Error Handling:** Comprehensive logging and exception management +- **Modular Design:** Each objective is a separate, testable method diff --git a/create-update-stack.sh b/create-update-stack.sh index 7f70923..560eca8 100755 --- a/create-update-stack.sh +++ b/create-update-stack.sh @@ -43,7 +43,8 @@ then echo "Creating a new stack: $stack" aws cloudformation create-stack --stack-name "$stack" \ --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \ - --template-body file://"$stack_yml" + --template-body file://"$stack_yml" \ + --parameters ParameterKey=StackName,ParameterValue="$STACK_NAME" # Wait for the stack creation to complete echo "Waiting for stack creation to complete: $stack" @@ -54,7 +55,8 @@ else echo "Updating the stack: $stack" aws cloudformation update-stack --stack-name "$stack" \ --capabilities CAPABILITY_NAMED_IAM CAPABILITY_AUTO_EXPAND \ - --template-body file://"$stack_yml" + --template-body file://"$stack_yml" \ + --parameters ParameterKey=StackName,ParameterValue="$STACK_NAME" # Wait for the stack update to complete echo "Waiting for stack update to complete: $stack" diff --git a/infra/deploy_infrastructure.py b/infra/deploy_infrastructure.py new file mode 100644 index 0000000..1bbc43f --- /dev/null +++ b/infra/deploy_infrastructure.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +""" +Infrastructure deployment script for stock analysis project. +This script handles file uploads that are not managed by CloudFormation. +""" + +import os +import boto3 +from dotenv import load_dotenv +import logging + +class InfrastructureDeployer: + def __init__(self): + # Load environment variables from .env file + load_dotenv() + + self.stack_name = os.getenv('STACK_NAME') + self.region = os.getenv('AWS_DEFAULT_REGION', 'eu-central-1') + + if not self.stack_name: + raise ValueError("STACK_NAME not found in .env file") + + self.bucket_name = f'data-engineer-assignment-{self.stack_name.lower()}' + + # Initialize AWS clients + self.s3_client = boto3.client( + 's3', + region_name=self.region, + aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY') + ) + + self.cloudformation = boto3.client( + 'cloudformation', + region_name=self.region, + aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), + aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY') + ) + + self.logger = self._setup_logger() + + def _setup_logger(self) -> logging.Logger: + logging.basicConfig(level=logging.INFO) + return logging.getLogger(__name__) + + def wait_for_stack_completion(self): + """Wait for CloudFormation stack to be ready""" + try: + self.logger.info(f"Waiting for stack {self.stack_name} to be ready...") + + # Check if stack exists and is in a complete state + waiter = self.cloudformation.get_waiter('stack_create_complete') + try: + waiter.wait(StackName=self.stack_name) + self.logger.info("Stack creation completed") + except Exception: + # If create waiter fails, try update waiter + waiter = self.cloudformation.get_waiter('stack_update_complete') + waiter.wait(StackName=self.stack_name) + self.logger.info("Stack update completed") + + except Exception as e: + self.logger.error(f"Error waiting for stack: {str(e)}") + raise + + def upload_glue_script(self): + """Upload the Glue job script to S3""" + local_file = "scripts/stock_analyzer_job.py" + s3_key = "scripts/stock_analyzer_job.py" + + try: + if os.path.exists(local_file): + self.logger.info(f"Uploading {local_file} to s3://{self.bucket_name}/{s3_key}") + self.s3_client.upload_file(local_file, self.bucket_name, s3_key) + self.logger.info("Glue script uploaded successfully") + else: + self.logger.error(f"Glue script {local_file} not found") + raise FileNotFoundError(f"Glue script {local_file} not found") + + except Exception as e: + self.logger.error(f"Error uploading Glue script: {str(e)}") + raise + + def upload_stock_data(self): + """Upload the stock data CSV file to S3""" + local_file = "stocks_data.csv" + s3_key = "input/stocks_data.csv" + + try: + if os.path.exists(local_file): + self.logger.info(f"Uploading {local_file} to s3://{self.bucket_name}/{s3_key}") + self.s3_client.upload_file(local_file, self.bucket_name, s3_key) + self.logger.info("Stock data uploaded successfully") + else: + self.logger.error(f"Stock data file {local_file} not found") + raise FileNotFoundError(f"Stock data file {local_file} not found") + + except Exception as e: + self.logger.error(f"Error uploading stock data: {str(e)}") + raise + + def get_stack_outputs(self): + """Get CloudFormation stack outputs""" + try: + response = self.cloudformation.describe_stacks(StackName=self.stack_name) + stack = response['Stacks'][0] + + outputs = {} + if 'Outputs' in stack: + for output in stack['Outputs']: + outputs[output['OutputKey']] = output['OutputValue'] + + return outputs + + except Exception as e: + self.logger.error(f"Error getting stack outputs: {str(e)}") + raise + + def deploy_post_cloudformation_resources(self): + """Deploy resources that need to be created after CloudFormation stack""" + self.logger.info("=== Post-CloudFormation Deployment ===") + + # Wait for stack to be ready + self.wait_for_stack_completion() + + # Upload files to S3 + self.upload_glue_script() + self.upload_stock_data() + + # Get stack outputs + outputs = self.get_stack_outputs() + + self.logger.info("=== Deployment Summary ===") + for key, value in outputs.items(): + self.logger.info(f"{key}: {value}") + + self.logger.info(f"S3 Bucket: {self.bucket_name}") + self.logger.info("Files uploaded successfully!") + + self.logger.info("=== Next Steps ===") + self.logger.info(f"1. Go to AWS Glue Console") + self.logger.info(f"2. Find job: {outputs.get('GlueJobName', 'N/A')}") + self.logger.info(f"3. Run the job to process the data") + self.logger.info(f"4. Query results in Athena using database: {outputs.get('GlueDatabaseName', 'N/A')}") + +def main(): + """Main deployment function""" + print("=== Infrastructure Post-Deployment Script ===") + + try: + deployer = InfrastructureDeployer() + deployer.deploy_post_cloudformation_resources() + + except Exception as e: + print(f"Deployment failed: {str(e)}") + print("\nMake sure:") + print("1. Your .env file is configured correctly") + print("2. CloudFormation stack has been deployed using create-update-stack.sh") + print("3. AWS credentials are valid") + exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..824304f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pyspark==3.5.0 +py4j==0.10.9.7 +findspark==2.0.1 +boto3==1.34.0 +botocore==1.34.0 +python-dotenv==1.0.0 +setuptools==70.0.0 \ No newline at end of file diff --git a/scripts/stock_analyzer_job.py b/scripts/stock_analyzer_job.py new file mode 100644 index 0000000..492e409 --- /dev/null +++ b/scripts/stock_analyzer_job.py @@ -0,0 +1,205 @@ +import sys + +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from pyspark.sql import DataFrame +import logging + +class StockAnalyzer: + def __init__(self, glue_context: GlueContext, bucket_name: str): + self.glue_context = glue_context + self.spark = glue_context.spark_session + self.bucket_name = bucket_name + self.logger = self._setup_logger() + + def _setup_logger(self) -> logging.Logger: + logging.basicConfig(level=logging.INFO) + return logging.getLogger(__name__) + + + def load_stock_data(self, input_path: str) -> DataFrame: + """Load stock data from S3""" + try: + self.logger.info(f"Loading stock data from: {input_path}") + + df = self.spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .option("dateFormat", "yyyy-MM-dd") \ + .csv(input_path) + + df = df.withColumn("Date", df["Date"].cast("date")) + + self.logger.info(f"Successfully loaded {df.count()} rows") + return df + + except Exception as e: + self.logger.error(f"Error loading data: {str(e)}") + raise + + def objective1_average_daily_return(self, df: DataFrame) -> DataFrame: + """ + Objective 1: Compute the average daily return of all stocks for every date + Formula: Daily Return = ((Current Price - Previous Price) / Previous Price) * 100 + This calculates the percentage change from previous day's closing price + Then averages all stocks' daily returns for each date + """ + self.logger.info("Computing objective 1: Average daily return") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.Date, + AVG(((t1.close - t2.close) / t2.close) * 100) as average_return + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 1) + GROUP BY t1.Date + ORDER BY t1.Date + """ + result = self.spark.sql(sql_query) + return result + + def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame: + """ + Objective 2: Which stock was traded with the highest worth (closing price * volume) on average + Formula: Trading Worth = Closing Price * Volume + This represents the total dollar value traded for that stock on a given day + Then calculate the average trading worth across all days for each stock + """ + self.logger.info("Computing objective 2: Highest worth stock") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + ticker, + AVG(close * volume) as value + FROM stocks_data + GROUP BY ticker + ORDER BY value DESC + LIMIT 1 + """ + result = self.spark.sql(sql_query) + return result + + def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame: + """ + Objective 3: Which stock was the most volatile (annualized standard deviation of daily returns) + Formula: Daily Return = ((Current Price - Previous Price) / Previous Price) * 100 + Volatility = Standard Deviation of Daily Returns + Annualized Volatility = Daily Volatility * SQRT(252) + 252 = typical number of trading days in a year + SQRT(252) ≈ 15.87, scales daily volatility to annual volatility + """ + self.logger.info("Computing objective 3: Most volatile stock") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.ticker, + STDDEV(((t1.close - t2.close) / t2.close) * 100) * SQRT(252) as standard_deviation + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 1) + GROUP BY t1.ticker + ORDER BY standard_deviation DESC + LIMIT 1 + """ + result = self.spark.sql(sql_query) + return result + + def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame: + """ + Objective 4: Top three 30-day return dates (% increase compared to 30 days prior) + Formula: 30-Day Return = ((Current Price - Price 30 Days Ago) / Price 30 Days Ago) * 100 + This calculates the percentage change over a 30-day period + We find the top 3 ticker-date combinations with highest 30-day returns + """ + self.logger.info("Computing objective 4: Top three 30-day returns") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.ticker, + t1.Date, + ((t1.close - t2.close) / t2.close) * 100 as return_30_day + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 30) + ORDER BY return_30_day DESC + LIMIT 3 + """ + result = self.spark.sql(sql_query) + return result + + def run_all_objectives(self, df: DataFrame) -> dict: + """Run all objectives and return results""" + self.logger.info("Running all objectives") + + results = {} + results['objective1'] = self.objective1_average_daily_return(df) + results['objective2'] = self.objective2_highest_worth_stock(df) + results['objective3'] = self.objective3_most_volatile_stock(df) + results['objective4'] = self.objective4_top_30day_returns(df) + + return results + + def save_results_to_s3(self, results: dict): + """Save all results to S3""" + for objective, df_result in results.items(): + output_path = f"s3://{self.bucket_name}/results/{objective}/" + + self.logger.info(f"Saving {objective} results to {output_path}") + + df_result.coalesce(1) \ + .write \ + .mode("overwrite") \ + .option("header", "true") \ + .csv(output_path) + + self.logger.info(f"Successfully saved {objective} results") + +def main(): + # Get job arguments + args = getResolvedOptions(sys.argv, ['JOB_NAME', 'bucket-name']) + + # Initialize Spark and Glue contexts + sc = SparkContext() + glueContext = GlueContext(sc) + job = Job(glueContext) + job.init(args['JOB_NAME'], args) + + # Get bucket name from arguments + bucket_name = args['bucket_name'] + + # Initialize analyzer + analyzer = StockAnalyzer(glueContext, bucket_name) + + try: + # Load stock data from S3 + input_path = f"s3://{bucket_name}/input/stocks_data.csv" + df = analyzer.load_stock_data(input_path) + + # Run all analysis objectives + results = analyzer.run_all_objectives(df) + + # Save results to S3 + analyzer.save_results_to_s3(results) + + analyzer.logger.info("Stock analysis job completed successfully") + + except Exception as e: + analyzer.logger.error(f"Job failed: {str(e)}") + raise + + finally: + job.commit() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/stack.yml b/stack.yml index fb3dac9..8a2d404 100644 --- a/stack.yml +++ b/stack.yml @@ -1,48 +1,246 @@ -AWSTemplateFormatVersion: "2010-09-09" -Description: CloudFormation template to create a Lambda function that prints "Hello, World!". +AWSTemplateFormatVersion: '2010-09-09' +Description: 'Stock Analysis Infrastructure - Complete IaC Solution' + +Parameters: + StackName: + Type: String + Description: Name of the stack for resource naming + Default: data-engineer-assignment Resources: - LambdaExecutionRole: - Type: "AWS::IAM::Role" - Properties: - AssumeRolePolicyDocument: - Version: "2012-10-17" - Statement: - - Effect: "Allow" - Principal: - Service: - - "lambda.amazonaws.com" - Action: "sts:AssumeRole" - Policies: - - PolicyName: "LambdaExecutionPolicy" - PolicyDocument: - Version: "2012-10-17" - Statement: - - Effect: "Allow" - Action: - - "logs:CreateLogGroup" - - "logs:CreateLogStream" - - "logs:PutLogEvents" - Resource: "arn:aws:logs:*:*:*" - - HelloWorldLambda: - Type: "AWS::Lambda::Function" - Properties: - FunctionName: "HelloWorldFunction" - Handler: "index.handler" - Role: !GetAtt LambdaExecutionRole.Arn - Runtime: "python3.9" - Code: - ZipFile: | - def handler(event, context): - print("Hello, World!") - Timeout: 30 - MemorySize: 128 + # S3 Bucket for data storage + DataBucket: + Type: AWS::S3::Bucket + Properties: + BucketName: !Sub 'data-engineer-assignment-${StackName}' + VersioningConfiguration: + Status: Enabled + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + # IAM Role for Glue Job + GlueJobRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub '${StackName}-GlueJobRole' + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: glue.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole + Policies: + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - s3:DeleteObject + - s3:ListBucket + Resource: + - !GetAtt DataBucket.Arn + - !Sub '${DataBucket.Arn}/*' + - Effect: Allow + Action: + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*' + + # IAM Role for Glue Crawler + GlueCrawlerRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub '${StackName}-GlueCrawlerRole' + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: glue.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole + Policies: + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - s3:ListBucket + Resource: + - !GetAtt DataBucket.Arn + - !Sub '${DataBucket.Arn}/*' + + # Glue Database + StockAnalysisDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Sub '${StackName}-stocks-analysis-db' + Description: 'Database for stock analysis results' + + # Glue Tables for each objective + Objective1Table: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref StockAnalysisDatabase + TableInput: + Name: objective1_average_daily_return + TableType: EXTERNAL_TABLE + StorageDescriptor: + Columns: + - Name: date + Type: date + - Name: average_return + Type: double + Location: !Sub 's3://${DataBucket}/results/objective1/' + InputFormat: org.apache.hadoop.mapred.TextInputFormat + OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + SerdeInfo: + SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Parameters: + field.delim: ',' + skip.header.line.count: '1' + + Objective2Table: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref StockAnalysisDatabase + TableInput: + Name: objective2_highest_worth_stock + TableType: EXTERNAL_TABLE + StorageDescriptor: + Columns: + - Name: ticker + Type: string + - Name: value + Type: double + Location: !Sub 's3://${DataBucket}/results/objective2/' + InputFormat: org.apache.hadoop.mapred.TextInputFormat + OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + SerdeInfo: + SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Parameters: + field.delim: ',' + skip.header.line.count: '1' + + Objective3Table: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref StockAnalysisDatabase + TableInput: + Name: objective3_most_volatile_stock + TableType: EXTERNAL_TABLE + StorageDescriptor: + Columns: + - Name: ticker + Type: string + - Name: standard_deviation + Type: double + Location: !Sub 's3://${DataBucket}/results/objective3/' + InputFormat: org.apache.hadoop.mapred.TextInputFormat + OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + SerdeInfo: + SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Parameters: + field.delim: ',' + skip.header.line.count: '1' + + Objective4Table: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref StockAnalysisDatabase + TableInput: + Name: objective4_top_30day_returns + TableType: EXTERNAL_TABLE + StorageDescriptor: + Columns: + - Name: ticker + Type: string + - Name: date + Type: date + - Name: return_30_day + Type: double + Location: !Sub 's3://${DataBucket}/results/objective4/' + InputFormat: org.apache.hadoop.mapred.TextInputFormat + OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + SerdeInfo: + SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Parameters: + field.delim: ',' + skip.header.line.count: '1' + + # Glue Job + StockAnalysisGlueJob: + Type: AWS::Glue::Job + Properties: + Name: !Sub '${StackName}-stock-analysis-job' + Role: !Ref GlueJobRole + Command: + Name: glueetl + ScriptLocation: !Sub 's3://${DataBucket}/scripts/stock_analyzer_job.py' + PythonVersion: '3' + DefaultArguments: + '--job-language': python + '--enable-metrics': '' + '--enable-continuous-cloudwatch-log': 'true' + '--bucket-name': !Ref DataBucket + MaxRetries: 1 + Timeout: 60 + GlueVersion: '3.0' + + # Glue Crawler + ResultsCrawler: + Type: AWS::Glue::Crawler + Properties: + Name: !Sub '${StackName}-results-crawler' + Role: !Ref GlueCrawlerRole + DatabaseName: !Ref StockAnalysisDatabase + Targets: + S3Targets: + - Path: !Sub 's3://${DataBucket}/results/' + Schedule: + ScheduleExpression: 'cron(0 2 * * ? *)' # Daily at 2 AM Outputs: - LambdaFunctionName: - Description: "Name of the Lambda function created" - Value: !Ref HelloWorldLambda - LambdaExecutionRoleArn: - Description: "ARN of the Lambda execution role" - Value: !GetAtt LambdaExecutionRole.Arn + S3BucketName: + Description: 'Name of the S3 bucket for data storage' + Value: !Ref DataBucket + Export: + Name: !Sub '${AWS::StackName}-S3Bucket' + + GlueJobName: + Description: 'Name of the Glue job' + Value: !Ref StockAnalysisGlueJob + Export: + Name: !Sub '${AWS::StackName}-GlueJob' + + GlueDatabaseName: + Description: 'Name of the Glue database' + Value: !Ref StockAnalysisDatabase + Export: + Name: !Sub '${AWS::StackName}-GlueDatabase' + + GlueCrawlerName: + Description: 'Name of the Glue crawler' + Value: !Ref ResultsCrawler + Export: + Name: !Sub '${AWS::StackName}-GlueCrawler' diff --git a/tests/test_local.py b/tests/test_local.py new file mode 100644 index 0000000..b188194 --- /dev/null +++ b/tests/test_local.py @@ -0,0 +1,210 @@ +import os +import findspark +findspark.init() + +from pyspark.sql import SparkSession +from pyspark.sql import DataFrame +import logging + +class MockGlueContext: + """Mock GlueContext for local testing""" + def __init__(self, spark): + self.spark_session = spark + +class LocalStockAnalyzer: + """Local version of StockAnalyzer for testing without AWS Glue dependencies""" + def __init__(self, spark_session: SparkSession, bucket_name: str = "test-bucket"): + self.spark = spark_session + self.bucket_name = bucket_name + self.logger = self._setup_logger() + + def _setup_logger(self) -> logging.Logger: + logging.basicConfig(level=logging.INFO) + return logging.getLogger(__name__) + + def load_stock_data(self, input_path: str) -> DataFrame: + """Load stock data from local file""" + try: + self.logger.info(f"Loading stock data from: {input_path}") + + df = self.spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .option("dateFormat", "yyyy-MM-dd") \ + .csv(input_path) + + df = df.withColumn("Date", df["Date"].cast("date")) + + self.logger.info(f"Successfully loaded {df.count()} rows") + return df + + except Exception as e: + self.logger.error(f"Error loading data: {str(e)}") + raise + + def objective1_average_daily_return(self, df: DataFrame) -> DataFrame: + """Compute the average daily return of all stocks for every date""" + self.logger.info("Computing objective 1: Average daily return") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.Date, + AVG(((t1.close - t2.close) / t2.close) * 100) as average_return + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 1) + GROUP BY t1.Date + ORDER BY t1.Date + """ + result = self.spark.sql(sql_query) + return result + + def objective2_highest_worth_stock(self, df: DataFrame) -> DataFrame: + """Find stock with highest worth (closing price * volume) on average""" + self.logger.info("Computing objective 2: Highest worth stock") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + ticker, + AVG(close * volume) as value + FROM stocks_data + GROUP BY ticker + ORDER BY value DESC + LIMIT 1 + """ + result = self.spark.sql(sql_query) + return result + + def objective3_most_volatile_stock(self, df: DataFrame) -> DataFrame: + """Find most volatile stock (annualized standard deviation of daily returns)""" + self.logger.info("Computing objective 3: Most volatile stock") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.ticker, + STDDEV(((t1.close - t2.close) / t2.close) * 100) * SQRT(252) as standard_deviation + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 1) + GROUP BY t1.ticker + ORDER BY standard_deviation DESC + LIMIT 1 + """ + result = self.spark.sql(sql_query) + return result + + def objective4_top_30day_returns(self, df: DataFrame) -> DataFrame: + """Find top three 30-day return dates""" + self.logger.info("Computing objective 4: Top three 30-day returns") + + df.createOrReplaceTempView("stocks_data") + sql_query = """ + SELECT + t1.ticker, + t1.Date, + ((t1.close - t2.close) / t2.close) * 100 as return_30_day + FROM stocks_data t1 + JOIN stocks_data t2 + ON t1.ticker = t2.ticker + AND t2.Date = DATE_SUB(t1.Date, 30) + ORDER BY return_30_day DESC + LIMIT 3 + """ + result = self.spark.sql(sql_query) + return result + + def run_all_objectives(self, df: DataFrame) -> dict: + """Run all objectives and return results""" + self.logger.info("Running all objectives") + + results = {} + results['objective1'] = self.objective1_average_daily_return(df) + results['objective2'] = self.objective2_highest_worth_stock(df) + results['objective3'] = self.objective3_most_volatile_stock(df) + results['objective4'] = self.objective4_top_30day_returns(df) + + return results + + def save_results_locally(self, results: dict, output_dir: str = "tests/local_results"): + """Save all results to local CSV files""" + os.makedirs(output_dir, exist_ok=True) + + for objective, df_result in results.items(): + output_path = os.path.join(output_dir, f"{objective}.csv") + + self.logger.info(f"Saving {objective} results to {output_path}") + + # Save directly using Spark + temp_path = os.path.join(output_dir, f"{objective}_tmp") + + # Write as a single CSV (coalesce to 1 partition) + ( + df_result + .coalesce(1) + .write + .option("header", "true") + .mode("overwrite") + .csv(temp_path) + ) + + # Move the single CSV file to the final desired name + import shutil, glob + + part_file = glob.glob(os.path.join(temp_path, "part-*.csv"))[0] + shutil.move(part_file, output_path) + shutil.rmtree(temp_path) + + self.logger.info(f"Successfully saved {objective} results to {output_path}") + + +def main(): + """Test the stock analysis locally""" + print("=== Starting Local Stock Analysis Test ===") + + # Create Spark session + spark = SparkSession.builder \ + .appName("LocalStockAnalysisTest") \ + .config("spark.sql.adaptive.enabled", "true") \ + .getOrCreate() + + try: + # Initialize analyzer + analyzer = LocalStockAnalyzer(spark) + + # Load stock data + df = analyzer.load_stock_data("stocks_data.csv") + + print(f"\n=== Data Overview ===") + df.printSchema() + print(f"Total rows: {df.count()}") + df.show(5) + + # Run all analysis objectives + results = analyzer.run_all_objectives(df) + + # Display results + print(f"\n=== Analysis Results ===") + for objective, result_df in results.items(): + print(f"\n--- {objective.upper()} ---") + result_df.show() + + # Save results locally + analyzer.save_results_locally(results) + + print(f"\n=== Test Completed Successfully ===") + print("Results saved to 'tests/local_results/' folder") + + except Exception as e: + print(f"Test failed: {str(e)}") + raise + + finally: + spark.stop() + +if __name__ == "__main__": + main() \ No newline at end of file