Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 42 additions & 29 deletions src/main/java/com/brogrammer/streamspace/content/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand Down Expand Up @@ -140,7 +141,12 @@ private List<Path> filterPaths(List<Path> paths, String... extensions) {
return paths.parallelStream()
.filter(path -> {
String pathString = path.toString().toLowerCase();
return extensionSet.stream().anyMatch(pathString::endsWith);
for (String ext : extensionSet) {
if (pathString.endsWith(ext)) {
return true;
}
}
return false;
})
.toList();
}
Expand All @@ -165,7 +171,7 @@ private List<Video> createVideoEntities(List<Path> paths) throws IOException {
log.error("Error creating video entity for {}", entry, e);
return null;
}
}).filter(video -> video != null).toList();
}).filter(Objects::nonNull).toList();
}

private CompletableFuture<Void> saveVideosAsync(List<Video> videos) {
Expand All @@ -191,40 +197,47 @@ private CompletableFuture<Void> saveVideosAsync(List<Video> videos) {

private CompletableFuture<Void> saveMusicAsync(List<Song> songs) {
return CompletableFuture.runAsync(() -> {
// Get all content IDs in one query to avoid N+1 problem
Set<String> existingContentIds = new HashSet<>();
musicRepository.findAllContentIds().forEach(existingContentIds::add);

List<Song> nonExistingSongs = songs.stream()
.filter(song -> !musicRepository.existsByContentId(song.getContentId()))
.filter(song -> !existingContentIds.contains(song.getContentId()))
.toList();
musicRepository.saveAll(nonExistingSongs);

// Process in batches of 500
final int batchSize = 500;
for (int i = 0; i < nonExistingSongs.size(); i += batchSize) {
List<Song> batch = nonExistingSongs.subList(
i, Math.min(i + batchSize, nonExistingSongs.size())
);
musicRepository.saveAll(batch);
}
}).thenRun(() -> log.info("Finished Indexing Music"));
}

private List<Song> createMusicEntities(List<Path> paths) throws IOException {
Song song = null;
List<Song> songs = new ArrayList<>();
Path relativePath;
String encodedFileName;

for (Path entry : paths) {
log.debug(entry.toString());
encodedFileName = decodePathSegment.apply(entry.getFileName().toString());

// Relativize the entry path against the user home directory
relativePath = Paths.get(ContentDirectoryServices.userHomePath).relativize(entry);

song = new Song()
.setName(encodedFileName)
.setContentLength(Files.size(entry))
.setSummary(entry.getFileName().toString())
.setContentId(File.separator + decodePathSegment.apply(relativePath.toString()))
//.setContentId(decodePathSegment.apply(relativePath.toString()))
.setContentMimeType(decodeContentType.apply(entry))
.setSongId(encodedFileName)
.setSource(SOURCE.LOCAL);

songs.add(song);
}
Path userHomePath = Paths.get(ContentDirectoryServices.userHomePath);

return paths.parallelStream().map(entry -> {
try {
log.debug(entry.toString());
String encodedFileName = decodePathSegment.apply(entry.getFileName().toString());
Path relativePath = userHomePath.relativize(entry);

return songs;
return new Song()
.setName(encodedFileName)
.setContentLength(Files.size(entry))
.setSummary(entry.getFileName().toString())
.setContentId(File.separator + decodePathSegment.apply(relativePath.toString()))
.setContentMimeType(decodeContentType.apply(entry))
.setSongId(encodedFileName)
.setSource(SOURCE.LOCAL);
} catch (IOException e) {
log.error("Error creating song entity for {}", entry, e);
return null;
}
}).filter(Objects::nonNull).toList();
}

private Video createVideoEntity(TorrentFile file, String torrentName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,60 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class RetryService<T> {

//@Value("${retryAttempts}")
private int retryAttempts = 4;
//@Value("${timeToWait}")
private final long timeToWait = TimeUnit.SECONDS.toSeconds(1000);
private static final int DEFAULT_MAX_ATTEMPTS = 4;
private static final long DEFAULT_WAIT_MILLIS = 1000L;

public T retry(RetryExecutor<T> retryExecutor) {
return retry(retryExecutor, DEFAULT_MAX_ATTEMPTS, DEFAULT_WAIT_MILLIS);
}

public T retry(RetryExecutor<T> retryExecutor, int maxAttempts, long waitMillis) {
int remainingAttempts = maxAttempts;

while (shouldRetry()) {
while (remainingAttempts > 0) {
try {
//log.info("Retrying...");
T result = retryExecutor.run();
if (result!=null) {
if (result != null) {
return result;
}
//return; // if successful, exit method
// Result is null, decrement attempts and retry
remainingAttempts--;
if (remainingAttempts > 0 && !waitBeforeNextRetry(waitMillis)) {
return null; // Interrupted, exit early
}
} catch (Exception e) {
retryAttempts--;
if (shouldRetry()) {
//log.error(e.getMessage(), e);
remainingAttempts--;
if (remainingAttempts > 0) {
log.error(e.getMessage());
waitBeforeNextRetry();
if (!waitBeforeNextRetry(waitMillis)) {
return null; // Interrupted, exit early
}
} else {
//throw e; // if all retries failed, throw the exception
log.error(e.getMessage(), e);
}
}
}

return null; // if all retries failed
}

private boolean shouldRetry() {
return retryAttempts > 0;
return null;
}

private void waitBeforeNextRetry() {
/**
* Waits before the next retry attempt.
* @return true if wait completed successfully, false if interrupted
*/
private boolean waitBeforeNextRetry(long waitMillis) {
try {
log.info("Waiting before next retry...");
Thread.sleep(timeToWait);
} catch (Exception e) {
log.error("Exception while waiting for next retry {}", e.getMessage(), e);
Thread.sleep(waitMillis);
return true;
} catch (InterruptedException e) {
log.error("Retry interrupted: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,26 @@ public class TorrentDownloadManager {
final DownloadProgressHandler downloadProgressHandler;

public void startDownload(DownloadTask downloadTask) {
TorrentClient torrentClient = clients.get(downloadTask.getTorrentHash());
if (!downloads.existsById(downloadTask.getTorrentHash())) {
try {
if (torrentClient != null) {
torrentClient.resume();
} else {
torrentClient = new TorrentClient(
downloadTaskToOptions(downloadTask),
indexer,
downloadProgressHandler,
this); // Passing current instance
clients.put(downloadTask.getTorrentHash(), torrentClient);
torrentClient.resume();
}
downloads.save(downloadTask);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
} else {
if (torrentClient != null) {
torrentClient.resume();
} else {
String torrentHash = downloadTask.getTorrentHash();
TorrentClient torrentClient = clients.get(torrentHash);
boolean isNewDownload = !downloads.existsById(torrentHash);

try {
if (torrentClient == null) {
torrentClient = new TorrentClient(
downloadTaskToOptions(downloadTask),
indexer,
downloadProgressHandler,
this); // Passing current instance
clients.put(downloadTask.getTorrentHash(), torrentClient);
torrentClient.resume();
this);
clients.put(torrentHash, torrentClient);
}
torrentClient.resume();

if (isNewDownload) {
downloads.save(downloadTask);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/brogrammer/streamspace/yt/YoutubeCrawler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class YoutubeCrawler {

private static final Pattern POLYMER_INITIAL_DATA_REGEX = Pattern.compile("(window\\[\"ytInitialData\"]|var ytInitialData)\\s*=\\s*(.*);");
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public YouTubeResponseDTO getYoutubeTrailersByTitle(String searchQuery) {

Expand Down Expand Up @@ -70,20 +71,14 @@ private Content crawlSearchResults(String searchQuery) {
Document document = Jsoup.connect("https://www.youtube.com/results?search_query=" + searchQuery + " trailer")
.get();

// document.getElementsByTag("a").forEach(System.out::println); // This will get all links in the document
// Match the JSON from the HTML. It should be within a script tag
// String matcher0 = matcher.group(0);
// String matcher1 = matcher.group(1);
// String matcher2 = matcher.group(2);
Matcher matcher = POLYMER_INITIAL_DATA_REGEX.matcher(document.html());
if (!matcher.find()) {
log.warn("Failed to match ytInitialData JSON object");
}

ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(matcher.group(2));
JsonNode jsonNode = OBJECT_MAPPER.readTree(matcher.group(2));
JsonNode contents = jsonNode.get("contents");
return Objects.requireNonNull(objectMapper.treeToValue(contents, Content.class));
return Objects.requireNonNull(OBJECT_MAPPER.treeToValue(contents, Content.class));
});
}

Expand Down