From 350ce1ce888d2be6d7ecc9ff1395348b4355f123 Mon Sep 17 00:00:00 2001 From: caniro Date: Mon, 16 Jun 2025 14:01:51 +0900 Subject: [PATCH 01/12] =?UTF-8?q?fix:=20=EC=B2=B4=EA=B2=B0=20=EC=84=B1?= =?UTF-8?q?=EB=8A=A5=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=A0=95=EC=83=81?= =?UTF-8?q?=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../trade/application/TradeFlowService.java | 13 ++ .../application/TradeExecuteLoadTest.java | 183 ++++++++++++------ 2 files changed, 136 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index f0880bbb..cf629b06 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -6,9 +6,11 @@ import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import java.util.Optional; +import java.util.concurrent.CountDownLatch; @Slf4j @RequiredArgsConstructor @@ -19,6 +21,13 @@ public class TradeFlowService { private final TradeExecutor tradeExecutor; private final WaitingOrdersManager waitingOrdersManager; + private CountDownLatch testLatch; // 테스트용 후크 + + @Profile("trade-load-test") + public void setTestLatch(CountDownLatch latch) { + this.testLatch = latch; + } + public void execMatchAndTrade(String ticker) { WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); // TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지 @@ -28,6 +37,10 @@ public void execMatchAndTrade(String ticker) { while (continueProcessing) { try { tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + if (testLatch != null) { + testLatch.countDown(); + } + tradePair = tradeMatcher.matchOrders(waitingOrders); continueProcessing = tradePair.isPresent(); } catch (TradeZeroOrderException e) { diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index 733a3a38..51f96f2b 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -19,12 +19,13 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@ActiveProfiles({"dev", "it", "h2-mem"}) -@Disabled +@ActiveProfiles({"dev", "it", "h2-mem", "trade-load-test"}) +//@Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest class TradeExecuteLoadTest { @@ -38,6 +39,9 @@ class TradeExecuteLoadTest { @Autowired TradeRepository tradeRepository; + @Autowired + private TradeFlowService tradeFlowService; + @Autowired SellOrderRepository sellOrderRepository; @@ -50,9 +54,17 @@ class TradeExecuteLoadTest { @Order(1) @Test void warmUp() throws InterruptedException { + System.out.println("Starting warmUp"); + runSingleTest(1000); + runSingleTest(1000); runSingleTest(1000); + runSingleTest(1000); + runSingleTest(1000); + runSingleTest(10000); + runSingleTest(10000); runSingleTest(10000); runSingleTest(50000); + System.out.println("Finished warmUp"); } @BeforeEach @@ -63,13 +75,14 @@ void setUp() { sellOrderRepository.deleteAll(); buyOrderRepository.deleteAll(); } - @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 10회 진행한다.") + + @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 30회 진행한다.") @Order(2) @Test void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { // given int orderCount = 1000; - int repeatCount = 10; + int repeatCount = 30; List executionTimes = new ArrayList<>(); List queueInsertTimes = new ArrayList<>(); @@ -85,56 +98,89 @@ void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { printStatistics(queueInsertTimes, executionTimes, orderCount); } - @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") - @Order(3) - @Test - void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { - // given - int orderCount = 10000; - int repeatCount = 10; - List executionTimes = new ArrayList<>(); - List queueInsertTimes = new ArrayList<>(); - - // when - for (int i = 0; i < repeatCount; ++i) { - long[] times = runSingleTest(orderCount); - queueInsertTimes.add(times[0]); - executionTimes.add(times[1]); - System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); - } - - // 통계 출력 - printStatistics(queueInsertTimes, executionTimes, orderCount); - } +// @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 10회 진행한다.") +// @Order(2) +// @Test +// void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { +// // given +// int orderCount = 10000; +// int repeatCount = 10; +// List executionTimes = new ArrayList<>(); +// List queueInsertTimes = new ArrayList<>(); +// +// // when +// for (int i = 0; i < repeatCount; ++i) { +// long[] times = runSingleTest(orderCount); +// queueInsertTimes.add(times[0]); +// executionTimes.add(times[1]); +// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); +// } +// +// // 통계 출력 +// printStatistics(queueInsertTimes, executionTimes, orderCount); +// } +// +// +// @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") +// @Order(3) +// @Test +// void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { +// // given +// int orderCount = 10000; +// int repeatCount = 10; +// List executionTimes = new ArrayList<>(); +// List queueInsertTimes = new ArrayList<>(); +// +// // when +// for (int i = 0; i < repeatCount; ++i) { +// long[] times = runSingleTest(orderCount); +// queueInsertTimes.add(times[0]); +// executionTimes.add(times[1]); +// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); +// } +// +// // 통계 출력 +// printStatistics(queueInsertTimes, executionTimes, orderCount); +// } +// +// @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") +// @Order(4) +// @Test +// void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { +// // given +// int orderCount = 100000; +// int repeatCount = 10; +// List executionTimes = new ArrayList<>(); +// List queueInsertTimes = new ArrayList<>(); +// +// // when +// for (int i = 0; i < repeatCount; ++i) { +// long[] times = runSingleTest(orderCount); +// queueInsertTimes.add(times[0]); +// executionTimes.add(times[1]); +// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); +// } +// +// // 통계 출력 +// printStatistics(queueInsertTimes, executionTimes, orderCount); +// } - @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") - @Order(4) - @Test - void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { - // given - int orderCount = 100000; - int repeatCount = 10; - List executionTimes = new ArrayList<>(); - List queueInsertTimes = new ArrayList<>(); + private long[] runSingleTest(int orderCount) throws InterruptedException { + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); - // when - for (int i = 0; i < repeatCount; ++i) { - long[] times = runSingleTest(orderCount); - queueInsertTimes.add(times[0]); - executionTimes.add(times[1]); - System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); - } + // 큐와 DB 초기화 + waitingOrders.clearAllQueues(); + tradeRepository.deleteAll(); + sellOrderRepository.deleteAll(); + buyOrderRepository.deleteAll(); - // 통계 출력 - printStatistics(queueInsertTimes, executionTimes, orderCount); - } + CountDownLatch latch = new CountDownLatch(orderCount); + tradeFlowService.setTestLatch(latch); - private long[] runSingleTest(int orderCount) throws InterruptedException { - WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); long testStart = System.nanoTime(); // 주문 생성 및 큐 삽입 - ExecutorService executor = Executors.newFixedThreadPool(10); + ExecutorService executor = Executors.newFixedThreadPool(20); for (int i = 0; i < orderCount; i++) { executor.submit(() -> { SellOrder limitSellOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); @@ -146,31 +192,48 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { // 큐 삽입 완료 대기 executor.shutdown(); - boolean queueTerminated = executor.awaitTermination(10, TimeUnit.SECONDS); + boolean queueTerminated = executor.awaitTermination(30, TimeUnit.SECONDS); long queueInsertEnd = System.nanoTime(); long queueInsertTime = (queueInsertEnd - testStart) / 1_000_000; - // 단일 이벤트 발행 (체결 시작) + // 단일 이벤트 발행 (체결 시작, 큐에는 안 넣음) long eventStart = System.nanoTime(); SellOrder dummyOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); eventPublisher.publishEvent(new OrderInsertedToQueue(dummyOrder)); + + + + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + + + // 체결 완료 대기 +// int pollCount = 100; +// for (int i = 0; i < pollCount; i++) { +// if (buyOrderPriorityQueueStore.isEmpty() && sellOrderPriorityQueueStore.isEmpty()) { +// break; +// } +// Thread.sleep(50); +// } + + boolean completed = latch.await(60, TimeUnit.SECONDS); long eventEnd = System.nanoTime(); long executionTime = (eventEnd - eventStart) / 1_000_000; + + // CountDownLatch 초기화 + tradeFlowService.setTestLatch(null); + // 결과 출력 - if (tradeRepository.findAll().size() != orderCount) { - PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); - PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); - System.out.print("체결 종료 - 체결내역[: " + tradeRepository.findAll().size() + "건]"); + long tradeCount = tradeRepository.count(); +// if (tradeCount != orderCount) { + System.out.print("체결 종료 - 체결내역[: " + tradeCount + "건]"); System.out.println("잔여 주문[매도 " + sellOrderPriorityQueueStore.size() + "건, 매수 " + buyOrderPriorityQueueStore.size() + "건]"); - } - - // 큐와 DB 초기화 - waitingOrders.clearAllQueues(); - tradeRepository.deleteAll(); - sellOrderRepository.deleteAll(); - buyOrderRepository.deleteAll(); + if (tradeCount != orderCount || !completed) { + System.out.println("경고: 예상 체결 건수(" + orderCount + "건)와 실제(" + tradeCount + "건) 불일치 또는 타임아웃"); + } +// } return new long[]{queueInsertTime, executionTime}; } From 265d61d7f88990ba4f9ed3d67f66a8cc6c4f96c6 Mon Sep 17 00:00:00 2001 From: caniro Date: Tue, 17 Jun 2025 18:38:13 +0900 Subject: [PATCH 02/12] =?UTF-8?q?feat:=20=EC=B2=B4=EA=B2=B0=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C=20=EC=95=8C=EB=A6=BC=20=EA=B0=9C=EC=84=A0(=EC=A3=BC?= =?UTF-8?q?=EB=AC=B8=20=EC=99=84=EC=A0=84=EC=B2=B4=EA=B2=B0=20=EC=8B=9C?= =?UTF-8?q?=EC=A0=90=EC=97=90=20=EC=95=8C=EB=A6=BC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TradeExecutedNotificationHandler.java | 31 +++++------ .../application/TradeExecutedNotifyDto.java | 43 --------------- .../coin/trade/application/TradeExecutor.java | 31 ++++++----- .../application/TradeOrderCompletedEvent.java | 9 ++++ .../TradeOrderCompletedEventImpl.java | 22 ++++++++ .../TradeOrderCompletedEventPublisher.java | 19 +++++++ .../TradeOrderCompletedNotifyDto.java | 37 +++++++++++++ .../TradeExecutedNotificationHandlerTest.java | 54 ++++++++----------- 8 files changed, 140 insertions(+), 106 deletions(-) delete mode 100644 src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java create mode 100644 src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java create mode 100644 src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java create mode 100644 src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java create mode 100644 src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java index 8d94a666..7583e2e6 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java @@ -1,6 +1,6 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.order.domain.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; @@ -23,30 +23,23 @@ public TradeExecutedNotificationHandler(SimpMessagingTemplate messagingTemplate) } @TransactionalEventListener - public void notifyAfterTradeExecuted(TradeExecutedEvent tradeExecutedEvent) { - Trade trade = tradeExecutedEvent.getTrade(); - if (trade == null) { - log.error("체결 알림 실패! trade == null"); + public void notifyAfterTradeExecuted(TradeOrderCompletedEvent tradeOrderCompletedEvent) { + // TODO : 평균단가는 별도 계산해야 함 + Order order = tradeOrderCompletedEvent.getOrder(); + if (order == null) { + log.error("체결 알림 실패! order == null"); return ; } - Integer sellUserId = trade.getSellUserId(); - Integer buyUserId = trade.getBuyUserId(); - if (sellUserId == null || buyUserId == null) { - log.error("체결 알림 실패! sellUserId: {}, buyUserId: {}", sellUserId, buyUserId); + Integer userId = order.getUserId(); + if (userId == null) { + log.error("체결 알림 실패! userId: {}", userId); return ; } - if (sellUserId != SELL_ORDER_BOT_ID) { - TradeExecutedNotifyDto soldDto = TradeExecutedNotifyDto.of(trade, ASK); - messagingTemplate.convertAndSend("/topic/tradeNotification/" + sellUserId, soldDto); - } - if (buyUserId != BUY_ORDER_BOT_ID) { - TradeExecutedNotifyDto boughtDto = TradeExecutedNotifyDto.of(trade, BID); - messagingTemplate.convertAndSend("/topic/tradeNotification/" + buyUserId, boughtDto); - } - if (sellUserId != SELL_ORDER_BOT_ID || buyUserId != BUY_ORDER_BOT_ID) { - log.debug("{} 체결 이벤트 구독 : {}원에 {}개, 매수인: {}, 매도인: {}", trade.getTicker(), trade.getPrice(), trade.getSize(), buyUserId, sellUserId ); + if (userId != SELL_ORDER_BOT_ID && userId != BUY_ORDER_BOT_ID) { + TradeOrderCompletedNotifyDto notifyDto = TradeOrderCompletedNotifyDto.of(order); + messagingTemplate.convertAndSend("/topic/tradeNotification/" + userId, notifyDto); } } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java deleted file mode 100644 index 10585a79..00000000 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.cleanengine.coin.trade.application; - -import com.cleanengine.coin.trade.entity.Trade; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import lombok.Builder; -import lombok.Getter; - -import java.time.LocalDateTime; - -@Getter -@JsonPropertyOrder({"ticker", "price", "size", "type", "tradedTime"}) -public class TradeExecutedNotifyDto { - - private String ticker; - - private Double price; - - private Double size; - - private String type; - - private LocalDateTime tradedTime; - - @Builder - private TradeExecutedNotifyDto(String ticker, Double price, Double size, String type, LocalDateTime tradedTime) { - this.ticker = ticker; - this.price = price; - this.size = size; - this.type = type; - this.tradedTime = tradedTime; - } - - public static TradeExecutedNotifyDto of(Trade trade, String type) { - return TradeExecutedNotifyDto.builder() - .ticker(trade.getTicker()) - .price(trade.getPrice()) - .size(trade.getSize()) - .type(type) - .tradedTime(trade.getTradeTime()) - .build(); - } - -} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index b8e4ca14..8bfe03ef 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -33,6 +33,7 @@ public class TradeExecutor { private final AccountService accountService; @Getter private final TradeExecutedEventPublisher tradeExecutedEventPublisher; + private final TradeOrderCompletedEventPublisher tradeOrderCompletedEventPublisher; private final TradeService tradeService; @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED) @@ -64,8 +65,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr sellOrder.decreaseRemainingSize(tradedSize); // 주문 완전체결 처리(잔여금액 or 잔여수량이 0) - removeCompletedBuyOrder(waitingOrders, buyOrder); - removeCompletedSellOrder(waitingOrders, sellOrder); + removeCompletedOrders(waitingOrders, buyOrder, sellOrder); tradeService.updateOrder(buyOrder); tradeService.updateOrder(sellOrder); @@ -180,25 +180,32 @@ private static void writeTradingLog(BuyOrder buyOrder, SellOrder sellOrder) { sellOrder.getRemainingSize()); } - private static void removeCompletedBuyOrder(WaitingOrders waitingOrders, BuyOrder order) { - boolean isOrderCompleted = (isMarketOrder(order) && approxEquals(order.getRemainingDeposit(), 0.0)) || - (isLimitOrder(order) && approxEquals(order.getRemainingSize(), 0.0)); - - if (isOrderCompleted) { - waitingOrders.removeOrder(order); - updateCompletedOrderStatus(order); - } + private void removeCompletedOrders(WaitingOrders waitingOrders, BuyOrder buyOrder, SellOrder sellOrder) { + removeCompletedOrder(waitingOrders, buyOrder); + removeCompletedOrder(waitingOrders, sellOrder); } - private static void removeCompletedSellOrder(WaitingOrders waitingOrders, SellOrder order) { - boolean isOrderCompleted = approxEquals(order.getRemainingSize(), 0.0); + private void removeCompletedOrder(WaitingOrders waitingOrders, Order order) { + boolean isOrderCompleted = false; + + if (order instanceof BuyOrder buyOrder) { + isOrderCompleted = (isMarketOrder(buyOrder) && approxEquals(buyOrder.getRemainingDeposit(), 0.0)) || + (isLimitOrder(buyOrder) && approxEquals(buyOrder.getRemainingSize(), 0.0)); + } else if (order instanceof SellOrder sellOrder) { + isOrderCompleted = approxEquals(sellOrder.getRemainingSize(), 0.0); + } if (isOrderCompleted) { waitingOrders.removeOrder(order); updateCompletedOrderStatus(order); + publishOrderCompletionEvent(order); } } + private void publishOrderCompletionEvent(Order order) { + tradeOrderCompletedEventPublisher.publish(TradeOrderCompletedEventImpl.of(order)); + } + private static void updateCompletedOrderStatus(Order order) { order.setState(OrderStatus.DONE); } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java new file mode 100644 index 00000000..02f25821 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java @@ -0,0 +1,9 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.Order; + +public interface TradeOrderCompletedEvent { + + Order getOrder(); + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java new file mode 100644 index 00000000..be72ae38 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java @@ -0,0 +1,22 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.Order; +import com.cleanengine.coin.trade.entity.Trade; +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +public class TradeOrderCompletedEventImpl implements TradeOrderCompletedEvent { + + Order order; + + private TradeOrderCompletedEventImpl(Order order) { + this.order = order; + } + + public static TradeOrderCompletedEventImpl of(Order order) { + return new TradeOrderCompletedEventImpl(order); + } + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java new file mode 100644 index 00000000..fa8e6a93 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java @@ -0,0 +1,19 @@ +package com.cleanengine.coin.trade.application; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Service; + +@Service +public class TradeOrderCompletedEventPublisher { + + private final ApplicationEventPublisher publisher; + + public TradeOrderCompletedEventPublisher(ApplicationEventPublisher publisher) { + this.publisher = publisher; + } + + public void publish(TradeOrderCompletedEvent event) { + publisher.publishEvent(event); + } + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java new file mode 100644 index 00000000..9dc1e49c --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java @@ -0,0 +1,37 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.order.domain.Order; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import lombok.Builder; +import lombok.Getter; + +import java.time.LocalDateTime; + +@Getter +@JsonPropertyOrder({"ticker", "size", "type"}) +public class TradeOrderCompletedNotifyDto { + + private String ticker; + + private Double size; + + private String type; + + @Builder + private TradeOrderCompletedNotifyDto(String ticker, Double price, Double size, String type, LocalDateTime tradedTime) { + this.ticker = ticker; + this.size = size; + this.type = type; + } + + public static TradeOrderCompletedNotifyDto of(Order order) { + String orderType = order instanceof BuyOrder ? "ASK" : "BID"; + return TradeOrderCompletedNotifyDto.builder() + .ticker(order.getTicker()) + .size(order.getOrderSize()) + .type(orderType) + .build(); + } + +} diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java index d3793153..059721ac 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java @@ -6,7 +6,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.order.domain.SellOrder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -35,52 +36,38 @@ void setUp() { @Test void shouldSendNotificationsForValidTrade() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), 3, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", 3, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); // when handler.notifyAfterTradeExecuted(event); // then - verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); + verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeOrderCompletedNotifyDto.class)); + verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeOrderCompletedNotifyDto.class)); } @DisplayName("매수인은 봇인 정상 체결내역을 리스닝하면 웹소켓으로 전송한다.") @Test void shouldSendNotificationsForValidTrade2() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, 3, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", 4, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); // then - verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - } - - @DisplayName("매수인과 매도인의 userId가 null이면 메시지를 전송하지 않는다.") - @Test - void shouldNotSendNotificationForNullUserIds() { - // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), null, null, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); - - // when - handler.notifyAfterTradeExecuted(event); - - // then - verifyNoInteractions(messagingTemplate); + verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/4"), any(TradeOrderCompletedNotifyDto.class)); + verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/4"), any(TradeOrderCompletedNotifyDto.class)); } @DisplayName("매수인의 userId가 null이면 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForNullBuyUserId() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), null, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", null, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); @@ -93,8 +80,8 @@ void shouldNotSendNotificationForNullBuyUserId() { @Test void shouldNotSendNotificationForNullSellUserId() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, null, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", null, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); // when handler.notifyAfterTradeExecuted(event); @@ -103,25 +90,28 @@ void shouldNotSendNotificationForNullSellUserId() { verifyNoInteractions(messagingTemplate); } - @DisplayName("봇끼리의 체결은 메시지를 전송하지 않는다.") + @DisplayName("봇의 체결은 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForBotTrade() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", SELL_ORDER_BOT_ID, 5.0, 130_000_000.0, LocalDateTime.now(), false); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", BUY_ORDER_BOT_ID, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); + TradeOrderCompletedEvent event2 = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); + handler.notifyAfterTradeExecuted(event2); // then verifyNoInteractions(messagingTemplate); } - @DisplayName("체결이 null이면 메시지를 전송하지 않는다.") + @DisplayName("주문이 null이면 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForNullTrade() { // given - TradeExecutedEvent event = TradeExecutedEvent.of(null, null, null); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(null); // when handler.notifyAfterTradeExecuted(event); From 6e6259cb1c281355884b035fd8b6947c4487fe68 Mon Sep 17 00:00:00 2001 From: caniro Date: Tue, 17 Jun 2025 19:24:05 +0900 Subject: [PATCH 03/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20=EC=B2=B4=EA=B2=B0=20=EC=8B=9C=20Wallet=EC=9D=84?= =?UTF-8?q?=20=ED=95=9C=EB=B2=88=EC=97=90=20update=20=ED=95=98=EB=8F=84?= =?UTF-8?q?=EB=A1=9D=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/trade/application/TradeExecutor.java | 42 ++++++++----------- .../user/info/application/WalletService.java | 4 ++ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index 8bfe03ef..af762596 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -21,6 +21,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.List; import static com.cleanengine.coin.common.CommonValues.approxEquals; @@ -82,8 +83,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr } // 지갑 누적계산 - this.updateWalletAfterTrade(buyOrder, ticker, tradedSize, totalTradedPrice); - this.updateWalletAfterTrade(sellOrder, ticker, tradedSize, totalTradedPrice); + this.updateWalletAfterTrade(buyOrder, sellOrder, ticker, tradedSize, totalTradedPrice); // 체결내역 저장 Trade trade = this.insertNewTrade(ticker, buyOrder, sellOrder, tradedSize, tradedPrice); @@ -91,7 +91,11 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr TradeExecutedEvent tradeExecutedEvent = TradeExecutedEvent.of(trade, buyOrder.getId(), sellOrder.getId()); tradeExecutedEventPublisher.publish(tradeExecutedEvent); } + private Trade insertNewTrade(String ticker, BuyOrder buyOrder, SellOrder sellOrder, double tradeSize, Double tradePrice) { + Trade newTrade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradePrice, tradeSize); + return tradeService.save(newTrade); + } private static void checkZeroOrderAndThrowException(BuyOrder buyOrder, SellOrder sellOrder) { Order zeroOrder = null; if (approxEquals(buyOrder.getRemainingDeposit(), 0.0)) @@ -109,29 +113,17 @@ private void increaseAccountCash(Order order, Double amount) { accountService.save(account.increaseCash(amount)); } - private void updateWalletAfterTrade(Order order, String ticker, double tradedSize, double totalTradedPrice) { - if (order instanceof BuyOrder) { - Wallet buyerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker); - double updatedBuySize = buyerWallet.getSize() + tradedSize; - double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice(); - double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize; - buyerWallet.setSize(updatedBuySize); - buyerWallet.setBuyPrice(updatedBuyPrice); - // TODO : ROI 계산 - walletService.save(buyerWallet); - } else if (order instanceof SellOrder) { - // 매도 시에는 평단가 변동 없음 - Wallet sellerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker); - walletService.save(sellerWallet); - } else { - throw new BusinessException("Unsupported order type: " + order.getClass().getName(), ErrorStatus.INTERNAL_SERVER_ERROR); - } - } - - private Trade insertNewTrade(String ticker, BuyOrder buyOrder, SellOrder sellOrder, double tradeSize, Double tradePrice) { - Trade newTrade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradePrice, tradeSize); - - return tradeService.save(newTrade); + private void updateWalletAfterTrade(BuyOrder buyOrder, SellOrder sellOrder, String ticker, double tradedSize, double totalTradedPrice) { + Wallet buyerWallet = walletService.findWalletByUserIdAndTicker(buyOrder.getUserId(), ticker); + double updatedBuySize = buyerWallet.getSize() + tradedSize; + double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice(); + double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize; + buyerWallet.setSize(updatedBuySize); + buyerWallet.setBuyPrice(updatedBuyPrice); + // TODO : ROI 계산 + // 매도 시에는 평단가 변동 없음 + Wallet sellerWallet = walletService.findWalletByUserIdAndTicker(sellOrder.getUserId(), ticker); + walletService.saveAll(List.of(buyerWallet, sellerWallet)); } private static TradeUnitPriceAndSize getTradeUnitPriceAndSize(BuyOrder buyOrder, SellOrder sellOrder) { diff --git a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java index 10fbcef3..d9d85e8a 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java @@ -31,6 +31,10 @@ public Wallet save(Wallet wallet) { return walletRepository.save(wallet); } + public List saveAll(List wallets) { + return walletRepository.saveAll(wallets); + } + public Wallet findWalletByUserIdAndTicker(Integer userId, String ticker) { int accountId = accountRepository.findByUserId(userId).orElseThrow().getId(); return walletRepository.findByAccountIdAndTicker(accountId, ticker) From c90f92ccd755bd3855eb5ee24fde69107a8949f4 Mon Sep 17 00:00:00 2001 From: caniro Date: Tue, 17 Jun 2025 20:55:51 +0900 Subject: [PATCH 04/12] =?UTF-8?q?test:=20=EC=B2=B4=EA=B2=B0=20=EC=84=B1?= =?UTF-8?q?=EB=8A=A5=ED=85=8C=EC=8A=A4=ED=8A=B8=20mariaDB=20=EA=B8=B0?= =?UTF-8?q?=EC=A4=80=EC=9C=BC=EB=A1=9C=20=EC=88=98=ED=96=89=EB=90=98?= =?UTF-8?q?=EB=8F=84=EB=A1=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/trade/application/TradeExecuteLoadTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index 51f96f2b..af76160a 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -24,7 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@ActiveProfiles({"dev", "it", "h2-mem", "trade-load-test"}) +@ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test"}) //@Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest From e253435fdd93b54da9b656191a7aaf5a02530c9e Mon Sep 17 00:00:00 2001 From: caniro Date: Wed, 18 Jun 2025 13:35:28 +0900 Subject: [PATCH 05/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20=EC=B2=B4=EA=B2=B0=20=EC=8B=9C=20Trade=EB=93=A4?= =?UTF-8?q?=EC=9D=84=20=EB=AA=A8=EC=95=84=EC=84=9C=20=ED=95=9C=EB=B2=88?= =?UTF-8?q?=EC=97=90=20=EC=A0=80=EC=9E=A5=ED=95=98=EB=8F=84=EB=A1=9D=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/trade/application/TradeExecutor.java | 11 +- .../trade/application/TradeFlowService.java | 24 ++- .../application/TradeExecuteLoadTest.java | 162 +++++++----------- 3 files changed, 89 insertions(+), 108 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index af762596..d1c64f0f 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -1,7 +1,5 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.common.error.BusinessException; -import com.cleanengine.coin.common.response.ErrorStatus; import com.cleanengine.coin.order.domain.BuyOrder; import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.OrderStatus; @@ -38,7 +36,7 @@ public class TradeExecutor { private final TradeService tradeService; @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED) - public void executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { + public Trade executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { BuyOrder buyOrder = tradePair.getBuyOrder(); SellOrder sellOrder = tradePair.getSellOrder(); log.trace("{} - 체결 시작: 매수[{} {}원 {}개] / 매도[{} {}원 {}개]", ticker, buyOrder.getId(), buyOrder.getPrice(), buyOrder.getRemainingSize(), @@ -86,16 +84,13 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr this.updateWalletAfterTrade(buyOrder, sellOrder, ticker, tradedSize, totalTradedPrice); // 체결내역 저장 - Trade trade = this.insertNewTrade(ticker, buyOrder, sellOrder, tradedSize, tradedPrice); + Trade trade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradedPrice, tradedSize); TradeExecutedEvent tradeExecutedEvent = TradeExecutedEvent.of(trade, buyOrder.getId(), sellOrder.getId()); tradeExecutedEventPublisher.publish(tradeExecutedEvent); + return trade; } - private Trade insertNewTrade(String ticker, BuyOrder buyOrder, SellOrder sellOrder, double tradeSize, Double tradePrice) { - Trade newTrade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradePrice, tradeSize); - return tradeService.save(newTrade); - } private static void checkZeroOrderAndThrowException(BuyOrder buyOrder, SellOrder sellOrder) { Order zeroOrder = null; if (approxEquals(buyOrder.getRemainingDeposit(), 0.0)) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index cf629b06..2c8e1ceb 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -4,11 +4,16 @@ import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.spi.WaitingOrders; import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; +import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.trade.repository.TradeRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -20,6 +25,7 @@ public class TradeFlowService { private final TradeMatcher tradeMatcher; private final TradeExecutor tradeExecutor; private final WaitingOrdersManager waitingOrdersManager; + private final TradeRepository tradeRepository; private CountDownLatch testLatch; // 테스트용 후크 @@ -33,12 +39,15 @@ public void execMatchAndTrade(String ticker) { // TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지 Optional> tradePair = tradeMatcher.matchOrders(waitingOrders); boolean continueProcessing = tradePair.isPresent(); + List tradesToSave = new ArrayList<>(); while (continueProcessing) { try { - tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); - if (testLatch != null) { - testLatch.countDown(); + Trade trade = tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + tradesToSave.add(trade); + if (tradesToSave.size() > 1000) { + tradeRepository.saveAll(tradesToSave); + tradesToSave.clear(); } tradePair = tradeMatcher.matchOrders(waitingOrders); @@ -54,6 +63,15 @@ public void execMatchAndTrade(String ticker) { continueProcessing = false; } } + + if (!tradesToSave.isEmpty()) { + tradeRepository.saveAll(tradesToSave); + tradesToSave.clear(); + } + + if (testLatch != null) { + testLatch.countDown(); + } } } diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index af76160a..ef7d3daf 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -55,15 +55,18 @@ class TradeExecuteLoadTest { @Test void warmUp() throws InterruptedException { System.out.println("Starting warmUp"); - runSingleTest(1000); - runSingleTest(1000); - runSingleTest(1000); - runSingleTest(1000); - runSingleTest(1000); - runSingleTest(10000); - runSingleTest(10000); - runSingleTest(10000); - runSingleTest(50000); + int warmUpCount1 = 10; + for (int i = 0; i < warmUpCount1; i++) { + runSingleTest(1000); + } +// int warmUpCount2 = 3; +// for (int i = 0; i < warmUpCount2; i++) { +// runSingleTest(10000); +// } +// int warmUpCount3 = 3; +// for (int i = 0; i < warmUpCount2; i++) { +// runSingleTest(50000); +// } System.out.println("Finished warmUp"); } @@ -74,6 +77,7 @@ void setUp() { tradeRepository.deleteAll(); sellOrderRepository.deleteAll(); buyOrderRepository.deleteAll(); + tradeFlowService.setTestLatch(null); } @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 30회 진행한다.") @@ -98,72 +102,52 @@ void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { printStatistics(queueInsertTimes, executionTimes, orderCount); } -// @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 10회 진행한다.") -// @Order(2) -// @Test -// void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { -// // given -// int orderCount = 10000; -// int repeatCount = 10; -// List executionTimes = new ArrayList<>(); -// List queueInsertTimes = new ArrayList<>(); -// -// // when -// for (int i = 0; i < repeatCount; ++i) { -// long[] times = runSingleTest(orderCount); -// queueInsertTimes.add(times[0]); -// executionTimes.add(times[1]); -// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); -// } -// -// // 통계 출력 -// printStatistics(queueInsertTimes, executionTimes, orderCount); -// } -// -// -// @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") -// @Order(3) -// @Test -// void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { -// // given -// int orderCount = 10000; -// int repeatCount = 10; -// List executionTimes = new ArrayList<>(); -// List queueInsertTimes = new ArrayList<>(); -// -// // when -// for (int i = 0; i < repeatCount; ++i) { -// long[] times = runSingleTest(orderCount); -// queueInsertTimes.add(times[0]); -// executionTimes.add(times[1]); -// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); -// } -// -// // 통계 출력 -// printStatistics(queueInsertTimes, executionTimes, orderCount); -// } -// -// @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") -// @Order(4) -// @Test -// void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { -// // given -// int orderCount = 100000; -// int repeatCount = 10; -// List executionTimes = new ArrayList<>(); -// List queueInsertTimes = new ArrayList<>(); -// -// // when -// for (int i = 0; i < repeatCount; ++i) { -// long[] times = runSingleTest(orderCount); -// queueInsertTimes.add(times[0]); -// executionTimes.add(times[1]); -// System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); -// } -// -// // 통계 출력 -// printStatistics(queueInsertTimes, executionTimes, orderCount); -// } + @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 30회 진행한다.") + @Order(3) + @Test + @Disabled + void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { + // given + int orderCount = 10000; + int repeatCount = 30; + List executionTimes = new ArrayList<>(); + List queueInsertTimes = new ArrayList<>(); + + // when + for (int i = 0; i < repeatCount; ++i) { + long[] times = runSingleTest(orderCount); + queueInsertTimes.add(times[0]); + executionTimes.add(times[1]); + System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); + } + + // 통계 출력 + printStatistics(queueInsertTimes, executionTimes, orderCount); + } + + + @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 30회 진행한다.") + @Order(4) + @Test + @Disabled + void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { + // given + int orderCount = 100000; + int repeatCount = 30; + List executionTimes = new ArrayList<>(); + List queueInsertTimes = new ArrayList<>(); + + // when + for (int i = 0; i < repeatCount; ++i) { + long[] times = runSingleTest(orderCount); + queueInsertTimes.add(times[0]); + executionTimes.add(times[1]); + System.out.printf("Run-%d: 큐 삽입 소요시간 = %d ms, 체결 소요시간 = %d ms%n", (i + 1), times[0], times[1]); + } + + // 통계 출력 + printStatistics(queueInsertTimes, executionTimes, orderCount); + } private long[] runSingleTest(int orderCount) throws InterruptedException { WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); @@ -174,7 +158,7 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { sellOrderRepository.deleteAll(); buyOrderRepository.deleteAll(); - CountDownLatch latch = new CountDownLatch(orderCount); + CountDownLatch latch = new CountDownLatch(1); tradeFlowService.setTestLatch(latch); long testStart = System.nanoTime(); @@ -201,39 +185,23 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { SellOrder dummyOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); eventPublisher.publishEvent(new OrderInsertedToQueue(dummyOrder)); - - - PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); - - // 체결 완료 대기 -// int pollCount = 100; -// for (int i = 0; i < pollCount; i++) { -// if (buyOrderPriorityQueueStore.isEmpty() && sellOrderPriorityQueueStore.isEmpty()) { -// break; -// } -// Thread.sleep(50); -// } - boolean completed = latch.await(60, TimeUnit.SECONDS); long eventEnd = System.nanoTime(); long executionTime = (eventEnd - eventStart) / 1_000_000; - // CountDownLatch 초기화 tradeFlowService.setTestLatch(null); // 결과 출력 long tradeCount = tradeRepository.count(); -// if (tradeCount != orderCount) { - System.out.print("체결 종료 - 체결내역[: " + tradeCount + "건]"); - System.out.println("잔여 주문[매도 " + sellOrderPriorityQueueStore.size() + "건, 매수 " + buyOrderPriorityQueueStore.size() + "건]"); - if (tradeCount != orderCount || !completed) { - System.out.println("경고: 예상 체결 건수(" + orderCount + "건)와 실제(" + tradeCount + "건) 불일치 또는 타임아웃"); - } -// } + System.out.print("체결 종료 - 체결내역[: " + tradeCount + "건]"); + System.out.println("잔여 주문[매도 " + sellOrderPriorityQueueStore.size() + "건, 매수 " + buyOrderPriorityQueueStore.size() + "건]"); + if (tradeCount != orderCount || !completed) { + System.out.println("경고: 예상 체결 건수(" + orderCount + "건)와 실제(" + tradeCount + "건) 불일치 또는 타임아웃"); + } return new long[]{queueInsertTime, executionTime}; } From 9a3ba50a0e2b3d49dd93826240524fbdbf4162c2 Mon Sep 17 00:00:00 2001 From: caniro Date: Wed, 18 Jun 2025 17:30:55 +0900 Subject: [PATCH 06/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20userId=EB=A1=9C=20wallet=20=EC=A1=B0=ED=9A=8C=20?= =?UTF-8?q?=EC=8B=9C=20DB=20join=EC=9C=BC=EB=A1=9C=20=ED=95=9C=EB=B2=88?= =?UTF-8?q?=EC=97=90=20=EA=B0=80=EC=A0=B8=EC=98=A4=EB=8F=84=EB=A1=9D=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/user/info/application/WalletService.java | 5 ++--- .../cleanengine/coin/user/info/infra/WalletRepository.java | 7 +++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java index d9d85e8a..6836a824 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java @@ -36,9 +36,8 @@ public List saveAll(List wallets) { } public Wallet findWalletByUserIdAndTicker(Integer userId, String ticker) { - int accountId = accountRepository.findByUserId(userId).orElseThrow().getId(); - return walletRepository.findByAccountIdAndTicker(accountId, ticker) - .orElseGet(() -> Wallet.of(ticker, accountId)); + return walletRepository.findByUserIdAndTicker(userId, ticker) + .orElseGet(() -> Wallet.of(ticker, accountRepository.findByUserId(userId).orElseThrow().getId())); } public void createNewWallets(Integer accountId) { diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java index 25bf9a9b..332167fc 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java @@ -4,14 +4,21 @@ import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; import java.util.List; import java.util.Optional; public interface WalletRepository extends JpaRepository { + @Lock(LockModeType.PESSIMISTIC_WRITE) Optional findByAccountIdAndTicker(Integer accountId, String ticker); @Lock(LockModeType.PESSIMISTIC_WRITE) List findByAccountId(Integer accountId); + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT w FROM Wallet w JOIN Account a ON w.accountId = a.id WHERE a.userId = :userId AND w.ticker = :ticker") + Optional findByUserIdAndTicker(Integer userId, String ticker); + } From 62c43e30eefbe6b321ebfe9fbd0d96b447d1ef6a Mon Sep 17 00:00:00 2001 From: caniro Date: Wed, 18 Jun 2025 17:52:58 +0900 Subject: [PATCH 07/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20userId=EB=A1=9C=20wallet=20=EC=A1=B0=ED=9A=8C=20?= =?UTF-8?q?=EC=8B=9C=20=EC=97=AC=EB=9F=AC=20=EB=A0=88=EC=BD=94=EB=93=9C?= =?UTF-8?q?=EB=A5=BC=20=ED=95=9C=EB=B2=88=EC=97=90=20=EA=B0=80=EC=A0=B8?= =?UTF-8?q?=EC=98=A4=EB=8F=84=EB=A1=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../coin/trade/application/TradeExecutor.java | 19 +++++++++++++++++-- .../trade/application/TradeFlowService.java | 1 - .../user/info/application/WalletService.java | 4 ++++ .../user/info/infra/WalletRepository.java | 4 ++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index d1c64f0f..12a6643b 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -20,6 +20,8 @@ import java.time.LocalDateTime; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static com.cleanengine.coin.common.CommonValues.approxEquals; @@ -109,7 +111,21 @@ private void increaseAccountCash(Order order, Double amount) { } private void updateWalletAfterTrade(BuyOrder buyOrder, SellOrder sellOrder, String ticker, double tradedSize, double totalTradedPrice) { - Wallet buyerWallet = walletService.findWalletByUserIdAndTicker(buyOrder.getUserId(), ticker); + List userIds = List.of(buyOrder.getUserId(), sellOrder.getUserId()); + List results = walletService.findWalletsByUserIdsAndTicker(userIds, ticker); + if (results.isEmpty()) + return; + if (results.size() == 1) { + throw new RuntimeException("wallets.size() == 1"); + } + Map walletMap = results.stream() + .collect(Collectors.toMap( + result -> (Integer) result[1], + result -> (Wallet) result[0] + )); + Wallet buyerWallet = walletMap.get(buyOrder.getUserId()); + Wallet sellerWallet = walletMap.get(sellOrder.getUserId()); + double updatedBuySize = buyerWallet.getSize() + tradedSize; double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice(); double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize; @@ -117,7 +133,6 @@ private void updateWalletAfterTrade(BuyOrder buyOrder, SellOrder sellOrder, Stri buyerWallet.setBuyPrice(updatedBuyPrice); // TODO : ROI 계산 // 매도 시에는 평단가 변동 없음 - Wallet sellerWallet = walletService.findWalletByUserIdAndTicker(sellOrder.getUserId(), ticker); walletService.saveAll(List.of(buyerWallet, sellerWallet)); } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index 2c8e1ceb..0fddc7bc 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -8,7 +8,6 @@ import com.cleanengine.coin.trade.repository.TradeRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; diff --git a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java index 6836a824..16ce46ae 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java @@ -40,6 +40,10 @@ public Wallet findWalletByUserIdAndTicker(Integer userId, String ticker) { .orElseGet(() -> Wallet.of(ticker, accountRepository.findByUserId(userId).orElseThrow().getId())); } + public List findWalletsByUserIdsAndTicker(List userIds, String ticker) { + return walletRepository.findAllByUserIdsAndTicker(userIds, ticker); + } + public void createNewWallets(Integer accountId) { assetRepository.findAll() .stream() diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java index 332167fc..da77c883 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java @@ -21,4 +21,8 @@ public interface WalletRepository extends JpaRepository { @Query("SELECT w FROM Wallet w JOIN Account a ON w.accountId = a.id WHERE a.userId = :userId AND w.ticker = :ticker") Optional findByUserIdAndTicker(Integer userId, String ticker); + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT w, a.userId FROM Wallet w JOIN Account a ON w.accountId = a.id WHERE a.userId IN :userIds AND w.ticker = :ticker") + List findAllByUserIdsAndTicker(List userIds, String ticker); + } From f37dd2adec7275842e101bace021aeb3f176baa0 Mon Sep 17 00:00:00 2001 From: caniro Date: Wed, 18 Jun 2025 19:33:53 +0900 Subject: [PATCH 08/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20=EC=B2=B4=EA=B2=B0=20=EC=8B=9C=20=EB=A7=A4?= =?UTF-8?q?=EB=8F=84=EA=B3=84=EC=A2=8C=20=EC=98=88=EC=88=98=EA=B8=88=20?= =?UTF-8?q?=EC=A6=9D=EA=B0=80=20=EB=A1=9C=EC=A7=81=20JPQL=EB=A1=9C=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cleanengine/coin/trade/application/TradeExecutor.java | 7 +++++-- .../coin/user/info/application/AccountService.java | 5 +++++ .../coin/user/info/infra/AccountRepository.java | 6 ++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index 12a6643b..765383c5 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -106,8 +106,11 @@ else if (approxEquals(sellOrder.getRemainingSize(), 0.0)) } private void increaseAccountCash(Order order, Double amount) { - Account account = accountService.findAccountByUserId(order.getUserId()).orElseThrow(); - accountService.save(account.increaseCash(amount)); + int updatedRows = accountService.increaseAccountCash(order.getUserId(), amount); + + if (updatedRows == 0) { + throw new RuntimeException("account updatedRows == 0"); + } } private void updateWalletAfterTrade(BuyOrder buyOrder, SellOrder sellOrder, String ticker, double tradedSize, double totalTradedPrice) { diff --git a/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java b/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java index af743b5d..af3ddf6e 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java @@ -55,4 +55,9 @@ public void resetBot(String ticker) { walletRepository.save(wallet); walletRepository.save(wallet2); } + + public int increaseAccountCash(int userId, double amount) { + return accountRepository.increaseAccountCash(userId, amount); + } + } diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java index 4c337cc5..f5933d2e 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java @@ -4,6 +4,8 @@ import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import java.util.Optional; @@ -12,4 +14,8 @@ public interface AccountRepository extends JpaRepository { @Lock(LockModeType.PESSIMISTIC_WRITE) Optional findByUserId(Integer userId); + @Modifying(clearAutomatically = true) + @Query("UPDATE Account a SET a.cash = a.cash + :amount WHERE a.userId = :userId") + int increaseAccountCash(int userId, double amount); + } From 3339daf747699d2802af78bb4e570c40c3fbff67 Mon Sep 17 00:00:00 2001 From: caniro Date: Wed, 18 Jun 2025 21:05:48 +0900 Subject: [PATCH 09/12] =?UTF-8?q?refactor:=20(=EC=84=B1=EB=8A=A5=EA=B0=9C?= =?UTF-8?q?=EC=84=A0)=20=EC=B2=B4=EA=B2=B0=20=EC=8B=9C=20=EB=A7=A4?= =?UTF-8?q?=EC=88=98=EC=A7=80=EA=B0=91=20=EC=9E=94=EA=B3=A0=20=EC=A6=9D?= =?UTF-8?q?=EA=B0=80=20=EB=A1=9C=EC=A7=81=20QueryDSL=EB=A1=9C=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 7 +++ .../coin/trade/application/TradeExecutor.java | 33 +---------- .../trade/application/TradeFlowService.java | 2 +- .../user/info/application/WalletService.java | 37 ++++++++++-- .../application/TradeExecuteLoadTest.java | 59 ++++++++++++------- 5 files changed, 79 insertions(+), 59 deletions(-) diff --git a/build.gradle b/build.gradle index 83ce59b6..cb051d7d 100644 --- a/build.gradle +++ b/build.gradle @@ -76,6 +76,13 @@ dependencies { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'org.junit.platform:junit-platform-suite:1.10.0' + + // QueryDSL + implementation 'com.querydsl:querydsl-jpa:5.1.0:jakarta' + annotationProcessor 'com.querydsl:querydsl-apt:5.1.0:jakarta' + annotationProcessor 'jakarta.persistence:jakarta.persistence-api' + annotationProcessor 'jakarta.annotation:jakarta.annotation-api' + // Spring Security + OAuth2 implementation 'org.springframework.boot:spring-boot-starter-security' implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index 765383c5..9b86a714 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -6,8 +6,6 @@ import com.cleanengine.coin.order.domain.SellOrder; import com.cleanengine.coin.order.domain.spi.WaitingOrders; import com.cleanengine.coin.trade.entity.Trade; -import com.cleanengine.coin.user.domain.Account; -import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.application.AccountService; import com.cleanengine.coin.user.info.application.WalletService; import lombok.Getter; @@ -19,9 +17,6 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; import static com.cleanengine.coin.common.CommonValues.approxEquals; @@ -83,7 +78,7 @@ public Trade executeTrade(WaitingOrders waitingOrders, TradePair t } // 지갑 누적계산 - this.updateWalletAfterTrade(buyOrder, sellOrder, ticker, tradedSize, totalTradedPrice); + walletService.updateWalletAfterTrade(buyOrder, ticker, tradedSize, totalTradedPrice); // 체결내역 저장 Trade trade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradedPrice, tradedSize); @@ -113,32 +108,6 @@ private void increaseAccountCash(Order order, Double amount) { } } - private void updateWalletAfterTrade(BuyOrder buyOrder, SellOrder sellOrder, String ticker, double tradedSize, double totalTradedPrice) { - List userIds = List.of(buyOrder.getUserId(), sellOrder.getUserId()); - List results = walletService.findWalletsByUserIdsAndTicker(userIds, ticker); - if (results.isEmpty()) - return; - if (results.size() == 1) { - throw new RuntimeException("wallets.size() == 1"); - } - Map walletMap = results.stream() - .collect(Collectors.toMap( - result -> (Integer) result[1], - result -> (Wallet) result[0] - )); - Wallet buyerWallet = walletMap.get(buyOrder.getUserId()); - Wallet sellerWallet = walletMap.get(sellOrder.getUserId()); - - double updatedBuySize = buyerWallet.getSize() + tradedSize; - double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice(); - double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize; - buyerWallet.setSize(updatedBuySize); - buyerWallet.setBuyPrice(updatedBuyPrice); - // TODO : ROI 계산 - // 매도 시에는 평단가 변동 없음 - walletService.saveAll(List.of(buyerWallet, sellerWallet)); - } - private static TradeUnitPriceAndSize getTradeUnitPriceAndSize(BuyOrder buyOrder, SellOrder sellOrder) { double tradedPrice; double tradedSize; diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index 0fddc7bc..6ebf2fa7 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -44,7 +44,7 @@ public void execMatchAndTrade(String ticker) { try { Trade trade = tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); tradesToSave.add(trade); - if (tradesToSave.size() > 1000) { + if (tradesToSave.size() > 10000) { tradeRepository.saveAll(tradesToSave); tradesToSave.clear(); } diff --git a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java index 16ce46ae..16867f18 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java @@ -1,9 +1,14 @@ package com.cleanengine.coin.user.info.application; import com.cleanengine.coin.order.adapter.out.persistentce.asset.AssetRepository; +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.user.domain.QAccount; +import com.cleanengine.coin.user.domain.QWallet; import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.infra.AccountRepository; import com.cleanengine.coin.user.info.infra.WalletRepository; +import com.querydsl.jpa.impl.JPAQueryFactory; +import jakarta.persistence.EntityManager; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -15,11 +20,14 @@ public class WalletService { private final WalletRepository walletRepository; private final AccountRepository accountRepository; private final AssetRepository assetRepository; + private final JPAQueryFactory queryFactory; - public WalletService(WalletRepository walletRepository, AccountRepository accountRepository, AssetRepository assetRepository) { + public WalletService(WalletRepository walletRepository, AccountRepository accountRepository, + AssetRepository assetRepository, EntityManager entityManager) { this.walletRepository = walletRepository; this.accountRepository = accountRepository; this.assetRepository = assetRepository; + this.queryFactory = new JPAQueryFactory(entityManager); } @Transactional @@ -40,10 +48,6 @@ public Wallet findWalletByUserIdAndTicker(Integer userId, String ticker) { .orElseGet(() -> Wallet.of(ticker, accountRepository.findByUserId(userId).orElseThrow().getId())); } - public List findWalletsByUserIdsAndTicker(List userIds, String ticker) { - return walletRepository.findAllByUserIdsAndTicker(userIds, ticker); - } - public void createNewWallets(Integer accountId) { assetRepository.findAll() .stream() @@ -51,4 +55,27 @@ public void createNewWallets(Integer accountId) { .forEach(walletRepository::save); } + @Transactional + public void updateWalletAfterTrade(BuyOrder buyOrder, String ticker, double tradedSize, double totalTradedPrice) { + QWallet wallet = QWallet.wallet; + QAccount account = QAccount.account; + + queryFactory + .update(wallet) + .where(wallet.accountId.eq( + queryFactory + .select(account.id) + .from(account) + .where(account.userId.eq(buyOrder.getUserId())) + ).and(wallet.ticker.eq(ticker))) + .set(wallet.size, wallet.size.add(tradedSize)) + .set(wallet.buyPrice, + wallet.buyPrice.coalesce(0.0) + .multiply(wallet.size) + .add(totalTradedPrice) + .divide(wallet.size.add(tradedSize)) + ) + .execute(); + } + } diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index ef7d3daf..ecaab4a5 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -24,8 +24,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test"}) -//@Disabled +import static org.assertj.core.api.Assertions.assertThat; + +@ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test", "actuator", "apm", "otel-local"}) +@Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest class TradeExecuteLoadTest { @@ -53,20 +55,21 @@ class TradeExecuteLoadTest { @DisplayName("워밍업: Spring 컨텍스트 및 JVM 최적화") @Order(1) @Test +// @Disabled void warmUp() throws InterruptedException { System.out.println("Starting warmUp"); int warmUpCount1 = 10; for (int i = 0; i < warmUpCount1; i++) { runSingleTest(1000); } -// int warmUpCount2 = 3; -// for (int i = 0; i < warmUpCount2; i++) { -// runSingleTest(10000); -// } -// int warmUpCount3 = 3; -// for (int i = 0; i < warmUpCount2; i++) { -// runSingleTest(50000); -// } + int warmUpCount2 = 5; + for (int i = 0; i < warmUpCount2; i++) { + runSingleTest(10000); + } + int warmUpCount3 = 5; + for (int i = 0; i < warmUpCount3; i++) { + runSingleTest(100000); + } System.out.println("Finished warmUp"); } @@ -80,13 +83,14 @@ void setUp() { tradeFlowService.setTestLatch(null); } - @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 30회 진행한다.") + @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 10회 진행한다.") @Order(2) @Test +// @Disabled void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { // given int orderCount = 1000; - int repeatCount = 30; + int repeatCount = 10; List executionTimes = new ArrayList<>(); List queueInsertTimes = new ArrayList<>(); @@ -100,16 +104,23 @@ void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { // 통계 출력 printStatistics(queueInsertTimes, executionTimes, orderCount); + + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + + assertThat(sellOrderPriorityQueueStore.size()).isEqualTo(0); + assertThat(buyOrderPriorityQueueStore.size()).isEqualTo(0); } - @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 30회 진행한다.") + @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") @Order(3) @Test - @Disabled +// @Disabled void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { // given int orderCount = 10000; - int repeatCount = 30; + int repeatCount = 10; List executionTimes = new ArrayList<>(); List queueInsertTimes = new ArrayList<>(); @@ -123,17 +134,23 @@ void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { // 통계 출력 printStatistics(queueInsertTimes, executionTimes, orderCount); - } + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + + assertThat(sellOrderPriorityQueueStore.size()).isEqualTo(0); + assertThat(buyOrderPriorityQueueStore.size()).isEqualTo(0); + } - @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 30회 진행한다.") + @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") @Order(4) @Test - @Disabled +// @Disabled void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { // given int orderCount = 100000; - int repeatCount = 30; + int repeatCount = 10; List executionTimes = new ArrayList<>(); List queueInsertTimes = new ArrayList<>(); @@ -176,7 +193,7 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { // 큐 삽입 완료 대기 executor.shutdown(); - boolean queueTerminated = executor.awaitTermination(30, TimeUnit.SECONDS); + executor.awaitTermination(30, TimeUnit.SECONDS); long queueInsertEnd = System.nanoTime(); long queueInsertTime = (queueInsertEnd - testStart) / 1_000_000; @@ -188,7 +205,7 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); - boolean completed = latch.await(60, TimeUnit.SECONDS); + boolean completed = latch.await(2, TimeUnit.MINUTES); long eventEnd = System.nanoTime(); long executionTime = (eventEnd - eventStart) / 1_000_000; From 9f9bd4bdcf7fd21aa129024ee04d7208fc98e17b Mon Sep 17 00:00:00 2001 From: caniro Date: Mon, 23 Jun 2025 17:14:50 +0900 Subject: [PATCH 10/12] =?UTF-8?q?fix:=20account=20JPQL=20=EC=88=98?= =?UTF-8?q?=ED=96=89=20=EC=A0=84=20flush=20=EB=90=98=EB=8F=84=EB=A1=9D=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD(=EC=B2=B4=EA=B2=B0=20=EC=8B=9C=20order=20?= =?UTF-8?q?=EC=A0=80=EC=9E=A5=EB=90=98=EC=A7=80=20=EC=95=8A=EB=8D=98=20?= =?UTF-8?q?=EC=9D=B4=EC=8A=88=20=ED=95=B4=EA=B2=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/cleanengine/coin/user/info/infra/AccountRepository.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java index f5933d2e..05b056e3 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java @@ -14,7 +14,7 @@ public interface AccountRepository extends JpaRepository { @Lock(LockModeType.PESSIMISTIC_WRITE) Optional findByUserId(Integer userId); - @Modifying(clearAutomatically = true) + @Modifying(flushAutomatically = true) @Query("UPDATE Account a SET a.cash = a.cash + :amount WHERE a.userId = :userId") int increaseAccountCash(int userId, double amount); From 42267628dae187c451c8412aaedbf00ab74f87b8 Mon Sep 17 00:00:00 2001 From: caniro Date: Mon, 23 Jun 2025 17:15:25 +0900 Subject: [PATCH 11/12] =?UTF-8?q?test:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=8B=9C=20order=20DB=EC=97=90=20=EC=A0=80=EC=9E=A5=ED=95=9C?= =?UTF-8?q?=20=EC=83=81=ED=83=9C=EB=A1=9C=20=EC=8B=9C=EC=9E=91=ED=95=98?= =?UTF-8?q?=EB=8F=84=EB=A1=9D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/TradeExecuteLoadTest.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index ecaab4a5..093d2f4c 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -27,7 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat; @ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test", "actuator", "apm", "otel-local"}) -@Disabled +//@Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest class TradeExecuteLoadTest { @@ -62,14 +62,14 @@ void warmUp() throws InterruptedException { for (int i = 0; i < warmUpCount1; i++) { runSingleTest(1000); } - int warmUpCount2 = 5; - for (int i = 0; i < warmUpCount2; i++) { - runSingleTest(10000); - } - int warmUpCount3 = 5; - for (int i = 0; i < warmUpCount3; i++) { - runSingleTest(100000); - } +// int warmUpCount2 = 5; +// for (int i = 0; i < warmUpCount2; i++) { +// runSingleTest(10000); +// } +// int warmUpCount3 = 5; +// for (int i = 0; i < warmUpCount3; i++) { +// runSingleTest(100000); +// } System.out.println("Finished warmUp"); } @@ -116,7 +116,7 @@ void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") @Order(3) @Test -// @Disabled + @Disabled void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { // given int orderCount = 10000; @@ -146,7 +146,7 @@ void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") @Order(4) @Test -// @Disabled + @Disabled void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { // given int orderCount = 100000; @@ -188,12 +188,19 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { BuyOrder limitBuyOrder = BuyOrder.createLimitBuyOrder(ticker, 2, 10.0, 130_000_000.0, LocalDateTime.now(), true); waitingOrders.addOrder(limitSellOrder); waitingOrders.addOrder(limitBuyOrder); + + sellOrderRepository.save(limitSellOrder); + buyOrderRepository.save(limitBuyOrder); }); } // 큐 삽입 완료 대기 executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); + + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + long queueInsertEnd = System.nanoTime(); long queueInsertTime = (queueInsertEnd - testStart) / 1_000_000; @@ -202,8 +209,6 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { SellOrder dummyOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); eventPublisher.publishEvent(new OrderInsertedToQueue(dummyOrder)); - PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); - PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); boolean completed = latch.await(2, TimeUnit.MINUTES); long eventEnd = System.nanoTime(); From 62138049a55de3a0091432b02174bb5515b32361 Mon Sep 17 00:00:00 2001 From: caniro Date: Mon, 23 Jun 2025 17:48:38 +0900 Subject: [PATCH 12/12] =?UTF-8?q?fix:=20=EC=B2=B4=EA=B2=B0=20=EC=84=B1?= =?UTF-8?q?=EB=8A=A5=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EB=8F=99=EC=8B=9C?= =?UTF-8?q?=EC=84=B1=20=EC=9D=B4=EC=8A=88=20=EC=9B=90=EC=9D=B8=20=EC=A0=9C?= =?UTF-8?q?=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/TradeExecuteLoadTest.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index 093d2f4c..74de0550 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -20,14 +20,12 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test", "actuator", "apm", "otel-local"}) -//@Disabled +@Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest class TradeExecuteLoadTest { @@ -181,22 +179,19 @@ private long[] runSingleTest(int orderCount) throws InterruptedException { long testStart = System.nanoTime(); // 주문 생성 및 큐 삽입 - ExecutorService executor = Executors.newFixedThreadPool(20); + final LocalDateTime baseTime = LocalDateTime.now(); + for (int i = 0; i < orderCount; i++) { - executor.submit(() -> { - SellOrder limitSellOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); - BuyOrder limitBuyOrder = BuyOrder.createLimitBuyOrder(ticker, 2, 10.0, 130_000_000.0, LocalDateTime.now(), true); - waitingOrders.addOrder(limitSellOrder); - waitingOrders.addOrder(limitBuyOrder); - - sellOrderRepository.save(limitSellOrder); - buyOrderRepository.save(limitBuyOrder); - }); - } + final LocalDateTime orderTime = baseTime.plusSeconds(i); - // 큐 삽입 완료 대기 - executor.shutdown(); - executor.awaitTermination(30, TimeUnit.SECONDS); + SellOrder limitSellOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, orderTime, true); + BuyOrder limitBuyOrder = BuyOrder.createLimitBuyOrder(ticker, 2, 10.0, 130_000_000.0, orderTime, true); + waitingOrders.addOrder(limitSellOrder); + waitingOrders.addOrder(limitBuyOrder); + + sellOrderRepository.save(limitSellOrder); + buyOrderRepository.save(limitBuyOrder); + } PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT);