ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Golang 예제
    Language/Golang 2023. 2. 12. 14:38

    Example For Sarama

    call_topic 이라는 이름을 가진 Topic 에 Partition 0 번에 Event를 전달하는 예시

    // ProducerTest
    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/Shopify/sarama"
    )
    
    //Reference : https://pkg.go.dev/github.com/shopify/sarama
    func SyncWriter(brokerList []string) sarama.SyncProducer {
        // For the data collector, we are looking for strong consistency semantics.
        // Because we don't change the flush settings, sarama will try to produce messages
        // as fast as possible to keep latency low.
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
        config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
        config.Producer.Return.Successes = true
    
        // On the broker side, you may want to change the following settings to get
        // stronger consistency guarantees:
        // - For your broker, set `unclean.leader.election.enable` to false
        // - For the topic, you could increase `min.insync.replicas`.
    
        producer, err := sarama.NewSyncProducer(brokerList, config)
        if err != nil {
            fmt.Println("Failed to start Sarama producer:", err)
        }
    
        return producer
    }
    
    func AsyncWriter(brokerList []string) sarama.AsyncProducer {
        // For the access log, we are looking for AP semantics, with high throughput.
        // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
        config := sarama.NewConfig()
        //tlsConfig := createTlsConfiguration()
        //if tlsConfig != nil {
        //    config.Net.TLS.Enable = true
        //    config.Net.TLS.Config = tlsConfig
        //}
        config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
        config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
        config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
    
        producer, err := sarama.NewAsyncProducer(brokerList, config)
        if err != nil {
            fmt.Println("Failed to start Sarama producer:", err)
        }
    
        // We will just log to STDOUT if we're not able to produce messages.
        // Note: messages will only be returned here after all retry attempts are exhausted.
        go func() {
            for err := range producer.Errors() {
                fmt.Println("Failed to write access log entry:", err)
            }
        }()
    
        return producer
    }
    
    func AsyncProducer() {
        var brokerList []string
        brokerList = append(brokerList, "localhost:9092")
    
        x := AsyncWriter(brokerList)
    
        x.Input() <- &sarama.ProducerMessage{
            Topic: "call_topic",
            Key:   sarama.StringEncoder("CallId"),
            Value: sarama.StringEncoder("AgentId"),
        }
    
        time.Sleep(1000 * time.Millisecond)
    }
    
    func SyncPrducer() {
        var brokerList []string
        brokerList = append(brokerList, "localhost:9092")
    
        x := SyncWriter(brokerList)
    
        //returned partition, offset, err
        x.SendMessage(&sarama.ProducerMessage{
            Topic:     "call_topic",
            Key:       sarama.StringEncoder("CallId"),
            Value:     sarama.StringEncoder("AgentId"),
            Partition: int32(0),
            //Offset
            //Metadata
            //Timestamp
            //Headers
        })
    }
    
    func main() {
        SyncPrducer()
        AsyncProducer()
    }

    call_topic 의 Parition 0 번에서 Event를 취득

    Consumer Group 을 이용한 동작에 대해서는 주석 참조

    // ConsumerTest
    package main
    
    import (
        "fmt"
    
        "github.com/Shopify/sarama"
    )
    
    /*
    //Consumer Group Handler
    
    type exampleConsumerGroupHandler struct{}
    
    func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
    func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
    func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
        for msg := range claim.Messages() {
            fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
            sess.MarkMessage(msg, "")
        }
        return nil
    }
    */
    
    //Reference : https://pkg.go.dev/github.com/shopify/sarama
    func main() {
        var nTntIdx int32 = 0 // Partition Index Set
    
        config := sarama.NewConfig()
        //config.Version = version
    
        //default 256
        config.ChannelBufferSize = 1000000
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    
        client, err := sarama.NewClient([]string{"localhost:9092"}, config)
    
        if err != nil {
            panic(err)
        }
    
        // if you want completed message -> 0
        lastoffset, err := client.GetOffset("call_topic", nTntIdx, sarama.OffsetNewest)
    
        if err != nil {
            panic(err)
        }
    
        // if consumer group isn't exist , create it
        //group, err := sarama.NewConsumerGroupFromClient("consumer-group-name", client)
        consumer, err := sarama.NewConsumerFromClient(client)
    
        if err != nil {
            panic(err)
        }
    
        defer func() {
            if err := consumer.Close(); err != nil {
                fmt.Println(err)
            }
        }()
    
        //Read Specific Partition From Topic
        partitionConsumer, err := consumer.ConsumePartition("call_topic", nTntIdx, lastoffset)
        //ctx := context.Background()
        //handler := exampleConsumerGroupHandler{}
        //err := group.Consume(ctx, []string{"call_topic"}, handler)
    
        if err != nil {
            panic(err)
        }
    
        // if ConsumerGroup Skip Underlines
        defer func() {
            if err := partitionConsumer.Close(); err != nil {
                fmt.Println(err)
            }
        }()
    
        // Trap SIGINT to trigger a shutdown.
        consumed := 0
    
        for {
            select {
            case msg := <-partitionConsumer.Messages():
                fmt.Printf("Topic %s Consumed message offset %d , Partition %d\n", msg.Topic, msg.Offset, msg.Partition)
                consumed++
                fmt.Printf("Consumed: %d\n", consumed)
                fmt.Println(string(msg.Key))
                fmt.Println(string(msg.Value))
                fmt.Println("")
            }
        }
    }

    Example For Kafka-go-client

    Consumer Group 을 이용한 Producer , Consumer

    Rebalance 에서 대해서는 go.application.rebalance.enable 변경하여 테스트 진행

    // Example channel-based Apache Kafka producer
    package main
    
    /**
     * Copyright 2016 Confluent Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    import (
        "fmt"
        "os"
    
        "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
        if len(os.Args) != 3 {
            fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
                os.Args[0])
            os.Exit(1)
        }
    
        broker := os.Args[1] // ex ) localhost:9092
        topic := os.Args[2]  // ex ) call_topic
    
        p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
    
        if err != nil {
            fmt.Printf("Failed to create producer: %s\n", err)
            os.Exit(1)
        }
    
        fmt.Printf("Created Producer %v\n", p)
    
        doneChan := make(chan bool)
    
        go func() {
            defer close(doneChan)
            for e := range p.Events() {
                switch ev := e.(type) {
                case *kafka.Message:
                    m := ev
                    if m.TopicPartition.Error != nil {
                        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
                    } else {
                        fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
                            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
                    }
                    return
    
                default:
                    fmt.Printf("Ignored event: %s\n", ev)
                }
            }
        }()
    
        value := "Hello Go!"
        p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}
    
        // wait for delivery report goroutine to finish
        _ = <-doneChan
    
        p.Close()
    }
    // Example channel-based high-level Apache Kafka consumer
    package main
    
    /**
     * Copyright 2016 Confluent Inc.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    import (
        "fmt"
        "os"
        "os/signal"
        "syscall"
    
        "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
        if len(os.Args) < 4 {
            fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <topics..>\n",
                os.Args[0])
            os.Exit(1)
        }
    
        broker := os.Args[1]  // ex ) localhost:9092
        group := os.Args[2]   // ex ) CALL
        topics := os.Args[3:] // ex ) call_topic
    
        sigchan := make(chan os.Signal, 1)
        signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    
        c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":               broker,
            "group.id":                        group,
            "session.timeout.ms":              6000,
            "go.events.channel.enable":        true,
            "go.application.rebalance.enable": false,
            // Enable generation of PartitionEOF when the
            // end of a partition is reached.
            "enable.partition.eof": true,
            "auto.offset.reset":    "earliest"})
    
        if err != nil {
            fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
            os.Exit(1)
        }
    
        fmt.Printf("Created Consumer %v\n", c)
    
        err = c.SubscribeTopics(topics, nil)
    
        run := true
    
        for run == true {
            select {
            case sig := <-sigchan:
                fmt.Printf("Caught signal %v: terminating\n", sig)
                run = false
    
            case ev := <-c.Events():
                switch e := ev.(type) {
                case kafka.AssignedPartitions:
                    fmt.Fprintf(os.Stderr, "%% %v\n", e)
                    c.Assign(e.Partitions)
                case kafka.RevokedPartitions:
                    fmt.Fprintf(os.Stderr, "%% %v\n", e)
                    c.Unassign()
                case *kafka.Message:
                    fmt.Printf("%% Message on %s:\n%s\n",
                        e.TopicPartition, string(e.Value))
                case kafka.PartitionEOF:
                    fmt.Printf("%% Reached %v\n", e)
                case kafka.Error:
                    // Errors should generally be considered as informational, the client will try to automatically recover
                    fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                }
            }
        }
    
        fmt.Printf("Closing consumer\n")
        c.Close()
    }
    # Get Consumer Group - Consumer List ( if -members remove, Main Connection Only )
    ./bin/kafka-consumer-groups.sh --describe --group CALL --members --bootstrap-server localhost:9092
    
    # how to Get Consumer Group Information from Go Code???
    // Spectific Topic Partition Count
    meta, err := c.GetMetadata(&s, true, 5000)
    
        if err == nil {
            for key, topicmeta := range meta.Topics {
                fmt.Println("Meta Topic Key : " + key)
    
                if "call_topic" == key {
                    fmt.Println(len(topicmeta.Partitions))
    
                    for _, partitionMeta := range topicmeta.Partitions {
                        fmt.Println(partitionMeta)
                    }
                } else {
                    fmt.Println("Meta Topic No Match Skip Info")
                }
            }
        } else {
            fmt.Println(err)
        }

    'Language > Golang' 카테고리의 다른 글

    Golang에서 Json 사용예시  (0) 2023.02.07
    CGO ( Golang With C++ ) 케이스별 설명 및 예시  (0) 2023.02.07
    문법 및 사용법 예시  (0) 2023.02.07
    자료형 및 키워드  (0) 2023.02.07
    Golang 설치 및 IDE  (0) 2023.02.07
Designed by Tistory.