diff --git a/README.md b/README.md index 861ea15..8ffc898 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,14 @@ # CMSC624 Final Project +The name of this repository is rather vague, but it contains the source code for +the "From DAGs to Riches: Improving Transaction Throughput in a Deterministic System" paper, which +was produced as, you guessed it, the final project for CMSC624. + +Link to presentation: (insert link here) +Link to paper: (insert link here) + +# Contributors + Pranav Sivaraman Rakrish Dhakal Alex Movsesyan diff --git a/a2_final/.gitignore b/a2_final/.gitignore new file mode 100644 index 0000000..87f7612 --- /dev/null +++ b/a2_final/.gitignore @@ -0,0 +1,10 @@ +build +.deps +bin +obj + +.vscode +.DS_Store +.cache/clangd/index +compile_commands.json +work_steal diff --git a/a2_final/CMakeLists.txt b/a2_final/CMakeLists.txt new file mode 100644 index 0000000..d17c1e4 --- /dev/null +++ b/a2_final/CMakeLists.txt @@ -0,0 +1,23 @@ +cmake_minimum_required(VERSION 3.18) + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +project(db_concurrency_control VERSION 0.1.0 LANGUAGES C CXX) + +set(DB_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) +set(DB_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}) + +find_package(Threads REQUIRED) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_EXTENSIONS OFF) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) + +enable_testing() + +include(systems) +include(compile) + +include_directories(${DB_SOURCE_DIR}) + +add_subdirectory(src) diff --git a/a2_final/cmake/colorize.cmake b/a2_final/cmake/colorize.cmake new file mode 100644 index 0000000..daabba8 --- /dev/null +++ b/a2_final/cmake/colorize.cmake @@ -0,0 +1,79 @@ +### +# +# @copyright (c) 2009-2014 The University of Tennessee and The University +# of Tennessee Research Foundation. +# All rights reserved. +# @copyright (c) 2012-2014 Inria. All rights reserved. +# @copyright (c) 2012-2014 Bordeaux INP, CNRS (LaBRI UMR 5800), Inria, Univ. Bordeaux. All rights reserved. +# +### +# +# @file ColorizeMessage.cmake +# +# @project ECRC +# ECRC is a software package provided by: +# Inria Bordeaux - Sud-Ouest, +# Univ. of Tennessee, +# King Abdullah University of Science and Technology +# Univ. of California Berkeley, +# Univ. of Colorado Denver. +# +# @version 0.9.0 +# @author Cedric Castagnede +# @author Emmanuel Agullo +# @author Mathieu Faverge +# @author Florent Pruvost +# @date 13-07-2012 +# +### + +# Set some colors +if(NOT WIN32) + string(ASCII 27 Esc) + set(ColourReset "${Esc}[m") + set(ColourBold "${Esc}[1m") + set(Red "${Esc}[31m") + set(Green "${Esc}[32m") + set(Yellow "${Esc}[33m") + set(Blue "${Esc}[34m") + set(Magenta "${Esc}[35m") + set(Cyan "${Esc}[36m") + set(White "${Esc}[37m") + set(BoldRed "${Esc}[1;31m") + set(BoldGreen "${Esc}[1;32m") + set(BoldYellow "${Esc}[1;33m") + set(BoldBlue "${Esc}[1;34m") + set(BoldMagenta "${Esc}[1;35m") + set(BoldCyan "${Esc}[1;36m") + set(BoldWhite "${Esc}[1;37m") +endif() + +# Colorize cmake messages during configure +function(message) +list(GET ARGV 0 MessageType) +if(MessageType STREQUAL FATAL_ERROR OR MessageType STREQUAL SEND_ERROR) + list(REMOVE_AT ARGV 0) + string (REPLACE ";" " " ARGV_STR "${ARGV}") + _message(${MessageType} "${BoldRed}${ARGV_STR}${ColourReset}") +elseif(MessageType STREQUAL WARNING) + list(REMOVE_AT ARGV 0) + string (REPLACE ";" " " ARGV_STR "${ARGV}") + _message(${MessageType} "${BoldYellow}${ARGV_STR}${ColourReset}") +elseif(MessageType STREQUAL AUTHOR_WARNING) + list(REMOVE_AT ARGV 0) + string (REPLACE ";" " " ARGV_STR "${ARGV}") + _message(${MessageType} "${Yellow}${ARGV_STR}${ColourReset}") +elseif(MessageType STREQUAL STATUS) + list(REMOVE_AT ARGV 0) + string (REPLACE ";" " " ARGV_STR "${ARGV}") + _message(${MessageType} "${Green}${ARGV_STR}${ColourReset}") +else() + string (REPLACE ";" " " ARGV_STR "${ARGV}") + string (REPLACE "${Esc}[1 " "${Esc}[1;" ARGV_STR "${ARGV_STR}") + _message("${ARGV_STR}") +endif() +endfunction() + +## +## @end file ColorizeMessage.cmake +## \ No newline at end of file diff --git a/a2_final/cmake/compile.cmake b/a2_final/cmake/compile.cmake new file mode 100644 index 0000000..162c9f4 --- /dev/null +++ b/a2_final/cmake/compile.cmake @@ -0,0 +1,19 @@ +add_compile_options("-Wall") + +##### +# Change the default build type from Debug to Release, while still +# supporting overriding the build type. +# +# The CACHE STRING logic here and elsewhere is needed to force CMake +# to pay attention to the value of these variables. +if(NOT CMAKE_BUILD_TYPE) + message(STATUS "No build type specified; defaulting to CMAKE_BUILD_TYPE=Release.") + set(CMAKE_BUILD_TYPE Release CACHE STRING + "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel." + FORCE) +else(NOT CMAKE_BUILD_TYPE) + if(CMAKE_BUILD_TYPE STREQUAL "Debug") + message(STATUS "Build type: Debug. Performance will be terrible!") + message(STATUS "Add -DCMAKE_BUILD_TYPE=Release to the CMake command line to get an optimized build.") + endif(CMAKE_BUILD_TYPE STREQUAL "Debug") +endif(NOT CMAKE_BUILD_TYPE) diff --git a/a2_final/cmake/systems.cmake b/a2_final/cmake/systems.cmake new file mode 100644 index 0000000..faf6ac6 --- /dev/null +++ b/a2_final/cmake/systems.cmake @@ -0,0 +1,54 @@ +INCLUDE(colorize) # colorize and highlight message + +IF(WIN32) + SET(HOST_SYSTEM "win32") +ELSE(WIN32) + IF(APPLE) + EXEC_PROGRAM (sw_vers ARGS -productVersion OUTPUT_VARIABLE MACOSX_VERSION) + STRING(REGEX MATCH "[0-9]+.[0-9]+" VERSION "${MACOSX_VERSION}") + SET(MACOS_VERSION ${VERSION}) + SET(HOST_SYSTEM "macosx") + IF(NOT DEFINED ENV{MACOSX_DEPLOYMENT_TARGET}) + # Set cache variable - end user may change this during ccmake or cmake-gui configure. + SET(CMAKE_OSX_DEPLOYMENT_TARGET ${MACOS_VERSION} CACHE STRING + "Minimum OS X version to target for deployment (at runtime); newer APIs weak linked. Set to empty string for default value.") + ENDIF() + set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") + ELSE(APPLE) + IF(EXISTS "/etc/issue") + FILE(READ "/etc/issue" LINUX_ISSUE) + IF(LINUX_ISSUE MATCHES "CentOS") + SET(HOST_SYSTEM "centos") + ELSEIF(LINUX_ISSUE MATCHES "Debian") + SET(HOST_SYSTEM "debian") + ELSEIF(LINUX_ISSUE MATCHES "Ubuntu") + SET(HOST_SYSTEM "ubuntu") + ELSEIF(LINUX_ISSUE MATCHES "Red Hat") + SET(HOST_SYSTEM "redhat") + ELSEIF(LINUX_ISSUE MATCHES "Fedora") + SET(HOST_SYSTEM "fedora") + ENDIF() + ENDIF(EXISTS "/etc/issue") + + IF(EXISTS "/etc/redhat-release") + FILE(READ "/etc/redhat-release" LINUX_ISSUE) + IF(LINUX_ISSUE MATCHES "CentOS") + SET(HOST_SYSTEM "centos") + ENDIF() + ENDIF(EXISTS "/etc/redhat-release") + + IF(NOT HOST_SYSTEM) + SET(HOST_SYSTEM ${CMAKE_SYSTEM_NAME}) + ENDIF() + set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-Wno-unused-but-set-variable -Wno-maybe-uninitialized") + ENDIF(APPLE) +ENDIF(WIN32) + +# query number of logical cores +CMAKE_HOST_SYSTEM_INFORMATION(RESULT CPU_CORES QUERY NUMBER_OF_LOGICAL_CORES) + +MARK_AS_ADVANCED(HOST_SYSTEM CPU_CORES) + +MESSAGE(STATUS "CMSC624 Assignment 2: Understanding Locking, OCC and MVCC") +MESSAGE(STATUS "Found host system: ${HOST_SYSTEM}") +MESSAGE(STATUS "Found host system's CPU: ${CPU_CORES} cores") diff --git a/a2_final/cmsc624-a2-sol-latex-template/Makefile b/a2_final/cmsc624-a2-sol-latex-template/Makefile new file mode 100644 index 0000000..86da174 --- /dev/null +++ b/a2_final/cmsc624-a2-sol-latex-template/Makefile @@ -0,0 +1,122 @@ +############################################################################### +# Project Configuration + +TARGET = $(shell basename $(PWD)).pdf +MAIN = main + +##### + +TARGET_BASE = $(basename $(TARGET)) + +################################################################################ +# Command Configuration + +PDFLATEX = pdflatex +BIBTEX = bibtex +EPS_TO_PDF = epstopdf +OBJ_TO_EPS = tgif -print -eps -color -stdout +GNUPLOT = gnuplot +SPELLCHECK = aspell -t list + +################################################################################ +# Gathering Information + +TEX_MAIN := $(MAIN).tex + +TEX_ALL := $(shell search=$(TEX_MAIN); all=; \ + while [ -n "$$search" ] ; do \ + all="$$all $$search"; \ + search=`egrep "^[^%]*\\input" $$search | \ + sed -En 's/.*input[^\{]*\{(.+)\}/\1.tex/p'`; \ + done; \ + echo "$$all") + +GFX_ALL := $(shell for t in $(TEX_ALL); do \ + cat $$t | \ + egrep '^[^%]*\\includegraphics' | \ + sed -En 's/.*includegraphics(\[.+\])?\{([^\{]*)\}.*/\2/p'; \ + done) + +GFX_FILES := $(shell for g in $(GFX_ALL); do \ + ls $$g.* | \ + egrep "(\.obj|\.eps)$$" | \ + sed -E 's/\.[^ ]+/\.pdf/g'; \ + done) + +GNUPLOT_FILES := $(shell for g in $(GFX_ALL); do \ + test -e $$g.p && \ + test ! -e $$g.eps -a ! -e $$g.obj && \ + echo $$g.pdf; \ + done) + +BIB_FILES := $(shell cat $(TEX_MAIN) | \ + egrep '^[^%]*\\bibliography\{' | \ + sed -E -e 's/.*\{([^\{]+)\}.*/\1/g' \ + -e 's/([^,\{\}]+)/\1.bib/g' \ + -e 's/,/ /g') + +################################################################################ +# Command Rules + +all: $(TARGET) + +$(TARGET): $(TEX_ALL) $(GFX_FILES) $(GNUPLOT_FILES) $(BIB_FILES) + $(PDFLATEX) $(TEX_MAIN) File Rules + +%.pdf: %.eps + $(EPS_TO_PDF) $< + +%.pdf: %.obj + $(OBJ_TO_EPS) $< | \ + $(EPS_TO_PDF) -f -o=$@ + +%.pdf: %.p + $(GNUPLOT) $< | \ + $(EPS_TO_PDF) -f -o=$@ + diff --git a/a2_final/cmsc624-a2-sol-latex-template/main.tex b/a2_final/cmsc624-a2-sol-latex-template/main.tex new file mode 100644 index 0000000..8251c29 --- /dev/null +++ b/a2_final/cmsc624-a2-sol-latex-template/main.tex @@ -0,0 +1,235 @@ +\documentclass[11pt]{article} +\usepackage{fullpage} +\usepackage{amsmath} +\usepackage{graphicx} +\usepackage{tabularx} +\usepackage{float} +\usepackage{hyperref} +\usepackage[usenames,dvipsnames]{xcolor} +\usepackage[normalem]{ulem} + +\setlength{\topmargin}{0in} % top of paper to head (less one inch) +\setlength{\headheight}{0in} % height of the head +\setlength{\headsep}{0in} % head to the top of the body +\setlength{\textheight}{8.75in} % height of the body +\setlength{\oddsidemargin}{0mm} % left edge of paper to body (less one inch) +\setlength{\evensidemargin}{0mm} % ditto, even pages +\setlength{\textwidth}{6.5in} % width of body +\setlength{\topskip}{0in} % top of body to bottom of first line of text +\setlength{\footskip}{0.50in} % bottom of text to bottom of foot + +\newtheorem{theorem}{Theorem} +\newtheorem{corollary}{Corollary} +\newtheorem{lemma}{Lemma} +\newtheorem{observation}{Observation} +\newtheorem{definition}{Definition} +\newtheorem{fact}{Fact} +\newcommand{\proof}{\vspace*{-1ex} \noindent {\bf Proof: }} +\newcommand{\proofsketch}{\vspace*{-1ex} \noindent {\bf Proof Sketch: }} +\newcommand{\qed}{\hfill\rule{2mm}{2mm}} +\newcommand{\ceiling}[1]{{\left\lceil{#1}\right\rceil}} +\newcommand{\floor}[1]{{\left\lfloor{#1}\right\rfloor}} +\newcommand{\paren}[1]{\left({#1}\right)} +\newcommand{\braces}[1]{\left\{{#1}\right\}} +\newcommand{\brackets}[1]{\left[{#1}\right]} +\newcommand{\Prob}{{\rm Prob}} +\newcommand{\prob}{{\rm Prob}} +\newcommand{\host}[1]{\tt \small {#1}} + +\newcommand{\header}[3]{ + \pagestyle{plain} + \noindent + \begin{center} + \framebox{ + \vbox{ + \hbox to 6.28in { {\bf CMSC 624 Database System Architecture and Implementation +\hfill Spring 2024} } + \vspace{4mm} + \hbox to 6.28in { {\Large \hfill #1 \hfill} } + \vspace{4mm} + \hbox to 6.28in { {\sl #2 \hfill #3} } + } + } + + \end{center} + \vspace*{4mm} +} + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% Per-author comment commands +% NOTE: To remove them, build with 'make finished' which will generate a new +% output file with the suffix '-final.pdf' + +\ifdefined\isFinalized + +\newcommand{\note}[1]{} +\newcommand{\mnote}[1]{} + +\newcommand{\pooja}[1]{} +\newcommand{\dna}[1]{} +\newcommand{\answer}[1]{} + +\else + +\newcommand{\note}[1]{{\color{green}{\it Note: #1}}} +\newcommand{\mnote}[1]{\marginpar{{\color{red}{\it\ #1 \ \ }}}} + +\newcommand{\pooja}[1]{{\color{green}{\it Gang - #1}}} +\newcommand{\dna}[1]{{\color{purple}{\it DNA - #1}}} +\newcommand{\answer}[1]{{\color{black}\texttt{{a: - #1}}}} + +\fi + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +\begin{document} + +\addtolength{\baselineskip}{-.01\baselineskip} + +\header{Programming Assignment 2}{Assigned: February 23}{Due: March 8, 11:59:59 PM} + +\newcommand{\spc}{\sqcup} +\newcommand{\vers}{spring-2024-624} + +\section{Description} + +In this assignment you will be implementing six concurrency control schemes: + +\begin{itemize} +\item Two versions of locking schemes, both of which implement variations of the standard two-phase locking algorithm we discussed in class. +\item A version of OCC very similar to the serial-validation version described in the OCC paper you read for class. +\item A version of OCC somewhat similar to the parallel-validation version described in the OCC paper. +\item A version of MVCC timestamp ordering that is a simplified version of the PostgreSQL scheme we studied in class. +\item A serializable version of the MVCC scheme that is a simplified version of PostgreSQL's SSI scheme. +\end{itemize} + +\section{Requirements} + +\begin{enumerate} +% + \item Your code must be submitted as a series of commits that are pushed to +the origin/master branch of your private Git repository. We consider your latest commit +prior to the due date/time to represent your submission. +% + \item The directory for your project must be located at the root of your Git repository. +% + \item You are not allowed to work in teams or to copy code from any source. + \item You are not allowed to discuss your analysis with anyone else. +% + \item Please submit (\{your\_name\}\_a2\_sol.pdf) to the assignment 2 link on ELMS. You do not need to submit any code, since we can access to your repositories. +\end{enumerate} + +\section{Part 5} + +\subsection{Carpe datum (0 points)} + +When you finish the coding part of this assignment, we will run your code on our test server, and commit the results back to you. + +\subsection{Simulations are doomed to succeed. (4 points)} + +Transaction durations are accomplished simply by forcing the thread executing each transaction to run in a busy loop for approximately the amount of time specified. This is supposed to simulate transaction logic --- e.g. the application might run some proprietary customer scoring function to estimate the total value of the customer after reading in the customer record from the database. Please list **at least two weaknesses** of this simulation --- i.e. give two reasons why performance of the different concurrency control schemes you experimented with for this assignment would change relative to each other if we ran actual application code instead of just simulating it with a busy loop. + +\answer{your answer here...} +% your answer +\vspace{10mm} + +\subsection{Locking manager (6 points)} + +\begin{enumerate} +\item Is deadlock possible in your locking implementation? Why or why not? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item Most 2PL systems wait until they access data before they request a lock. But this implementation requests all locks before executing any transaction code. What is a performance-related disadvantage of our implementation in this assignment of requesting all locks in advance? What is a client-usability disadvantage of requesting all locks in advance? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} +\end{enumerate} + +\subsection{OCCam's Razor (6 points)} + +The OCC with serial validation is simpler than OCC with parallel validation. + +\begin{enumerate} +\item How did the two algorithms compare with each other in this simulation? Why do you think that is the case? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item How does this compare to the OCC paper that we read for class? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item What is the biggest reason for the difference between your results and the what you expected after reading the OCC paper? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} +\end{enumerate} + +If you did not follow the given pseudocode for OCC with parallel validation, give your pseudocode and argue why it is better. + +\subsection{OCC vs. Locking B (7 points)} + +If your code is correct, you probably found that relative performance of OCC and Locking B were different from the tradeoffs we discussed in class. In fact, you might be quite surprised by your results. Please describe the two biggest differences between the relative performance of OCC vs. Locking B relative to what we discussed in class, and explain why the theory doesn't match the practice for this codebase for each of these two surprises. + +\subsection{MVCC vs. OCC/Locking (6 points)} + +\begin{enumerate} +\item For the read-write tests, MVCC performs worse than OCC and Locking. Why? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item MVCC even sometimes does worse than serial. Why? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item Yet for the mixed read-only/read-write experiment it performs the best, even though it wasn't the best for either read-only nor read-write. Why? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\end{enumerate} + +If you wrote your own version, please explain why it's better than the ones presented here. + + +\subsection{MVCC pseudocode (4 points)} + +\begin{enumerate} +\item Why did our MVCC pseudocode request read locks before each read? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item In particular, what would happen if you didn't acquire these read locks? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item How long do these locks have to be held? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\item How does the MVCC-SSI pseudocode guarantee serializability? \\ +\answer{your answer here...} +% your answer +\vspace{10mm} + +\end{enumerate} + +\subsection{Mixed transaction lengths (8 points)} + +Take a close look at your high contention read/write results. When transaction lengths are uniform, one of the conccurency control schemes you implemented is best. However, for mixed transaction lengths (the fourth column in the results), you will probably see a different concurrency control scheme as the winner! Please explain why the results changed for mixed transaction lengths. + +\answer{your answer here...} +% your answer +\vspace{10mm} + +\end{document} + diff --git a/a2_final/db.pftrace b/a2_final/db.pftrace new file mode 100644 index 0000000..97a0664 Binary files /dev/null and b/a2_final/db.pftrace differ diff --git a/a2_final/output.txt b/a2_final/output.txt new file mode 100644 index 0000000..e969a72 --- /dev/null +++ b/a2_final/output.txt @@ -0,0 +1,103 @@ + ------------------------------------------------------------------- + Average Transaction Duration + ------------------------------------------------------------------- + 0.1ms 1ms 10ms (0.1ms, 1ms, 10ms) + ------------------------------------------------------------------- + Low contention Read only (5 records) + ------------------------------------------------------------------- + Serial 6313.64 685.113 80.3091 218.333 + Locking A 61779.1 7023.37 767.145 2177.67 + Locking B 57497.1 7013.83 763.848 2184.98 + OCC 61041.3 6990.28 767.465 2176.91 + OCC-P 59046.3 6979.77 777.341 2346.17 + MVCC 52127.9 6842.07 773.609 2255.84 + Calvin 49631.2 6908.55 775.621 2365.7 + Calvin_I 62502.6 7114.15 777.667 2208.8 + + Low contention Read only (30 records) + ------------------------------------------------------------------- + Serial 6362.16 733.099 76.5669 238.367 + Locking A 33486.9 6767.38 768.894 2187.82 + Locking B 30057.6 6619.49 757.015 2291.36 + OCC 52873.7 6789.52 758.044 2218.23 + OCC-P 47550 6827.73 774.005 2352.38 + MVCC 29755.6 6610.29 770.066 2284.79 + Calvin 30208 6708.56 778.358 2263.26 + Calvin_I 49857.8 7015.77 775.237 2276.95 + + High contention Read only (5 records) + ------------------------------------------------------------------- + Serial 6533.83 696.912 79.5509 237.414 + Locking A 19231.6 2775.61 314.605 617.182 + Locking B 63547.7 7071.99 765.73 2250.21 + OCC 60617.1 6978.53 767.674 2222.98 + OCC-P 55931.1 6984.07 774.68 2323.05 + MVCC 60764.9 7004.81 774.551 2284.43 + Calvin 55069.9 7052.71 778.26 2366.17 + Calvin_I 61507.5 7072.87 778.564 2367.15 + + High contention Read only (30 records) + ------------------------------------------------------------------- + Serial 6167.82 717.895 80.5363 218.753 + Locking A 4248.91 686.413 87.9072 239.431 + Locking B 56060.7 6963.99 770.102 2311.56 + OCC 55545.7 6901.03 764.023 2285.88 + OCC-P 49578.6 6909.33 770.914 2319.84 + MVCC 47701.6 6649.12 754.794 2254.67 + Calvin 36848.5 6865.1 775.915 2270.72 + Calvin_I 47184.1 7031.64 780.226 2200.85 + + Low contention read-write (5 records) + ------------------------------------------------------------------- + Serial 5518.22 689.275 77.8896 231.573 + Locking A 48847.4 6468.86 652.563 1929.77 + Locking B 49441.3 6834.35 760.01 2102.29 + OCC 55304 6258.31 701.676 2172.76 + OCC-P 46418.7 5541.55 748.62 2395.52 + MVCC 37226.4 5932.7 769.837 2326.69 + Calvin 52865.1 6994.41 773.542 2293.76 + Calvin_I 58051 6923.14 772.343 2309.05 + + Low contention read-write (10 records) + ------------------------------------------------------------------- + Serial 6618.44 702.852 78.994 237.47 + Locking A 53508.3 6774 765.874 2269.97 + Locking B 54511.1 7010.7 761.892 2191.58 + OCC 59593.3 7040.93 751.302 2214.78 + OCC-P 56212.6 6846.18 771.228 2207.09 + MVCC 36932 6664.64 745.806 2283.06 + Calvin 44358.4 6621.14 762.247 2247.41 + Calvin_I 55016.4 6820.03 762.852 2143.3 + + High contention read-write (5 records) + ------------------------------------------------------------------- + Serial 6723 715.211 82.2509 241.427 + Locking A 14221.4 2335.92 288.563 775.02 + Locking B 18471 2729.64 283.535 593.739 + OCC 61561.4 7035.69 765.862 2199.1 + OCC-P 54380.8 6921.56 765.339 2241.02 + MVCC 25807.6 1954.46 450.427 540.566 + Calvin 54263.2 6980.73 773.375 2205.76 + Calvin_I 24595.4 3019.24 320.8 633.946 + + High contention read-write (10 records) + ------------------------------------------------------------------- + Serial 7424.06 782.098 77.7501 237.008 + Locking A 8427.41 1039.34 128.171 308.804 + Locking B 7955.51 1113.16 143.877 358.452 + OCC 56568.1 6979.34 760.904 2209.88 + OCC-P 46433.9 6582.25 759.159 2262.38 + MVCC 17151.9 1369.63 173.687 356.568 + Calvin 46452.7 6852.74 772.802 2283.09 + Calvin_I 12025.9 1299.03 138.089 365.976 + + High contention mixed read only/read-write + ------------------------------------------------------------------- + Serial 8996.61 1035.65 101.01 301.373 + Locking A 5093.6 944.327 111.944 327.471 + Locking B 21099.3 4051.01 478.15 828.36 + OCC 33541.9 4080.74 487.084 910.444 + OCC-P 38406.8 4557.94 528.102 976.781 + MVCC 57002.6 7929.36 926.128 2762.65 + Calvin 20907.9 4474.24 471.702 838.858 + Calvin_I 31050.2 4667.61 498.342 868.454 diff --git a/a2_final/run.sh b/a2_final/run.sh new file mode 100644 index 0000000..fe09afc --- /dev/null +++ b/a2_final/run.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +#SBATCH -A m2404 +#SBATCH --time=30:00 +#SBATCH --nodes=1 +#SBATCH --constraint=cpu +#SBATCH --qos regular + +srun -n1 -c 8 --cpu-bind-cores ./work_steal/src/txn_processor_test diff --git a/a2_final/src/CMakeLists.txt b/a2_final/src/CMakeLists.txt new file mode 100644 index 0000000..51cc550 --- /dev/null +++ b/a2_final/src/CMakeLists.txt @@ -0,0 +1,29 @@ + +add_library(txn STATIC + txn/storage.cc + txn/mvcc_storage.cc + txn/txn.cc + txn/txn_processor.cc + txn/lock_manager.cc + txn/calvin.cc +) +target_link_libraries(txn PUBLIC Threads::Threads) + +add_executable(lock_manager_test + txn/lock_manager_test.cc +) +target_link_libraries(lock_manager_test PUBLIC txn) + +add_executable(txn_processor_test + txn/txn_processor_test.cc +) +target_link_libraries(txn_processor_test PUBLIC txn) + +add_executable(txn_types_test + txn/txn_types_test.cc +) +target_link_libraries(txn_types_test PUBLIC txn) + +add_test(NAME lock_manager_test COMMAND lock_manager_test) +add_test(NAME txn_processor_test COMMAND txn_processor_test) +add_test(NAME txn_types_test COMMAND txn_types_test) diff --git a/a2_final/src/txn/calvin.cc b/a2_final/src/txn/calvin.cc new file mode 100644 index 0000000..d8d840f --- /dev/null +++ b/a2_final/src/txn/calvin.cc @@ -0,0 +1,515 @@ +#include "txn_processor.h" +#include "utils/common.h" +#include +#include +#include +#include + +#include "lock_manager.h" + +/*********************************************** + * Calvin Continuous Execution -- Global Locks * + ***********************************************/ +void TxnProcessor::RunCalvinContScheduler() { + Txn *txn; + + std::unordered_map> shared_holders; + std::unordered_map last_excl; + + while (!stopped_) { + if (txn_requests_.Pop(&txn)) { + + adj_list_lock.lock(); + indegree_lock.lock(); + + adj_list[txn] = {}; + + // Print the adj_list in one go so the lines aren't interleaved + + // Don't add to indegree hashmap right away because if indegree == 0, + // we want to add it to the threadpool right away + int ind = 0; + + // Loop through writeset + for (const Key &key : txn->writeset_) { + // Add an edge between the current txn and all shared holders + if (shared_holders.contains(key)) { + for (auto conflicting_txn : shared_holders[key]) { + if (conflicting_txn != txn && + conflicting_txn->Status() == INCOMPLETE && + !adj_list[conflicting_txn].contains(txn)) { + adj_list[conflicting_txn].insert(txn); + ind++; + } + } + shared_holders[key].clear(); + } + + if (last_excl.contains(key) && last_excl[key] != txn) { + last_excl[key]->neighbors_mutex.lock(); + + if (last_excl[key]->Status() != COMMITTED && + last_excl[key]->Status() != ABORTED) { + // We came in before CalvinExecutorFunc took "snapshot" of + // neighbors + txn->indegree++; + last_excl[key]->neighbors.insert(txn); + } + + last_excl[key]->neighbors_mutex.unlock(); + } + + last_excl[key] = txn; + } + + // Loop through readset + // auto merged_sets = txn->readset_; + // merged_sets.insert(txn->writeset_.begin(), txn->writeset_.end()); + for (const Key &key : txn->readset_) { + // Add to shared holders + if (!shared_holders.contains(key)) { + shared_holders[key] = {}; + } + shared_holders[key].insert(txn); + + // If the last_excl txn is not the current txn, add an edge + if (last_excl.contains(key) && last_excl[key] != txn && + last_excl[key]->Status() == INCOMPLETE && + !adj_list[last_excl[key]].contains(txn)) { + adj_list[last_excl[key]].insert(txn); + ind++; + } + } + + // If current transaction's indegree is 0, add it to the threadpool + if (ind == 0) { + tp_.AddTask([this, txn]() { this->CalvinContExecutorFunc(txn); }); + } else { + // Otherwise, add it to the indegree hashmap + indegree[txn] = ind; + } + indegree_lock.unlock(); + adj_list_lock.unlock(); + } + } +} + +void TxnProcessor::CalvinContExecutorFunc(Txn *txn) { + // Execute txn. + ExecuteTxn(txn); + + // Commit/abort txn according to program logic's commit/abort decision. + // Note: we do this within the worker thread instead of returning + // back to the scheduler thread. + + if (txn->Status() == COMPLETED_C) { + ApplyWrites(txn); + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + } else if (txn->Status() == COMPLETED_A) { + txn->status_ = ABORTED; + } else { + // Invalid TxnStatus! + DIE("Completed Txn has invalid TxnStatus: " << txn->Status()); + } + + // Update indegrees of neighbors + // If any has indegree 0, add them back to the queue + // if (adj_list.find(txn) != adj_list.end()) { + // std::shared_lock + // adj_list_shared_lock(adj_list_lock); + // std::shared_lock + // indegree_shared_lock(indegree_lock); + + adj_list_lock.lock(); + indegree_lock.lock(); + auto neighbors = adj_list[txn]; + for (auto nei : neighbors) { + indegree[nei]--; + if (indegree[nei] == 0) { + tp_.AddTask([this, nei]() { this->CalvinContExecutorFunc(nei); }); + } + } + + adj_list_lock.unlock(); + indegree_lock.unlock(); + + // Return result to client. + txn_results_.Push(txn); +} + +/*********************************************** + * Calvin Continuous Execution -- Indiv Locks * + ***********************************************/ +void TxnProcessor::RunCalvinContIndivScheduler() { + Txn *txn; + + std::unordered_map> shared_holders; + std::unordered_map last_excl; + + while (!stopped_) { + while (txn_requests_.Pop(&txn)) { + std::vector sorted_keys; + sorted_keys.reserve(txn->readset_.size() + txn->writeset_.size()); + sorted_keys.insert(sorted_keys.end(), txn->readset_.begin(), + txn->readset_.end()); + sorted_keys.insert(sorted_keys.end(), txn->writeset_.begin(), + txn->writeset_.end()); + std::sort(sorted_keys.begin(), sorted_keys.end()); + + txn->indegree_mutex.lock(); + txn->indegree = 0; + txn->neighbors.clear(); + + // Handle writeset + for (const Key &key : txn->writeset_) { + if (shared_holders.contains(key)) { + for (auto conflicting_txn : shared_holders[key]) { + if (conflicting_txn != txn) { + if (conflicting_txn->neighbors_mutex.try_lock()) { + + if (conflicting_txn->Status() != COMMITTED && + conflicting_txn->Status() != ABORTED && + !conflicting_txn->neighbors.contains(txn)) { + // We came in before CalvinExecutorFunc took "snapshot" of + // neighbors + txn->indegree++; + conflicting_txn->neighbors.insert(txn); + } + + conflicting_txn->neighbors_mutex.unlock(); + } + } + } + shared_holders[key].clear(); + + } + + else if (last_excl.contains(key) && last_excl[key] != txn) { + // if (last_excl[key]->neighbors_mutex.try_lock()) { + if (last_excl[key]->neighbors_mutex.try_lock()) { + + if (last_excl[key]->Status() != COMMITTED && + last_excl[key]->Status() != ABORTED && + !last_excl[key]->neighbors.contains(txn)) { + // We came in before CalvinExecutorFunc took "snapshot" of + // neighbors + txn->indegree++; + last_excl[key]->neighbors.insert(txn); + } + + last_excl[key]->neighbors_mutex.unlock(); + } + } + + last_excl[key] = txn; + } + + // Handle readset + for (const Key &key : txn->readset_) { + if (!shared_holders.contains(key)) { + shared_holders[key] = {}; + } + shared_holders[key].insert(txn); + + if (last_excl.contains(key) && last_excl[key] != txn) { + if (last_excl[key]->neighbors_mutex.try_lock()) { + + if (last_excl[key]->Status() != COMMITTED && + last_excl[key]->Status() != ABORTED && + !last_excl[key]->neighbors.contains(txn)) { + // We came in before CalvinExecutorFunc took "snapshot" of + // neighbors + txn->indegree++; + last_excl[key]->neighbors.insert(txn); + } + + last_excl[key]->neighbors_mutex.unlock(); + } + } + } + + // If current transaction's indegree is 0, add it to the threadpool + if (txn->indegree == 0) { + tp_.AddTask([this, txn]() { this->CalvinContIndivExecutorFunc(txn); }); + } + txn->indegree_mutex.unlock(); + } + } +} + +void TxnProcessor::CalvinContIndivExecutorFunc(Txn *txn) { + // Execute txn. + // printf("Starting to execute txn %d\n", txn->unique_id_); + ExecuteTxn(txn); + + // Commit/abort txn according to program logic's commit/abort decision. + // Note: we do this within the worker thread instead of returning + // back to the scheduler thread. + if (txn->Status() == COMPLETED_C) { + ApplyWrites(txn); + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + } else if (txn->Status() == COMPLETED_A) { + txn->status_ = ABORTED; + } else { + // Invalid TxnStatus! + DIE("Completed Txn has invalid TxnStatus: " << txn->Status()); + } + + // Update indegrees of neighbors + // if (txn->neighbors_mutex.try_lock()) { + txn->neighbors_mutex.lock(); + std::vector sorted_neighbors(txn->neighbors.begin(), + txn->neighbors.end()); + // std::sort(sorted_neighbors.begin(), sorted_neighbors.end()); + + for (Txn *nei : sorted_neighbors) { + // test with trylock here + nei->indegree_mutex.lock(); + nei->indegree--; + if (nei->indegree == 0) { + tp_.AddTask([this, nei]() { this->CalvinContIndivExecutorFunc(nei); }); + } + nei->indegree_mutex.unlock(); + } + + // Return result to client. + txn_results_.Push(txn); + txn->neighbors_mutex.unlock(); +} + +/*********************************************** + * Calvin Epoch Execution * + ***********************************************/ + +void TxnProcessor::RunCalvinEpochSequencer() { + Txn *txn; + // save time of last epoch for calvin sequencer + auto last_epoch_time = std::chrono::high_resolution_clock::now(); + // set up current epoch + Epoch *current_epoch = new Epoch(); + while (!stopped_) { + // Add the txn to the epoch. + if (txn_requests_.Pop(&txn)) { + current_epoch->push(txn); + } + + // check if we need to close the epoch + auto curr_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + curr_time - last_epoch_time); + if (duration.count() > 5) { + // new epoch is out of scope + last_epoch_time = curr_time; + + // make new epoch if last epoch has anything in it + if (!current_epoch->empty()) { + epoch_queue.Push(current_epoch); + current_epoch = new Epoch(); + } + } + } +} + +void *TxnProcessor::calvin_sequencer_helper(void *arg) { + reinterpret_cast(arg)->RunCalvinEpochSequencer(); + return NULL; +} + +void TxnProcessor::RunCalvinEpochScheduler() { + + // Start Calvin Sequencer + pthread_create(&calvin_sequencer_thread, NULL, calvin_sequencer_helper, + reinterpret_cast(this)); + // Start Calvin Epoch Executor + // pthread_create(&calvin_epoch_executor_thread, NULL, + // calvin_epoch_executor_helper, reinterpret_cast(this)); + // for(int i = 0; i < THREAD_COUNT; i++) { + // tp_.AddTask([this]() { this->CalvinEpochExecutorLMAO(); }); + // } + + Epoch *curr_epoch; + EpochDag *dag; + while (!stopped_) { + // Get the next epoch + if (epoch_queue.Pop(&curr_epoch)) { + // Create new DAG for this epoch + std::unordered_map> shared_holders; + std::unordered_map last_excl; + + dag = (EpochDag *)malloc(sizeof(EpochDag)); + + std::unordered_map> *adj_list = + new std::unordered_map>(); + std::unordered_map> *indegree = + new std::unordered_map>(); + std::queue *root_txns = new std::queue(); + + Txn *txn; + while (!curr_epoch->empty()) { + txn = curr_epoch->front(); + curr_epoch->pop(); + adj_list->emplace(txn, std::unordered_set()); + indegree->emplace(txn, 0); + + // Loop through readset + for (const Key &key : txn->readset_) { + // Add to shared holders + if (!shared_holders.contains(key)) { + shared_holders[key] = std::unordered_set(); + } + shared_holders[key].insert(txn); + + // If the last_excl txn is not the current txn, add an edge + + // add an edge between the last_excl txn and the current txn + // if we read a key that was last written by last_excl txn + if (last_excl.contains(key) && + !adj_list->at(last_excl[key]).contains(txn)) { + adj_list->at(last_excl[key]).insert(txn); + indegree->at(txn)++; + } + } + // Loop through writeset + for (const Key &key : txn->writeset_) { + // Add an edge between the current txn and all shared holders + if (shared_holders.contains(key)) { + for (auto conflicting_txn : shared_holders[key]) { + if (!adj_list->at(conflicting_txn).contains(txn)) { + adj_list->at(conflicting_txn).insert(txn); + indegree->at(txn)++; + } + } + shared_holders[key].clear(); + } + last_excl[key] = txn; + } + + // set as root if indegree of 0 + if (indegree->at(txn) == 0) { + root_txns->push(txn); + } + } + // finalize new epoch dag + dag->adj_list = adj_list; + dag->indegree = indegree; + dag->root_txns = root_txns; + // dag->indegree_locks = indegree_locks; + + // push dag to queue for executor to read + // epoch_dag_queue.Push(dag); + CalvinExecuteSingleEpoch(dag); + } + } +} + +void TxnProcessor::CalvinEpochExecutor() { + num_txns_left_in_epoch = 0; + while (!stopped_) { + if (epoch_dag_queue.Pop(¤t_epoch_dag)) { + if (num_txns_left_in_epoch != 0) { + std::cout << "Num transactions in epoch: " << num_txns_left_in_epoch + << std::endl; + std::cout << "UH OH--------------------------------UH OH" << std::endl; + } + num_txns_left_in_epoch = current_epoch_dag->adj_list->size(); + Txn *txn; + std::queue *root_txns = current_epoch_dag->root_txns; + + // add all root txns to threadpool + while (!root_txns->empty()) { + txn = root_txns->front(); + root_txns->pop(); + tp_.AddTask([this, txn]() { this->CalvinEpochExecutorFunc(txn); }); + } + + // wait for epoch to end executing + int sleep_duration = 1; // in microseconds + while (num_txns_left_in_epoch > 0) { + usleep(sleep_duration); + // Back off exponentially. + if (sleep_duration < 32) + sleep_duration *= 2; + } + delete current_epoch_dag->adj_list; + delete current_epoch_dag->indegree; + delete current_epoch_dag->root_txns; + free(current_epoch_dag); + current_epoch_dag = NULL; + } + } +} + +void TxnProcessor::CalvinExecuteSingleEpoch(EpochDag *epoch_dag) { + num_txns_left_in_epoch = 0; + // if (num_txns_left_in_epoch != 0) { + // std::cout << "Num transactions in epoch: " << num_txns_left_in_epoch + // << std::endl; + // std::cout << "UH OH--------------------------------UH OH" << std::endl; + // } + current_epoch_dag = epoch_dag; + num_txns_left_in_epoch = current_epoch_dag->adj_list->size(); + Txn *txn; + std::queue *root_txns = current_epoch_dag->root_txns; + + // add all root txns to threadpool + while (!root_txns->empty()) { + txn = root_txns->front(); + root_txns->pop(); + tp_.AddTask([this, txn]() { this->CalvinEpochExecutorFunc(txn); }); + } + + // wait for epoch to end executing + int sleep_duration = 1; // in microseconds + while (num_txns_left_in_epoch > 0) { + usleep(sleep_duration); + // Back off exponentially. + if (sleep_duration < 32) + sleep_duration *= 2; + } + delete current_epoch_dag->adj_list; + delete current_epoch_dag->indegree; + delete current_epoch_dag->root_txns; + free(current_epoch_dag); + current_epoch_dag = NULL; +} + +void *TxnProcessor::calvin_epoch_executor_helper(void *arg) { + reinterpret_cast(arg)->CalvinEpochExecutor(); + return NULL; +} + +void TxnProcessor::CalvinEpochExecutorFunc(Txn *txn) { + ExecuteTxn(txn); + + // Commit/abort txn according to program logic's commit/abort decision. + // Note: we do this within the worker thread instead of returning + // back to the scheduler thread. + if (txn->Status() == COMPLETED_C) { + ApplyWrites(txn); + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + } else if (txn->Status() == COMPLETED_A) { + txn->status_ = ABORTED; + } else { + // Invalid TxnStatus! + DIE("Completed Txn has invalid TxnStatus: " << txn->Status()); + } + + // Update indegrees of neighbors + // If any has indegree 0, add them back to the queue + auto neighbors = current_epoch_dag->adj_list->at(txn); + for (Txn *blocked_txn : neighbors) { + if (current_epoch_dag->indegree->at(blocked_txn)-- == 1) { + tp_.AddTask([this, blocked_txn]() { + this->CalvinEpochExecutorFunc(blocked_txn); + }); + } + } + num_txns_left_in_epoch--; + + // Return result to client. + txn_results_.Push(txn); +} diff --git a/a2_final/src/txn/lock_manager.cc b/a2_final/src/txn/lock_manager.cc new file mode 100644 index 0000000..0974bd3 --- /dev/null +++ b/a2_final/src/txn/lock_manager.cc @@ -0,0 +1,262 @@ +#include "lock_manager.h" + +LockManagerA::LockManagerA(std::deque *ready_txns) { + // // printf("creating new lock manager?\n"); + ready_txns_ = ready_txns; +} + +bool LockManagerA::WriteLock(Txn *txn, const Key &key) { + // + // Implement this method! + + // get the queue for the key + if (lock_table_.find(key) == lock_table_.end()) { + // key isn't ins the table yet + lock_table_[key] = new std::deque(); + } + std::deque *lr_queue = lock_table_[key]; + + // calculate how many transactions were already present in queue + int num_before = lr_queue->size(); + + // create and add lock request for this transaction + lr_queue->push_back(LockRequest(LockMode::EXCLUSIVE, txn)); + + // APPARENTLY WE DONT ADD TO THE READY LIST????? + if (num_before > 0) { + // add transaction to waiting list + + if (txn_waits_.find(txn) == txn_waits_.end()) { + // txn was not in the map + // add transaction to map + txn_waits_[txn] = num_before; + } else { + // add transaction to map + txn_waits_[txn] += num_before; + } + } + return num_before == 0; +} + +bool LockManagerA::ReadLock(Txn *txn, const Key &key) { + // Since Part 1A implements ONLY exclusive locks, calls to ReadLock can + // simply use the same logic as 'WriteLock'. + return WriteLock(txn, key); +} + +void LockManagerA::Release(Txn *txn, const Key &key) { + // + // Implement this method! + + if (lock_table_.find(key) == lock_table_.end()) { + // key not in lock / can't actually release + return; + } + // get queue for the key + std::deque *lr_queue = lock_table_[key]; + + // find location of this transaction in the queue + size_t txn_index = lr_queue->size(); + for (size_t i = 0; i < lr_queue->size(); i++) { + if (lr_queue->at(i).txn_ == txn) { + txn_index = i; + break; + } + } + if (txn_index == lr_queue->size()) { + // this transaction doesn't have a lock on this key + return; + } + + // remove this transaction from the queue + lr_queue->erase(lr_queue->begin() + txn_index); + + // update following elements + for (size_t i = txn_index; i < lr_queue->size(); i++) { + // get transaction waiting for us to release a lock + Txn *update_txn = lr_queue->at(i).txn_; + // update number of transactions before it + txn_waits_[update_txn]--; + // if the transaction is no longer waiting for a lock + // remove it from txn_waits_ and add to read_txns_ + if (txn_waits_[update_txn] == 0) { + ready_txns_->push_back(update_txn); + txn_waits_.erase(update_txn); + } + } +} + +// NOTE: The owners input vector is NOT assumed to be empty. +LockMode LockManagerA::Status(const Key &key, std::vector *owners) { + // + // Implement this method! + owners->clear(); + + if (lock_table_.find(key) == lock_table_.end() || + lock_table_[key]->size() == 0) { + // the key isn't in the lock table or empty + return LockMode::UNLOCKED; + } else { + owners->push_back(lock_table_[key]->at(0).txn_); + return LockMode::EXCLUSIVE; + } +} + +LockManagerB::LockManagerB(std::deque *ready_txns) { + ready_txns_ = ready_txns; +} + +bool LockManagerB::WriteLock(Txn *txn, const Key &key) { + // printf("write locking key: %d\n", key); + + // + // Implement this method! + + // first let's get the correct Lock Request Queue + if (lock_table_.find(key) == lock_table_.end()) { + // queue doesn't exist yet + lock_table_[key] = new std::deque(); + } + + std::deque *lr_queue = lock_table_[key]; + + size_t num_before = lr_queue->size(); + + // add lock request to queue + lr_queue->push_back(LockRequest(LockMode::EXCLUSIVE, txn)); + + if (num_before > 0) { + // can't get lock yet + // add txn to waitlist + if (txn_waits_.find(txn) == txn_waits_.end()) { + txn_waits_[txn] = 0; + } + txn_waits_[txn] += num_before; + } + + return num_before == 0; +} + +bool LockManagerB::ReadLock(Txn *txn, const Key &key) { + // + // Implement this method! + // printf("read locking key: %d\n", key); + + if (lock_table_.find(key) == lock_table_.end()) { + // queue doesn't exist yet + lock_table_[key] = new std::deque(); + } + + // first let's get the correct Lock Request Queue + std::deque *lr_queue = lock_table_[key]; + + int num_exclusive_before = 0; + // check for exclusive transactions before this one + for (size_t i = 0; i < lr_queue->size(); i++) { + if (lr_queue->at(i).mode_ == LockMode::EXCLUSIVE) { + num_exclusive_before++; + } + } + + // add lock request to queue + lr_queue->push_back(LockRequest(LockMode::SHARED, txn)); + + if (num_exclusive_before > 0) { + // can't get lock yet + // add txn to waitlist + if (txn_waits_.find(txn) == txn_waits_.end()) { + txn_waits_[txn] = 0; + } + txn_waits_[txn] += num_exclusive_before; + } + + return num_exclusive_before == 0; +} + +void LockManagerB::Release(Txn *txn, const Key &key) { + // Implement this method! + + if (lock_table_.find(key) == lock_table_.end()) { + // don't need to do anything if the lock doesn't exist on the key + return; + } + + std::deque *lr_queue = lock_table_[key]; + LockMode released_type; + // find location of this transaction in the queue + size_t txn_index = lr_queue->size(); + for (size_t i = 0; i < lr_queue->size(); i++) { + if (lr_queue->at(i).txn_ == txn) { + txn_index = i; + released_type = lr_queue->at(i).mode_; + break; + } + } + + if (txn_index == lr_queue->size()) { + // this transaction doesn't have a lock on the key + return; + } + + // remove this transaction from the queue + lr_queue->erase(lr_queue->begin() + txn_index); + + // update following elements + for (size_t i = txn_index; i < lr_queue->size(); i++) { + + // get transaction waiting for us to release a lock + LockMode update_type = lr_queue->at(i).mode_; + Txn *update_txn = lr_queue->at(i).txn_; + + // update number of transactions before it if necessary + // shared locks only count exclusives before them + // exlusive lock count all before them + if (released_type == LockMode::EXCLUSIVE || + update_type == LockMode::EXCLUSIVE) { + if (txn_waits_.find(update_txn) == txn_waits_.end()) { + printf("Shouldn't be here in release lock :(\n"); + // + } + txn_waits_[update_txn]--; + // if the transaction is no longer waiting for a lock + // remove it from txn_waits_ and add to read_txns_ + if (txn_waits_[update_txn] == 0) { + ready_txns_->push_back(update_txn); + txn_waits_.erase(update_txn); + } + } + } +} + +// NOTE: The owners input vector is NOT assumed to be empty. +LockMode LockManagerB::Status(const Key &key, std::vector *owners) { + // Implement this method! + if (lock_table_.find(key) == lock_table_.end()) { + return UNLOCKED; + } + + owners->clear(); + + std::deque *lr_queue = lock_table_[key]; + + LockMode mode; + + if (lr_queue->size() == 0) { + // empty queue + mode = LockMode::UNLOCKED; + } else if (lr_queue->at(0).mode_ == LockMode::EXCLUSIVE) { + mode = LockMode::EXCLUSIVE; + owners->push_back(lr_queue->at(0).txn_); + } else { + mode = LockMode::SHARED; + for (size_t i = 0; i < lr_queue->size(); i++) { + LockRequest lr = lr_queue->at(i); + if (lr.mode_ == LockMode::EXCLUSIVE) { + break; + } else { + owners->push_back(lr.txn_); + } + } + } + return mode; +} diff --git a/a2_final/src/txn/lock_manager.h b/a2_final/src/txn/lock_manager.h new file mode 100644 index 0000000..6bd4f77 --- /dev/null +++ b/a2_final/src/txn/lock_manager.h @@ -0,0 +1,127 @@ +// Interface for lock managers in the system. + +#ifndef _LOCK_MANAGER_H_ +#define _LOCK_MANAGER_H_ + +#include +#include +#include +#include + +#include "utils/common.h" + +class Txn; + +// This interface supports locks being held in both read/shared and +// write/exclusive modes. +enum LockMode { + UNLOCKED = 0, + SHARED = 1, + EXCLUSIVE = 2, +}; + +class LockManager { +public: + virtual ~LockManager() {} + // Attempts to grant a read lock to the specified transaction, enqueueing + // request in lock table. Returns true if lock is immediately granted, else + // returns false. + // + // Requires: Neither ReadLock nor WriteLock has previously been called with + // this txn and key. + virtual bool ReadLock(Txn *txn, const Key &key) = 0; + + // Attempts to grant a write lock to the specified transaction, enqueueing + // request in lock table. Returns true if lock is immediately granted, else + // returns false. + // + // Requires: Neither ReadLock nor WriteLock has previously been called with + // this txn and key. + virtual bool WriteLock(Txn *txn, const Key &key) = 0; + + // Releases lock held by 'txn' on 'key', or cancels any pending request for + // a lock on 'key' by 'txn'. If 'txn' held an EXCLUSIVE lock on 'key' (or was + // the sole holder of a SHARED lock on 'key'), then the next request(s) in the + // request queue is granted. If the granted request(s) corresponds to a + // transaction that has now acquired ALL of its locks, that transaction is + // appended to the 'ready_txns_' queue. + // + // IMPORTANT NOTE: In order to know WHEN a transaction is ready to run, you + // may need to track its lock acquisition progress during the lock request + // process. + // (Hint: Use 'LockManager::txn_waits_' defined below.) + virtual void Release(Txn *txn, const Key &key) = 0; + + // Sets '*owners' to contain the txn IDs of all txns holding the lock, and + // returns the current LockMode of the lock: UNLOCKED if it is not currently + // held, SHARED or EXCLUSIVE if it is, depending on the current state. + virtual LockMode Status(const Key &key, std::vector *owners) = 0; + +protected: + // The LockManager's lock table tracks all lock requests. For a given key, if + // 'lock_table_' contains a nonempty deque, then the item with that key is + // locked and either: + // + // (a) first element in the deque specifies the owner if that item is a + // request for an EXCLUSIVE lock, or + // + // (b) a SHARED lock is held by all elements of the longest prefix of the + // deque containing only SHARED lock requests. + // + // For example, if lock_table_["key1"] points to a deque containing + // + // (&Txn1, SHARED), (&Txn2, SHARED), (&Txn3, EXCLUSIVE), (&Txn4, SHARED) + // + // then Txn1 and Txn2 currently hold a SHARED lock on the record with key + // "key1". Only when they BOTH release their locks will Txn3 acquire its + // exclusive lock on the record. (Note that since Txn4 comes after Txn3, it + // cannot acquire a lock until after Txn3 has released its lock, so it cannot + // share the lock with Txn1 and Txn2.) + // + // As a second example, if lock_table_["key1"] points to a deque containing + // + // (&Txn1, EXCLUSIVE), (&Txn2, SHARED), (&Txn3, SHARED), (Txn4, EXCLUSIVE) + // + // then Txn1 currently holds an EXCLUSIVE lock on "key1". When Txn1 releases + // its lock, Txn2 and Txn3 will simultaneously acquire SHARED locks on "key1". + struct LockRequest { + LockRequest(LockMode m, Txn *t) : txn_(t), mode_(m) {} + Txn *txn_; // Pointer to txn requesting the lock. + LockMode mode_; // Specifies whether this is a read or write lock request. + }; + std::unordered_map *> lock_table_; + + // Queue of pointers to transactions that: + // (a) were previously blocked on acquiring at least one lock, and + // (b) have now acquired all locks that they have requested. + std::deque *ready_txns_; + + // Tracks all txns still waiting on acquiring at least one lock. Entries in + // 'txn_waits_' are invalided by any call to Release() with the entry's + // txn. + std::unordered_map txn_waits_; +}; + +// Version of the LockManager implementing ONLY exclusive locks. +class LockManagerA : public LockManager { +public: + explicit LockManagerA(std::deque *ready_txns); + inline virtual ~LockManagerA() {} + virtual bool ReadLock(Txn *txn, const Key &key); + virtual bool WriteLock(Txn *txn, const Key &key); + virtual void Release(Txn *txn, const Key &key); + virtual LockMode Status(const Key &key, std::vector *owners); +}; + +// Version of the LockManager implementing both shared and exclusive locks. +class LockManagerB : public LockManager { +public: + explicit LockManagerB(std::deque *ready_txns); + inline virtual ~LockManagerB() {} + virtual bool ReadLock(Txn *txn, const Key &key); + virtual bool WriteLock(Txn *txn, const Key &key); + virtual void Release(Txn *txn, const Key &key); + virtual LockMode Status(const Key &key, std::vector *owners); +}; + +#endif // _LOCK_MANAGER_H_ diff --git a/a2_final/src/txn/lock_manager_test.cc b/a2_final/src/txn/lock_manager_test.cc new file mode 100644 index 0000000..1c1453a --- /dev/null +++ b/a2_final/src/txn/lock_manager_test.cc @@ -0,0 +1,189 @@ +#include "lock_manager.h" + +#include +#include + +#include "utils/testing.h" + +TEST(LockManagerA_SimpleLocking) { + std::deque ready_txns; + LockManagerA lm(&ready_txns); + std::vector owners; + + Txn *t1 = reinterpret_cast(1); + Txn *t2 = reinterpret_cast(2); + Txn *t3 = reinterpret_cast(3); + + // Txn 1 acquires read lock. + lm.ReadLock(t1, 101); + ready_txns.push_back(t1); // Txn 1 is ready. + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + EXPECT_EQ(t1, ready_txns.at(0)); + + // Txn 2 requests write lock. Not granted. + lm.WriteLock(t2, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + + // Txn 3 requests read lock. Not granted. + lm.ReadLock(t3, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + + // Txn 1 releases lock. Txn 2 is granted write lock. + lm.Release(t1, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t2, owners[0]); + EXPECT_EQ(2, ready_txns.size()); + EXPECT_EQ(t2, ready_txns.at(1)); + + // Txn 2 releases lock. Txn 3 is granted read lock. + lm.Release(t2, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t3, owners[0]); + EXPECT_EQ(3, ready_txns.size()); + EXPECT_EQ(t3, ready_txns.at(2)); + + END; +} + +TEST(LockManagerA_LocksReleasedOutOfOrder) { + std::deque ready_txns; + LockManagerA lm(&ready_txns); + std::vector owners; + + Txn *t1 = reinterpret_cast(1); + Txn *t2 = reinterpret_cast(2); + Txn *t3 = reinterpret_cast(3); + Txn *t4 = reinterpret_cast(4); + + lm.ReadLock(t1, 101); // Txn 1 acquires read lock. + ready_txns.push_back(t1); // Txn 1 is ready. + lm.WriteLock(t2, 101); // Txn 2 requests write lock. Not granted. + lm.ReadLock(t3, 101); // Txn 3 requests read lock. Not granted. + lm.ReadLock(t4, 101); // Txn 4 requests read lock. Not granted. + + lm.Release(t2, 101); // Txn 2 cancels write lock request. + + // Txn 1 should now have a read lock and Txns 3 and 4 should be next in line. + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + + // Txn 1 releases lock. Txn 3 is granted read lock. + lm.Release(t1, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t3, owners[0]); + EXPECT_EQ(2, ready_txns.size()); + EXPECT_EQ(t3, ready_txns.at(1)); + + // Txn 3 releases lock. Txn 4 is granted read lock. + lm.Release(t3, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t4, owners[0]); + EXPECT_EQ(3, ready_txns.size()); + EXPECT_EQ(t4, ready_txns.at(2)); + + END; +} + +TEST(LockManagerB_SimpleLocking) { + std::deque ready_txns; + LockManagerB lm(&ready_txns); + std::vector owners; + + Txn *t1 = reinterpret_cast(1); + Txn *t2 = reinterpret_cast(2); + Txn *t3 = reinterpret_cast(3); + + // Txn 1 acquires read lock. + lm.ReadLock(t1, 101); + ready_txns.push_back(t1); // Txn 1 is ready. + EXPECT_EQ(SHARED, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + EXPECT_EQ(t1, ready_txns.at(0)); + + // Txn 2 requests write lock. Not granted. + lm.WriteLock(t2, 101); + EXPECT_EQ(SHARED, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + + // Txn 3 requests read lock. Not granted. + lm.ReadLock(t3, 101); + EXPECT_EQ(SHARED, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(1, ready_txns.size()); + + // Txn 1 releases lock. Txn 2 is granted write lock. + lm.Release(t1, 101); + EXPECT_EQ(EXCLUSIVE, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t2, owners[0]); + EXPECT_EQ(2, ready_txns.size()); + EXPECT_EQ(t2, ready_txns.at(1)); + + // Txn 2 releases lock. Txn 3 is granted read lock. + lm.Release(t2, 101); + EXPECT_EQ(SHARED, lm.Status(101, &owners)); + EXPECT_EQ(1, owners.size()); + EXPECT_EQ(t3, owners[0]); + EXPECT_EQ(3, ready_txns.size()); + EXPECT_EQ(t3, ready_txns.at(2)); + + END; +} + +TEST(LockManagerB_LocksReleasedOutOfOrder) { + std::deque ready_txns; + LockManagerB lm(&ready_txns); + std::vector owners; + + Txn *t1 = reinterpret_cast(1); + Txn *t2 = reinterpret_cast(2); + Txn *t3 = reinterpret_cast(3); + Txn *t4 = reinterpret_cast(4); + + lm.ReadLock(t1, 101); // Txn 1 acquires read lock. + ready_txns.push_back(t1); // Txn 1 is ready. + lm.WriteLock(t2, 101); // Txn 2 requests write lock. Not granted. + lm.ReadLock(t3, 101); // Txn 3 requests read lock. Not granted. + lm.ReadLock(t4, 101); // Txn 4 requests read lock. Not granted. + + lm.Release(t2, 101); // Txn 2 cancels write lock request. + + // Txns 1, 3 and 4 should now have a shared lock. + EXPECT_EQ(SHARED, lm.Status(101, &owners)); + EXPECT_EQ(3, owners.size()); + EXPECT_EQ(t1, owners[0]); + EXPECT_EQ(t3, owners[1]); + EXPECT_EQ(t4, owners[2]); + EXPECT_EQ(3, ready_txns.size()); + EXPECT_EQ(t1, ready_txns.at(0)); + EXPECT_EQ(t3, ready_txns.at(1)); + EXPECT_EQ(t4, ready_txns.at(2)); + + END; +} + +int main(int argc, char **argv) { + LockManagerA_SimpleLocking(); + LockManagerA_LocksReleasedOutOfOrder(); + LockManagerB_SimpleLocking(); + LockManagerB_LocksReleasedOutOfOrder(); +} diff --git a/a2_final/src/txn/mvcc_storage.cc b/a2_final/src/txn/mvcc_storage.cc new file mode 100644 index 0000000..c52a030 --- /dev/null +++ b/a2_final/src/txn/mvcc_storage.cc @@ -0,0 +1,119 @@ +#include "mvcc_storage.h" + +// Init the storage +void MVCCStorage::InitStorage() { + for (int i = 0; i < 1000000; i++) { + Write(i, 0, 0); + std::mutex *key_mutex = new std::mutex(); + mutexs_[i] = key_mutex; + } +} + +// Free memory. +MVCCStorage::~MVCCStorage() { + for (auto it = mvcc_data_.begin(); it != mvcc_data_.end(); ++it) { + delete it->second; + } + + mvcc_data_.clear(); + + for (auto it = mutexs_.begin(); it != mutexs_.end(); ++it) { + delete it->second; + } + + mutexs_.clear(); +} + +// Lock the key to protect its version_list. Remember to lock the key when you +// read/update the version_list +void MVCCStorage::Lock(Key key) { mutexs_[key]->lock(); } + +// Unlock the key. +void MVCCStorage::Unlock(Key key) { mutexs_[key]->unlock(); } + +// MVCC Read +// If there exists a record for the specified key, sets '*result' equal to +// the value associated with the key and returns true, else returns false; +// The third parameter is the txn_unique_id(txn timestamp), which is used for +// MVCC. +bool MVCCStorage::Read(Key key, Value *result, int txn_unique_id) { + // + // Implement this method! + + // Hint: Iterate the version_lists and return the version whose write + // timestamp (version_id) is the largest write timestamp less than or equal to + // txn_unique_id. + + // Check if the key exists in mvcc_data_ + if (mvcc_data_.count(key) == 0) { + return false; + } + + for (auto version : *mvcc_data_[key]) { + // Return the first version whose version_id is less than or equal to + // txn_unique_id This assumes that the version list is sorted in descending + // order + if (version->version_id_ <= txn_unique_id) { + *result = version->value_; + version->max_read_id_ = txn_unique_id; + return true; + } + } + + return false; +} + +// Check whether the txn executed on the latest version of the key. +bool MVCCStorage::CheckKey(Key key, int txn_unique_id) { + // + // Implement this method! + + // Hint: Before all writes are applied (and SSI reads are validated), we need + // to make sure that each key was accessed safely based on MVCC timestamp + // ordering protocol. This method only checks one key, so you should call this + // method for each key (as necessary). Return true if this key passes the + // check, return false if not. Note that you don't have to call Lock(key) in + // this method, just call Lock(key) before you call this method and call + // Unlock(key) afterward. + + // If key doesn't exist, or if deque is empty for some reason, return false + if (mvcc_data_.count(key) == 0 || mvcc_data_[key]->empty()) { + return false; + } + + // Assuming that the version list is sorted in descending order + return txn_unique_id >= mvcc_data_[key]->front()->version_id_; +} + +// MVCC Write, call this method only if CheckWrite return true. +// Inserts a new version with key and value +// The third parameter is the txn_unique_id(txn timestamp), which is used for +// MVCC. +void MVCCStorage::Write(Key key, Value value, int txn_unique_id) { + // + // Implement this method! + + // Hint: Insert a new version (malloc a Version and specify its + // value/version_id/max_read_id) into the version_lists. Note that + // InitStorage() also calls this method to init storage. Note that you don't + // have to call Lock(key) in this method, just call Lock(key) before you call + // this method and call Unlock(key) afterward. Note that the performance would + // be much better if you organize the versions in decreasing order. + + auto version = new Version(); + version->value_ = value; + version->version_id_ = txn_unique_id; + version->max_read_id_ = 0; + + if (mvcc_data_.count(key) == 0) { + mvcc_data_[key] = new std::deque(); + } + + // Insert the new version in descending order + auto it = mvcc_data_[key]->begin(); + while (it != mvcc_data_[key]->end() && (*it)->version_id_ > txn_unique_id) { + ++it; + } + mvcc_data_[key]->insert(it, version); +} + diff --git a/a2_final/src/txn/mvcc_storage.h b/a2_final/src/txn/mvcc_storage.h new file mode 100644 index 0000000..ae55ce0 --- /dev/null +++ b/a2_final/src/txn/mvcc_storage.h @@ -0,0 +1,58 @@ +#ifndef _MVCC_STORAGE_H_ +#define _MVCC_STORAGE_H_ + +#include "storage.h" +#include +#include + +// MVCC 'version' structure +struct Version { + Value value_; // The value of this version + int max_read_id_; // Largest timestamp of a transaction that read the version + int version_id_; // Timestamp of the transaction that created(wrote) the + // version +}; + +// MVCC storage +class MVCCStorage : public Storage { +public: + // If there exists a record for the specified key, sets '*result' equal to + // the value associated with the key and returns true, else returns false; + // The third parameter is the txn_unique_id(txn timestamp), which is used for + // MVCC. + virtual bool Read(Key key, Value *result, int txn_unique_id = 0); + + // Inserts a new version with key and value + // The third parameter is the txn_unique_id(txn timestamp), which is used for + // MVCC. + virtual void Write(Key key, Value value, int txn_unique_id = 0); + + // Returns the timestamp at which the record with the specified key was last + // updated (returns 0 if the record has never been updated). This is used for + // OCC. + virtual double Timestamp(Key key) { return 0; } + // Init storage + virtual void InitStorage(); + + // Lock the version_list of key + virtual void Lock(Key key); + + // Unlock the version_list of key + virtual void Unlock(Key key); + + // Check whether the txn executed on the latest version of the key. + virtual bool CheckKey(Key key, int txn_unique_id); + + virtual ~MVCCStorage(); + +private: + friend class TxnProcessor; + + // Storage for MVCC, each key has a linklist of versions + std::unordered_map *> mvcc_data_; + + // Mutexs for each key + std::unordered_map mutexs_; +}; + +#endif // _MVCC_STORAGE_H_ diff --git a/a2_final/src/txn/storage.cc b/a2_final/src/txn/storage.cc new file mode 100644 index 0000000..920c6ae --- /dev/null +++ b/a2_final/src/txn/storage.cc @@ -0,0 +1,29 @@ +#include "storage.h" + +bool Storage::Read(Key key, Value *result, int txn_unique_id) { + if (data_.count(key)) { + *result = data_[key]; + return true; + } else { + return false; + } +} + +// Write value and timestamps +void Storage::Write(Key key, Value value, int txn_unique_id) { + data_[key] = value; + timestamps_[key] = GetTime(); +} + +double Storage::Timestamp(Key key) { + if (timestamps_.count(key) == 0) + return 0; + return timestamps_[key]; +} + +// Init the storage +void Storage::InitStorage() { + for (int i = 0; i < 1000000; i++) { + Write(i, 0, 0); + } +} diff --git a/a2_final/src/txn/storage.h b/a2_final/src/txn/storage.h new file mode 100644 index 0000000..c0e7e3e --- /dev/null +++ b/a2_final/src/txn/storage.h @@ -0,0 +1,46 @@ +#ifndef _STORAGE_H_ +#define _STORAGE_H_ + +#include + +#include "txn.h" + +class Storage { +public: + // If there exists a record for the specified key, sets '*result' equal to + // the value associated with the key and returns true, else returns false; + // Note that the third parameter is only used for MVCC, the default vaule is + // 0. + virtual bool Read(Key key, Value *result, int txn_unique_id = 0); + + // Inserts the record , replacing any previous record with the + // same key. + // Note that the third parameter is only used for MVCC, the default vaule is + // 0. + virtual void Write(Key key, Value value, int txn_unique_id = 0); + + // Returns the timestamp at which the record with the specified key was last + // updated (returns 0 if the record has never been updated). This is used for + // OCC. + virtual double Timestamp(Key key); + + // Init storage + virtual void InitStorage(); + + virtual ~Storage() {} + // The following methods are only used for MVCC + virtual void Lock(Key key) {} + virtual void Unlock(Key key) {} + virtual bool CheckWrite(Key key, int txn_unique_id) { return true; } + +private: + friend class TxnProcessor; + + // Collection of pairs. Use this for single-version storage + std::unordered_map data_; + + // Timestamps at which each key was last updated. + std::unordered_map timestamps_; +}; + +#endif // _STORAGE_H_ diff --git a/a2_final/src/txn/txn.cc b/a2_final/src/txn/txn.cc new file mode 100644 index 0000000..5a1a923 --- /dev/null +++ b/a2_final/src/txn/txn.cc @@ -0,0 +1,56 @@ +#include "txn.h" + +bool Txn::Read(const Key &key, Value *value) { + // Check that key is in readset/writeset. + if (readset_.count(key) == 0 && writeset_.count(key) == 0) + DIE("Invalid read (key not in readset or writeset)."); + + // Reads have no effect if we have already aborted or committed. + if (status_ != INCOMPLETE) + return false; + + // 'reads_' has already been populated by TxnProcessor, so it should contain + // the target value iff the record appears in the database. + if (reads_.count(key)) { + *value = reads_[key]; + return true; + } else { + return false; + } +} + +void Txn::Write(const Key &key, const Value &value) { + // Check that key is in writeset. + if (writeset_.count(key) == 0) + DIE("Invalid write to key " << key << " (writeset)."); + + // Writes have no effect if we have already aborted or committed. + if (status_ != INCOMPLETE) + return; + + // Set key-value pair in write buffer. + writes_[key] = value; + + // Also set key-value pair in read results in case txn logic requires the + // record to be re-read. + reads_[key] = value; +} + +void Txn::CheckReadWriteSets() { + for (std::set::iterator it = writeset_.begin(); it != writeset_.end(); + ++it) { + if (readset_.count(*it) > 0) { + DIE("Overlapping read/write sets\n."); + } + } +} + +void Txn::CopyTxnInternals(Txn *txn) const { + txn->readset_ = std::set(this->readset_); + txn->writeset_ = std::set(this->writeset_); + txn->reads_ = std::map(this->reads_); + txn->writes_ = std::map(this->writes_); + txn->status_ = this->status_; + txn->unique_id_ = this->unique_id_; + txn->occ_start_idx_ = this->occ_start_idx_; +} diff --git a/a2_final/src/txn/txn.h b/a2_final/src/txn/txn.h new file mode 100644 index 0000000..051aec1 --- /dev/null +++ b/a2_final/src/txn/txn.h @@ -0,0 +1,108 @@ +#ifndef _TXN_H_ +#define _TXN_H_ + +#include +#include +#include +#include + +#include "utils/common.h" + +// Txns can have five distinct status values: +enum TxnStatus { + INCOMPLETE = 0, // Not yet executed + COMPLETED_C = 1, // Executed (with commit vote) + COMPLETED_A = 2, // Executed (with abort vote) + COMMITTED = 3, // Committed + ABORTED = 4, // Aborted +}; + +class Txn { +public: + // Commit vote defauls to false. Only by calling "commit" + Txn() : status_(INCOMPLETE) {} + virtual ~Txn() {} + virtual Txn *clone() const = 0; // Virtual constructor (copying) + + // Method containing all the transaction's method logic. + virtual void Run() = 0; + + // Returns the Txn's current execution status. + TxnStatus Status() { return status_; } + // Checks for overlap in read and write sets. If any key appears in both, + // an error occurs. + void CheckReadWriteSets(); + +protected: + // Copies the internals of this txn into a given transaction (i.e. + // the readset, writeset, and so forth). Be sure to modify this method + // to copy any new data structures you create. + void CopyTxnInternals(Txn *txn) const; + + friend class TxnProcessor; + + // Method to be used inside 'Execute()' function when reading records from + // the database. If record corresponding with specified 'key' exists, sets + // '*value' equal to the record value and returns true, else returns false. + // + // Requires: key appears in readset or writeset + // + // Note: Can ONLY be called from inside the 'Execute()' function. + bool Read(const Key &key, Value *value); + + // Method to be used inside 'Execute()' function when writing records to + // the database. + // + // Requires: key appears in writeset + // + // Note: Can ONLY be called from inside the 'Execute()' function. + void Write(const Key &key, const Value &value); + +// Macro to be used inside 'Execute()' function when deciding to COMMIT. +// +// Note: Can ONLY be called from inside the 'Execute()' function. +#define COMMIT \ + do { \ + status_ = COMPLETED_C; \ + return; \ + } while (0) + +// Macro to be used inside 'Execute()' function when deciding to ABORT. +// +// Note: Can ONLY be called from inside the 'Execute()' function. +#define ABORT \ + do { \ + status_ = COMPLETED_A; \ + return; \ + } while (0) + + // Set of all keys that may need to be read in order to execute the + // transaction. + std::set readset_; + + // Set of all keys that may be updated when executing the transaction. + std::set writeset_; + + // Results of reads performed by the transaction. + std::map reads_; + + // Key, Value pairs WRITTEN by the transaction. + std::map writes_; + + // Transaction's current execution status. + TxnStatus status_; + + // Unique, monotonically increasing transaction ID, assigned by TxnProcessor. + uint64 unique_id_; + + // Start index (used for OCC). + int64_t occ_start_idx_; + + std::unordered_set neighbors; + std::shared_mutex neighbors_mutex; + + std::mutex indegree_mutex; + int indegree; +}; + +#endif // _TXN_H_ diff --git a/a2_final/src/txn/txn_processor.cc b/a2_final/src/txn/txn_processor.cc new file mode 100644 index 0000000..52aa997 --- /dev/null +++ b/a2_final/src/txn/txn_processor.cc @@ -0,0 +1,596 @@ +#include +#include +#include +#include +#include + +#include "lock_manager.h" +#include "txn_processor.h" + +// Thread & queue counts for StaticThreadPool initialization. +#define THREAD_COUNT 8 + +using std::set; + +TxnProcessor::TxnProcessor(CCMode mode) + : mode_(mode), tp_(THREAD_COUNT), next_unique_id_(1) { + if (mode_ == LOCKING_EXCLUSIVE_ONLY) + lm_ = new LockManagerA(&ready_txns_); + else if (mode_ == LOCKING) + lm_ = new LockManagerB(&ready_txns_); + + // Create the storage + if (mode_ == MVCC || mode_ == MVCC_SSI) { + storage_ = new MVCCStorage(); + } else { + storage_ = new Storage(); + } + + storage_->InitStorage(); + stopped_ = false; + scheduler_thread_ = std::thread{&TxnProcessor::RunScheduler, this}; +} + +TxnProcessor::~TxnProcessor() { + // Wait for the scheduler thread to join back before destroying the object and + // its thread pool. + if (mode_ == CALVIN_EPOCH) { + pthread_join(calvin_sequencer_thread, NULL); + } + stopped_ = true; + scheduler_thread_.join(); + + if (mode_ == LOCKING_EXCLUSIVE_ONLY || mode_ == LOCKING) + delete lm_; + + delete storage_; +} + +void TxnProcessor::NewTxnRequest(Txn *txn) { + // Atomically assign the txn a new number and add it to the incoming txn + // requests queue. + txn->unique_id_ = next_unique_id_++; + txn_requests_.Push(txn); +} + +Txn *TxnProcessor::GetTxnResult() { + Txn *txn; + while (!txn_results_.Pop(&txn)) { + // No result yet. Wait a bit before trying again (to reduce contention on + // atomic queues). + usleep(1); + } + return txn; +} + +void TxnProcessor::RunScheduler() { + switch (mode_) { + case SERIAL: + RunSerialScheduler(); + break; + case LOCKING: + RunLockingScheduler(); + break; + case LOCKING_EXCLUSIVE_ONLY: + RunLockingScheduler(); + break; + case OCC: + RunOCCScheduler(); + break; + case P_OCC: + RunOCCParallelScheduler(); + break; + case MVCC: + RunMVCCScheduler(); + break; + case CALVIN: + RunCalvinContScheduler(); + break; + case CALVIN_I: + RunCalvinContIndivScheduler(); + break; + case CALVIN_EPOCH: + RunCalvinEpochScheduler(); + break; + } +} + +void TxnProcessor::RunSerialScheduler() { + Txn *txn; + while (!stopped_) { + // Get next txn request. + if (txn_requests_.Pop(&txn)) { + // Execute txn. + ExecuteTxn(txn); + + // Commit/abort txn according to program logic's commit/abort decision. + if (txn->Status() == COMPLETED_C) { + ApplyWrites(txn); + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + } else if (txn->Status() == COMPLETED_A) { + txn->status_ = ABORTED; + } else { + // Invalid TxnStatus! + DIE("Completed Txn has invalid TxnStatus: " << txn->Status()); + } + + // Return result to client. + txn_results_.Push(txn); + } + } +} + +void TxnProcessor::RunLockingScheduler() { + Txn *txn; + while (!stopped_) { + // Start processing the next incoming transaction request. + if (txn_requests_.Pop(&txn)) { + bool blocked = false; + // Request read locks. + for (std::set::iterator it = txn->readset_.begin(); + it != txn->readset_.end(); ++it) { + if (!lm_->ReadLock(txn, *it)) { + blocked = true; + } + } + + // Request write locks. + for (std::set::iterator it = txn->writeset_.begin(); + it != txn->writeset_.end(); ++it) { + if (!lm_->WriteLock(txn, *it)) { + blocked = true; + } + } + + // If all read and write locks were immediately acquired, this txn is + // ready to be executed. + if (blocked == false) { + ready_txns_.push_back(txn); + } + } + + // Process and commit all transactions that have finished running. + while (completed_txns_.Pop(&txn)) { + // Commit/abort txn according to program logic's commit/abort decision. + if (txn->Status() == COMPLETED_C) { + ApplyWrites(txn); + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + } else if (txn->Status() == COMPLETED_A) { + txn->status_ = ABORTED; + } else { + // Invalid TxnStatus! + DIE("Completed Txn has invalid TxnStatus: " << txn->Status()); + } + + // Release read locks. + for (std::set::iterator it = txn->readset_.begin(); + it != txn->readset_.end(); ++it) { + lm_->Release(txn, *it); + } + // Release write locks. + for (std::set::iterator it = txn->writeset_.begin(); + it != txn->writeset_.end(); ++it) { + lm_->Release(txn, *it); + } + + // Return result to client. + txn_results_.Push(txn); + } + + // Start executing all transactions that have newly acquired all their + // locks. + while (ready_txns_.size()) { + // Get next ready txn from the queue. + txn = ready_txns_.front(); + ready_txns_.pop_front(); + + // Start txn running in its own thread. + tp_.AddTask([this, txn]() { this->ExecuteTxn(txn); }); + } + } +} + +void TxnProcessor::ExecuteTxn(Txn *txn) { + // Get the current commited transaction index for the further validation. + txn->occ_start_idx_ = committed_txns_.Size(); + + // Read everything in from readset. + for (std::set::iterator it = txn->readset_.begin(); + it != txn->readset_.end(); ++it) { + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(*it, &result)) + txn->reads_[*it] = result; + } + + // Also read everything in from writeset. + for (std::set::iterator it = txn->writeset_.begin(); + it != txn->writeset_.end(); ++it) { + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(*it, &result)) + txn->reads_[*it] = result; + } + + // Execute txn's program logic. + txn->Run(); + + // Hand the txn back to the RunScheduler thread. + completed_txns_.Push(txn); +} + +void TxnProcessor::ApplyWrites(Txn *txn) { + // Write buffered writes out to storage. + for (std::map::iterator it = txn->writes_.begin(); + it != txn->writes_.end(); ++it) { + storage_->Write(it->first, it->second, txn->unique_id_); + } +} + +void TxnProcessor::RunOCCScheduler() { + Txn *txn; + while (!stopped_) { + // Get the next new txn request (if one is pending) + if (txn_requests_.Pop(&txn)) { + // Pass it to an execution thread + tp_.AddTask([this, txn]() { this->ExecuteTxn(txn); }); + } + + // Dealing with a finished transaction + while (completed_txns_.Pop(&txn)) { + // Validation phase + // Use the data structure in `txn_processor` class to check overlap with + // each record whose key appears in the txn's read and write sets + bool valid = true; + + // Check for overlap with newly committed transactions + // after the txn's occ_start_idx_ + for (int i = txn->occ_start_idx_ + 1; i < committed_txns_.Size(); i++) { + Txn *t = committed_txns_[i]; + + // check if write_set of t intersects with read_set of txn + for (auto key : txn->readset_) { + if (t->writeset_.find(key) != t->writeset_.end()) { + valid = false; + break; + } + } + } + + // If validation failed, cleanup txn and completely restart it + if (!valid) { + // Cleanup txn + txn->reads_.clear(); + txn->writes_.clear(); + txn->status_ = INCOMPLETE; + + // Restart txn + std::scoped_lock lock{mutex_}; + txn->unique_id_ = next_unique_id_++; + txn_requests_.Push(txn); + } else { + // Apply all writes + ApplyWrites(txn); + + // Mark transaction as committed + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + + // Update relevant data structure + txn_results_.Push(txn); + } + } + } +} + +void TxnProcessor::ExecuteTxnParallel(Txn *txn) { + txn->occ_start_idx_ = committed_txns_.Size(); + + // Perform "read phase" of transaction + // Read everything in from readset. + for (std::set::iterator it = txn->readset_.begin(); + it != txn->readset_.end(); ++it) { + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(*it, &result)) + txn->reads_[*it] = result; + } + + // Also read everything in from writeset. + for (std::set::iterator it = txn->writeset_.begin(); + it != txn->writeset_.end(); ++it) { + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(*it, &result)) + txn->reads_[*it] = result; + } + + // Execute txn's program logic. + txn->Run(); + + // Start of critical section + // Make a copy of the active set + std::unique_lock lock{active_set_mutex_}; + auto finish_active = active_set_.GetSet(); + // Add this txn to the active set + active_set_.Insert(txn); + // End of critical section + lock.unlock(); + + // Validation phase + // Use the data structure in `txn_processor` class to check overlap with + // each record whose key appears in the txn's read and write sets + bool valid = true; + + // NOTE: This is not in the pseudocode in the project description + // Check for overlap with newly committed transactions + // after the txn's occ_start_idx_ + for (int i = txn->occ_start_idx_ + 1; i < committed_txns_.Size(); i++) { + Txn *t = committed_txns_[i]; + + // check if write_set of t intersects with read_set of txn + for (auto key : txn->readset_) { + if (t->writeset_.find(key) != t->writeset_.end()) { + valid = false; + break; + } + } + } + + // Check overlap with each record whose key appears in the txn's read and + // write sets NOTE: we only run this if the txn hasn't been invalidated by the + // previous check NOTE: this is the only validation implemented in the + // pseudocode in the project description + if (valid) { + for (auto t : finish_active) { + // if txn's write set intersects with t's write sets + for (auto key : txn->writeset_) { + if (t->writeset_.find(key) != t->writeset_.end()) { + valid = false; + break; + } + } + + // if txn's read set intersects with t's write sets + for (auto key : txn->readset_) { + if (t->writeset_.find(key) != t->writeset_.end()) { + valid = false; + break; + } + } + } + } + + // If validation failed, cleanup txn and completely restart it + if (!valid) { + // Remove this txn from the active set + std::unique_lock active_set_lock{active_set_mutex_}; + active_set_.Erase(txn); + active_set_lock.unlock(); + + // Cleanup txn + txn->reads_.clear(); + txn->writes_.clear(); + txn->status_ = INCOMPLETE; + + // Restart txn + std::scoped_lock lock{mutex_}; + txn->unique_id_ = next_unique_id_++; + txn_requests_.Push(txn); + } else { + // Apply all writes + ApplyWrites(txn); + + // Remove this txn from the active set + std::unique_lock active_set_lock{active_set_mutex_}; + active_set_.Erase(txn); + active_set_lock.unlock(); + + // Mark transaction as committed + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + + // Update relevant data structure + txn_results_.Push(txn); + } +} + +void TxnProcessor::RunOCCParallelScheduler() { + Txn *txn; + while (!stopped_) { + // Get the next new transaction request (if one is pending) and pass it to + // an execution thread that executes the txn logic *and also* does the + // validation and write phases. + if (txn_requests_.Pop(&txn)) { + tp_.AddTask([this, txn]() { this->ExecuteTxnParallel(txn); }); + } + } +} + +set set_union(const std::set &s1, const set &s2) { + std::set result = s1; + result.insert(s2.begin(), s2.end()); + return result; +} + +void TxnProcessor::MVCCExecuteTxn(Txn *txn) { + // Read all necessary data for this transaction from storage + // (Note that unlike the version of MVCC from class, you should lock the key + // before each read) + + // Read everything in from readset and writeset. + for (auto key : set_union(txn->readset_, txn->writeset_)) { + // Lock the key + storage_->Lock(key); + + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(key, &result, txn->unique_id_)) + txn->reads_[key] = result; + + // Unlock the key + storage_->Unlock(key); + } + + // Execute txn's program logic. + txn->Run(); + + // Acquire all locks for keys in the write_set_ + for (auto key : txn->writeset_) { + storage_->Lock(key); + } + + // Call MVCCStorage::CheckWrite method to check all keys in the write_set_ + bool checkPassed = true; + for (auto key : txn->writeset_) { + if (!((MVCCStorage *)storage_)->CheckKey(key, txn->unique_id_)) { + checkPassed = false; + break; + } + } + + // If each key passed the check + if (checkPassed) { + // Apply the writes + ApplyWrites(txn); + + // Release all locks for keys in the write_set_ + for (auto key : txn->writeset_) { + storage_->Unlock(key); + } + + // Mark transaction as committed + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + + // Update relevant data structure + txn_results_.Push(txn); + } else { // At least one key failed the check + // Release all locks for keys in the write_set_ + for (auto key : txn->writeset_) { + storage_->Unlock(key); + } + + // Cleanup txn + txn->reads_.clear(); + txn->writes_.clear(); + txn->status_ = INCOMPLETE; + + // Restart txn -- same as OCC + std::scoped_lock lock{mutex_}; + txn->unique_id_ = next_unique_id_++; + txn_requests_.Push(txn); + } +} + +void TxnProcessor::RunMVCCScheduler() { + // + // Implement this method! + + // Hint:Pop a txn from txn_requests_, and pass it to a thread to execute. + // Note that you may need to create another execute method, like + // TxnProcessor::MVCCExecuteTxn. + + Txn *txn; + while (!stopped_) { + // Get the next new transaction request (if one is pending) and pass it to + // an execution thread that executes the txn logic *and also* does the + // validation and write phases. + if (txn_requests_.Pop(&txn)) { + tp_.AddTask([this, txn]() { this->MVCCExecuteTxn(txn); }); + } + } +} + +void TxnProcessor::MVCCSSIExecuteTxn(Txn *txn) { + // Read all necessary data for this transaction from storage + // (Note that unlike the version of MVCC from class, you should lock the key + // before each read) + + // Read everything in from readset and writeset. + for (auto key : set_union(txn->readset_, txn->writeset_)) { + // Lock the key + storage_->Lock(key); + + // Save each read result iff record exists in storage. + Value result; + if (storage_->Read(key, &result, txn->unique_id_)) + txn->reads_[key] = result; + + // Unlock the key + storage_->Unlock(key); + } + + // Execute txn's program logic. + txn->Run(); + + // THIS IS DIFFERENT FROM MVCCExecuteTxn: we lock write_set AND read_set + // Acquire all locks for keys in the read_set_ and write_set_ + // (Lock any overlapping key only once.) + for (auto key : set_union(txn->writeset_, txn->readset_)) { + storage_->Lock(key); + } + + // Call MVCCStorage::CheckWrite method to check all keys in the write_set_ + bool checkPassed = true; + for (auto key : txn->writeset_) { + if (!((MVCCStorage *)storage_)->CheckKey(key, txn->unique_id_)) { + checkPassed = false; + break; + } + } + + // If each key passed the check + if (checkPassed) { + // Apply the writes + ApplyWrites(txn); + + // Release all locks for ALL keys (read_set_ and write_set_) + for (auto key : set_union(txn->writeset_, txn->readset_)) { + storage_->Unlock(key); + } + + // Mark transaction as committed + committed_txns_.Push(txn); + txn->status_ = COMMITTED; + + // Update relevant data structure + txn_results_.Push(txn); + } else { // At least one key failed the check + // Release all locks for ALL keys (read_set_ and write_set_) + for (auto key : set_union(txn->writeset_, txn->readset_)) { + storage_->Unlock(key); + } + + // Cleanup txn + txn->reads_.clear(); + txn->writes_.clear(); + txn->status_ = INCOMPLETE; + + // Restart txn -- same as OCC + std::scoped_lock lock{mutex_}; + txn->unique_id_ = next_unique_id_++; + txn_requests_.Push(txn); + } +} + +void TxnProcessor::RunMVCCSSIScheduler() { + // + // Implement this method! + + // Hint:Pop a txn from txn_requests_, and pass it to a thread to execute. + // Note that you may need to create another execute method, like + // TxnProcessor::MVCCSSIExecuteTxn. + + Txn *txn; + while (!stopped_) { + // Get the next new transaction request (if one is pending) and pass it to + // an execution thread that executes the txn logic *and also* does the + // validation and write phases. + if (txn_requests_.Pop(&txn)) { + tp_.AddTask([this, txn]() { this->MVCCSSIExecuteTxn(txn); }); + } + } +} diff --git a/a2_final/src/txn/txn_processor.h b/a2_final/src/txn/txn_processor.h new file mode 100644 index 0000000..f5fd4e1 --- /dev/null +++ b/a2_final/src/txn/txn_processor.h @@ -0,0 +1,228 @@ +#ifndef _TXN_PROCESSOR_H_ +#define _TXN_PROCESSOR_H_ + +#include +#include +#include +#include + +#include "lock_manager.h" +#include "mvcc_storage.h" +#include "storage.h" +#include "txn.h" +#include "utils/atomic.h" +#include "utils/common.h" +#include "utils/pool.h" +#include "utils/static_thread_pool.h" + +// The TxnProcessor supports five different execution modes, corresponding to +// the four parts of assignment 2, plus a simple serial (non-concurrent) mode. +enum CCMode { + SERIAL = 0, // Serial transaction execution (no concurrency) + LOCKING_EXCLUSIVE_ONLY, // Part 1A + LOCKING, // Part 1B + OCC, // Part 2 + P_OCC, // Part 3 + MVCC, // Part 4 + CALVIN, + CALVIN_I, + CALVIN_EPOCH, + MVCC_SSI, // Part 5 +}; + +// Returns a human-readable string naming of the providing mode. +std::string ModeToString(CCMode mode); + +class TxnProcessor { +public: + // The TxnProcessor's constructor starts the TxnProcessor running in the + // background. + explicit TxnProcessor(CCMode mode); + + // The TxnProcessor's destructor stops all background threads and deallocates + // all objects currently owned by the TxnProcessor, except for Txn objects. + ~TxnProcessor(); + + // Registers a new txn request to be executed by the TxnProcessor. + // Ownership of '*txn' is transfered to the TxnProcessor. + void NewTxnRequest(Txn *txn); + + // Returns a pointer to the next COMMITTED or ABORTED Txn. The caller takes + // ownership of the returned Txn. + Txn *GetTxnResult(); + + // Main loop implementing all concurrency control/thread scheduling. + void RunScheduler(); + + static void *StartScheduler(void *arg); + +private: + // Serial validation + bool SerialValidate(Txn *txn); + + // Parallel executtion/validation for OCC + void ExecuteTxnParallel(Txn *txn); + + // Serial version of scheduler. + void RunSerialScheduler(); + + // Locking version of scheduler. + void RunLockingScheduler(); + + // OCC version of scheduler. + void RunOCCScheduler(); + + // OCC version of scheduler with parallel validation. + void RunOCCParallelScheduler(); + + // MVCC version of scheduler. + void RunMVCCScheduler(); + + // MVCC SSI version of scheduler. + void RunMVCCSSIScheduler(); + + // Performs all reads required to execute the transaction, then executes the + // transaction logic. + void ExecuteTxn(Txn *txn); + + // Applies all writes performed by '*txn' to 'storage_'. + // + // Requires: txn->Status() is COMPLETED_C. + void ApplyWrites(Txn *txn); + + // The following functions are for MVCC. + void MVCCExecuteTxn(Txn *txn); + + // The following functions are for MVCC_SSI. + + void MVCCSSIExecuteTxn(Txn *txn); + + void MVCCSSICheckReads(Txn *txn); + + // The following functions are for MVCC & MVCC_SSI. + bool MVCCCheckWrites(Txn *txn); + + void MVCCLockWriteKeys(Txn *txn); + + void MVCCUnlockWriteKeys(Txn *txn); + + // Concurrency control mechanism the TxnProcessor is currently using. + CCMode mode_; + + // Thread pool managing all threads used by TxnProcessor. + StaticThreadPool tp_; + + // Data storage used for all modes. + Storage *storage_; + + // Next valid unique_id, and a mutex to guard incoming txn requests. + std::atomic next_unique_id_; + std::mutex mutex_; + + // Queue of incoming transaction requests. + AtomicQueue txn_requests_; + + // Queue of txns that have acquired all locks and are ready to be executed. + // + // Does not need to be atomic because RunScheduler is the only thread that + // will ever access this queue. + std::deque ready_txns_; + + // Queue of completed (but not yet committed/aborted) transactions. + AtomicQueue completed_txns_; + + // Vector of committed transactions that are used to check any overlap + // during OCC validation phase. + AtomicVector committed_txns_; + + // Queue of transaction results (already committed or aborted) to be returned + // to client. + AtomicQueue txn_results_; + + // Set of transactions that are currently in the process of parallel + // validation. + AtomicSet active_set_; + + // Used it for critical section in parallel occ. + std::mutex active_set_mutex_; + + // Lock Manager used for LOCKING concurrency implementations. + LockManager *lm_; + + // Used for stopping the continuous loop that runs in the scheduler thread + std::atomic stopped_; + + // Gives us access to the scheduler thread so that we can wait for it to join + // later. + std::thread scheduler_thread_; + + // ===================== START OF CALVIN ===================== + + /*********************************************** + * Calvin Continuous Execution -- Global Locks * + ***********************************************/ + + // putting calvin sequencer as public for pthread + void RunCalvinEpochSequencer(); + + // putting calvin epoch executor as public for pthread + void CalvinEpochExecutor(); + + std::unordered_map> adj_list; + std::unordered_map> + indegree; // indegree needs to be atomic + std::queue *root_txns; + + std::shared_mutex adj_list_lock; + std::shared_mutex indegree_lock; + + void RunCalvinContScheduler(); + void CalvinContExecutorFunc(Txn *txn); + + /*********************************************** + * Calvin Continuous Execution -- Indiv Locks * + ***********************************************/ + void RunCalvinContIndivScheduler(); + void CalvinContIndivExecutorFunc(Txn *txn); + + /*********************************************** + * Calvin Epoch Execution * + ***********************************************/ + // 1) Sequencer + + // thread for calvin sequencer + pthread_t calvin_sequencer_thread; + // thread for calvin sequencer + pthread_t calvin_epoch_executor_thread; + // defining epoch for ease of use + typedef std::queue Epoch; + // queue of epochs for calvin scheduler + AtomicQueue epoch_queue; + + // helper function for pthreads + static void *calvin_sequencer_helper(void *arg); + + // 2) Scheduler + struct EpochDag { + std::unordered_map> *adj_list; + std::unordered_map> *indegree; + std::queue *root_txns; + }; + EpochDag *current_epoch_dag; + AtomicQueue epoch_dag_queue; + std::atomic num_txns_left_in_epoch; + pthread_cond_t epoch_finished_cond; + pthread_mutex_t epoch_finished_mutex; + void RunCalvinEpochScheduler(); + + // 3) Executor + // helper function to call calvin epoch executor in pthread + static void *calvin_epoch_executor_helper(void *arg); + + void CalvinEpochExecutorFunc(Txn *txn); + void CalvinExecuteSingleEpoch(EpochDag *epoch_dag); + + // ===================== END OF CALVIN ======================= +}; + +#endif // _TXN_PROCESSOR_H_ diff --git a/a2_final/src/txn/txn_processor_test.cc b/a2_final/src/txn/txn_processor_test.cc new file mode 100644 index 0000000..3c787d9 --- /dev/null +++ b/a2_final/src/txn/txn_processor_test.cc @@ -0,0 +1,384 @@ +#include +#include + +#include "txn/txn_types.h" +#include "txn_processor.h" + +// Returns a human-readable string naming of the providing mode. +std::string ModeToString(CCMode mode) { + switch (mode) { + case SERIAL: + return " Serial "; + case LOCKING_EXCLUSIVE_ONLY: + return " Locking A"; + case LOCKING: + return " Locking B"; + case OCC: + return " OCC "; + case P_OCC: + return " OCC-P "; + case MVCC: + return " MVCC "; + case CALVIN: + return " Calvin "; + case CALVIN_I: + return " Calvin_I "; + case CALVIN_EPOCH: + return " Calvin_E "; + default: + return "INVALID MODE"; + } +} + +class LoadGen { +public: + virtual ~LoadGen() {} + virtual Txn *NewTxn() = 0; +}; + +class RMWLoadGen : public LoadGen { +public: + RMWLoadGen(int dbsize, int rsetsize, int wsetsize, double wait_time) + : dbsize_(dbsize), rsetsize_(rsetsize), wsetsize_(wsetsize), + wait_time_(wait_time) {} + + virtual Txn *NewTxn() { + return new RMW(dbsize_, rsetsize_, wsetsize_, wait_time_); + } + +private: + int dbsize_; + int rsetsize_; + int wsetsize_; + double wait_time_; +}; + +class RMWLoadGen2 : public LoadGen { +public: + RMWLoadGen2(int dbsize, int rsetsize, int wsetsize, double wait_time) + : dbsize_(dbsize), rsetsize_(rsetsize), wsetsize_(wsetsize), + wait_time_(wait_time) {} + + virtual Txn *NewTxn() { + // 80% of transactions are READ only transactions and run for the full + // transaction duration. The rest are very fast (< 0.1ms), high-contention + // updates. + if (rand() % 100 < 80) + return new RMW(dbsize_, rsetsize_, 0, wait_time_); + else + return new RMW(dbsize_, 0, wsetsize_, 0); + } + +private: + int dbsize_; + int rsetsize_; + int wsetsize_; + double wait_time_; +}; + +class RMWDynLoadGen : public LoadGen { +public: + RMWDynLoadGen(int dbsize, int rsetsize, int wsetsize, + std::vector wait_times) + : dbsize_(dbsize), rsetsize_(rsetsize), wsetsize_(wsetsize) { + wait_times_ = wait_times; + } + + virtual Txn *NewTxn() { + // Mix transactions with different time durations (wait_times_) + if (rand() % 100 < 30) + return new RMW(dbsize_, rsetsize_, wsetsize_, wait_times_[0]); + else if (rand() % 100 < 60) + return new RMW(dbsize_, rsetsize_, wsetsize_, wait_times_[1]); + else + return new RMW(dbsize_, rsetsize_, wsetsize_, wait_times_[2]); + } + +private: + int dbsize_; + int rsetsize_; + int wsetsize_; + std::vector wait_times_; +}; + +class RMWDynLoadGen2 : public LoadGen { +public: + RMWDynLoadGen2(int dbsize, int rsetsize, int wsetsize, + std::vector wait_times) + : dbsize_(dbsize), rsetsize_(rsetsize), wsetsize_(wsetsize) { + wait_times_ = wait_times; + } + + virtual Txn *NewTxn() { + // 80% of transactions are READ only transactions and run for the different + // transaction duration. The rest are very fast (< 0.1ms), high-contention + // updates. + if (rand() % 100 < 80) { + // Mix transactions with different time durations (wait_times_) + if (rand() % 100 < 30) + return new RMW(dbsize_, rsetsize_, 0, wait_times_[0]); + else if (rand() % 100 < 60) + return new RMW(dbsize_, rsetsize_, 0, wait_times_[1]); + else + return new RMW(dbsize_, rsetsize_, 0, wait_times_[2]); + } else { + return new RMW(dbsize_, 0, wsetsize_, 0); + } + } + +private: + int dbsize_; + int rsetsize_; + int wsetsize_; + std::vector wait_times_; +}; + +void Benchmark(const std::vector &lg) { + // Number of transaction requests that can be active at any given time. + int active_txns = 100; + std::deque doneTxns; + + // For each MODE... + for (CCMode mode = SERIAL; mode <= CALVIN_I; + mode = static_cast(mode + 1)) { + // Print out mode name. + std::cout << ModeToString(mode) << std::flush; + + // For each experiment, run 2 times and get the average. + for (uint32 exp = 0; exp < lg.size(); exp++) { + double throughput[2]; + for (uint32 round = 0; round < 2; round++) { + int txn_count = 0; + + // Create TxnProcessor in next mode. + TxnProcessor *p = new TxnProcessor(mode); + + // Record start time. + double start = GetTime(); + + // Start specified number of txns running. + for (int i = 0; i < active_txns; i++) + p->NewTxnRequest(lg[exp]->NewTxn()); + + // Keep 100 active txns at all times for the first full second. + while (GetTime() < start + 0.5) { + Txn *txn = p->GetTxnResult(); + doneTxns.push_back(txn); + txn_count++; + p->NewTxnRequest(lg[exp]->NewTxn()); + } + + // Wait for all of them to finish. + for (int i = 0; i < active_txns; i++) { + Txn *txn = p->GetTxnResult(); + doneTxns.push_back(txn); + txn_count++; + } + + // Record end time. + double end = GetTime(); + + throughput[round] = txn_count / (end - start); + + for (auto it = doneTxns.begin(); it != doneTxns.end(); ++it) { + delete *it; + } + + doneTxns.clear(); + delete p; + } + + // Print throughput + std::cout << "\t" << (throughput[0] + throughput[1]) / 2 << "\t" + << std::flush; + } + + std::cout << std::endl; + } +} + +int main(int argc, char **argv) { + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + std::cout << "\t\t Average Transaction Duration" << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + std::cout << "\t\t0.1ms\t\t1ms\t\t10ms\t\t(0.1ms, 1ms, 10ms)" << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + + std::vector lg; + + std::cout << "\t\t Low contention Read only (5 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(1000000, 5, 0, 0.0001)); + lg.push_back(new RMWLoadGen(1000000, 5, 0, 0.001)); + lg.push_back(new RMWLoadGen(1000000, 5, 0, 0.01)); + lg.push_back(new RMWDynLoadGen(1000000, 5, 0, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t Low contention Read only (30 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(1000000, 30, 0, 0.0001)); + lg.push_back(new RMWLoadGen(1000000, 30, 0, 0.001)); + lg.push_back(new RMWLoadGen(1000000, 30, 0, 0.01)); + lg.push_back(new RMWDynLoadGen(1000000, 30, 0, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t High contention Read only (5 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(100, 5, 0, 0.0001)); + lg.push_back(new RMWLoadGen(100, 5, 0, 0.001)); + lg.push_back(new RMWLoadGen(100, 5, 0, 0.01)); + lg.push_back(new RMWDynLoadGen(100, 5, 0, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t High contention Read only (30 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(100, 30, 0, 0.0001)); + lg.push_back(new RMWLoadGen(100, 30, 0, 0.001)); + lg.push_back(new RMWLoadGen(100, 30, 0, 0.01)); + lg.push_back(new RMWDynLoadGen(100, 30, 0, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t Low contention read-write (5 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(1000000, 0, 5, 0.0001)); + lg.push_back(new RMWLoadGen(1000000, 0, 5, 0.001)); + lg.push_back(new RMWLoadGen(1000000, 0, 5, 0.01)); + lg.push_back(new RMWDynLoadGen(1000000, 0, 5, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t Low contention read-write (10 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(1000000, 0, 10, 0.0001)); + lg.push_back(new RMWLoadGen(1000000, 0, 10, 0.001)); + lg.push_back(new RMWLoadGen(1000000, 0, 10, 0.01)); + lg.push_back(new RMWDynLoadGen(1000000, 0, 10, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t High contention read-write (5 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(100, 0, 5, 0.0001)); + lg.push_back(new RMWLoadGen(100, 0, 5, 0.001)); + lg.push_back(new RMWLoadGen(100, 0, 5, 0.01)); + lg.push_back(new RMWDynLoadGen(100, 0, 5, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + std::cout << "\t\t High contention read-write (10 records)" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen(100, 0, 10, 0.0001)); + lg.push_back(new RMWLoadGen(100, 0, 10, 0.001)); + lg.push_back(new RMWLoadGen(100, 0, 10, 0.01)); + lg.push_back(new RMWDynLoadGen(100, 0, 10, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + // 80% of transactions are READ only transactions and run for the full + // transaction duration. The rest are very fast (< 0.1ms), high-contention + // updates. + std::cout << "\t\t High contention mixed read only/read-write" + << std::endl; + std::cout + << "\t\t----------------------------------------------------------------" + "---" + << std::endl; + lg.push_back(new RMWLoadGen2(50, 30, 10, 0.0001)); + lg.push_back(new RMWLoadGen2(50, 30, 10, 0.001)); + lg.push_back(new RMWLoadGen2(50, 30, 10, 0.01)); + lg.push_back(new RMWDynLoadGen2(50, 30, 10, {0.0001, 0.001, 0.01})); + + Benchmark(lg); + std::cout << std::endl; + + for (uint32 i = 0; i < lg.size(); i++) + delete lg[i]; + lg.clear(); + + return 0; +} diff --git a/a2_final/src/txn/txn_types.h b/a2_final/src/txn/txn_types.h new file mode 100644 index 0000000..9d7c76b --- /dev/null +++ b/a2_final/src/txn/txn_types.h @@ -0,0 +1,152 @@ +#ifndef _TXN_TYPES_H_ +#define _TXN_TYPES_H_ + +#include +#include +#include + +#include "txn.h" + +// Immediately commits. +class Noop : public Txn { +public: + Noop() {} + virtual void Run() { COMMIT; } + Noop *clone() const { // Virtual constructor (copying) + Noop *clone = new Noop(); + this->CopyTxnInternals(clone); + return clone; + } +}; + +// Reads all keys in the map 'm', if all results correspond to the values in +// the provided map, commits, else aborts. +class Expect : public Txn { +public: + Expect(const std::map &m) : m_(m) { + for (std::map::iterator it = m_.begin(); it != m_.end(); ++it) + readset_.insert(it->first); + } + + Expect *clone() const { // Virtual constructor (copying) + Expect *clone = new Expect(std::map(m_)); + this->CopyTxnInternals(clone); + return clone; + } + + virtual void Run() { + Value result; + for (std::map::iterator it = m_.begin(); it != m_.end(); ++it) { + if (!Read(it->first, &result) || result != it->second) { + ABORT; + } + } + COMMIT; + } + +private: + std::map m_; +}; + +// Inserts all pairs in the map 'm'. +class Put : public Txn { +public: + Put(const std::map &m) : m_(m) { + for (std::map::iterator it = m_.begin(); it != m_.end(); ++it) + writeset_.insert(it->first); + } + + Put *clone() const { // Virtual constructor (copying) + Put *clone = new Put(std::map(m_)); + this->CopyTxnInternals(clone); + return clone; + } + + virtual void Run() { + for (std::map::iterator it = m_.begin(); it != m_.end(); ++it) + Write(it->first, it->second); + COMMIT; + } + +private: + std::map m_; +}; + +// Read-modify-write transaction. +class RMW : public Txn { +public: + explicit RMW(double time = 0) : time_(time) {} + RMW(const std::set &writeset, double time = 0) : time_(time) { + writeset_ = writeset; + } + RMW(const std::set &readset, const std::set &writeset, + double time = 0) + : time_(time) { + readset_ = readset; + writeset_ = writeset; + } + + // Constructor with randomized read/write sets + RMW(int dbsize, int readsetsize, int writesetsize, double time = 0) + : time_(time) { + // Make sure we can find enough unique keys. + DCHECK(dbsize >= readsetsize + writesetsize); + + // Find readsetsize unique read keys. + for (int i = 0; i < readsetsize; i++) { + Key key; + do { + key = rand() % dbsize; + } while (readset_.count(key)); + readset_.insert(key); + } + + // Find writesetsize unique write keys. + for (int i = 0; i < writesetsize; i++) { + Key key; + do { + key = rand() % dbsize; + } while (readset_.count(key) || writeset_.count(key)); + writeset_.insert(key); + } + } + + RMW *clone() const { // Virtual constructor (copying) + RMW *clone = new RMW(time_); + this->CopyTxnInternals(clone); + return clone; + } + + virtual void Run() { + Value result; + // Read everything in readset. + for (std::set::iterator it = readset_.begin(); it != readset_.end(); + ++it) + Read(*it, &result); + + // Run while loop to simulate the txn logic(duration is time_). + double begin = GetTime(); + while (GetTime() - begin < time_) { + for (int i = 0; i < 1000; i++) { + int x = 100; + x = x + 2; + x = x * x; + } + } + + // Increment length of everything in writeset. + for (std::set::iterator it = writeset_.begin(); it != writeset_.end(); + ++it) { + result = 0; + Read(*it, &result); + Write(*it, result + 1); + } + + COMMIT; + } + +private: + double time_; +}; + +#endif // _TXN_TYPES_H_ diff --git a/a2_final/src/txn/txn_types_test.cc b/a2_final/src/txn/txn_types_test.cc new file mode 100644 index 0000000..746453c --- /dev/null +++ b/a2_final/src/txn/txn_types_test.cc @@ -0,0 +1,76 @@ +#include "txn.h" + +#include + +#include "txn_processor.h" +#include "txn_types.h" +#include "utils/testing.h" + +TEST(NoopTest) { + TxnProcessor p(SERIAL); + + Txn *t = new Noop(); + EXPECT_EQ(INCOMPLETE, t->Status()); + + p.NewTxnRequest(t); + p.GetTxnResult(); + + EXPECT_EQ(COMMITTED, t->Status()); + delete t; + + END; +} + +TEST(PutTest) { + TxnProcessor p(SERIAL); + Txn *t; + + std::map m1 = {{1, 2}}; + p.NewTxnRequest(new Put(m1)); + delete p.GetTxnResult(); + + std::map m2 = {{0, 2}}; + p.NewTxnRequest(new Expect(m2)); // Should abort (no key '0' exists) + t = p.GetTxnResult(); + EXPECT_EQ(ABORTED, t->Status()); + delete t; + + std::map m3 = {{1, 1}}; + p.NewTxnRequest(new Expect(m3)); // Should abort (wrong value for key) + t = p.GetTxnResult(); + EXPECT_EQ(ABORTED, t->Status()); + delete t; + + std::map m4 = {{1, 2}}; + p.NewTxnRequest(new Expect(m4)); // Should commit + t = p.GetTxnResult(); + EXPECT_EQ(COMMITTED, t->Status()); + delete t; + + END; +} + +TEST(PutMultipleTest) { + TxnProcessor p(SERIAL); + Txn *t; + + std::map m; + for (int i = 0; i < 1000; i++) + m[i] = i * i; + + p.NewTxnRequest(new Put(m)); + delete p.GetTxnResult(); + + p.NewTxnRequest(new Expect(m)); + t = p.GetTxnResult(); + EXPECT_EQ(COMMITTED, t->Status()); + delete t; + + END; +} + +int main(int argc, char **argv) { + NoopTest(); + PutTest(); + PutMultipleTest(); +} diff --git a/a2_final/src/utils/atomic.h b/a2_final/src/utils/atomic.h new file mode 100644 index 0000000..e4208f4 --- /dev/null +++ b/a2_final/src/utils/atomic.h @@ -0,0 +1,209 @@ +#ifndef _DB_UTILS_ATOMIC_H_ +#define _DB_UTILS_ATOMIC_H_ + +#include +#include +#include +#include +#include +#include +#include + +/// @class AtomicMap +/// +/// Atomically readable, atomically mutable unordered associative container. +/// Implemented as a std::unordered_map guarded by a pthread rwlock. +/// Supports CRUD operations only. Iterators are NOT supported. +template class AtomicMap { +public: + AtomicMap() {} + // Returns the number of key-value pairs currently stored in the map. + int Size() { + std::shared_lock lock{mutex_}; + return map_.size(); + } + + // Returns true if the map contains a pair with key equal to 'key'. + bool Contains(const K &key) { + std::shared_lock lock{mutex_}; + return map_.contains(key); + } + + // If the map contains a pair with key 'key', sets '*value' equal to the + // associated value and returns true, else returns false. + bool Lookup(const K &key, V *value) { + std::shared_lock lock{mutex_}; + if (map_.contains(key)) { + *value = map_[key]; + return true; + } else { + return false; + } + } + + // Atomically inserts the pair (key, value) into the map (clobbering any + // previous pair with key equal to 'key'. + void Insert(const K &key, const V &value) { + std::scoped_lock lock{mutex_}; + map_[key] = value; + } + + // Synonym for 'Insert(key, value)'. + void Set(const K &key, const V &value) { Insert(key, value); } + // Atomically erases any pair with key 'key' from the map. + void Erase(const K &key) { + std::scoped_lock lock{mutex_}; + map_.erase(key); + } + +private: + std::unordered_map map_; + std::shared_mutex mutex_; +}; + +/// @class AtomicSet +/// +/// Atomically readable, atomically mutable container. +/// Implemented as a std::set guarded by a pthread rwlock. +/// Supports CRUD operations only. Iterators are NOT supported. +template class AtomicSet { +public: + AtomicSet() {} + // Returns the number of key-value pairs currently stored in the map. + int Size() { + std::shared_lock lock{mutex_}; + return set_.size(); + } + + // Returns true if the set contains V value. + bool Contains(const V &value) { + std::shared_lock lock{mutex_}; + return set_.contains(value); + } + + // Atomically inserts the value into the set. + void Insert(const V &value) { + std::scoped_lock lock{mutex_}; + set_.insert(value); + } + + // Atomically erases the object value from the set. + void Erase(const V &value) { + std::scoped_lock lock{mutex_}; + set_.erase(value); + } + + V GetFirst() { + std::scoped_lock lock{mutex_}; + V first = *(set_.begin()); + return first; + } + + // Returns a copy of the underlying set. + std::set GetSet() { + std::shared_lock lock{mutex_}; + return {set_}; + } + +private: + std::set set_; + std::shared_mutex mutex_; +}; + +/// @class AtomicQueue +/// +/// Queue with atomic push and pop operations. +/// +/// @TODO(alex): This should use lower-contention synchronization. +template class AtomicQueue { +public: + AtomicQueue() { mutex_ = std::make_unique(); } + // Returns the number of elements currently in the queue. + int Size() { + std::scoped_lock lock{*mutex_}; + int size = queue_.size(); + return size; + } + + // Atomically pushes 'item' onto the queue. + void Push(const T &item) { + std::scoped_lock lock{*mutex_}; + queue_.push(item); + } + + // If the queue is non-empty, (atomically) sets '*result' equal to the front + // element, pops the front element from the queue, and returns true, + // otherwise returns false. + bool Pop(T *result) { + std::scoped_lock lock{*mutex_}; + if (!queue_.empty()) { + *result = queue_.front(); + queue_.pop(); + return true; + } else { + return false; + } + } + + // If mutex is immediately acquired, pushes and returns true, else immediately + // returns false. + bool PushNonBlocking(const T &item) { + std::unique_lock lock{*mutex_, std::try_to_lock}; + if (lock) { + queue_.push(item); + return true; + } else { + return false; + } + } + + // If mutex is immediately acquired AND queue is nonempty, pops and returns + // true, else returns false. + bool PopNonBlocking(T *result) { + std::unique_lock lock{*mutex_, std::try_to_lock}; + if (lock && !queue_.empty()) { + *result = queue_.front(); + queue_.pop(); + return true; + } else { + return false; + } + } + +private: + std::queue queue_; + std::unique_ptr mutex_; +}; + +template class AtomicVector { +public: + AtomicVector() {} + // Returns the number of elements currently stored in the vector. + int Size() { + std::shared_lock lock{mutex_}; + int size = vec_.size(); + return size; + } + + // Atomically accesses the value associated with the id. + T &operator[](int id) { + std::shared_lock lock{mutex_}; + T &value = vec_[id]; + return value; + } + + // Atomically inserts the value into the vector. + void Push(const T &value) { + std::scoped_lock lock{mutex_}; + vec_.push_back(value); + } + + // CMSC 624: TODO(students) + // Feel free to add more methods as needed. + +private: + std::vector vec_; + std::shared_mutex mutex_; +}; + +#endif // _DB_UTILS_ATOMIC_H_ diff --git a/a2_final/src/utils/atomic_queue.h b/a2_final/src/utils/atomic_queue.h new file mode 100644 index 0000000..de0e47b --- /dev/null +++ b/a2_final/src/utils/atomic_queue.h @@ -0,0 +1,49 @@ +#include +#include + +template class atomic_queue { +private: + std::queue queue_; + std::mutex mutex_; + +public: + bool pop(T &f) { + std::scoped_lock lock{mutex_}; + if (std::empty(queue_)) + return false; + + f = std::move(queue_.front()); + queue_.pop(); + + return true; + } + + bool push(T &&f) { + std::scoped_lock lock{mutex_}; + queue_.emplace(f); + return true; + } + + bool try_push(T &&f) { + std::unique_lock lock{mutex_, std::try_to_lock}; + + if (!lock) + return false; + + queue_.emplace(f); + return true; + } + + bool try_pop(T &f) { + std::unique_lock lock{mutex_, std::try_to_lock}; + + if (!lock || std::empty(queue_)) { + return false; + } + + f = std::move(queue_.front()); + queue_.pop(); + + return true; + } +}; diff --git a/a2_final/src/utils/common.h b/a2_final/src/utils/common.h new file mode 100644 index 0000000..cd4ab02 --- /dev/null +++ b/a2_final/src/utils/common.h @@ -0,0 +1,70 @@ +#ifndef _COMMON_H_ +#define _COMMON_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// debug mode +#define DEBUG true + +// assert if in debug mode +#define DCHECK(ARG) \ + if (DEBUG) { \ + assert(ARG); \ + } + +// print message and die +#define DIE(MSG) \ + do { \ + std::cerr << __FILE__ << ":" << __LINE__ << ": " << MSG << std::endl; \ + exit(1); \ + } while (0); + +// Abbreviated signed int types. +typedef int8_t int8; +typedef int16_t int16; +typedef int32_t int32; +typedef int64_t int64; + +// Abbreviated unsigned int types. +typedef uint8_t uint8; +typedef uint16_t uint16; +typedef uint32_t uint32; +typedef uint64_t uint64; + +// Key and value types +typedef uint64 Key; +typedef uint64 Value; + +// Returns the number of seconds since midnight according to local system time, +// to the nearest microsecond. +static inline double GetTime() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + tv.tv_usec / 1e6; +} + +// Returns a random double in [0, max] (flat distribution). +static inline double RandomDouble(double max) { + return max * (static_cast(rand()) / static_cast(RAND_MAX)); +} + +// Sleep for 'duration' seconds. +static inline void Sleep(double duration) { usleep(1000000 * duration); } +// Returns a human-readable string representation of an int. +static inline std::string IntToString(int n) { + char s[64]; + snprintf(s, sizeof(s), "%d", n); + return std::string(s); +} + +// Converts a human-readable numeric string to an int. +static inline int StringToInt(const std::string &s) { return atoi(s.c_str()); } +#endif // _COMMON_H_ diff --git a/a2_final/src/utils/pool.h b/a2_final/src/utils/pool.h new file mode 100644 index 0000000..80c3f26 --- /dev/null +++ b/a2_final/src/utils/pool.h @@ -0,0 +1,69 @@ +#include +#include +#include + +#include "atomic_queue.h" + +using task = std::function; + +#define TRY_FACTOR 10 + +class thread_pool { +private: + std::atomic counter_{}; + std::atomic stopped_ = false; + unsigned num_threads_{}; + std::vector> queues_; + std::vector threads_; + + void run(unsigned tid) { + while (!stopped_) { + task f; + + // Check Our Queue + if (!queues_[tid].pop(f)) { + for (unsigned i = 0; i < num_threads_; i++) { + // Try and Steal Other work + if (queues_[i].try_pop(f)) { + break; + } + } + } + + if (f) + f(); + } + } + +public: + thread_pool(unsigned num_threads) + : num_threads_(num_threads), queues_(num_threads), threads_(num_threads) { + for (unsigned i = 0; i < num_threads; i++) { + threads_[i] = std::thread{&thread_pool::run, this, i}; + } + } + + ~thread_pool() { + stopped_ = true; + for (unsigned i = 0; i < num_threads_; i++) { + threads_[i].join(); + } + } + + void AddTask(task &&f) { + unsigned tid = counter_++; + + // Try to Assign Round Robin + if (!queues_[tid % num_threads_].try_push(std::forward(f))) { + for (unsigned i = 0; i < num_threads_; i++) { + // Try and Push to another queue + if (queues_[i % num_threads_].try_push(std::forward(f))) { + return; + } + } + + // Otherwise push in order + queues_[tid % num_threads_].push(std::forward(f)); + } + } +}; diff --git a/a2_final/src/utils/static_thread_pool.h b/a2_final/src/utils/static_thread_pool.h new file mode 100644 index 0000000..0d7e05f --- /dev/null +++ b/a2_final/src/utils/static_thread_pool.h @@ -0,0 +1,86 @@ +#ifndef _DB_UTILS_STATIC_THREAD_POOL_H_ +#define _DB_UTILS_STATIC_THREAD_POOL_H_ + +#include +#include +#include +#include +#include + +#include "assert.h" +#include "stdlib.h" +#include "utils/atomic.h" +#include "utils/thread_pool.h" + +class StaticThreadPool : public ThreadPool { +public: + StaticThreadPool(int nthreads) : thread_count_(nthreads), stopped_(false) { + threads_.resize(nthreads); + queues_.resize(nthreads); + + for (int i = 0; i < nthreads; i++) { + threads_[i] = std::thread{&StaticThreadPool::RunThread, this, i}; + } + } + ~StaticThreadPool() { + stopped_ = true; + for (int i = 0; i < thread_count_; i++) { + threads_[i].join(); + } + } + + bool Active() { return !stopped_; } + + virtual void AddTask(Task &&task) { + assert(!stopped_); + while (!queues_[rand() % thread_count_].PushNonBlocking( + std::forward(task))) { + } + } + + virtual void AddTask(const Task &task) { + assert(!stopped_); + while (!queues_[rand() % thread_count_].PushNonBlocking(task)) { + } + } + + virtual int ThreadCount() { return thread_count_; } + +private: + // Function executed by each pthread. + void RunThread(int queue_id) { + Task task; + int sleep_duration = 1; // in microseconds + while (true) { + if (this->queues_[queue_id].PopNonBlocking(&task)) { + task(); + // Reset backoff. + sleep_duration = 1; + } else { + usleep(sleep_duration); + // Back off exponentially. + if (sleep_duration < 32) + sleep_duration *= 2; + } + + if (this->stopped_.load(std::memory_order_relaxed)) { + // Go through ALL queues looking for a remaining task. + while (this->queues_[queue_id].Pop(&task)) { + task(); + } + + break; + } + } + } + + int thread_count_; + std::vector threads_; + + // Task queues. + std::vector> queues_; + + std::atomic stopped_; +}; + +#endif // _DB_UTILS_STATIC_THREAD_POOL_H_ diff --git a/a2_final/src/utils/testing.h b/a2_final/src/utils/testing.h new file mode 100644 index 0000000..751630b --- /dev/null +++ b/a2_final/src/utils/testing.h @@ -0,0 +1,70 @@ +#ifndef __TESTING_H_ +#define __TESTING_H_ + +#include +#include + +// Global variable tracking whether current test has failed. +bool __failed_; + +#define WARN(MSG) printf("%s:%d: %s\n", __FILE__, __LINE__, MSG) +#define CHECK(T, MSG) \ + do { \ + if (!(T)) { \ + __failed_ = true; \ + WARN(MSG); \ + } \ + } while (0) + +#define LINE std::cout << "[ " << __FUNCTION__ << " ] " + +#define EXPECT_TRUE(T) \ + do { \ + if (!(T)) { \ + __failed_ = true; \ + std::cout << "EXPECT_TRUE(" << #T << ") failed at " << __FILE__ << ":" \ + << __LINE__ << "\n"; \ + } \ + } while (0) + +#define EXPECT_FALSE(T) \ + do { \ + if (T) { \ + __failed_ = true; \ + std::cout << "EXPECT_FALSE(" << #T << ") failed at " << __FILE__ << ":" \ + << __LINE__ << "\n"; \ + } \ + } while (0) + +#define EXPECT_EQ(A, B) \ + do { \ + if ((A) != (B)) { \ + __failed_ = true; \ + std::cout << "EXPECT_EQ(" << #A << ", " << #B \ + << ") \033[1;31mfailed\033[0m at " << __FILE__ << ":" \ + << __LINE__ << "\n" \ + << "Expected:\n" \ + << A << "\n" \ + << "Actual:\n" \ + << B << "\n" \ + << std::flush; \ + } \ + } while (0) + +#define TEST(TESTNAME) \ + void TESTNAME() { \ + __failed_ = false; \ + LINE << "\033[1;32mBEGIN\033[0m\n"; \ + do + +#define END \ + if (__failed_) { \ + LINE << "\033[1;31mFAIL\033[0m\n" << std::flush; \ + } else { \ + LINE << "\033[1;32mPASS\033[0m\n" << std::flush; \ + } \ + } \ + while (0) \ + ; + +#endif // __TESTING_H_ diff --git a/a2_final/src/utils/thread_pool.h b/a2_final/src/utils/thread_pool.h new file mode 100644 index 0000000..6288327 --- /dev/null +++ b/a2_final/src/utils/thread_pool.h @@ -0,0 +1,21 @@ +#ifndef _DB_UTILS_THREAD_POOL_H_ +#define _DB_UTILS_THREAD_POOL_H_ + +#include + +class ThreadPool { +public: + using Task = std::function; + + virtual ~ThreadPool() {} + // Causes 'task' to be scheduled for background by a thread in the threadpool. + virtual void AddTask(Task &&task) = 0; + + virtual void AddTask(const Task &task) = 0; + + // Returns the number of active physical pthreads currently consituting the + // threadpool. + virtual int ThreadCount() = 0; +}; + +#endif // _DB_UTILS_THREAD_POOL_H_