• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    golang如何实现mapreduce单进程版本详解

    前言

      MapReduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个Job执行效率影响很大的部分,所以很有必要深入了解整个过程。元旦放假的第一天,在家没事干,用golang实现了一下mapreduce的单进程版本,github地址。处理对大文件统计最高频的10个单词,因为功能比较简单,所以设计没有解耦合。

      本文先对mapreduce大体概念进行介绍,然后结合代码介绍一下,如果接下来几天有空,我会实现一下分布式高可用的mapreduce版本。下面话不多说了,来一起看看详细的介绍吧。

    1. Mapreduce大体架构

      上图是论文中mapreduce的大体架构。总的来说Mapreduce的思想就是分治思想:对数据进行分片,然后用mapper进行处理,以key-value形式输出中间文件;然后用reducer进行对mapper输出的中间文件进行合并:将key一致的合到一块,并输出结果文件;如果有需要,采用Combiner进行最后的合并。

      归纳来说主要分为5部分:用户程序、Master、Mapper、Reducer、Combiner(上图未给出)。

      总的来说,架构不复杂。组件间通信用啥都可以,比如RPC、HTTP或者私有协议等。

    2. 实现代码介绍

      该版本代码实现了单机单进程版本,Mapper、Reducer和Combiner的实现用协程goroutine实现,通信采用channel。代码写的比较随意,没有解耦合。

      为了方便起见,Combiner对最高频的10个单词进行堆排序处理,按规范来说应该放在用户程序处理。

      文件目录如下,其中bin文件夹下的big_input_file.txt为输入文件,可以调用generate下的main文件生成,caller文件为入口的用户程序,master目录下分别存放master、mapper、reducer、combiner代码:

    .
    ├── README.md
    ├── bin
    │ └── file-store
    │  └── big_input_file.txt
    └── src
     ├── caller
     │ └── main.go
     ├── generate
     │ └── main.go
     └── master
      ├── combiner.go
      ├── mapper.go
      ├── master.go
      └── reducer.go
    
    6 directories, 8 files 

    2.1 caller

      用户程序,读入文件并按固定行数进行划分;然后调用master.Handle进行处理。

    package main
    import ( 
     "os"
     "path"
     "path/filepath"
     "bufio"
     "strconv"
     "master"
     "github.com/vinllen/go-logger/logger"
    )
    const ( 
     LIMIT int = 10000 // the limit line of every file
    )
    func main() { 
     curDir, err := filepath.Abs(filepath.Dir(os.Args[0]))
     if err != nil {
      logger.Error("Read path error: ", err.Error())
      return
     }
     fileDir := path.Join(curDir, "file-store")
     _ = os.Mkdir(fileDir, os.ModePerm)
     // 1. read file
     filename := "big_input_file.txt"
     inputFile, err := os.Open(path.Join(fileDir, filename))
     if err != nil {
      logger.Error("Read inputFile error: ", err.Error())
      return
     }
     defer inputFile.Close()
     // 2. split inputFile into several pieces that every piece hold 100,000 lines
     filePieceArr := []string{}
     scanner := bufio.NewScanner(inputFile)
     piece := 1
    Outter: 
     for {
      outputFilename := "input_piece_" + strconv.Itoa(piece)
      outputFilePos := path.Join(fileDir, outputFilename)
      filePieceArr = append(filePieceArr, outputFilePos)
      outputFile, err := os.Create(outputFilePos)
      if err != nil {
       logger.Error("Split inputFile error: ", err.Error())
       continue
      }
      defer outputFile.Close()
      for cnt := 0; cnt  LIMIT; cnt++ {
       if !scanner.Scan() {
        break Outter
       }
       _, err := outputFile.WriteString(scanner.Text() + "\n")
       if err != nil {
        logger.Error("Split inputFile writting error: ", err.Error())
        return
       }
      }
      piece++
     }
     // 3. pass to master
     res := master.Handle(filePieceArr, fileDir)
     logger.Warn(res)
    }

    2.2 master

      Master程序,依次生成Combiner、Reducer、Mapper,处理消息中转,输出最后结果。

    package master
    import (
     "github.com/vinllen/go-logger/logger"
    )
    var ( 
     MapChanIn chan MapInput // channel produced by master while consumed by mapper
     MapChanOut chan string // channel produced by mapper while consumed by master
     ReduceChanIn chan string // channel produced by master while consumed by reducer
     ReduceChanOut chan string // channel produced by reducer while consumed by master
     CombineChanIn chan string // channel produced by master while consumed by combiner
     CombineChanOut chan []Item // channel produced by combiner while consumed by master
    )
    func Handle(inputArr []string, fileDir string) []Item { 
     logger.Info("handle called")
     const(
      mapperNumber int = 5
      reducerNumber int = 2
     )
     MapChanIn = make(chan MapInput)
     MapChanOut = make(chan string)
     ReduceChanIn = make(chan string)
     ReduceChanOut = make(chan string)
     CombineChanIn = make(chan string)
     CombineChanOut = make(chan []Item)
     reduceJobNum := len(inputArr)
     combineJobNum := reducerNumber
     // start combiner
     go combiner()
     // start reducer
     for i := 1; i = reducerNumber; i++ {
      go reducer(i, fileDir)
     }
     // start mapper
     for i := 1; i = mapperNumber; i++ {
      go mapper(i, fileDir)
     }
     go func() {
      for i, v := range(inputArr) {
       MapChanIn - MapInput{
        Filename: v,
        Nr: i + 1,
       } // pass job to mapper
      }
      close(MapChanIn) // close map input channel when no more job
     }()
     var res []Item
    outter: 
     for {
      select {
       case v := - MapChanOut:
        go func() {
         ReduceChanIn - v
         reduceJobNum--
         if reduceJobNum = 0 {
          close(ReduceChanIn)
         }
        }()
       case v := - ReduceChanOut:
        go func() {
         CombineChanIn - v
         combineJobNum--
         if combineJobNum = 0 {
          close(CombineChanIn)
         }
        }()
       case v := - CombineChanOut:
        res = v
        break outter
      }
     }
     close(MapChanOut)
     close(ReduceChanOut)
     close(CombineChanOut)
     return res
    }

    2.3 mapper

      Mapper程序,读入并按key-value格式生成中间文件,告知Master。

    package master
    import ( 
     "fmt"
     "path"
     "os"
     "bufio"
     "strconv"
    
     "github.com/vinllen/go-logger/logger"
    )
    type MapInput struct { 
     Filename string
     Nr int
    }
    func mapper(nr int, fileDir string) { 
     for {
      val, ok := - MapChanIn // val: filename
      if !ok { // channel close
       break
      }
      inputFilename := val.Filename
      nr := val.Nr
      file, err := os.Open(inputFilename)
      if err != nil {
       errMsg := fmt.Sprintf("Read file(%s) error in mapper(%d)", inputFilename, nr)
       logger.Error(errMsg)
       MapChanOut - ""
       continue
      }
      mp := make(map[string]int)
      scanner := bufio.NewScanner(file)
      scanner.Split(bufio.ScanWords)
      for scanner.Scan() {
       str := scanner.Text()
       //logger.Info(str)
       mp[str]++
      }
      outputFilename := path.Join(fileDir, "mapper-output-" + strconv.Itoa(nr))
      outputFileHandler, err := os.Create(outputFilename)
      if err != nil {
       errMsg := fmt.Sprintf("Write file(%s) error in mapper(%d)", outputFilename, nr)
       logger.Error(errMsg)
      } else {
       for k, v := range mp {
        str := fmt.Sprintf("%s %d\n", k, v)
        outputFileHandler.WriteString(str)
       }
       outputFileHandler.Close()
      }
      MapChanOut - outputFilename
     }
    }

    2.4 reducer

      Reducer程序,读入Master传递过来的中间文件并归并。

    package master
    import ( 
     "fmt"
     "bufio"
     "os"
     "strconv"
     "path"
     "strings"
     "github.com/vinllen/go-logger/logger"
    )
    func reducer(nr int, fileDir string) { 
     mp := make(map[string]int) // store the frequence of words
     // read file and do reduce
     for {
      val, ok := - ReduceChanIn
      if !ok {
       break
      }
      logger.Debug("reducer called: ", nr)
      file, err := os.Open(val)
      if err != nil {
       errMsg := fmt.Sprintf("Read file(%s) error in reducer", val)
       logger.Error(errMsg)
       continue
      }
      scanner := bufio.NewScanner(file)
      for scanner.Scan() {
       str := scanner.Text()
       arr := strings.Split(str, " ")
       if len(arr) != 2 {
        errMsg := fmt.Sprintf("Read file(%s) error that len of line(%s) != 2(%d) in reducer", val, str, len(arr))
        logger.Warn(errMsg)
        continue
       }
       v, err := strconv.Atoi(arr[1])
       if err != nil {
        errMsg := fmt.Sprintf("Read file(%s) error that line(%s) parse error in reduer", val, str)
        logger.Warn(errMsg)
        continue
       }
       mp[arr[0]] += v
      }
      if err := scanner.Err(); err != nil {
       logger.Error("reducer: reading standard input:", err)
      }
      file.Close()
     }
     outputFilename := path.Join(fileDir, "reduce-output-" + strconv.Itoa(nr))
     outputFileHandler, err := os.Create(outputFilename)
     if err != nil {
      errMsg := fmt.Sprintf("Write file(%s) error in reducer(%d)", outputFilename, nr)
      logger.Error(errMsg)
     } else {
      for k, v := range mp {
       str := fmt.Sprintf("%s %d\n", k, v)
       outputFileHandler.WriteString(str)
      }
      outputFileHandler.Close()
     }
     ReduceChanOut - outputFilename
    }

    2.5 combiner

      Combiner程序,读入Master传递过来的Reducer结果文件并归并成一个,然后堆排序输出最高频的10个词语。

    package master
    import ( 
     "fmt"
     "strings"
     "bufio"
     "os"
     "container/heap"
     "strconv"
    
     "github.com/vinllen/go-logger/logger"
    )
    type Item struct { 
     key string
     val int
    }
    type PriorityQueue []*Item
    func (pq PriorityQueue) Len() int { 
     return len(pq)
    }
    func (pq PriorityQueue) Less(i, j int) bool { 
     return pq[i].val > pq[j].val
    }
    func (pq PriorityQueue) Swap(i, j int) { 
     pq[i], pq[j] = pq[j], pq[i]
    }
    func (pq *PriorityQueue) Push(x interface{}) { 
     item := x.(*Item)
     *pq = append(*pq, item)
    }
    func (pq *PriorityQueue) Pop() interface{} { 
     old := *pq
     n := len(old)
     item := old[n - 1]
     *pq = old[0 : n - 1]
     return item
    }
    func combiner() { 
     mp := make(map[string]int) // store the frequence of words
     // read file and do combine
     for {
      val, ok := - CombineChanIn
      if !ok {
       break
      }
      logger.Debug("combiner called")
      file, err := os.Open(val)
      if err != nil {
       errMsg := fmt.Sprintf("Read file(%s) error in combiner", val)
       logger.Error(errMsg)
       continue
      }
      scanner := bufio.NewScanner(file)
      for scanner.Scan() {
       str := scanner.Text()
       arr := strings.Split(str, " ")
       if len(arr) != 2 {
        errMsg := fmt.Sprintf("Read file(%s) error that len of line != 2(%s) in combiner", val, str)
        logger.Warn(errMsg)
        continue
       }
       v, err := strconv.Atoi(arr[1])
       if err != nil {
        errMsg := fmt.Sprintf("Read file(%s) error that line(%s) parse error in combiner", val, str)
        logger.Warn(errMsg)
        continue
       }
       mp[arr[0]] += v
      }
      file.Close()
     }
     // heap sort
     // pq := make(PriorityQueue, len(mp))
     pq := make(PriorityQueue, 0)
     heap.Init(pq)
     for k, v := range mp {
      node := Item {
       key: k,
       val: v,
      }
      // logger.Debug(k, v)
      heap.Push(pq, node)
     }
     res := []Item{}
     for i := 0; i  10  pq.Len() > 0; i++ {
      node := heap.Pop(pq).(*Item)
      res = append(res, *node)
     }
     CombineChanOut - res
    }

    3. 总结

      不足以及未实现之处:

      接下来要是有空,我会实现分布式高可用的代码,模块间采用RPC通讯。

    好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

    您可能感兴趣的文章:
    • golang 输出重定向:fmt Log,子进程Log,第三方库logrus的详解
    • Golang信号处理及如何实现进程的优雅退出详解
    • golang守护进程用法示例
    • golang 后台进程的启动和停止操作
    上一篇:golang log4go的日志输出优化详解
    下一篇:利用rpm打包上线部署golang代码的方法教程
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    golang如何实现mapreduce单进程版本详解 golang,如何,实现,mapreduce,