Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.vaultionizer.vaultserver.controllers;

import com.vaultionizer.vaultserver.helpers.Config;
import com.vaultionizer.vaultserver.model.dto.AuthWrapperDto;
import com.vaultionizer.vaultserver.model.dto.GenericAuthDto;
import com.vaultionizer.vaultserver.model.dto.PushRTDataDto;
import com.vaultionizer.vaultserver.service.*;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@Api(value = "/api/stream/", description = "Controller that handles realtime stream requests.")
@RestController
public class RTStreamController {
private final SessionService sessionService;
private final UserAccessService userAccessService;
private final RTStreamService rtStreamService;
private final RTStreamDataService rtStreamDataService;


@Autowired
public RTStreamController(SessionService sessionService, UserAccessService userAccessService,
RTStreamService rtStreamService, RTStreamDataService rtStreamDataService) {
this.sessionService = sessionService;
this.userAccessService = userAccessService;
this.rtStreamService = rtStreamService;
this.rtStreamDataService = rtStreamDataService;
}

@RequestMapping(value = "/api/stream/register/{spaceID}", method = RequestMethod.POST)
@ApiOperation(value = "Register a new stream for a given spaceID.",
response = Long.class)
@ApiResponses(value = {
@ApiResponse(code = 201, message = "The stream was created successfully."),
@ApiResponse(code = 400, message = "The spaceID is malformed."),
@ApiResponse(code = 401, message = "The user authorization failed."),
@ApiResponse(code = 403, message = "The user has no access to the space specified."),
@ApiResponse(code = 412, message = "Realtime is disabled on this server.")
})
public @ResponseBody
ResponseEntity<?>
registerStream(@RequestBody AuthWrapperDto req, @PathVariable("spaceID") Long spaceID){
if (Config.REALTIME_DISABLED) return new ResponseEntity<>(null, HttpStatus.PRECONDITION_FAILED);
if (spaceID == null) return new ResponseEntity<>(null, HttpStatus.BAD_REQUEST);
GenericAuthDto auth = req.getAuth();
if (!sessionService.getSession(auth.getUserID(), auth.getSessionKey())){
return new ResponseEntity<>(null, HttpStatus.UNAUTHORIZED);
}
if (userAccessService.userHasAccess(auth.getUserID(), spaceID)){
return new ResponseEntity<>(null, HttpStatus.FORBIDDEN);
}
Long streamID = rtStreamService.createStream(spaceID);
return new ResponseEntity<>(streamID, HttpStatus.CREATED);
}

@RequestMapping(value = "/api/stream/push/{streamID}", method = RequestMethod.POST)
@ApiOperation(value = "Push data to a specific stream.",
response = Long.class)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "The data was pushed successfully."),
@ApiResponse(code = 401, message = "The user authorization failed."),
@ApiResponse(code = 403, message = "The user has no access to the stream specified."),
@ApiResponse(code = 412, message = "Realtime is disabled on this server."),
@ApiResponse(code = 425, message = "The minimum delay was not met.")
})
public @ResponseBody
ResponseEntity<?>
pushData(@RequestBody PushRTDataDto req, @PathVariable("streamID") Long streamID){
var res = checkValidRequest(req.getAuth(), streamID);
if (res != null) return res;

boolean success = rtStreamDataService.pushData(streamID, req.getData());
if (!success) return new ResponseEntity<>(null, HttpStatus.TOO_EARLY);
return new ResponseEntity<>(null, HttpStatus.OK);
}

@RequestMapping(value = "/api/stream/delete/{streamID}", method = RequestMethod.DELETE)
@ApiOperation(value = "Deletes the stream specified.",
response = Long.class)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "The user was signed in successfully. The response is a session key."),
@ApiResponse(code = 401, message = "The user authorization failed."),
@ApiResponse(code = 403, message = "The user has no access to the stream."),
@ApiResponse(code = 412, message = "Realtime is disabled on this server.")
})
public @ResponseBody
ResponseEntity<?>
deleteStream(@RequestBody AuthWrapperDto req, @PathVariable("streamID") Long streamID){
var res = checkValidRequest(req.getAuth(), streamID);
if (res != null) return res;
rtStreamService.deleteStream(streamID);

return new ResponseEntity<>(null, HttpStatus.OK);
}



@RequestMapping(value = "/api/stream/fetch/{streamID}/{limit}", method = RequestMethod.POST)
@ApiOperation(value = "Fetches a specific amount of entries of data from a stream.",
response = Long.class)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "The values were fetched successfully."),
@ApiResponse(code = 401, message = "The user authorization failed."),
@ApiResponse(code = 403, message = "The user has no access to the stream."),
@ApiResponse(code = 412, message = "Realtime is disabled on this server.")
})
public @ResponseBody
ResponseEntity<?>
fetchData(@RequestBody AuthWrapperDto req, @PathVariable("streamID") Long streamID, @PathVariable("limit") Long limit){
var res = checkValidRequest(req.getAuth(), streamID);
if (res != null) return res;

// TODO
return new ResponseEntity<>(null, HttpStatus.OK);
}

private ResponseEntity<?> checkValidRequest(GenericAuthDto auth, Long streamID){
if (Config.REALTIME_DISABLED) return new ResponseEntity<>(null, HttpStatus.PRECONDITION_FAILED);
if (!sessionService.getSession(auth.getUserID(), auth.getSessionKey())){
return new ResponseEntity<>(null, HttpStatus.UNAUTHORIZED);
}

Long spaceID = rtStreamService.getSpaceID(streamID);
if (spaceID == null || !userAccessService.userHasAccess(auth.getUserID(),spaceID)){
return new ResponseEntity<>(null, HttpStatus.FORBIDDEN);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class Config {


public static final GetVersionResponseDto VERSION = new GetVersionResponseDto(
"Vaultionizer v0.1",
"Vaultionizer v1.0",
"No maintainer",
false
);
Expand All @@ -34,4 +34,9 @@ public class Config {
public static final int PENDING_UPLOAD_JOB_DELAY = 86400 * 1000;

public static final String RANDOM_ALGO = "SHA1PRNG";


// Realtime stream config
public static final boolean REALTIME_DISABLED = false; // disable realtime (default: true to disable)
public static final Long MIN_PUSH_DELAY = 5L; // minimal delay between two pushes in ms
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.vaultionizer.vaultserver.model.db;

import org.hibernate.annotations.CreationTimestamp;

import javax.persistence.*;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.time.Instant;

@Entity
public class RTStreamDataModel {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long streamDataID;

@NotNull(message = "streamID cannot be null!")
@Min(value = 0, message = "streamID cannot be below zero...")
private Long streamID;

@Column(columnDefinition = "TEXT")
@NotNull(message = "Data cannot be null!")
private String data;

@CreationTimestamp
private Instant timestamp;

public RTStreamDataModel() {
}

public RTStreamDataModel(Long streamID, String data) {
this.streamID = streamID;
this.data = data;
}

public Long getStreamDataID() {
return streamDataID;
}

public Long getStreamID() {
return streamID;
}

public String getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.vaultionizer.vaultserver.model.db;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Entity
public class RTStreamModel {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long streamID;

@NotNull(message = "SpaceID cannot be null!")
@Min(value = 0, message = "SpaceID cannot be below zero...")
private Long spaceID;

public RTStreamModel() {
}

public RTStreamModel(Long spaceID) {
this.spaceID = spaceID;
}

public Long getStreamID() {
return streamID;
}

public Long getSpaceID() {
return spaceID;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.vaultionizer.vaultserver.model.dto;

import java.time.Instant;

public class FetchDataResponseDto {
private Instant timestamp;
private String data;

public FetchDataResponseDto(Instant timestamp, String data) {
this.timestamp = timestamp;
this.data = data;
}

public Instant getTimestamp() {
return timestamp;
}

public String getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.vaultionizer.vaultserver.model.dto;

public class PushRTDataDto {
private GenericAuthDto auth;
private String data;

public GenericAuthDto getAuth() {
return auth;
}

public String getData() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.vaultionizer.vaultserver.resource;

import com.vaultionizer.vaultserver.model.db.RTStreamDataModel;
import com.vaultionizer.vaultserver.model.dto.FetchDataResponseDto;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;

public interface RTStreamDataRepository extends JpaRepository<RTStreamDataModel, Long> {

@Transactional
@Modifying
@Query("DELETE FROM RTStreamDataModel it " +
"WHERE it.streamID IN " +
"(SELECT stream FROM RTStreamModel stream WHERE stream.spaceID = ?1)" +
"")
void deleteAllStreamDataWithSpace(Long spaceID);

@Transactional
@Modifying
@Query("DELETE FROM RTStreamDataModel it WHERE it.streamID = ?1")
void deleteAllStreamDataWithStream(Long streamID);

@Query("SELECT new com.vaultionizer.vaultserver.model.dto.FetchDataResponseDto(it.timestamp, it.data) " +
"FROM RTStreamDataModel it WHERE it.streamID = ?1 ORDER BY it.timestamp")
ArrayList<FetchDataResponseDto> fetchData(Long streamID, Long limit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.vaultionizer.vaultserver.resource;

import com.vaultionizer.vaultserver.model.db.RTStreamModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.transaction.annotation.Transactional;

import java.util.Set;

public interface RTStreamRepository extends JpaRepository<RTStreamModel, Long> {
@Transactional
@Modifying
@Query("DELETE FROM RTStreamModel it WHERE it.spaceID = ?1")
void deleteAllStreamsForSpace(Long spaceID);


@Transactional
@Modifying
@Query("DELETE FROM RTStreamModel it WHERE it.streamID = ?1")
void deleteStream(Long streamID);


@Query("SELECT it.streamID FROM RTStreamModel it WHERE it.spaceID = ?1")
Set<Long> getAllStreams(Long spaceID);

@Query("SELECT it.spaceID FROM RTStreamModel it WHERE it.streamID = ?1")
Set<Long> getSpaceID(Long streamID);


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.vaultionizer.vaultserver.service;

import com.vaultionizer.vaultserver.helpers.Config;
import com.vaultionizer.vaultserver.model.db.RTStreamDataModel;
import com.vaultionizer.vaultserver.resource.RTStreamDataRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.util.HashMap;

@Service
public class RTStreamDataService {
private final RTStreamDataRepository rtStreamDataRepository;
private final HashMap<Long, Instant> pushMap; // needed to keep track of minimum delays between pushes

@Autowired
public RTStreamDataService(RTStreamDataRepository rtStreamDataRepository) {
this.rtStreamDataRepository = rtStreamDataRepository;
pushMap = new HashMap<>();
}

public boolean pushData(Long streamID, String content){
if (!checkPushable(streamID)) return false;
RTStreamDataModel streamEntry = new RTStreamDataModel(streamID, content);
rtStreamDataRepository.save(streamEntry);
return true;
}

private synchronized boolean checkPushable(Long streamID){
Instant lastPush = pushMap.get(streamID);
Instant now = Instant.now();
if (lastPush == null){
pushMap.put(streamID, now);
return true;
}
if (lastPush.isBefore(now.minusMillis(Config.MIN_PUSH_DELAY))){
pushMap.put(streamID, now);
return true;
}
return false;
}

public void deleteStreamData(Long streamID){
rtStreamDataRepository.deleteAllStreamDataWithStream(streamID);
}

public void fetchData(Long streamID, Long limit){

}
}
Loading