使用Go语言,25秒读取16GB文件

 当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些文件通常位于本地磁盘中。

我们将使用Go语言,从一个大小为16GB的.txt或.log文件中提取日志。

让我们开始编码……

首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go os.File。 

 
 
 
  1. f, err := os.Open(fileName)  
  2.  if err != nil {  
  3.    fmt.Println("cannot able to read the file", err)  
  4.    return  
  5.  }  
  6. // UPDATE: close after checking error  
  7. defer file.Close()  //Do not forget to close the file 

打开文件后,我们有以下两个选项可以选择:

逐行读取文件,这有助于减少内存紧张,但需要更多的时间。一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。

由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。 

 
 
 
  1. r := bufio.NewReader(f)  
  2. for {  
  3. buf := make([]byte,4*1024) //the chunk size  
  4. n, err := r.Read(buf) //loading chunk into buffer  
  5.    bufbuf = buf[:n]  
  6. if n == 0 {     
  7.      if err != nil {  
  8.        fmt.Println(err)  
  9.        break  
  10.      }  
  11.      if err == io.EOF {  
  12.        break  
  13.      }  
  14.      return err  
  15.   }  

一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为: 

 
 
 
  1. //sync pools to reuse the memory and decrease the preassure on Garbage Collector  
  2. linesPool := sync.Pool{New: func() interface{} {  
  3.         lines := make([]byte, 500*1024)  
  4.         return lines  
  5. }}  
  6. stringPool := sync.Pool{New: func() interface{} {  
  7.           lines := ""  
  8.           return lines  
  9. }}  
  10. slicePool := sync.Pool{New: func() interface{} {  
  11.            lines := make([]string, 100)  
  12.            return lines  
  13. }}  
  14. r := bufio.NewReader(f)  
  15. var wg sync.WaitGroup //wait group to keep track off all threads  
  16. for {   
  17.      buf := linesPool.Get().([]byte)  
  18.      n, err := r.Read(buf)  
  19.      bufbuf = buf[:n]  
  20. if n == 0 {  
  21.         if err != nil {  
  22.             fmt.Println(err)  
  23.             break  
  24.         }  
  25.         if err == io.EOF {  
  26.             break  
  27.         }  
  28.         return err  
  29.      }  
  30. nextUntillNewline, err := r.ReadBytes('\n')//read entire line    
  31.       if err != io.EOF {  
  32.          buf = append(buf, nextUntillNewline...)  
  33.      }    
  34.       wg.Add(1)  
  35.      go func() {      
  36.          //process each chunk concurrently 
  37.          //start -> log start time, end -> log end time   
  38.         ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end) 
  39. wg.Done()     
  40.       }()  
  41. }  
  42. wg.Wait()  

上面的代码,引入了两个优化点:

sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。 

 
 
 
  1. 2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n 

我们将根据命令行提供的时间戳提取日志。 

 
 
 
  1. func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {  
  2. //another wait group to process every chunk further                            
  3.        var wg2 sync.WaitGroup  
  4. logs := stringPool.Get().(string)  
  5. logs = string(chunk) 
  6.  linesPool.Put(chunk) //put back the chunk in pool  
  7. //split the string by "\n", so that we have slice of logs  
  8.       logsSlice := strings.Split(logs, "\n")  
  9. stringPool.Put(logs) //put back the string pool  
  10. chunkSize := 100 //process the bunch of 100 logs in thread  
  11. n := len(logsSlice)  
  12. noOfThread := n / chunkSize  
  13. if n%chunkSize != 0 { //check for overflow  
  14.           noOfThread++  
  15.       }  
  16. length := len(logsSlice)  
  17. //traverse the chunk  
  18.      for i := 0; i < length; i += chunkSize {         
  19.           wg2.Add(1) 
  20. //process each chunk in saperate chunk  
  21.          go func(s int, e int) {  
  22.             for i:= s; i
  23.                text := logsSlice[i]  
  24. if len(text) == 0 {  
  25.                   continue  
  26.                }           
  27.              logParts := strings.SplitN(text, ",", 2)  
  28.             logCreationTimeString := logParts[0]  
  29.             logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)  
  30. if err != nil {  
  31.                  fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)  
  32.                  return  
  33.             }  
  34. // check if log's timestamp is inbetween our desired period  
  35.           if logCreationTime.After(start) && logCreationTime.Before(end) {          
  36.              fmt.Println(text)  
  37.            }  
  38.         }  
  39.         textSlice = nil  
  40.         wg2.Done()     
  41.       }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))  
  42.    //passing the indexes for processing  
  43. }    
  44.    wg2.Wait() //wait for a chunk to finish  
  45.    logsSlice = nil  

对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下: 

 
 
 
  1. func main() {  
  2.  s := time.Now()  
  3.  args := os.Args[1:]  
  4.  if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"  
  5.   fmt.Println("Please give proper command line arguments")  
  6.   return  
  7.  }  
  8.  startTimeArg := args[1]  
  9.  finishTimeArg := args[3]  
  10.  fileName := args[5]  
  11.  file, err := os.Open(fileName) 
  12.  if err != nil {  
  13.   fmt.Println("cannot able to read the file", err)  
  14.   return  
  15.  } 
  16.  defer file.Close() //close after checking err 
  17.  queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)  
  18.  if err != nil {  
  19.   fmt.Println("Could not able to parse the start time", startTimeArg)  
  20.   return  
  21.  } 
  22.  queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)  
  23.  if err != nil {  
  24.   fmt.Println("Could not able to parse the finish time", finishTimeArg)  
  25.   return  
  26.  }  
  27.  filestat, err := file.Stat()  
  28.  if err != nil {  
  29.   fmt.Println("Could not able to get the file stat")  
  30.   return  
  31.  }  
  32.  fileSize := filestat.Size()  
  33.  offset := fileSize - 1  
  34.  lastLineSize := 0 
  35.  for {  
  36.   b := make([]byte, 1)  
  37.   n, err := file.ReadAt(b, offset)  
  38.   if err != nil {  
  39.    fmt.Println("Error reading file ", err)  
  40.    break  
  41.   }  
  42.   char := string(b[0])  
  43.   if char == "\n" {  
  44.    break  
  45.   }  
  46.   offset--  
  47.   lastLineSize += n  
  48.  }  
  49.  lastLine := make([]byte, lastLineSize)  
  50.  _, err = file.ReadAt(lastLine, offset+1)  
  51.  if err != nil {  
  52.   fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)  
  53.   return 
  54.  }  
  55.  logSlice := strings.SplitN(string(lastLine), ",", 2)  
  56.  logCreationTimeString := logSlice[0]  
  57.  lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)  
  58.  if err != nil {  
  59.   fmt.Println("can not able to parse time : ", err)  
  60.  }  
  61.  if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {  
  62.   Process(file, queryStartTime, queryFinishTime)  
  63.  }  
  64.  fmt.Println("\nTime taken - ", time.Since(s))  
  65. }   
  66. func Process(f *os.File, start time.Time, end time.Time) error {  
  67.  linesPool := sync.Pool{New: func() interface{} {  
  68.   lines := make([]byte, 250*1024)  
  69.   return lines  
  70.  }}  
  71.  stringPool := sync.Pool{New: func() interface{} {  
  72.   lines := ""  
  73.   return lines  
  74.  }}  
  75.  r := bufio.NewReader(f)  
  76.  var wg sync.WaitGroup  
  77.  for {  
  78.   buf := linesPool.Get().([]byte)  
  79.   n, err := r.Read(buf)  
  80.   bufbuf = buf[:n]  
  81.   if n == 0 {  
  82.    if err != nil {  
  83.     fmt.Println(err)  
  84.     break  
  85.    }  
  86.    if err == io.EOF {  
  87.     break  
  88.    }  
  89.    return err  
  90.   }   
  91.   nextUntillNewline, err := r.ReadBytes('\n')  
  92.   if err != io.EOF {  
  93.    buf = append(buf, nextUntillNewline...)  
  94.   }  
  95.   wg.Add(1)  
  96.   go func() {  
  97.    ProcessChunk(buf, &linesPool, &stringPool, start, end)  
  98.    wg.Done()  
  99.   }()   
  100.  }  
  101.  wg.Wait()  
  102.  return nil  
  103. }   
  104. func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {  
  105.  var wg2 sync.WaitGroup  
  106.  logs := stringPool.Get().(string)  
  107.  logs = string(chunk)   
  108.  linesPool.Put(chunk)  
  109.  logsSlice := strings.Split(logs, "\n")  
  110.  stringPool.Put(logs)  
  111.  chunkSize := 300  
  112.  n := len(logsSlice)  
  113.  noOfThread := n / chunkSize 
  114.  if n%chunkSize != 0 {  
  115.   noOfThread++  
  116.  }  
  117.  for i := 0; i < (noOfThread); i++ {  
  118.   wg2.Add(1)  
  119.   go func(s int, e int) {  
  120.    defer wg2.Done() //to avaoid deadlocks  
  121.    for i := s; i < e; i++ {  
  122.     text := logsSlice[i]  
  123.     if len(text) == 0 {  
  124.      continue  
  125.     }  
  126.     logSlice := strings.SplitN(text, ",", 2)  
  127.     logCreationTimeString := logSlice[0]  
  128.     logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)  
  129.     if err != nil {  
  130.      fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)  
  131.      return  
  132.     }   
  133.     if logCreationTime.After(start) && logCreationTime.Before(end)  
  134.      //fmt.Println(text)  
  135.     }  
  136.    }   
  137.   }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))  
  138.  }  
  139.  wg2.Wait() 
  140.  logsSlice = nil  
  141. }  

网页题目:使用Go语言,25秒读取16GB文件
文章分享:http://www.shufengxianlan.com/qtweb/news20/476470.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联