diff --git a/asyncreader.go b/asyncreader.go new file mode 100644 index 0000000..d4fcff8 --- /dev/null +++ b/asyncreader.go @@ -0,0 +1,72 @@ +package main + +import ( + "io" + "sync" + + "golang.org/x/exp/mmap" +) + +const lf = byte('\n') +const null = byte(0) + +type AsyncLineReader struct { + offset int64 + eof bool + file *mmap.ReaderAt + mu *sync.Mutex + remainder []byte +} + +func NewAsyncLineReader(name string) (*AsyncLineReader, error) { + var a = &AsyncLineReader{ + offset: 0, + eof: false, + mu: &sync.Mutex{}, + } + + var err error + a.file, err = mmap.Open(name) + + return a, err +} + +func (a *AsyncLineReader) Read(b []byte) (int, error) { + if a.eof { + return 0, io.EOF + } + + a.mu.Lock() + defer a.mu.Unlock() + + var n, err = a.file.ReadAt(b, a.offset) + if err == io.EOF { + a.eof = true + return 0, err + } + if n == 0 { + return 0, err + } + + // find the last line feed in the batch + for { + n-- + + if b[n] == lf { + // selected character is a line feed, we're done + break + } else if n == 0 { + break // result is at start of file + } + + b[n] = null // Discard this byte so it doesn't get read multiple times + } + + a.offset += int64(n) + + return n, err +} + +func (a *AsyncLineReader) Close() error { + return a.file.Close() +} diff --git a/build.sh b/build.sh index e5462ba..ee3000b 100755 --- a/build.sh +++ b/build.sh @@ -1,3 +1,3 @@ #!/bin/sh -go build -o rapidgrep -ldflags="-s -w" main.go +go build -o rapidgrep -ldflags="-s -w" asyncreader.go main.go diff --git a/main.go b/main.go index 1ee0e5f..c654b18 100644 --- a/main.go +++ b/main.go @@ -1,28 +1,24 @@ package main import ( - "bufio" "bytes" "io" "os" "runtime" - "runtime/pprof" "sync" ) -const lf = byte('\n') - func main() { - f, _ := os.Create("cprof") - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() + // prof, _ := os.Create("cprof") + // pprof.StartCPUProfile(prof) + // defer pprof.StopCPUProfile() if len(os.Args) != 2 { print("One argument required\n") os.Exit(1) } - f, err := os.Open("locate.txt") + f, err := NewAsyncLineReader("locate.txt") if err != nil { panic(err) } @@ -34,68 +30,42 @@ func main() { var wg sync.WaitGroup // Make a channel and a thread for each CPU core - var cores = runtime.NumCPU() - var workchannel = make(chan []byte) - for i := 0; i < cores; i++ { - go scannerThread(workchannel, []byte(needle), &wg) + var threads = runtime.NumCPU() + for i := 0; i < threads; i++ { + go scannerThread(f, []byte(needle), &wg) wg.Add(1) } - var br = bufio.NewReaderSize(f, 1<<25) // reader with 32 MiB buffer - var buf = make([]byte, 1<<20) // 1 MiB buffer for searching - var remainder []byte - var nread int - - for { - nread, err = br.Read(buf) - if err != nil && err != io.EOF { - panic(err) - } - if err != io.EOF { - // Get the remainder of the last line - remainder, err = br.ReadBytes(byte('\n')) - if err != nil && err != io.EOF { - panic(err) - } - } - - workchannel <- append(buf[:nread], remainder...) - - if err == io.EOF { - break - } - } - - close(workchannel) wg.Wait() } -func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) { - var batch []byte - var ok bool +func scannerThread(r *AsyncLineReader, needle []byte, wg *sync.WaitGroup) { var i, linelen, start, end int + var buf = make([]byte, 1<<24) // 1 MiB buffer for searching + var err error defer wg.Done() // Sync up the goroutines when done for { - batch, ok = <-workchannel - if !ok { - return // channel closed. we're done + linelen, err = r.Read(buf) + if err != nil { + if err == io.EOF { + return + } + panic(err) } - linelen = len(batch) - // This loop is for every result found, when there are no more results // it stops. When it runs for the first time the first index function is // executed, which indexes the entire batch. In the following iterations // the second index function is used, which begins searching where the // last result ended so it doesn't get found twice. - for i = bytes.Index(batch, needle); i != -1; i = bytes.Index(batch[end:], needle) { + for i = bytes.Index(buf, needle); i != -1; i = bytes.Index(buf[end:], needle) { start, end = i+end, i+end // needle was found, but where? for { // find the start (line feed of the line before it, or index 0) - if batch[start] == lf { + if buf[start] == lf { start++ // the line feed is from the previous line, so skip it break } else if start == 0 { @@ -105,7 +75,7 @@ func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) { } for { // find the end (line feed at the end of the line) - if batch[end] == lf { + if buf[end] == lf { end++ // include the line feed in the line // https://stackoverflow.com/questions/26857582/in-a-go-slice-why-does-slohi-end-at-element-hi-1 break @@ -118,11 +88,12 @@ func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) { // Print the result. Note that stdout is not necessarily // concurrency-safe, so in a real application this would have to be // passed through a channel. - os.Stdout.Write(batch[start:end]) + os.Stdout.Write(buf[start:end]) // This is to keep track of where the end of the byte array is so we // can avoid index out of bounds panics linelen = linelen - end } + end = 0 } }