diff --git a/src/main/java/com/vaultionizer/vaultserver/controllers/RTStreamController.java b/src/main/java/com/vaultionizer/vaultserver/controllers/RTStreamController.java new file mode 100644 index 0000000..ccc8da3 --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/controllers/RTStreamController.java @@ -0,0 +1,134 @@ +package com.vaultionizer.vaultserver.controllers; + +import com.vaultionizer.vaultserver.helpers.Config; +import com.vaultionizer.vaultserver.model.dto.AuthWrapperDto; +import com.vaultionizer.vaultserver.model.dto.GenericAuthDto; +import com.vaultionizer.vaultserver.model.dto.PushRTDataDto; +import com.vaultionizer.vaultserver.service.*; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +@Api(value = "/api/stream/", description = "Controller that handles realtime stream requests.") +@RestController +public class RTStreamController { + private final SessionService sessionService; + private final UserAccessService userAccessService; + private final RTStreamService rtStreamService; + private final RTStreamDataService rtStreamDataService; + + + @Autowired + public RTStreamController(SessionService sessionService, UserAccessService userAccessService, + RTStreamService rtStreamService, RTStreamDataService rtStreamDataService) { + this.sessionService = sessionService; + this.userAccessService = userAccessService; + this.rtStreamService = rtStreamService; + this.rtStreamDataService = rtStreamDataService; + } + + @RequestMapping(value = "/api/stream/register/{spaceID}", method = RequestMethod.POST) + @ApiOperation(value = "Register a new stream for a given spaceID.", + response = Long.class) + @ApiResponses(value = { + @ApiResponse(code = 201, message = "The stream was created successfully."), + @ApiResponse(code = 400, message = "The spaceID is malformed."), + @ApiResponse(code = 401, message = "The user authorization failed."), + @ApiResponse(code = 403, message = "The user has no access to the space specified."), + @ApiResponse(code = 412, message = "Realtime is disabled on this server.") + }) + public @ResponseBody + ResponseEntity + registerStream(@RequestBody AuthWrapperDto req, @PathVariable("spaceID") Long spaceID){ + if (Config.REALTIME_DISABLED) return new ResponseEntity<>(null, HttpStatus.PRECONDITION_FAILED); + if (spaceID == null) return new ResponseEntity<>(null, HttpStatus.BAD_REQUEST); + GenericAuthDto auth = req.getAuth(); + if (!sessionService.getSession(auth.getUserID(), auth.getSessionKey())){ + return new ResponseEntity<>(null, HttpStatus.UNAUTHORIZED); + } + if (userAccessService.userHasAccess(auth.getUserID(), spaceID)){ + return new ResponseEntity<>(null, HttpStatus.FORBIDDEN); + } + Long streamID = rtStreamService.createStream(spaceID); + return new ResponseEntity<>(streamID, HttpStatus.CREATED); + } + + @RequestMapping(value = "/api/stream/push/{streamID}", method = RequestMethod.POST) + @ApiOperation(value = "Push data to a specific stream.", + response = Long.class) + @ApiResponses(value = { + @ApiResponse(code = 200, message = "The data was pushed successfully."), + @ApiResponse(code = 401, message = "The user authorization failed."), + @ApiResponse(code = 403, message = "The user has no access to the stream specified."), + @ApiResponse(code = 412, message = "Realtime is disabled on this server."), + @ApiResponse(code = 425, message = "The minimum delay was not met.") + }) + public @ResponseBody + ResponseEntity + pushData(@RequestBody PushRTDataDto req, @PathVariable("streamID") Long streamID){ + var res = checkValidRequest(req.getAuth(), streamID); + if (res != null) return res; + + boolean success = rtStreamDataService.pushData(streamID, req.getData()); + if (!success) return new ResponseEntity<>(null, HttpStatus.TOO_EARLY); + return new ResponseEntity<>(null, HttpStatus.OK); + } + + @RequestMapping(value = "/api/stream/delete/{streamID}", method = RequestMethod.DELETE) + @ApiOperation(value = "Deletes the stream specified.", + response = Long.class) + @ApiResponses(value = { + @ApiResponse(code = 200, message = "The user was signed in successfully. The response is a session key."), + @ApiResponse(code = 401, message = "The user authorization failed."), + @ApiResponse(code = 403, message = "The user has no access to the stream."), + @ApiResponse(code = 412, message = "Realtime is disabled on this server.") + }) + public @ResponseBody + ResponseEntity + deleteStream(@RequestBody AuthWrapperDto req, @PathVariable("streamID") Long streamID){ + var res = checkValidRequest(req.getAuth(), streamID); + if (res != null) return res; + rtStreamService.deleteStream(streamID); + + return new ResponseEntity<>(null, HttpStatus.OK); + } + + + + @RequestMapping(value = "/api/stream/fetch/{streamID}/{limit}", method = RequestMethod.POST) + @ApiOperation(value = "Fetches a specific amount of entries of data from a stream.", + response = Long.class) + @ApiResponses(value = { + @ApiResponse(code = 200, message = "The values were fetched successfully."), + @ApiResponse(code = 401, message = "The user authorization failed."), + @ApiResponse(code = 403, message = "The user has no access to the stream."), + @ApiResponse(code = 412, message = "Realtime is disabled on this server.") + }) + public @ResponseBody + ResponseEntity + fetchData(@RequestBody AuthWrapperDto req, @PathVariable("streamID") Long streamID, @PathVariable("limit") Long limit){ + var res = checkValidRequest(req.getAuth(), streamID); + if (res != null) return res; + + // TODO + return new ResponseEntity<>(null, HttpStatus.OK); + } + + private ResponseEntity checkValidRequest(GenericAuthDto auth, Long streamID){ + if (Config.REALTIME_DISABLED) return new ResponseEntity<>(null, HttpStatus.PRECONDITION_FAILED); + if (!sessionService.getSession(auth.getUserID(), auth.getSessionKey())){ + return new ResponseEntity<>(null, HttpStatus.UNAUTHORIZED); + } + + Long spaceID = rtStreamService.getSpaceID(streamID); + if (spaceID == null || !userAccessService.userHasAccess(auth.getUserID(),spaceID)){ + return new ResponseEntity<>(null, HttpStatus.FORBIDDEN); + } + return null; + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/helpers/Config.java b/src/main/java/com/vaultionizer/vaultserver/helpers/Config.java index c75e8fb..c1129c5 100644 --- a/src/main/java/com/vaultionizer/vaultserver/helpers/Config.java +++ b/src/main/java/com/vaultionizer/vaultserver/helpers/Config.java @@ -10,7 +10,7 @@ public class Config { public static final GetVersionResponseDto VERSION = new GetVersionResponseDto( - "Vaultionizer v0.1", + "Vaultionizer v1.0", "No maintainer", false ); @@ -34,4 +34,9 @@ public class Config { public static final int PENDING_UPLOAD_JOB_DELAY = 86400 * 1000; public static final String RANDOM_ALGO = "SHA1PRNG"; + + + // Realtime stream config + public static final boolean REALTIME_DISABLED = false; // disable realtime (default: true to disable) + public static final Long MIN_PUSH_DELAY = 5L; // minimal delay between two pushes in ms } diff --git a/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamDataModel.java b/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamDataModel.java new file mode 100644 index 0000000..9790586 --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamDataModel.java @@ -0,0 +1,46 @@ +package com.vaultionizer.vaultserver.model.db; + +import org.hibernate.annotations.CreationTimestamp; + +import javax.persistence.*; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import java.time.Instant; + +@Entity +public class RTStreamDataModel { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long streamDataID; + + @NotNull(message = "streamID cannot be null!") + @Min(value = 0, message = "streamID cannot be below zero...") + private Long streamID; + + @Column(columnDefinition = "TEXT") + @NotNull(message = "Data cannot be null!") + private String data; + + @CreationTimestamp + private Instant timestamp; + + public RTStreamDataModel() { + } + + public RTStreamDataModel(Long streamID, String data) { + this.streamID = streamID; + this.data = data; + } + + public Long getStreamDataID() { + return streamDataID; + } + + public Long getStreamID() { + return streamID; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamModel.java b/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamModel.java new file mode 100644 index 0000000..bef4c96 --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/model/db/RTStreamModel.java @@ -0,0 +1,34 @@ +package com.vaultionizer.vaultserver.model.db; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +@Entity +public class RTStreamModel { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long streamID; + + @NotNull(message = "SpaceID cannot be null!") + @Min(value = 0, message = "SpaceID cannot be below zero...") + private Long spaceID; + + public RTStreamModel() { + } + + public RTStreamModel(Long spaceID) { + this.spaceID = spaceID; + } + + public Long getStreamID() { + return streamID; + } + + public Long getSpaceID() { + return spaceID; + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/model/dto/FetchDataResponseDto.java b/src/main/java/com/vaultionizer/vaultserver/model/dto/FetchDataResponseDto.java new file mode 100644 index 0000000..5923043 --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/model/dto/FetchDataResponseDto.java @@ -0,0 +1,21 @@ +package com.vaultionizer.vaultserver.model.dto; + +import java.time.Instant; + +public class FetchDataResponseDto { + private Instant timestamp; + private String data; + + public FetchDataResponseDto(Instant timestamp, String data) { + this.timestamp = timestamp; + this.data = data; + } + + public Instant getTimestamp() { + return timestamp; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/model/dto/PushRTDataDto.java b/src/main/java/com/vaultionizer/vaultserver/model/dto/PushRTDataDto.java new file mode 100644 index 0000000..8b8696d --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/model/dto/PushRTDataDto.java @@ -0,0 +1,14 @@ +package com.vaultionizer.vaultserver.model.dto; + +public class PushRTDataDto { + private GenericAuthDto auth; + private String data; + + public GenericAuthDto getAuth() { + return auth; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamDataRepository.java b/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamDataRepository.java new file mode 100644 index 0000000..58d731b --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamDataRepository.java @@ -0,0 +1,30 @@ +package com.vaultionizer.vaultserver.resource; + +import com.vaultionizer.vaultserver.model.db.RTStreamDataModel; +import com.vaultionizer.vaultserver.model.dto.FetchDataResponseDto; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; + +public interface RTStreamDataRepository extends JpaRepository { + + @Transactional + @Modifying + @Query("DELETE FROM RTStreamDataModel it " + + "WHERE it.streamID IN " + + "(SELECT stream FROM RTStreamModel stream WHERE stream.spaceID = ?1)" + + "") + void deleteAllStreamDataWithSpace(Long spaceID); + + @Transactional + @Modifying + @Query("DELETE FROM RTStreamDataModel it WHERE it.streamID = ?1") + void deleteAllStreamDataWithStream(Long streamID); + + @Query("SELECT new com.vaultionizer.vaultserver.model.dto.FetchDataResponseDto(it.timestamp, it.data) " + + "FROM RTStreamDataModel it WHERE it.streamID = ?1 ORDER BY it.timestamp") + ArrayList fetchData(Long streamID, Long limit); +} diff --git a/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamRepository.java b/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamRepository.java new file mode 100644 index 0000000..e8d28bb --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/resource/RTStreamRepository.java @@ -0,0 +1,31 @@ +package com.vaultionizer.vaultserver.resource; + +import com.vaultionizer.vaultserver.model.db.RTStreamModel; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Set; + +public interface RTStreamRepository extends JpaRepository { + @Transactional + @Modifying + @Query("DELETE FROM RTStreamModel it WHERE it.spaceID = ?1") + void deleteAllStreamsForSpace(Long spaceID); + + + @Transactional + @Modifying + @Query("DELETE FROM RTStreamModel it WHERE it.streamID = ?1") + void deleteStream(Long streamID); + + + @Query("SELECT it.streamID FROM RTStreamModel it WHERE it.spaceID = ?1") + Set getAllStreams(Long spaceID); + + @Query("SELECT it.spaceID FROM RTStreamModel it WHERE it.streamID = ?1") + Set getSpaceID(Long streamID); + + +} diff --git a/src/main/java/com/vaultionizer/vaultserver/service/RTStreamDataService.java b/src/main/java/com/vaultionizer/vaultserver/service/RTStreamDataService.java new file mode 100644 index 0000000..77042d7 --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/service/RTStreamDataService.java @@ -0,0 +1,51 @@ +package com.vaultionizer.vaultserver.service; + +import com.vaultionizer.vaultserver.helpers.Config; +import com.vaultionizer.vaultserver.model.db.RTStreamDataModel; +import com.vaultionizer.vaultserver.resource.RTStreamDataRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.Instant; +import java.util.HashMap; + +@Service +public class RTStreamDataService { + private final RTStreamDataRepository rtStreamDataRepository; + private final HashMap pushMap; // needed to keep track of minimum delays between pushes + + @Autowired + public RTStreamDataService(RTStreamDataRepository rtStreamDataRepository) { + this.rtStreamDataRepository = rtStreamDataRepository; + pushMap = new HashMap<>(); + } + + public boolean pushData(Long streamID, String content){ + if (!checkPushable(streamID)) return false; + RTStreamDataModel streamEntry = new RTStreamDataModel(streamID, content); + rtStreamDataRepository.save(streamEntry); + return true; + } + + private synchronized boolean checkPushable(Long streamID){ + Instant lastPush = pushMap.get(streamID); + Instant now = Instant.now(); + if (lastPush == null){ + pushMap.put(streamID, now); + return true; + } + if (lastPush.isBefore(now.minusMillis(Config.MIN_PUSH_DELAY))){ + pushMap.put(streamID, now); + return true; + } + return false; + } + + public void deleteStreamData(Long streamID){ + rtStreamDataRepository.deleteAllStreamDataWithStream(streamID); + } + + public void fetchData(Long streamID, Long limit){ + + } +} diff --git a/src/main/java/com/vaultionizer/vaultserver/service/RTStreamService.java b/src/main/java/com/vaultionizer/vaultserver/service/RTStreamService.java new file mode 100644 index 0000000..1c3799e --- /dev/null +++ b/src/main/java/com/vaultionizer/vaultserver/service/RTStreamService.java @@ -0,0 +1,41 @@ +package com.vaultionizer.vaultserver.service; + +import com.vaultionizer.vaultserver.model.db.RTStreamModel; +import com.vaultionizer.vaultserver.resource.RTStreamRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class RTStreamService { + private final RTStreamRepository rtStreamRepository; + private final RTStreamDataService rtStreamDataService; + + + @Autowired + public RTStreamService(RTStreamRepository rtStreamRepository, RTStreamDataService rtStreamDataService) { + this.rtStreamRepository = rtStreamRepository; + this.rtStreamDataService = rtStreamDataService; + } + + + public Long createStream(Long spaceID){ + RTStreamModel model = new RTStreamModel(spaceID); + rtStreamRepository.save(model); + return model.getStreamID(); + } + + public void deleteStreams(Long spaceID){ + // TODO + } + + public void deleteStream(Long streamID){ + rtStreamDataService.deleteStreamData(streamID); + rtStreamRepository.deleteStream(streamID); + } + + public Long getSpaceID(Long streamID){ + var ids = rtStreamRepository.getSpaceID(streamID); + if (ids.size() != 1) return null; + return ids.stream().findFirst().get(); + } +}