-
Kafka Golang 예제Language/Golang 2023. 2. 12. 14:38
Example For Sarama
call_topic 이라는 이름을 가진 Topic 에 Partition 0 번에 Event를 전달하는 예시
// ProducerTest package main import ( "fmt" "time" "github.com/Shopify/sarama" ) //Reference : https://pkg.go.dev/github.com/shopify/sarama func SyncWriter(brokerList []string) sarama.SyncProducer { // For the data collector, we are looking for strong consistency semantics. // Because we don't change the flush settings, sarama will try to produce messages // as fast as possible to keep latency low. config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message config.Producer.Return.Successes = true // On the broker side, you may want to change the following settings to get // stronger consistency guarantees: // - For your broker, set `unclean.leader.election.enable` to false // - For the topic, you could increase `min.insync.replicas`. producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { fmt.Println("Failed to start Sarama producer:", err) } return producer } func AsyncWriter(brokerList []string) sarama.AsyncProducer { // For the access log, we are looking for AP semantics, with high throughput. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency. config := sarama.NewConfig() //tlsConfig := createTlsConfiguration() //if tlsConfig != nil { // config.Net.TLS.Enable = true // config.Net.TLS.Config = tlsConfig //} config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack config.Producer.Compression = sarama.CompressionSnappy // Compress messages config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { fmt.Println("Failed to start Sarama producer:", err) } // We will just log to STDOUT if we're not able to produce messages. // Note: messages will only be returned here after all retry attempts are exhausted. go func() { for err := range producer.Errors() { fmt.Println("Failed to write access log entry:", err) } }() return producer } func AsyncProducer() { var brokerList []string brokerList = append(brokerList, "localhost:9092") x := AsyncWriter(brokerList) x.Input() <- &sarama.ProducerMessage{ Topic: "call_topic", Key: sarama.StringEncoder("CallId"), Value: sarama.StringEncoder("AgentId"), } time.Sleep(1000 * time.Millisecond) } func SyncPrducer() { var brokerList []string brokerList = append(brokerList, "localhost:9092") x := SyncWriter(brokerList) //returned partition, offset, err x.SendMessage(&sarama.ProducerMessage{ Topic: "call_topic", Key: sarama.StringEncoder("CallId"), Value: sarama.StringEncoder("AgentId"), Partition: int32(0), //Offset //Metadata //Timestamp //Headers }) } func main() { SyncPrducer() AsyncProducer() }
call_topic 의 Parition 0 번에서 Event를 취득
Consumer Group 을 이용한 동작에 대해서는 주석 참조
// ConsumerTest package main import ( "fmt" "github.com/Shopify/sarama" ) /* //Consumer Group Handler type exampleConsumerGroupHandler struct{} func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil } func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil } func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error { for msg := range claim.Messages() { fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset) sess.MarkMessage(msg, "") } return nil } */ //Reference : https://pkg.go.dev/github.com/shopify/sarama func main() { var nTntIdx int32 = 0 // Partition Index Set config := sarama.NewConfig() //config.Version = version //default 256 config.ChannelBufferSize = 1000000 config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange client, err := sarama.NewClient([]string{"localhost:9092"}, config) if err != nil { panic(err) } // if you want completed message -> 0 lastoffset, err := client.GetOffset("call_topic", nTntIdx, sarama.OffsetNewest) if err != nil { panic(err) } // if consumer group isn't exist , create it //group, err := sarama.NewConsumerGroupFromClient("consumer-group-name", client) consumer, err := sarama.NewConsumerFromClient(client) if err != nil { panic(err) } defer func() { if err := consumer.Close(); err != nil { fmt.Println(err) } }() //Read Specific Partition From Topic partitionConsumer, err := consumer.ConsumePartition("call_topic", nTntIdx, lastoffset) //ctx := context.Background() //handler := exampleConsumerGroupHandler{} //err := group.Consume(ctx, []string{"call_topic"}, handler) if err != nil { panic(err) } // if ConsumerGroup Skip Underlines defer func() { if err := partitionConsumer.Close(); err != nil { fmt.Println(err) } }() // Trap SIGINT to trigger a shutdown. consumed := 0 for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Topic %s Consumed message offset %d , Partition %d\n", msg.Topic, msg.Offset, msg.Partition) consumed++ fmt.Printf("Consumed: %d\n", consumed) fmt.Println(string(msg.Key)) fmt.Println(string(msg.Value)) fmt.Println("") } } }
Example For Kafka-go-client
Consumer Group 을 이용한 Producer , Consumer
Rebalance 에서 대해서는 go.application.rebalance.enable 변경하여 테스트 진행
// Example channel-based Apache Kafka producer package main /** * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import ( "fmt" "os" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) != 3 { fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n", os.Args[0]) os.Exit(1) } broker := os.Args[1] // ex ) localhost:9092 topic := os.Args[2] // ex ) call_topic p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) os.Exit(1) } fmt.Printf("Created Producer %v\n", p) doneChan := make(chan bool) go func() { defer close(doneChan) for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: m := ev if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } return default: fmt.Printf("Ignored event: %s\n", ev) } } }() value := "Hello Go!" p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)} // wait for delivery report goroutine to finish _ = <-doneChan p.Close() }
// Example channel-based high-level Apache Kafka consumer package main /** * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import ( "fmt" "os" "os/signal" "syscall" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() { if len(os.Args) < 4 { fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n", os.Args[0]) os.Exit(1) } broker := os.Args[1] // ex ) localhost:9092 group := os.Args[2] // ex ) CALL topics := os.Args[3:] // ex ) call_topic sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": group, "session.timeout.ms": 6000, "go.events.channel.enable": true, "go.application.rebalance.enable": false, // Enable generation of PartitionEOF when the // end of a partition is reached. "enable.partition.eof": true, "auto.offset.reset": "earliest"}) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) run := true for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false case ev := <-c.Events(): switch e := ev.(type) { case kafka.AssignedPartitions: fmt.Fprintf(os.Stderr, "%% %v\n", e) c.Assign(e.Partitions) case kafka.RevokedPartitions: fmt.Fprintf(os.Stderr, "%% %v\n", e) c.Unassign() case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: // Errors should generally be considered as informational, the client will try to automatically recover fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) } } } fmt.Printf("Closing consumer\n") c.Close() }
# Get Consumer Group - Consumer List ( if -members remove, Main Connection Only ) ./bin/kafka-consumer-groups.sh --describe --group CALL --members --bootstrap-server localhost:9092 # how to Get Consumer Group Information from Go Code???
// Spectific Topic Partition Count meta, err := c.GetMetadata(&s, true, 5000) if err == nil { for key, topicmeta := range meta.Topics { fmt.Println("Meta Topic Key : " + key) if "call_topic" == key { fmt.Println(len(topicmeta.Partitions)) for _, partitionMeta := range topicmeta.Partitions { fmt.Println(partitionMeta) } } else { fmt.Println("Meta Topic No Match Skip Info") } } } else { fmt.Println(err) }
'Language > Golang' 카테고리의 다른 글
Golang에서 Json 사용예시 (0) 2023.02.07 CGO ( Golang With C++ ) 케이스별 설명 및 예시 (0) 2023.02.07 문법 및 사용법 예시 (0) 2023.02.07 자료형 및 키워드 (0) 2023.02.07 Golang 설치 및 IDE (0) 2023.02.07