iterx: paging iteration working

We used to rely upon NumRows to determine if there are
new pages available. This is not correct since the server
is allowed to return empty pages with has_more_data flag
set and the needed data to do this is not exposed by
the gocql driver.

We simply remove these checks and let the driver decide
when to stop reading.

Co-authored-by: Henrik Johansson <henrik@scylladb.com>
Co-authored-by: Piotr Sarna <sarna@scylladb.com>
This commit is contained in:
Henrik Johansson
2019-05-31 09:15:02 +02:00
committed by Michal Matczuk
parent eddedae353
commit 8b6f083cdc
2 changed files with 45 additions and 16 deletions

View File

@@ -80,11 +80,6 @@ func (iter *Iterx) scanAny(dest interface{}, structOnly bool) bool {
return false return false
} }
// no results or query error
if iter.Iter.NumRows() == 0 {
return false
}
base := reflectx.Deref(value.Type()) base := reflectx.Deref(value.Type())
scannable := isScannable(base) scannable := isScannable(base)
@@ -132,11 +127,6 @@ func (iter *Iterx) scanAll(dest interface{}, structOnly bool) bool {
return false return false
} }
// no results or query error
if iter.Iter.NumRows() == 0 {
return false
}
slice, err := baseType(value.Type(), reflect.Slice) slice, err := baseType(value.Type(), reflect.Slice)
if err != nil { if err != nil {
iter.err = err iter.err = err
@@ -180,7 +170,7 @@ func (iter *Iterx) scanAll(dest interface{}, structOnly bool) bool {
// allocate memory for the page data // allocate memory for the page data
if !alloc { if !alloc {
v = reflect.MakeSlice(slice, 0, iter.Iter.NumRows()) v = reflect.MakeSlice(slice, 0, iter.NumRows())
alloc = true alloc = true
} }
@@ -212,11 +202,6 @@ func (iter *Iterx) StructScan(dest interface{}) bool {
return false return false
} }
// no results or query error
if iter.Iter.NumRows() == 0 {
return false
}
if !iter.started { if !iter.started {
columns := columnNames(iter.Iter.Columns()) columns := columnNames(iter.Iter.Columns())
m := iter.Mapper m := iter.Mapper

View File

@@ -16,6 +16,7 @@ import (
"github.com/gocql/gocql" "github.com/gocql/gocql"
"github.com/scylladb/gocqlx" "github.com/scylladb/gocqlx"
. "github.com/scylladb/gocqlx/gocqlxtest" . "github.com/scylladb/gocqlx/gocqlxtest"
"github.com/scylladb/gocqlx/qb"
"gopkg.in/inf.v0" "gopkg.in/inf.v0"
) )
@@ -327,3 +328,46 @@ func TestNotFound(t *testing.T) {
} }
}) })
} }
func TestPaging(t *testing.T) {
session := CreateSession(t)
defer session.Close()
if err := ExecStmt(session, `CREATE TABLE gocqlx_test.paging_table (id int PRIMARY KEY, val int)`); err != nil {
t.Fatal("create table:", err)
}
if err := ExecStmt(session, `CREATE INDEX id_val_index ON gocqlx_test.paging_table (val)`); err != nil {
t.Fatal("create index:", err)
}
stmt, names := qb.Insert("gocqlx_test.paging_table").Columns("id", "val").ToCql()
q := gocqlx.Query(session.Query(stmt), names)
for i := 0; i < 5000; i++ {
if err := q.Bind(i, i).Exec(); err != nil {
t.Fatal(err)
}
}
type Paging struct {
ID int
Val int
}
t.Run("iter", func(t *testing.T) {
stmt, names := qb.Select("gocqlx_test.paging_table").
Where(qb.Lt("val")).
AllowFiltering().
Columns("id", "val").ToCql()
it := gocqlx.Query(session.Query(stmt, 100).PageSize(10), names).Iter()
defer it.Close()
var cnt int
for {
p := &Paging{}
if !it.StructScan(p) {
break
}
cnt++
}
if cnt != 100 {
t.Fatal("expected 100", "got", cnt)
}
})
}