Browse Source

add custom reader + mmap

master
Wim Brand 7 years ago
parent
commit
1b7bb98b5e
  1. 72
      asyncreader.go
  2. 2
      build.sh
  3. 71
      main.go

72
asyncreader.go

@ -0,0 +1,72 @@
package main
import (
"io"
"sync"
"golang.org/x/exp/mmap"
)
const lf = byte('\n')
const null = byte(0)
type AsyncLineReader struct {
offset int64
eof bool
file *mmap.ReaderAt
mu *sync.Mutex
remainder []byte
}
func NewAsyncLineReader(name string) (*AsyncLineReader, error) {
var a = &AsyncLineReader{
offset: 0,
eof: false,
mu: &sync.Mutex{},
}
var err error
a.file, err = mmap.Open(name)
return a, err
}
func (a *AsyncLineReader) Read(b []byte) (int, error) {
if a.eof {
return 0, io.EOF
}
a.mu.Lock()
defer a.mu.Unlock()
var n, err = a.file.ReadAt(b, a.offset)
if err == io.EOF {
a.eof = true
return 0, err
}
if n == 0 {
return 0, err
}
// find the last line feed in the batch
for {
n--
if b[n] == lf {
// selected character is a line feed, we're done
break
} else if n == 0 {
break // result is at start of file
}
b[n] = null // Discard this byte so it doesn't get read multiple times
}
a.offset += int64(n)
return n, err
}
func (a *AsyncLineReader) Close() error {
return a.file.Close()
}

2
build.sh

@ -1,3 +1,3 @@
#!/bin/sh
go build -o rapidgrep -ldflags="-s -w" main.go
go build -o rapidgrep -ldflags="-s -w" asyncreader.go main.go

71
main.go

@ -1,28 +1,24 @@
package main
import (
"bufio"
"bytes"
"io"
"os"
"runtime"
"runtime/pprof"
"sync"
)
const lf = byte('\n')
func main() {
f, _ := os.Create("cprof")
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
// prof, _ := os.Create("cprof")
// pprof.StartCPUProfile(prof)
// defer pprof.StopCPUProfile()
if len(os.Args) != 2 {
print("One argument required\n")
os.Exit(1)
}
f, err := os.Open("locate.txt")
f, err := NewAsyncLineReader("locate.txt")
if err != nil {
panic(err)
}
@ -34,68 +30,42 @@ func main() {
var wg sync.WaitGroup
// Make a channel and a thread for each CPU core
var cores = runtime.NumCPU()
var workchannel = make(chan []byte)
for i := 0; i < cores; i++ {
go scannerThread(workchannel, []byte(needle), &wg)
var threads = runtime.NumCPU()
for i := 0; i < threads; i++ {
go scannerThread(f, []byte(needle), &wg)
wg.Add(1)
}
var br = bufio.NewReaderSize(f, 1<<25) // reader with 32 MiB buffer
var buf = make([]byte, 1<<20) // 1 MiB buffer for searching
var remainder []byte
var nread int
for {
nread, err = br.Read(buf)
if err != nil && err != io.EOF {
panic(err)
}
if err != io.EOF {
// Get the remainder of the last line
remainder, err = br.ReadBytes(byte('\n'))
if err != nil && err != io.EOF {
panic(err)
}
}
workchannel <- append(buf[:nread], remainder...)
if err == io.EOF {
break
}
}
close(workchannel)
wg.Wait()
}
func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) {
var batch []byte
var ok bool
func scannerThread(r *AsyncLineReader, needle []byte, wg *sync.WaitGroup) {
var i, linelen, start, end int
var buf = make([]byte, 1<<24) // 1 MiB buffer for searching
var err error
defer wg.Done() // Sync up the goroutines when done
for {
batch, ok = <-workchannel
if !ok {
return // channel closed. we're done
linelen, err = r.Read(buf)
if err != nil {
if err == io.EOF {
return
}
panic(err)
}
linelen = len(batch)
// This loop is for every result found, when there are no more results
// it stops. When it runs for the first time the first index function is
// executed, which indexes the entire batch. In the following iterations
// the second index function is used, which begins searching where the
// last result ended so it doesn't get found twice.
for i = bytes.Index(batch, needle); i != -1; i = bytes.Index(batch[end:], needle) {
for i = bytes.Index(buf, needle); i != -1; i = bytes.Index(buf[end:], needle) {
start, end = i+end, i+end
// needle was found, but where?
for { // find the start (line feed of the line before it, or index 0)
if batch[start] == lf {
if buf[start] == lf {
start++ // the line feed is from the previous line, so skip it
break
} else if start == 0 {
@ -105,7 +75,7 @@ func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) {
}
for { // find the end (line feed at the end of the line)
if batch[end] == lf {
if buf[end] == lf {
end++ // include the line feed in the line
// https://stackoverflow.com/questions/26857582/in-a-go-slice-why-does-slohi-end-at-element-hi-1
break
@ -118,11 +88,12 @@ func scannerThread(workchannel chan []byte, needle []byte, wg *sync.WaitGroup) {
// Print the result. Note that stdout is not necessarily
// concurrency-safe, so in a real application this would have to be
// passed through a channel.
os.Stdout.Write(batch[start:end])
os.Stdout.Write(buf[start:end])
// This is to keep track of where the end of the byte array is so we
// can avoid index out of bounds panics
linelen = linelen - end
}
end = 0
}
}
Loading…
Cancel
Save