• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    golang如何使用sarama访问kafka

    下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

    使用方式

    1、命令行参数

    $ ./kafkaclient -h
    Usage of ./client:
     -ca string
      CA Certificate (default "ca.pem")
     -cert string
      Client Certificate (default "cert.pem")
     -command string
      consumer|producer (default "consumer")
     -host string
      Common separated kafka hosts (default "localhost:9093")
     -key string
      Client Key (default "key.pem")
     -partition int
      Kafka topic partition
     -tls
      TLS enable
     -topic string
      Kafka topic (default "test--topic")

    2、作为producer启动

    $ ./kafkaclient -command producer \
    
     -host kafka1:9092,kafka2:9092
    
    ## TLS-enabled
    $ ./kafkaclient -command producer \
    
     -tls -cert client.pem -key client.key -ca ca.pem \
    
     -host kafka1:9093,kafka2:9093
    

    producer发送消息给kafka:

    > aaa
    2018/12/15 07:11:21 Produced message: [aaa]
    > bbb
    2018/12/15 07:11:30 Produced message: [bbb]
    > quit

    3、作为consumer启动

    $ ./kafkaclient -command consumer \
    
     -host kafka1:9092,kafka2:9092
    
    ## TLS-enabled
    $ ./kafkaclient -command consumer \
    
     -tls -cert client.pem -key client.key -ca ca.pem \
    
     -host kafka1:9093,kafka2:9093
    

    consumer从kafka接受消息:

    2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
    2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

    完整源代码如下

    这个代码使用到了Shopify/sarama库,请自行下载使用。

    $ cat kafkaclient.go
    package main
    
    import (
     "flag"
     "fmt"
     "log"
     "os"
     "io/ioutil"
     "bufio"
     "strings"
    
     "crypto/tls"
     "crypto/x509"
    
     "github.com/Shopify/sarama"
    )
    
    var (
     command  string
     tlsEnable bool
     hosts  string
     topic  string
     partition int
     clientcert string
     clientkey string
     cacert  string
    )
    
    func main() {
     flag.StringVar(command, "command",  "consumer",   "consumer|producer")
     flag.BoolVar(tlsEnable, "tls",   false,    "TLS enable")
     flag.StringVar(hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
     flag.StringVar(topic,  "topic",  "test--topic",  "Kafka topic")
     flag.IntVar(partition,  "partition", 0,     "Kafka topic partition")
     flag.StringVar(clientcert, "cert",   "cert.pem",   "Client Certificate")
     flag.StringVar(clientkey, "key",   "key.pem",   "Client Key")
     flag.StringVar(cacert,  "ca",   "ca.pem",   "CA Certificate")
     flag.Parse()
    
     config := sarama.NewConfig()
     if tlsEnable {
      //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
      tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
      if err != nil {
       log.Fatal(err)
      }
    
      config.Net.TLS.Enable = true
      config.Net.TLS.Config = tlsConfig
     }
     client, err := sarama.NewClient(strings.Split(hosts, ","), config)
     if err != nil {
      log.Fatalf("unable to create kafka client: %q", err)
     }
    
     if command == "consumer" {
      consumer, err := sarama.NewConsumerFromClient(client)
      if err != nil {
       log.Fatal(err)
      }
      defer consumer.Close()
      loopConsumer(consumer, topic, partition)
     } else {
      producer, err := sarama.NewAsyncProducerFromClient(client)
      if err != nil {
       log.Fatal(err)
      }
      defer producer.Close()
      loopProducer(producer, topic, partition)
     }
    }
    
    func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
     // load client cert
     clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
     if err != nil {
      return nil, err
     }
    
     // load ca cert pool
     cacert, err := ioutil.ReadFile(cacertfile)
     if err != nil {
      return nil, err
     }
     cacertpool := x509.NewCertPool()
     cacertpool.AppendCertsFromPEM(cacert)
    
     // generate tlcconfig
     tlsConfig := tls.Config{}
     tlsConfig.RootCAs = cacertpool
     tlsConfig.Certificates = []tls.Certificate{clientcert}
     tlsConfig.BuildNameToCertificate()
     // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
     return tlsConfig, err
    }
    
    func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
     scanner := bufio.NewScanner(os.Stdin)
     fmt.Print("> ")
     for scanner.Scan() {
      text := scanner.Text()
      if text == "" {
      } else if text == "exit" || text == "quit" {
       break
      } else {
       producer.Input() - sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
       log.Printf("Produced message: [%s]\n",text)
      }
      fmt.Print("> ")
     }
    }
    
    func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
     partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
     if err != nil {
      log.Println(err)
      return
     }
     defer partitionConsumer.Close()
    
     for {
      msg := -partitionConsumer.Messages()
      log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
     }
    }

    编译:

    $ go build kafkaclient.go

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

    您可能感兴趣的文章:
    • 在Golang中使用http.FileServer返回静态文件的操作
    • 解决golang http.FileServer 遇到的坑
    • 解决golang处理http response碰到的问题和需要注意的点
    • golang bad file descriptor问题的解决方法
    • golang复用http.request.body的方法示例
    • golang连接kafka消费进ES操作
    上一篇:golang解析域名的步骤全纪录
    下一篇:Go语言下载网络图片或文件的方法示例
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    golang如何使用sarama访问kafka golang,如何,使用,sarama,访问,