Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
"net"
)

// Create Client struct
type Client struct {
conn net.Conn
config *ClientConfig
}

// NewClient creates a new client return client and error
func NewClient(config *ClientConfig) *Client {
if config == nil {
config = DefaultConfig()
}
return &Client{
config: config,
}
}

func (c *Client) Connect() (net.Conn, error) {

fmt.Printf("Connecting to broker at %s\n", c.config.BrokerAddress)

//Create Dialup Connection to Broker
conn, err := net.DialTimeout("tcp", c.config.BrokerAddress, c.config.ConnectionTimeout)
if err != nil {
return nil, err
}
c.conn = conn
fmt.Printf("Connected to broker at %s\n", c.config.BrokerAddress)
return c.conn, nil
}

func (c *Client) Close() net.Conn {
return c.conn
}
49 changes: 49 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import "time"

type ClientConfig struct {
BrokerAddress string
ClientID string
ConnectionTimeout time.Duration
}

type TopicConfig struct {
Name string
NumPartitions int32
ReplicationFactor int32
TimeoutMs int32
ValidateOnly bool
}

type ProtocolConfig struct {
APIKeyCreateTopics int16
APIVersion int16
CorrelationID int32
}

func DefaultConfig() *ClientConfig {
return &ClientConfig{
BrokerAddress: "localhost:9092",
ClientID: "kafka-client",
ConnectionTimeout: 10 * time.Second,
}
}

func DefaultTopicConfig(topicName string, numPartition int32) *TopicConfig {
return &TopicConfig{
Name: topicName,
NumPartitions: numPartition,
ReplicationFactor: -1,
TimeoutMs: 10000,
ValidateOnly: false,
}
}

func DefaultProtocolConfig() *ProtocolConfig {
return &ProtocolConfig{
APIKeyCreateTopics: 19,
APIVersion: 4,
CorrelationID: 123,
}
}
62 changes: 27 additions & 35 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,26 @@ import (
"io"
"log"
"net"
"time"
)

const (
brokerAddress = "localhost:9092"
apiKeyCreateTopics = int16(19)
apiVersion = int16(4)
clientID = "kafka-topic-creator"
func main() {
// Iniatialize configurations
clientConfig := DefaultConfig()
topicConfig := DefaultTopicConfig("user-events", 3)
protocolConfig := DefaultProtocolConfig()
//1. Create Kafka Client

correlationID int32 = 123
numPartitions = 5
replicationFactor = -1
topicName = "user-events"
)
client := NewClient(clientConfig)

func main() {
//1. Open tcp connection to broker
fmt.Printf("Connecting to broker at %s\n", brokerAddress)
conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second)
// Establish Connection to broker
conn, err := client.Connect()
if err != nil {
fmt.Printf("Failed to connect to broker: %v\n", err)
return
fmt.Printf("Failed to connect to broker: %s\n", err)
}
defer conn.Close()
fmt.Printf("Connected to broker at %s\n", brokerAddress)
defer client.Close()

//2. Construct create topic request
createTopicsRequestBytes, err := buildCreateTopicsRequest()
createTopicsRequestBytes, err := buildCreateTopicsRequest(clientConfig, topicConfig, protocolConfig)
if err != nil {
fmt.Printf("Failed to build CreateTopicsRequest: %v\n", err)
return
Expand All @@ -59,28 +51,28 @@ func main() {
log.Printf("Response bytes: %x", responseBytes)

// 5. Parse the response
err = parseCreateTopicResponse(responseBytes)
err = parseCreateTopicResponse(responseBytes, topicConfig.Name, protocolConfig.CorrelationID)
if err != nil {
log.Fatalf("Topic creation failed: %v", err)
}

log.Printf("Successfully created topic '%s'!", topicName)
log.Printf("Successfully created topic '%s'!", topicConfig.Name)
}

// Kafka Create Topic Rquest Builder
func buildCreateTopicsRequest() ([]byte, error) {
func buildCreateTopicsRequest(clientCfg *ClientConfig, topicCfg *TopicConfig, protocolCfg *ProtocolConfig) ([]byte, error) {
buffer := new(bytes.Buffer)

//Constructing Kafka CreateTopicsRequest Header
binary.Write(buffer, binary.BigEndian, apiKeyCreateTopics)
binary.Write(buffer, binary.BigEndian, apiVersion)
binary.Write(buffer, binary.BigEndian, correlationID)
writeString(buffer, clientID)
binary.Write(buffer, binary.BigEndian, protocolCfg.APIKeyCreateTopics)
binary.Write(buffer, binary.BigEndian, protocolCfg.APIVersion)
binary.Write(buffer, binary.BigEndian, protocolCfg.CorrelationID)
writeString(buffer, clientCfg.ClientID)

binary.Write(buffer, binary.BigEndian, int32(1))
writeString(buffer, topicName)
binary.Write(buffer, binary.BigEndian, int32(numPartitions))
binary.Write(buffer, binary.BigEndian, int16(replicationFactor))
writeString(buffer, topicCfg.Name)
binary.Write(buffer, binary.BigEndian, int32(topicCfg.NumPartitions))
binary.Write(buffer, binary.BigEndian, int16(topicCfg.ReplicationFactor))
//Assignments array (empty for auto-assignment)
binary.Write(buffer, binary.BigEndian, int32(0))

Expand All @@ -102,14 +94,14 @@ func buildCreateTopicsRequest() ([]byte, error) {
}

// parseCreateTopicResponse reads the binary response and checks for errors.
func parseCreateTopicResponse(data []byte) error {
func parseCreateTopicResponse(data []byte, expectedTopicName string, expectedCorrelationID int32) error {
reader := bytes.NewReader(data)

// Response Header: Correlation ID (4 bytes)
var receivedCorrID int32
binary.Read(reader, binary.BigEndian, &receivedCorrID)
if receivedCorrID != correlationID {
return fmt.Errorf("correlation ID mismatch: expected %d, got %d", correlationID, receivedCorrID)
if receivedCorrID != expectedCorrelationID {
return fmt.Errorf("correlation ID mismatch: expected %d, got %d", expectedCorrelationID, receivedCorrID)
}

// Throttle Time (4 bytes)
Expand All @@ -128,8 +120,8 @@ func parseCreateTopicResponse(data []byte) error {
if err != nil {
return fmt.Errorf("could not read topic name from response: %w", err)
}
if respTopicName != topicName {
return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", topicName, respTopicName)
if respTopicName != expectedTopicName {
return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", expectedTopicName, respTopicName)
}

// The crucial error code
Expand Down