Skip to content

Data-ARENA-Space/data-nadhi-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data Nadhi Queue

npm version License: MIT GitHub issues GitHub stars

A cost-effective, scalable queue framework that combines PostgreSQL and object storage (MinIO/S3) to create an intermediate storage system that simulates traditional message queuing at a fraction of the cost of cloud-provided queuing systems.

🔗 Quick Links:

🚀 Overview

Data Nadhi Queue provides a robust queuing solution by separating concerns:

  • PostgreSQL: Handles queue metadata, job status tracking, and ordering
  • Object Storage (MinIO/S3): Stores large message payloads efficiently
  • Low Cost: Significantly cheaper than cloud queuing services like AWS SQS, Azure Service Bus, or Google Cloud Tasks

🏗️ Architecture

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Producer      │───▶│   Queue System   │───▶│   Consumer      │
│                 │    │                  │    │                 │
│ - Publishes     │    │ ┌──────────────┐ │    │ - Fetches jobs  │
│   messages      │    │ │ PostgreSQL   │ │    │ - Processes     │
│ - Large         │    │ │ - Job status │ │    │ - Completes     │
│   payloads      │    │ │ - Metadata   │ │    │                 │
└─────────────────┘    │ │ - Ordering   │ │    └─────────────────┘
                       │ └──────────────┘ │
                       │ ┌──────────────┐ │
                       │ │ MinIO/S3     │ │
                       │ │ - Payloads   │ │
                       │ │ - Files      │ │
                       │ └──────────────┘ │
                       └──────────────────┘

✨ Features

  • Cost Efficient: Use your existing PostgreSQL and object storage infrastructure
  • Scalable: Handle large message payloads without bloating your database
  • Reliable: ACID transactions ensure message consistency
  • Flexible: Pluggable storage backend (currently supports MinIO, easily extensible to S3, GCS, etc.)
  • File Organization: Optional file path organization for better message management and namespace separation
  • Simple API: Clean, TypeScript-first interface
  • Status Tracking: Built-in job status management (pending → processing → completed)

📦 Installation

npm install data-nadhi-queue

🔧 Prerequisites

  1. PostgreSQL Database with the following table:
CREATE TABLE queue_log (
    message_id VARCHAR(255) PRIMARY KEY,
    file_key VARCHAR(255) NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_queue_status_created ON queue_log(status, created_at);
  1. MinIO Server or S3-compatible storage with a configured bucket

🚀 Quick Start

import { Pool } from 'pg';
import { Queue, MinioStorage } from 'data-nadhi-queue';

// Setup database connection
const db = new Pool({
  host: 'localhost',
  port: 5432,
  database: 'your_db',
  user: 'your_user',
  password: 'your_password',
});

// Setup storage client
const storage = new MinioStorage(
  'localhost',        // MinIO host
  9000,              // MinIO port
  'minioadmin',      // Access key
  'minioadmin',      // Secret key
  'queue-bucket'     // Bucket name
);

// Create queue instance
const queue = new Queue(db, storage);

// Publish a message
const messageData = { 
  userId: 123, 
  action: 'process_payment', 
  amount: 99.99 
};
await queue.publish('unique-message-id', Buffer.from(JSON.stringify(messageData)));

// Publish with custom file path (organizes messages in folders)
await queue.publish('payment-123', Buffer.from(JSON.stringify(messageData)), 'payments/2025');
// This creates: payments/2025/payment-123.json

// Fetch and process messages
const job = await queue.fetchNext();
if (job) {
  console.log('Processing job:', job.message_id);
  console.log('Job data:', job.data);
  
  // Process your job here...
  
  // Mark as completed
  await queue.complete(job.message_id);
}

📚 API Reference

Queue

Constructor

new Queue(db: Pool, storage: StorageClient)

Methods

publish(messageId: string, data: Buffer, filePath?: string): Promise<void>

Publishes a new message to the queue.

  • messageId: Unique identifier for the message
  • data: Message payload as Buffer
  • filePath: Optional path prefix for organizing messages in folders/namespaces

Examples:

// Basic usage
await queue.publish('msg-001', buffer);
// Creates: msg-001.json

// With file path organization
await queue.publish('msg-001', buffer, 'orders/2025/october');
// Creates: orders/2025/october/msg-001.json
fetchNext(): Promise<Job | null>

Fetches the next pending job from the queue.

Returns a job object with:

{
  message_id: string;
  file_key: string;
  status: string;
  created_at: Date;
  updated_at: Date;
  data: any; // Parsed JSON data
}
complete(messageId: string): Promise<void>

Marks a job as completed and cleans up associated storage.

MinioStorage

Constructor

new MinioStorage(host: string, port: number, accessKey: string, secretKey: string, bucket: string)

StorageClient Interface

Implement this interface to create custom storage backends:

interface StorageClient {
  upload(fileKey: string, data: Buffer): Promise<void>;
  download(fileKey: string): Promise<Buffer>;
  delete(fileKey: string): Promise<void>;
}

🔌 Custom Storage Backends

You can easily implement custom storage backends by implementing the StorageClient interface:

import { StorageClient } from 'data-nadhi-queue';

class S3Storage implements StorageClient {
  // Implement upload, download, delete methods
}

const queue = new Queue(db, new S3Storage());

🏗️ Use Cases

  • Background Job Processing: Handle heavy computations, file processing, or API calls
  • Event-Driven Architecture: Decouple microservices with reliable message passing
  • Batch Processing: Queue large datasets for processing during off-peak hours
  • Cost Optimization: Replace expensive cloud queuing services in development/staging environments
  • Hybrid Architectures: Bridge on-premise and cloud systems

📊 Performance Characteristics

  • Throughput: Depends on your PostgreSQL and storage performance
  • Latency: Low latency for metadata operations, storage latency for payload operations
  • Scalability: Horizontal scaling through database read replicas and distributed storage
  • Cost: ~90% cost reduction compared to cloud queuing services for high-volume scenarios

🔗 Links & Resources

🔍 Monitoring

Monitor your queue performance by querying the queue_log table:

-- Check queue depth
SELECT COUNT(*) FROM queue_log WHERE status = 'pending';

-- Check processing jobs
SELECT COUNT(*) FROM queue_log WHERE status = 'processing';

-- Check job age
SELECT message_id, created_at, updated_at 
FROM queue_log 
WHERE status = 'processing' 
  AND updated_at < NOW() - INTERVAL '5 minutes';

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • Built for the Data ARENA Space project
  • Inspired by the need for cost-effective queuing solutions
  • Thanks to all contributors and the open-source community

Made with ❤️ by the Data ARENA Space team

About

A simulation to create queue like structure with File System and postgres

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published