From fa160f007677aaa80b8601fdf49a020c9327626e Mon Sep 17 00:00:00 2001 From: Mark Wylde Date: Sun, 9 Feb 2025 22:50:35 +0000 Subject: [PATCH] feat: implement delete stream --- src/db.ts | 18 ------------------ src/index.ts | 23 ++++++++++++++++++++--- test/index.test.ts | 25 +++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 21 deletions(-) delete mode 100644 src/db.ts diff --git a/src/db.ts b/src/db.ts deleted file mode 100644 index 16ac655..0000000 --- a/src/db.ts +++ /dev/null @@ -1,18 +0,0 @@ -import createDoubledb from 'doubledb'; -import { join } from 'path'; -import { mkdir } from 'fs/promises'; -import { tmpdir } from 'os'; - -export async function createDb(streamName: string, dbPath?: string) { - const path = dbPath - ? join(dbPath, `doubledb-${streamName}`) - : join(tmpdir(), `doubledb-${streamName}`); - - // Ensure the directory exists - await mkdir(dbPath || tmpdir(), { recursive: true }); - - // Open existing DB or create a new one - const db = await createDoubledb(path); - - return db; -} diff --git a/src/index.ts b/src/index.ts index 58f7b25..2f0dffb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import createDoubledb, { DoubleDb } from 'doubledb'; import { EventbaseNats, setupNats } from './nats.js'; import { JetStreamClient, JetStreamManager } from '@nats-io/jetstream'; import { v4 as uuidv4 } from 'uuid'; +import { rmdir } from 'fs/promises'; const base64encode = (str: string) => Buffer.from(str).toString('base64'); @@ -264,6 +265,20 @@ export async function createEventbase(config: EventbaseConfig) { getLastAccessed: () => lastAccessed, getActiveSubscriptions: () => activeSubscriptions, + deleteStream: async () => { + await jsm.streams.purge(config.streamName); + await jsm.streams.delete(config.streamName); + await instance.close(); + + await Promise.all([ + db.close(), + metaDb.close(), + settingsDb.close() + ]); + + await rmdir(config.dbPath || './data', { recursive: true }); + }, + close: async () => { instance.closed = true; await stream.stop(); @@ -374,9 +389,11 @@ async function replayEvents( return { waitUntilReady: () => readyPromise, stop: async () => { - await messages.close(); - await processing; - await consumer.delete(); + try { + await messages.close(); + await processing; + await consumer.delete(); + } catch (error) {} }, }; } diff --git a/test/index.test.ts b/test/index.test.ts index 06a343f..9add800 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -445,4 +445,29 @@ describe('Eventbase with Stats', async () => { assert.deepEqual(result, expectedResult); }); + + test('should delete the stream', async () => { + await eventbase1.put('user1', { name: 'Jane Doe' }); + await eventbase1.put('user2', { name: 'Jane Doe' }); + await eventbase1.put('user3', { name: 'Jane Doe' }); + + await eventbase1.deleteStream(); + + const eb = await createEventbase({ + dbPath: './test-data/' + streamName + '-node1', + nats: { + servers: ['localhost:4222'], + user: 'a', + pass: 'a', + }, + streamName, + statsStreamName, + }) + + const user1 = await eb.get('user1') + + assert.equal(user1, null) + + eb.close(); + }); });