• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    logrus hook输出日志到本地磁盘的操作

    logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!

    言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。

    首先

    go get github.com/sirupsen/logrus

    然后

    logrus和go lib里面一样有6个等级,可以直接调用

    logrus.Debug("Useful debugging information.")
    logrus.Info("Something noteworthy happened!")
    logrus.Warn("You should probably take a look at this.")
    logrus.Error("Something failed but I'm not quitting.")
    logrus.Fatal("Bye.")  //log之后会调用os.Exit(1)
    logrus.Panic("I'm bailing.")  //log之后会panic()

    项目例子结构

    main.go

    package main
    
    import (
     "fmt"
     "github.com/sirupsen/logrus"
     "logT/logS"
    )
    func main() {
      //创建一个hook,将日志存储路径输入进去
     hook := logS.NewHook("d:/log/golog.log")
     //加载hook之前打印日志
     logrus.WithField("file", "d:/log/golog.log").Info("New logrus hook err.")
     logrus.AddHook(hook)
     //加载hook之后打印日志
     logrus.WithFields(logrus.Fields{
     "animal": "walrus",
     }).Info("A walrus appears")
    }
    

    hook.go

    不要看下面三个go文件代码很长,其实大多数都是固定代码,也就NewHook函数自己扩展定义就好

    package logS

    import (
     "fmt"
     "github.com/sirupsen/logrus"
     "os"
     "strings"
    )
    
    // Hook 写文件的Logrus Hook
    type Hook struct {
     W LoggerInterface
    }
    
    func NewHook(file string) (f *Hook) {
     w := NewFileWriter()
     config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`, file)
     err := w.Init(config)
     if err != nil {
     return nil
     }
    
     return Hook{w}
    }
    
    // Fire 实现Hook的Fire接口
    func (hook *Hook) Fire(entry *logrus.Entry) (err error) {
     message, err := getMessage(entry)
     if err != nil {
     fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
     return err
     }
     switch entry.Level {
     case logrus.PanicLevel:
     fallthrough
     case logrus.FatalLevel:
     fallthrough
     case logrus.ErrorLevel:
     return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s", message), LevelError)
     case logrus.WarnLevel:
     return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s", message), LevelWarn)
     case logrus.InfoLevel:
     return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s", message), LevelInfo)
     case logrus.DebugLevel:
     return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s", message), LevelDebug)
     default:
     return nil
     }
    }
    
    // Levels 实现Hook的Levels接口
    func (hook *Hook) Levels() []logrus.Level {
     return []logrus.Level{
     logrus.PanicLevel,
     logrus.FatalLevel,
     logrus.ErrorLevel,
     logrus.WarnLevel,
     logrus.InfoLevel,
     logrus.DebugLevel,
     }
    }
    
    func getMessage(entry *logrus.Entry) (message string, err error) {
     message = message + fmt.Sprintf("%s ", entry.Message)
     file, lineNumber := GetCallerIgnoringLogMulti(2)
     if file != "" {
     sep := fmt.Sprintf("%s/src/", os.Getenv("GOPATH"))
     fileName := strings.Split(file, sep)
     if len(fileName) >= 2 {
      file = fileName[1]
     }
     }
     message = fmt.Sprintf("%s:%d ", file, lineNumber) + message
    
     for k, v := range entry.Data {
     message = message + fmt.Sprintf("%v:%v ", k, v)
     }
     return
    }
    

    caller.go

    package logS
    
    import (
     "runtime"
     "strings"
    )
    
    func GetCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
     // bump by 1 to ignore the getCaller (this) stackframe
     callDepth++
    outer:
     for {
     var ok bool
     _, file, line, ok = runtime.Caller(callDepth)
     if !ok {
      file = "???"
      line = 0
      break
     }
    
     for _, s := range suffixesToIgnore {
      if strings.HasSuffix(file, s) {
      callDepth++
      continue outer
      }
     }
     break
     }
     return
    }
    
    // GetCallerIgnoringLogMulti TODO
    func GetCallerIgnoringLogMulti(callDepth int) (string, int) {
     // the +1 is to ignore this (getCallerIgnoringLogMulti) frame
     return GetCaller(callDepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
    }
    

    file.go

    package logS
    
    import (
     "encoding/json"
     "errors"
     "fmt"
     "io/ioutil"
     "log"
     "os"
     "path/filepath"
     "strings"
     "sync"
     "time"
    )
    
    // RFC5424 log message levels.
    const (
     LevelError = iota
     LevelWarn
     LevelInfo
     LevelDebug
    )
    
    // LoggerInterface Logger接口
    type LoggerInterface interface {
     Init(config string) error
     WriteMsg(msg string, level int) error
     Destroy()
     Flush()
    }
    
    // LogWriter implements LoggerInterface.
    // It writes messages by lines limit, file size limit, or time frequency.
    type LogWriter struct {
     *log.Logger
     mw *MuxWriter
     // The opened file
     Filename string `json:"filename"`
    
     Maxlines     int `json:"maxlines"`
     maxlinesCurlines int
    
     // Rotate at size
     Maxsize    int `json:"maxsize"`
     maxsizeCursize int
    
     // Rotate daily
     Daily     bool `json:"daily"`
     Maxdays    int64 `json:"maxdays"`
     dailyOpendate int
    
     Rotate bool `json:"rotate"`
    
     startLock sync.Mutex // Only one log can write to the file
    
     Level int `json:"level"`
    }
    
    // MuxWriter an *os.File writer with locker.
    type MuxWriter struct {
     sync.Mutex
     fd *os.File
    }
    
    // write to os.File.
    func (l *MuxWriter) Write(b []byte) (int, error) {
     l.Lock()
     defer l.Unlock()
     return l.fd.Write(b)
    }
    
    // SetFd set os.File in writer.
    func (l *MuxWriter) SetFd(fd *os.File) {
     if l.fd != nil {
     _ = l.fd.Close()
     }
     l.fd = fd
    }
    
    // NewFileWriter create a FileLogWriter returning as LoggerInterface.
    func NewFileWriter() LoggerInterface {
     w := LogWriter{
     Filename: "",
     Maxlines: 1000000,
     Maxsize: 1  28, //256 MB
     Daily:  true,
     Maxdays: 7,
     Rotate:  true,
     Level:  LevelDebug,
     }
     // use MuxWriter instead direct use os.File for lock write when rotate
     w.mw = new(MuxWriter)
     // set MuxWriter as Logger's io.Writer
     w.Logger = log.New(w.mw, "", log.Ldate|log.Ltime)
     return w
    }
    
    // Init file logger with json config.
    // jsonconfig like:
    // {
    // "filename":"logs/sample.log",
    // "maxlines":10000,
    // "maxsize":130,
    // "daily":true,
    // "maxdays":15,
    // "rotate":true
    // }
    func (w *LogWriter) Init(jsonconfig string) error {
     err := json.Unmarshal([]byte(jsonconfig), w)
     if err != nil {
     return err
     }
     if len(w.Filename) == 0 {
     return errors.New("jsonconfig must have filename")
     }
     err = w.startLogger()
     return err
    }
    
    // start file logger. create log file and set to locker-inside file writer.
    func (w *LogWriter) startLogger() error {
     fd, err := w.createLogFile()
     if err != nil {
     return err
     }
     w.mw.SetFd(fd)
     err = w.initFd()
     if err != nil {
     return err
     }
     return nil
    }
    
    func (w *LogWriter) docheck(size int) {
     w.startLock.Lock()
     defer w.startLock.Unlock()
     if w.Rotate  ((w.Maxlines > 0  w.maxlinesCurlines >= w.Maxlines) ||
     (w.Maxsize > 0  w.maxsizeCursize >= w.Maxsize) ||
     (w.Daily  time.Now().Day() != w.dailyOpendate)) {
     if err := w.DoRotate(); err != nil {
      fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
      return
     }
     }
     w.maxlinesCurlines++
     w.maxsizeCursize += size
    }
    
    // WriteMsg write logger message into file.
    func (w *LogWriter) WriteMsg(msg string, level int) error {
     if level > w.Level {
     return nil
     }
     n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "
     w.docheck(n)
     w.Logger.Print(msg)
     return nil
    }
    
    func (w *LogWriter) createLogFile() (*os.File, error) {
     // Open the log file
     fd, err := os.OpenFile(w.Filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0660)
     return fd, err
    }
    
    func (w *LogWriter) initFd() error {
     fd := w.mw.fd
     finfo, err := fd.Stat()
     if err != nil {
     return fmt.Errorf("get stat err: %s", err)
     }
     w.maxsizeCursize = int(finfo.Size())
     w.dailyOpendate = time.Now().Day()
     if finfo.Size() > 0 {
     content, err := ioutil.ReadFile(w.Filename)
     if err != nil {
      return err
     }
     w.maxlinesCurlines = len(strings.Split(string(content), "\n"))
     } else {
     w.maxlinesCurlines = 0
     }
     return nil
    }
    
    // DoRotate means it need to write file in new file.
    // new file name like xx.log.2013-01-01.2
    func (w *LogWriter) DoRotate() error {
     _, err := os.Lstat(w.Filename)
     if err == nil { // file exists
     // Find the next available number
     num := 1
     fname := ""
     for ; err == nil  num = 999; num++ {
      fname = w.Filename + fmt.Sprintf(".%s.%03d", time.Now().Format("2006-01-02"), num)
      _, err = os.Lstat(fname)
     }
     // return error if the last file checked still existed
     if err == nil {
      return fmt.Errorf("Rotate: Cannot find free log number to rename %s", w.Filename)
     }
    
     // block Logger's io.Writer
     w.mw.Lock()
     defer w.mw.Unlock()
    
     fd := w.mw.fd
     _ = fd.Close()
    
     // close fd before rename
     // Rename the file to its newfound home
     err = os.Rename(w.Filename, fname)
     if err != nil {
      return fmt.Errorf("Rotate: %s", err)
     }
    
     // re-start logger
     err = w.startLogger()
     if err != nil {
      return fmt.Errorf("Rotate StartLogger: %s", err)
     }
    
     go w.deleteOldLog()
     }
    
     return nil
    }
    
    func (w *LogWriter) deleteOldLog() {
     dir := filepath.Dir(w.Filename)
     _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
     defer func() {
      if r := recover(); r != nil {
      returnErr = fmt.Errorf("Unable to delete old log '%s', error: %+v", path, r)
      fmt.Println(returnErr)
      }
     }()
    
     if !info.IsDir()  info.ModTime().Unix()  (time.Now().Unix()-60*60*24*w.Maxdays) {
      if strings.HasPrefix(filepath.Base(path), filepath.Base(w.Filename)) {
      _ = os.Remove(path)
      }
     }
     return
     })
    }
    
    // Destroy destroy file logger, close file writer.
    func (w *LogWriter) Destroy() {
     _ = w.mw.fd.Close()
    }
    
    // Flush file logger.
    // there are no buffering messages in file logger in memory.
    // flush file means sync file from disk.
    func (w *LogWriter) Flush() {
     _ = w.mw.fd.Sync()
    }
    

    补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook

    logrus Hook 分析

    logrus hook 接口定义很简单。如下

    package logrus
    
    // A hook to be fired when logging on the logging levels returned from
    // `Levels()` on your implementation of the interface. Note that this is not
    // fired in a goroutine or a channel with workers, you should handle such
    // functionality yourself if your call is non-blocking and you don't wish for
    // the logging calls for levels returned from `Levels()` to block.
    type Hook interface {
     Levels() []Level
     Fire(*Entry) error
    }
    
    // Internal type for storing the hooks on a logger instance.
    type LevelHooks map[Level][]Hook
    
    // Add a hook to an instance of logger. This is called with
    // `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
    func (hooks LevelHooks) Add(hook Hook) {
     for _, level := range hook.Levels() {
     hooks[level] = append(hooks[level], hook)
     }
    }
    
    // Fire all the hooks for the passed level. Used by `entry.log` to fire
    // appropriate hooks for a log entry.
    func (hooks LevelHooks) Fire(level Level, entry *Entry) error {
     for _, hook := range hooks[level] {
     if err := hook.Fire(entry); err != nil {
      return err
     }
     }
     return nil
    }
    

    只需实现 该结构的接口。

    type Hook interface {
     Levels() []Level
     Fire(*Entry) error
    }

    就会被logrus框架遍历调用已注册的 hook 的 Fire 方法

    获取日志实例

    // log_hook.go
    package logger
    
    import (
     "fmt"
     "github.com/sirupsen/logrus"
     "library/util/constant"
     "os"
    )
    
    //自实现 logrus hook
    func getLogger(module string) *logrus.Logger {
     //实例化
     logger := logrus.New()
     //设置输出
     logger.Out = os.Stdout
     //设置日志级别
     logger.SetLevel(logrus.DebugLevel)
     //设置日志格式
     //自定writer就行, hook 交给 lfshook
     logger.AddHook(newLogrusHook(constant.GetLogPath(), module))
     
     logger.SetFormatter(logrus.JSONFormatter{
     TimestampFormat:"2006-01-02 15:04:05",
     })
     return logger
    }
    
    //确保每次调用使用的文件都是唯一的。
    func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
     logger:= getLogger(module)
     return logger.WithFields(logrus.Fields{
     "app": appField,
     })
    }
    
    //订阅 警告日志
    func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {
     logger := entry.Logger
     logger.AddHook(newSubScribeHook(subMap))
     fmt.Println("日志订阅成功")
    }
    

    constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。

    日志切片hook

    代码

    // writer.go
    package logger
    
    import (
     "fmt"
     "github.com/pkg/errors"
     "io"
     "library/util"
     "os"
     "path/filepath"
     "sync"
     "time"
    )
    
    type LogWriter struct {
     logDir       string //日志根目录地址。
     module       string //模块 名
      curFileName   string //当前被指定的filename
     curBaseFileName   string //在使用中的file
     turnCateDuration  time.Duration
     mutex      sync.RWMutex
     outFh      *os.File
    }
    
    func (w *LogWriter) Write(p []byte) (n int, err error) {
     w.mutex.Lock()
     defer w.mutex.Unlock()
     if out, err:= w.getWriter(); err!=nil {
     return 0, errors.New("failed to fetch target io.Writer")
     }else{
     return out.Write(p)
     }
    }
    
    func (w *LogWriter) getFileName() string {
     base := time.Now().Truncate(w.turnCateDuration)
     return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))
    }
    
    func (w *LogWriter) getWriter()(io.Writer, error) {
     fileName := w.curBaseFileName
     //判断是否有新的文件名
     //会出现新的文件名
     baseFileName := w.getFileName()
     if baseFileName != fileName {
     fileName = baseFileName
     }
    
     dirname := filepath.Dir(fileName)
     if err := os.MkdirAll(dirname, 0755); err != nil {
     return nil, errors.Wrapf(err, "failed to create directory %s", dirname)
     }
    
     fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
     if err != nil {
     return nil, errors.Errorf("failed to open file %s", err)
     }
     w.outFh.Close()
     w.outFh = fileHandler
     w.curBaseFileName = fileName
     w.curFileName = fileName
    
     return fileHandler, nil
    }
    
    func New(logPath, module string, duration time.Duration) *LogWriter {
     return LogWriter{
     logDir: logPath,
     module: module,
     turnCateDuration:duration,
     curFileName: "",
     curBaseFileName: "",
     }
    }
    
    // hook.go
    package logger
    
    import (
     "github.com/rifflock/lfshook"
     "github.com/sirupsen/logrus"
     "time"
    )
    func newLogrusHook(logPath, moduel string) logrus.Hook {
     logrus.SetLevel(logrus.WarnLevel)
    
     writer := New(logPath, moduel, time.Hour * 2)
    
     lfsHook := lfshook.NewHook(lfshook.WriterMap{
     logrus.DebugLevel: writer,
     logrus.InfoLevel: writer,
     logrus.WarnLevel: writer,
     logrus.ErrorLevel: writer,
     logrus.FatalLevel: writer,
     logrus.PanicLevel: writer,
     }, logrus.TextFormatter{DisableColors: true})
    
     // writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
     // writer 是实现了writer 接口的库,在日志调用write是做预处理
     return lfsHook
    }
    

    测试代码

    func TestGetLogger(t *testing.T) {
     lg := GetNewFieldLoggerContext("test","d")
     lg.Logger.Info("????")
    }

    解析

    logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。

    注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。

    邮件警报hook

    代码

    // subscribeHook.go
    package logger
    
    import (
     "fmt"
     "github.com/sirupsen/logrus"
     "library/email"
     "strings"
    )
    
    type SubscribeMap map[logrus.Level][]*email.Receiver
    type SubscribeHook struct {
     subMap SubscribeMap
    }
    //此处可以自实现hook 目前使用三方hook
    func(h *SubscribeHook)Levels() []logrus.Level{
     return logrus.AllLevels
    }
    
    func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
     for level, receivers := range h.subMap {
     //命中 准备消费
     if level == entry.Level {
      if len(receivers) > 0 {
      email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),
       fmt.Sprintf("错误内容: %s",entry.Message))
      }
     }
     }
     return nil
    }
    func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{
     subMap := SubscribeMap{}
     addressList := strings.Split(receiverStr,";")
     var receivers []*email.Receiver
     for _, address := range addressList {
     receivers = append(receivers, email.Receiver{Email: address})
     }
     subMap[level] = receivers
     return subMap
    }
    func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
     return SubscribeHook{subMap}
    
    // email.go
    package email
    
    import (
     "fmt"
     "gopkg.in/gomail.v2"
     "regexp"
     "strconv"
    )
    
    type Sender struct {
     User   string
     Password string
     Host   string
     Port   int
     MailTo  []string
     Subject  string
     Content  string
    }
    
    type Receiver struct {
     Email  string
    }
    
    func (r *Receiver) Check() bool {
     pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
     reg := regexp.MustCompile(pattern)
     return reg.MatchString(r.Email)
    }
    
    func (s *Sender) clean (){
    
    }
    
    //检查 邮箱正确性
    func (s *Sender)NewReceiver(email string) *Receiver {
     rec := Receiver{Email:email}
     if rec.Check() {
     m.MailTo = []string{email}
     return rec
     }else{
     fmt.Printf("email check fail 【%s】\n", email)
     return nil
     }
    }
    func (s *Sender)NewReceivers(receivers []*Receiver) {
     for _, rec := range receivers {
     if rec.Check() {
      m.MailTo = append(m.MailTo, rec.Email)
     }else{
      fmt.Printf("email check fail 【%s】\n", rec.Email)
     }
     }
    }
    // 163邮箱 password 为开启smtp后给的秘钥
    var m = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}
    
    func SendEmail(receivers []*Receiver,subject, content string){
     m.NewReceivers(receivers)
     m.Subject = subject
     m.Content = content
    
     e := gomail.NewMessage()
     e.SetHeader("From", e.FormatAddress(m.User, "hengsheng"))
     e.SetHeader("To", m.MailTo...)  //发送给多个用户
     e.SetHeader("Subject", m.Subject) //设置邮件主题
     e.SetBody("text/html", m.Content)  //设置邮件正文
     d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)
     err := d.DialAndSend(e)
     if err != nil {
     fmt.Printf("error 邮件发送错误! %s \n", err.Error())
     }
    }
    

    使用

    同理在writer时 如果是错误日志则发送邮件。

    o.logger = logger.GetNewFieldLoggerContext("test", "666")
    if subscribeSocket {
     logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))
     }
     // o 为实际结构体实例

    kafkahook

    // kafka hook
    package logger
    
    import (
     "github.com/sirupsen/logrus"
     "library/kafka"
     "library/util/constant"
    )
    
    type KafKaHook struct {
     kafkaProducer  *kafka.KafkaProducer
    }
    
    
    func(h *KafKaHook)Levels() []logrus.Level{
     return logrus.AllLevels
    }
    
    func(h *KafKaHook)Fire(entry *logrus.Entry) error{
     h.kafkaProducer.SendMsgSync(entry.Message)
     return nil
    }
    
    func newKafkaHook() *KafKaHook{
     producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
     return KafKaHook{kafkaProducer: producer}
    }
    

    使用时logger.AddHook(newKafkaHook()) 即可

    kafka模块

    生产者

    // kafkaProducer.go
    package kafka
    
    import (
     "errors"
     "fmt"
     "github.com/Shopify/sarama"
     "library/util/constant"
     "log"
     "time"
    )
    
    func GetKafkaAddress()[]string{
     return "127.0.0.1:9092"
    }
    
    //同步消息模式
    func SyncProducer(topic, message string) error {
     config := sarama.NewConfig()
     config.Producer.Return.Successes = true
     config.Producer.Timeout = 5 * time.Second
     p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
     if err != nil {
     return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
     }
     defer p.Close()
     msg := sarama.ProducerMessage{
     Topic: topic,
     Value: sarama.ByteEncoder(message),
     }
     part, offset, err := p.SendMessage(msg)
     if err != nil {
     return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
     } else {
     fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)
     return nil
     }
    }
    
    //async 异步生产者
    type KafkaProducer struct {
     topic    string
     asyncProducer  *sarama.AsyncProducer
     syncProducer  *sarama.SyncProducer
     sync    bool
    }
    
    func NewKafkaProducer(topic string, sync bool) *KafkaProducer {
     k := KafkaProducer{
     topic:   topic,
     sync:   sync,
     }
     if sync {
     k.initSync()
     }else{
     k.initAsync()
     }
     return k
    }
    
    func (k *KafkaProducer) initAsync() bool {
     if k.sync {
     fmt.Printf("sync producer cant call async func !\n")
     return false
     }
     config := sarama.NewConfig()
     //等待服务器所有副本都保存成功后的响应
     config.Producer.RequiredAcks = sarama.WaitForAll
     //随机向partition发送消息
     config.Producer.Partitioner = sarama.NewRandomPartitioner
     //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
     config.Producer.Return.Successes = true
     config.Producer.Return.Errors = true
     //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
     //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
     config.Version = sarama.V0_10_0_1
    
     producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)
     if e != nil {
     fmt.Println(e)
     return false
     }
     k.asyncProducer = producer
     defer producer.AsyncClose()
     pd := *k.asyncProducer
     go func() {
     for{
      select {
      case -pd.Successes():
      //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
      case fail := -pd.Errors():
      fmt.Printf("err: %s \n", fail.Err.Error())
      }
     }
     }()
    
     return true
    }
    
    func (k *KafkaProducer) initSync() bool {
     if !k.sync {
     fmt.Println("async producer cant call sync func !")
     return false
     }
    
     config := sarama.NewConfig()
     config.Producer.Return.Successes = true
     config.Producer.Timeout = 5 * time.Second
     p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
     k.syncProducer = p
     if err != nil {
     log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
     return false
     }
     return true
    }
    
    func (k *KafkaProducer) SendMsgAsync(sendStr string) {
    
     msg := sarama.ProducerMessage{
     Topic: k.topic,
     }
    
     //将字符串转化为字节数组
     msg.Value = sarama.ByteEncoder(sendStr)
     //fmt.Println(value)
    
     //使用通道发送
     pd := *k.asyncProducer
     pd.Input() - msg
    }
    
    func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
     msg := sarama.ProducerMessage{
     Topic: k.topic,
     Value: sarama.ByteEncoder(sendStr),
     }
     pd := *k.syncProducer
     part, offset, err := pd.SendMessage(msg)
     if err != nil {
     fmt.Printf("发送失败 send message(%s) err=%s \n", sendStr, err)
     return false
     } else {
     fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
     return true
     }
    }
    

    调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致!

    消费者组

    // kafkaConsumerGroup.go
    
    package kafka
    
    import (
     "context"
     "fmt"
     "github.com/Shopify/sarama"
     "log"
     "sync"
    )
    
    func NewKafkaConsumerGroup(topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
     k := KafkaConsumerGroup{
     brokers:  GetKafkaAddress(),
     topics:  topics,
     group:       group,
     channelBufferSize: 2,
     ready:       make(chan bool),
     version:  "1.1.1",
     handler:  businessCall,
     }
     k.Init()
     return k
    }
    
    // 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
    // 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
    // Consumer 消费,但可以被多个 consumer group 消费
    type KafkaConsumerGroup struct {
     //代理(broker): 一台kafka服务器称之为一个broker
     brokers   []string
     //主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
     topics    []string
     version   string
     ready       chan bool
     group       string
     channelBufferSize  int
     //业务调用
     handler     func(message *sarama.ConsumerMessage) bool
    }
    
    func (k *KafkaConsumerGroup)Init() func() {
    
     version,err := sarama.ParseKafkaVersion(k.version)
     if err!=nil{
     fmt.Printf("Error parsing Kafka version: %v", err)
     }
     cfg := sarama.NewConfig()
     cfg.Version = version
     // 分区分配策略
     cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
     // 未找到组消费位移的时候从哪边开始消费
     cfg.Consumer.Offsets.Initial = -2
     // channel长度
     cfg.ChannelBufferSize = k.channelBufferSize
     ctx, cancel := context.WithCancel(context.Background())
     client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
     if err != nil {
      fmt.Printf("Error creating consumer group client: %v", err)
     }
    
     wg := sync.WaitGroup{}
     wg.Add(1)
     go func() {
      defer func() {
      wg.Done()
      //util.HandlePanic("client.Consume panic", log.StandardLogger())
      }()
      for {
      if err := client.Consume(ctx, k.topics, k); err != nil {
       log.Printf("Error from consumer: %v", err)
      }
      // check if context was cancelled, signaling that the consumer should stop
      if ctx.Err() != nil {
       log.Println(ctx.Err())
       return
      }
      k.ready = make(chan bool)
      }
     }()
    
     -k.ready
     fmt.Printf("Sarama consumer up and running!... \n")
     // 保证在系统退出时,通道里面的消息被消费
     return func() {
      cancel()
      wg.Wait()
      if err = client.Close(); err != nil {
      fmt.Printf("Error closing client: %v \n", err)
      }
     }
    
    }
    
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
     // Mark the consumer as ready
     close(k.ready)
     return nil
    }
    
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
     return nil
    }
    
    // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
    func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    
     // NOTE:
     // Do not move the code below to a goroutine.
     // The `ConsumeClaim` itself is called within a goroutine, see:
     // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
     // 具体消费消息
     for message := range claim.Messages() {
     //msg := string(message.Value)
     //k.logger.Infof("卡夫卡: %s", msg)
    
     if ok:= k.handler(message); ok {
      // 更新位移
      session.MarkMessage(message, "")
     }
     //run.Run(msg)
     }
     return nil
    }
    

    测试代码

    func TestKafkaConsumerGroup_Init(t *testing.T) {
     //pd := NewKafkaProducer("test-fail",true)
     //pd.InitSync()
     k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {
     fmt.Println(string(message.Value))
     //如果失败的处理逻辑
     //if ok := pd.SendMsgSync("666666"); ok {
     // return true
     //}
     return false
    
     })
     consumerDone := k.Init()
    
     sigterm := make(chan os.Signal, 1)
     signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
     select {
     case -sigterm:
     fmt.Println("terminating: via signal")
     }
     consumerDone()
    }
    

    这里有一些补偿逻辑在里面。

    以上就是logrus相关hook。

    好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

    您可能感兴趣的文章:
    • go日志系统logrus显示文件和行号的操作
    • logrus日志自定义格式操作
    • Golang logrus 日志包及日志切割的实现
    • golang日志框架之logrus的使用
    上一篇:go日志系统logrus显示文件和行号的操作
    下一篇:Golang单元测试与覆盖率的实例讲解
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    logrus hook输出日志到本地磁盘的操作 logrus,hook,输出,日志,到,