Fast multithreaded string searching in large text files.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

128 lines
2.9 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package main
  2. import (
  3. "bufio"
  4. "bytes"
  5. "io"
  6. "os"
  7. "runtime"
  8. "runtime/pprof"
  9. "sync"
  10. )
  11. const lf = byte('\n')
  12. func main() {
  13. f, _ := os.Create("cprof")
  14. pprof.StartCPUProfile(f)
  15. defer pprof.StopCPUProfile()
  16. if len(os.Args) != 2 {
  17. print("One argument required\n")
  18. os.Exit(1)
  19. }
  20. f, err := os.Open("locate.txt")
  21. if err != nil {
  22. panic(err)
  23. }
  24. defer f.Close()
  25. var needle = os.Args[1]
  26. // To make sure all threads have ended when the program finishes
  27. var wg sync.WaitGroup
  28. // Make a channel and a thread for each CPU core
  29. var cores = runtime.NumCPU()
  30. var workchannel = make(chan []byte)
  31. for i := 0; i < cores; i++ {
  32. go scannerThread(workchannel, []byte(needle), &wg)
  33. wg.Add(1)
  34. }
  35. var br = bufio.NewReaderSize(f, 1<<25) // reader with 32 MiB buffer
  36. var buf = make([]byte, 1<<20) // 1 MiB buffer for searching
  37. var remainder []byte
  38. var nread int
  39. for {
  40. nread, err = br.Read(buf)
  41. if err != nil && err != io.EOF {
  42. panic(err)
  43. }
  44. if err != io.EOF {
  45. // Get the remainder of the last line
  46. remainder, err = br.ReadBytes(byte('\n'))
  47. if err != nil && err != io.EOF {
  48. panic(err)
  49. }
  50. }
  51. workchannel <- append(buf[:nread], remainder...)
  52. if err == io.EOF {
  53. break
  54. }
  55. }
  56. close(workchannel)
  57. wg.Wait()
  58. }
  59. func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) {
  60. var batch []byte
  61. var ok bool
  62. var i, linelen, start, end int
  63. defer wg.Done() // Sync up the goroutines when done
  64. for {
  65. batch, ok = <-workchannel
  66. if !ok {
  67. return // channel closed. we're done
  68. }
  69. linelen = len(batch)
  70. // This loop is for every result found, when there are no more results
  71. // it stops. When it runs for the first time the first index function is
  72. // executed, which indexes the entire batch. In the following iterations
  73. // the second index function is used, which begins searching where the
  74. // last result ended so it doesn't get found twice.
  75. for i = bytes.Index(batch, needle); i != -1; i = bytes.Index(batch[end:], needle) {
  76. start, end = i+end, i+end
  77. // needle was found, but where?
  78. for { // find the start (line feed of the line before it, or index 0)
  79. if batch[start] == lf {
  80. start++ // the line feed is from the previous line, so skip it
  81. break
  82. } else if start == 0 {
  83. break // result is at start of file
  84. }
  85. start--
  86. }
  87. for { // find the end (line feed at the end of the line)
  88. if batch[end] == lf {
  89. end++ // include the line feed in the line
  90. // https://stackoverflow.com/questions/26857582/in-a-go-slice-why-does-slohi-end-at-element-hi-1
  91. break
  92. } else if end == linelen {
  93. break
  94. }
  95. end++
  96. }
  97. // Print the result. Note that stdout is not necessarily
  98. // concurrency-safe, so in a real application this would have to be
  99. // passed through a channel.
  100. os.Stdout.Write(batch[start:end])
  101. // This is to keep track of where the end of the byte array is so we
  102. // can avoid index out of bounds panics
  103. linelen = linelen - end
  104. }
  105. }
  106. }