diff --git a/client.go b/client.go new file mode 100644 index 0000000..277daff --- /dev/null +++ b/client.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + "net" +) + +// Create Client struct +type Client struct { + conn net.Conn + config *ClientConfig +} + +// NewClient creates a new client return client and error +func NewClient(config *ClientConfig) *Client { + if config == nil { + config = DefaultConfig() + } + return &Client{ + config: config, + } +} + +func (c *Client) Connect() (net.Conn, error) { + + fmt.Printf("Connecting to broker at %s\n", c.config.BrokerAddress) + + //Create Dialup Connection to Broker + conn, err := net.DialTimeout("tcp", c.config.BrokerAddress, c.config.ConnectionTimeout) + if err != nil { + return nil, err + } + c.conn = conn + fmt.Printf("Connected to broker at %s\n", c.config.BrokerAddress) + return c.conn, nil +} + +func (c *Client) Close() net.Conn { + return c.conn +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..5eef569 --- /dev/null +++ b/config.go @@ -0,0 +1,49 @@ +package main + +import "time" + +type ClientConfig struct { + BrokerAddress string + ClientID string + ConnectionTimeout time.Duration +} + +type TopicConfig struct { + Name string + NumPartitions int32 + ReplicationFactor int32 + TimeoutMs int32 + ValidateOnly bool +} + +type ProtocolConfig struct { + APIKeyCreateTopics int16 + APIVersion int16 + CorrelationID int32 +} + +func DefaultConfig() *ClientConfig { + return &ClientConfig{ + BrokerAddress: "localhost:9092", + ClientID: "kafka-client", + ConnectionTimeout: 10 * time.Second, + } +} + +func DefaultTopicConfig(topicName string, numPartition int32) *TopicConfig { + return &TopicConfig{ + Name: topicName, + NumPartitions: numPartition, + ReplicationFactor: -1, + TimeoutMs: 10000, + ValidateOnly: false, + } +} + +func DefaultProtocolConfig() *ProtocolConfig { + return &ProtocolConfig{ + APIKeyCreateTopics: 19, + APIVersion: 4, + CorrelationID: 123, + } +} diff --git a/main.go b/main.go index 3713fb4..44b0e52 100644 --- a/main.go +++ b/main.go @@ -7,34 +7,26 @@ import ( "io" "log" "net" - "time" ) -const ( - brokerAddress = "localhost:9092" - apiKeyCreateTopics = int16(19) - apiVersion = int16(4) - clientID = "kafka-topic-creator" +func main() { + // Iniatialize configurations + clientConfig := DefaultConfig() + topicConfig := DefaultTopicConfig("user-events", 3) + protocolConfig := DefaultProtocolConfig() + //1. Create Kafka Client - correlationID int32 = 123 - numPartitions = 5 - replicationFactor = -1 - topicName = "user-events" -) + client := NewClient(clientConfig) -func main() { - //1. Open tcp connection to broker - fmt.Printf("Connecting to broker at %s\n", brokerAddress) - conn, err := net.DialTimeout("tcp", brokerAddress, 10*time.Second) + // Establish Connection to broker + conn, err := client.Connect() if err != nil { - fmt.Printf("Failed to connect to broker: %v\n", err) - return + fmt.Printf("Failed to connect to broker: %s\n", err) } - defer conn.Close() - fmt.Printf("Connected to broker at %s\n", brokerAddress) + defer client.Close() //2. Construct create topic request - createTopicsRequestBytes, err := buildCreateTopicsRequest() + createTopicsRequestBytes, err := buildCreateTopicsRequest(clientConfig, topicConfig, protocolConfig) if err != nil { fmt.Printf("Failed to build CreateTopicsRequest: %v\n", err) return @@ -59,28 +51,28 @@ func main() { log.Printf("Response bytes: %x", responseBytes) // 5. Parse the response - err = parseCreateTopicResponse(responseBytes) + err = parseCreateTopicResponse(responseBytes, topicConfig.Name, protocolConfig.CorrelationID) if err != nil { log.Fatalf("Topic creation failed: %v", err) } - log.Printf("Successfully created topic '%s'!", topicName) + log.Printf("Successfully created topic '%s'!", topicConfig.Name) } // Kafka Create Topic Rquest Builder -func buildCreateTopicsRequest() ([]byte, error) { +func buildCreateTopicsRequest(clientCfg *ClientConfig, topicCfg *TopicConfig, protocolCfg *ProtocolConfig) ([]byte, error) { buffer := new(bytes.Buffer) //Constructing Kafka CreateTopicsRequest Header - binary.Write(buffer, binary.BigEndian, apiKeyCreateTopics) - binary.Write(buffer, binary.BigEndian, apiVersion) - binary.Write(buffer, binary.BigEndian, correlationID) - writeString(buffer, clientID) + binary.Write(buffer, binary.BigEndian, protocolCfg.APIKeyCreateTopics) + binary.Write(buffer, binary.BigEndian, protocolCfg.APIVersion) + binary.Write(buffer, binary.BigEndian, protocolCfg.CorrelationID) + writeString(buffer, clientCfg.ClientID) binary.Write(buffer, binary.BigEndian, int32(1)) - writeString(buffer, topicName) - binary.Write(buffer, binary.BigEndian, int32(numPartitions)) - binary.Write(buffer, binary.BigEndian, int16(replicationFactor)) + writeString(buffer, topicCfg.Name) + binary.Write(buffer, binary.BigEndian, int32(topicCfg.NumPartitions)) + binary.Write(buffer, binary.BigEndian, int16(topicCfg.ReplicationFactor)) //Assignments array (empty for auto-assignment) binary.Write(buffer, binary.BigEndian, int32(0)) @@ -102,14 +94,14 @@ func buildCreateTopicsRequest() ([]byte, error) { } // parseCreateTopicResponse reads the binary response and checks for errors. -func parseCreateTopicResponse(data []byte) error { +func parseCreateTopicResponse(data []byte, expectedTopicName string, expectedCorrelationID int32) error { reader := bytes.NewReader(data) // Response Header: Correlation ID (4 bytes) var receivedCorrID int32 binary.Read(reader, binary.BigEndian, &receivedCorrID) - if receivedCorrID != correlationID { - return fmt.Errorf("correlation ID mismatch: expected %d, got %d", correlationID, receivedCorrID) + if receivedCorrID != expectedCorrelationID { + return fmt.Errorf("correlation ID mismatch: expected %d, got %d", expectedCorrelationID, receivedCorrID) } // Throttle Time (4 bytes) @@ -128,8 +120,8 @@ func parseCreateTopicResponse(data []byte) error { if err != nil { return fmt.Errorf("could not read topic name from response: %w", err) } - if respTopicName != topicName { - return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", topicName, respTopicName) + if respTopicName != expectedTopicName { + return fmt.Errorf("topic name mismatch: expected '%s', got '%s'", expectedTopicName, respTopicName) } // The crucial error code