|
1 | 1 | package com.mobiera.ms.commons.stats.jms; |
2 | 2 |
|
3 | 3 |
|
| 4 | +import java.time.Instant; |
| 5 | +import java.time.format.DateTimeParseException; |
| 6 | +import java.util.ArrayList; |
4 | 7 | import java.util.HashMap; |
| 8 | +import java.util.List; |
5 | 9 | import java.util.Map; |
6 | 10 | import java.util.UUID; |
7 | 11 |
|
8 | 12 | import org.eclipse.microprofile.config.inject.ConfigProperty; |
9 | 13 | import org.jboss.logging.Logger; |
10 | 14 |
|
11 | 15 | import com.fasterxml.jackson.core.JsonProcessingException; |
| 16 | +import com.fasterxml.jackson.databind.JsonNode; |
| 17 | +import com.fasterxml.jackson.databind.ObjectMapper; |
12 | 18 | import com.mobiera.commons.util.JsonUtil; |
| 19 | +import com.mobiera.ms.commons.stats.api.StatEnum; |
13 | 20 | import com.mobiera.ms.commons.stats.api.StatEvent; |
14 | 21 | import com.mobiera.ms.commons.stats.svc.StatBuilderService; |
15 | 22 |
|
|
28 | 35 | import jakarta.jms.ObjectMessage; |
29 | 36 | import jakarta.jms.Queue; |
30 | 37 | import jakarta.jms.Session; |
| 38 | +import jakarta.jms.TextMessage; |
| 39 | +import lombok.Data; |
31 | 40 |
|
32 | 41 | @ApplicationScoped |
33 | 42 | public class StatQueueConsumer { |
@@ -237,31 +246,32 @@ Uni<Void> runConsumer(UUID uuid) { |
237 | 246 |
|
238 | 247 | } |
239 | 248 | } |
240 | | - try { |
241 | | - if (debug) |
242 | | - logger.info("statConsumer: " + queueName + " before stat "+ uuid + " " + (System.currentTimeMillis() - now)); |
243 | | - |
244 | | - getStatBuilderService().statEvent(event); |
245 | | - if (debug) |
246 | | - logger.info("statConsumer: " + queueName + " after stat, before commit "+ uuid + " " + (System.currentTimeMillis() - now)); |
247 | | - |
248 | | - context.commit(); |
249 | | - if (debug) |
250 | | - logger.info("statConsumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); |
251 | | - |
252 | | - |
253 | | - |
254 | | - } catch (Exception e) { |
255 | | - try { |
256 | | - logger.warn("statConsumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(event, false), e); |
257 | | - } catch (JsonProcessingException e1) { |
258 | | - logger.warn("statConsumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); |
259 | | - } |
260 | | - context.rollback(); |
261 | | - //if (debug) |
262 | | - logger.info("statConsumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); |
263 | | - |
264 | | - } |
| 249 | + processStatEvent(uuid, now, event, context); |
| 250 | + } else if (message instanceof TextMessage) { |
| 251 | + ObjectMapper objectMapper = new ObjectMapper(); |
| 252 | + TextMessage txtMsg = (TextMessage) message; |
| 253 | + JsonNode rootNode = objectMapper.readTree(txtMsg.getText()); |
| 254 | + |
| 255 | + List<StatEnum> enums = new ArrayList<>(); |
| 256 | + rootNode.path("enums").forEach(enumNode -> { |
| 257 | + try { |
| 258 | + enums.add(objectMapper.treeToValue(enumNode, StatEnumImpl.class)); |
| 259 | + } catch (JsonProcessingException e) { |
| 260 | + } |
| 261 | + }); |
| 262 | + event = new StatEvent(); |
| 263 | + event.setStatClass(rootNode.path("statClass").asText("null")); |
| 264 | + event.setEntityId(rootNode.path("entityId").asText("null")); |
| 265 | + if (!rootNode.path("ts").isMissingNode()) { |
| 266 | + try { |
| 267 | + event.setTs(Instant.parse(rootNode.path("ts").asText())); |
| 268 | + } catch (DateTimeParseException e) { |
| 269 | + } |
| 270 | + } |
| 271 | + event.setIncrement(rootNode.path("increment").asInt(0)); |
| 272 | + event.setDoubleIncrement(rootNode.path("doubleIncrement").asDouble(0.0)); |
| 273 | + event.setEnums(enums); |
| 274 | + processStatEvent(uuid, now, event, context); |
265 | 275 | } else { |
266 | 276 | if (debug) logger.warn("statConsumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + event); |
267 | 277 | context.commit(); |
@@ -342,7 +352,36 @@ private StatBuilderService getStatBuilderService() { |
342 | 352 | } |
343 | 353 |
|
344 | 354 |
|
| 355 | + private void processStatEvent(UUID uuid, long now, StatEvent event, JMSContext context) { |
| 356 | + try { |
| 357 | + if (debug) |
| 358 | + logger.info("statConsumer: " + queueName + " before stat "+ uuid + " " + (System.currentTimeMillis() - now)); |
| 359 | + |
| 360 | + getStatBuilderService().statEvent(event); |
| 361 | + if (debug) |
| 362 | + logger.info("statConsumer: " + queueName + " after stat, before commit "+ uuid + " " + (System.currentTimeMillis() - now)); |
| 363 | + |
| 364 | + context.commit(); |
| 365 | + if (debug) |
| 366 | + logger.info("statConsumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); |
| 367 | + } catch (Exception e) { |
| 368 | + try { |
| 369 | + logger.warn("statConsumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(event, false), e); |
| 370 | + } catch (JsonProcessingException e1) { |
| 371 | + logger.warn("statConsumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); |
| 372 | + } |
| 373 | + context.rollback(); |
| 374 | + logger.info("statConsumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); |
| 375 | + } |
| 376 | + } |
345 | 377 |
|
| 378 | + @Data |
| 379 | + public static class StatEnumImpl implements StatEnum { |
| 380 | + private Integer index; |
| 381 | + private String label; |
| 382 | + private String value; |
| 383 | + private String description; |
| 384 | + } |
346 | 385 |
|
347 | 386 |
|
348 | 387 |
|
|
0 commit comments