From 03ae4d7d08549b15c0caea60f84b28f45d3006e8 Mon Sep 17 00:00:00 2001 From: astrobounce Date: Sun, 12 Oct 2025 12:42:08 +0530 Subject: [PATCH 1/4] refactor(client) introduce constructor pattern and connection abstraction --- client.go | 40 ++++++++++++++++++++++++++++++++++++++++ main.go | 11 ++++++++++- 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 client.go diff --git a/client.go b/client.go new file mode 100644 index 0000000..39013cd --- /dev/null +++ b/client.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + "net" + "time" +) + +// Create Client struct +type Client struct { + conn net.Conn + brokerAddress string + connectionTimeout time.Duration +} + +// NewClient creates a new client return client and error +func NewClient(brokerAddress string) *Client { + return &Client{ + brokerAddress: brokerAddress, + connectionTimeout: 10 * time.Second, + } +} + +func (c *Client) Connect() error { + + fmt.Printf("Connecting to broker at %s\n", brokerAddress) + + //Create Dialup Connection to Broker + conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) + if err != nil { + return err + } + c.conn = conn + fmt.Printf("Connected to broker at %s\n", brokerAddress) + return nil +} + +func (c *Client) Close() net.Conn { + return c.conn +} diff --git a/main.go b/main.go index 3713fb4..64f4461 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,16 @@ const ( ) func main() { - //1. Open tcp connection to broker + //1. Create Kafka Client + + client := NewClient(brokerAddress) + + // Establish Connection to broker + if err := client.Connect(); err != nil { + fmt.Printf("Failed to connect to broker: %s\n", err) + } + defer client.Close() + fmt.Printf("Connecting to broker at %s\n", brokerAddress) conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) if err != nil { From f87215d4421f0290b1af58ed4e0555faa2867292 Mon Sep 17 00:00:00 2001 From: astrobounce Date: Sun, 12 Oct 2025 12:46:48 +0530 Subject: [PATCH 2/4] Remove redundant code --- main.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/main.go b/main.go index 64f4461..2b58113 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "io" "log" "net" - "time" ) const ( @@ -33,15 +32,6 @@ func main() { } defer client.Close() - fmt.Printf("Connecting to broker at %s\n", brokerAddress) - conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) - if err != nil { - fmt.Printf("Failed to connect to broker: %v\n", err) - return - } - defer conn.Close() - fmt.Printf("Connected to broker at %s\n", brokerAddress) - //2. Construct create topic request createTopicsRequestBytes, err := buildCreateTopicsRequest() if err != nil { From f804ade59d32dd28a1dea3b0013ed213df69b4f1 Mon Sep 17 00:00:00 2001 From: astrobounce Date: Sun, 12 Oct 2025 12:51:48 +0530 Subject: [PATCH 3/4] Fixed connection issue --- client.go | 6 +++--- main.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 39013cd..64ccd9a 100644 --- a/client.go +++ b/client.go @@ -21,18 +21,18 @@ func NewClient(brokerAddress string) *Client { } } -func (c *Client) Connect() error { +func (c *Client) Connect() (net.Conn, error) { fmt.Printf("Connecting to broker at %s\n", brokerAddress) //Create Dialup Connection to Broker conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) if err != nil { - return err + return nil, err } c.conn = conn fmt.Printf("Connected to broker at %s\n", brokerAddress) - return nil + return c.conn, nil } func (c *Client) Close() net.Conn { diff --git a/main.go b/main.go index 2b58113..8360320 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,8 @@ func main() { client := NewClient(brokerAddress) // Establish Connection to broker - if err := client.Connect(); err != nil { + conn, err := client.Connect() + if err != nil { fmt.Printf("Failed to connect to broker: %s\n", err) } defer client.Close() From c639147b80622651aa76eb255222f1437418248d Mon Sep 17 00:00:00 2001 From: astrobounce Date: Sun, 12 Oct 2025 13:39:41 +0530 Subject: [PATCH 4/4] refactor(config) move configs into separate file --- client.go | 20 ++++++++++---------- config.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 50 +++++++++++++++++++++----------------------------- 3 files changed, 80 insertions(+), 39 deletions(-) create mode 100644 config.go diff --git a/client.go b/client.go index 64ccd9a..277daff 100644 --- a/client.go +++ b/client.go @@ -3,35 +3,35 @@ package main import ( "fmt" "net" - "time" ) // Create Client struct type Client struct { - conn net.Conn - brokerAddress string - connectionTimeout time.Duration + conn net.Conn + config *ClientConfig } // NewClient creates a new client return client and error -func NewClient(brokerAddress string) *Client { +func NewClient(config *ClientConfig) *Client { + if config == nil { + config = DefaultConfig() + } return &Client{ - brokerAddress: brokerAddress, - connectionTimeout: 10 * time.Second, + config: config, } } func (c *Client) Connect() (net.Conn, error) { - fmt.Printf("Connecting to broker at %s\n", brokerAddress) + fmt.Printf("Connecting to broker at %s\n", c.config.BrokerAddress) //Create Dialup Connection to Broker - conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) + conn, err := net.DialTimeout("tcp", c.config.BrokerAddress, c.config.ConnectionTimeout) if err != nil { return nil, err } c.conn = conn - fmt.Printf("Connected to broker at %s\n", brokerAddress) + fmt.Printf("Connected to broker at %s\n", c.config.BrokerAddress) return c.conn, nil } diff --git a/config.go b/config.go new file mode 100644 index 0000000..5eef569 --- /dev/null +++ b/config.go @@ -0,0 +1,49 @@ +package main + +import "time" + +type ClientConfig struct { + BrokerAddress string + ClientID string + ConnectionTimeout time.Duration +} + +type TopicConfig struct { + Name string + NumPartitions int32 + ReplicationFactor int32 + TimeoutMs int32 + ValidateOnly bool +} + +type ProtocolConfig struct { + APIKeyCreateTopics int16 + APIVersion int16 + CorrelationID int32 +} + +func DefaultConfig() *ClientConfig { + return &ClientConfig{ + BrokerAddress: "localhost:9092", + ClientID: "kafka-client", + ConnectionTimeout: 10 * time.Second, + } +} + +func DefaultTopicConfig(topicName string, numPartition int32) *TopicConfig { + return &TopicConfig{ + Name: topicName, + NumPartitions: numPartition, + ReplicationFactor: -1, + TimeoutMs: 10000, + ValidateOnly: false, + } +} + +func DefaultProtocolConfig() *ProtocolConfig { + return &ProtocolConfig{ + APIKeyCreateTopics: 19, + APIVersion: 4, + CorrelationID: 123, + } +} diff --git a/main.go b/main.go index 8360320..44b0e52 100644 --- a/main.go +++ b/main.go @@ -9,22 +9,14 @@ import ( "net" ) -const ( - brokerAddress = "localhost:9092" - apiKeyCreateTopics = int16(19) - apiVersion = int16(4) - clientID = "kafka-topic-creator" - - correlationID int32 = 123 - numPartitions = 5 - replicationFactor = -1 - topicName = "user-events" -) - func main() { + // Iniatialize configurations + clientConfig := DefaultConfig() + topicConfig := DefaultTopicConfig("user-events", 3) + protocolConfig := DefaultProtocolConfig() //1. Create Kafka Client - client := NewClient(brokerAddress) + client := NewClient(clientConfig) // Establish Connection to broker conn, err := client.Connect() @@ -34,7 +26,7 @@ func main() { defer client.Close() //2. Construct create topic request - createTopicsRequestBytes, err := buildCreateTopicsRequest() + createTopicsRequestBytes, err := buildCreateTopicsRequest(clientConfig, topicConfig, protocolConfig) if err != nil { fmt.Printf("Failed to build CreateTopicsRequest: %v\n", err) return @@ -59,28 +51,28 @@ func main() { log.Printf("Response bytes: %x", responseBytes) // 5. Parse the response - err = parseCreateTopicResponse(responseBytes) + err = parseCreateTopicResponse(responseBytes, topicConfig.Name, protocolConfig.CorrelationID) if err != nil { log.Fatalf("Topic creation failed: %v", err) } - log.Printf("Successfully created topic '%s'!", topicName) + log.Printf("Successfully created topic '%s'!", topicConfig.Name) } // Kafka Create Topic Rquest Builder -func buildCreateTopicsRequest() ([]byte, error) { +func buildCreateTopicsRequest(clientCfg *ClientConfig, topicCfg *TopicConfig, protocolCfg *ProtocolConfig) ([]byte, error) { buffer := new(bytes.Buffer) //Constructing Kafka CreateTopicsRequest Header - binary.Write(buffer, binary.BigEndian, apiKeyCreateTopics) - binary.Write(buffer, binary.BigEndian, apiVersion) - binary.Write(buffer, binary.BigEndian, correlationID) - writeString(buffer, clientID) + binary.Write(buffer, binary.BigEndian, protocolCfg.APIKeyCreateTopics) + binary.Write(buffer, binary.BigEndian, protocolCfg.APIVersion) + binary.Write(buffer, binary.BigEndian, protocolCfg.CorrelationID) + writeString(buffer, clientCfg.ClientID) binary.Write(buffer, binary.BigEndian, int32(1)) - writeString(buffer, topicName) - binary.Write(buffer, binary.BigEndian, int32(numPartitions)) - binary.Write(buffer, binary.BigEndian, int16(replicationFactor)) + writeString(buffer, topicCfg.Name) + binary.Write(buffer, binary.BigEndian, int32(topicCfg.NumPartitions)) + binary.Write(buffer, binary.BigEndian, int16(topicCfg.ReplicationFactor)) //Assignments array (empty for auto-assignment) binary.Write(buffer, binary.BigEndian, int32(0)) @@ -102,14 +94,14 @@ func buildCreateTopicsRequest() ([]byte, error) { } // parseCreateTopicResponse reads the binary response and checks for errors. -func parseCreateTopicResponse(data []byte) error { +func parseCreateTopicResponse(data []byte, expectedTopicName string, expectedCorrelationID int32) error { reader := bytes.NewReader(data) // Response Header: Correlation ID (4 bytes) var receivedCorrID int32 binary.Read(reader, binary.BigEndian, &receivedCorrID) - if receivedCorrID != correlationID { - return fmt.Errorf("correlation ID mismatch: expected %d, got %d", correlationID, receivedCorrID) + if receivedCorrID != expectedCorrelationID { + return fmt.Errorf("correlation ID mismatch: expected %d, got %d", expectedCorrelationID, receivedCorrID) } // Throttle Time (4 bytes) @@ -128,8 +120,8 @@ func parseCreateTopicResponse(data []byte) error { if err != nil { return fmt.Errorf("could not read topic name from response: %w", err) } - if respTopicName != topicName { - return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", topicName, respTopicName) + if respTopicName != expectedTopicName { + return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", expectedTopicName, respTopicName) } // The crucial error code