dbutil: RewriteTable, generalize table.RewriteRows to copy from source table to destination table with row transformation
This commit is contained in:
committed by
Michal Jan Matczuk
parent
13ef8ceaf1
commit
69f6f201f2
3
Makefile
3
Makefile
@@ -29,9 +29,10 @@ GOTEST := go test -cpu $(GOTEST_CPU) -count=1 -cover -race -tags all
|
|||||||
.PHONY: test
|
.PHONY: test
|
||||||
test:
|
test:
|
||||||
@$(GOTEST) .
|
@$(GOTEST) .
|
||||||
@$(GOTEST) ./migrate
|
|
||||||
@$(GOTEST) ./qb
|
@$(GOTEST) ./qb
|
||||||
@$(GOTEST) ./table
|
@$(GOTEST) ./table
|
||||||
|
@$(GOTEST) ./migrate
|
||||||
|
@$(GOTEST) ./dbutil
|
||||||
|
|
||||||
.PHONY: bench
|
.PHONY: bench
|
||||||
bench:
|
bench:
|
||||||
|
|||||||
6
dbutil/doc.go
Normal file
6
dbutil/doc.go
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
// Copyright (C) 2017 ScyllaDB
|
||||||
|
// Use of this source code is governed by a ALv2-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// Package dbutil provides various utilities built on top of core gocqlx modules.
|
||||||
|
package dbutil
|
||||||
37
dbutil/rewrite.go
Normal file
37
dbutil/rewrite.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
// Copyright (C) 2017 ScyllaDB
|
||||||
|
// Use of this source code is governed by a ALv2-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package dbutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/scylladb/gocqlx/v2"
|
||||||
|
"github.com/scylladb/gocqlx/v2/table"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RewriteTable rewrites src table to dst table.
|
||||||
|
// Rows can be transformed using the transform function.
|
||||||
|
// Additional options can be passed to modify the insert query.
|
||||||
|
func RewriteTable(session gocqlx.Session, dst, src *table.Table, transform func(map[string]interface{}), options ...func(q *gocqlx.Queryx)) error {
|
||||||
|
insert := dst.InsertQuery(session)
|
||||||
|
defer insert.Release()
|
||||||
|
|
||||||
|
// Apply query options
|
||||||
|
for _, o := range options {
|
||||||
|
o(insert)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over all rows and reinsert them to dst table
|
||||||
|
iter := session.Query(src.SelectAll()).Iter()
|
||||||
|
m := make(map[string]interface{})
|
||||||
|
for iter.MapScan(m) {
|
||||||
|
if transform != nil {
|
||||||
|
transform(m)
|
||||||
|
}
|
||||||
|
if err := insert.BindMap(m).Exec(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
return iter.Close()
|
||||||
|
}
|
||||||
121
dbutil/rewrite_test.go
Normal file
121
dbutil/rewrite_test.go
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
// Copyright (C) 2017 ScyllaDB
|
||||||
|
// Use of this source code is governed by a ALv2-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// +build all integration
|
||||||
|
|
||||||
|
package dbutil_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/scylladb/gocqlx/v2/dbutil"
|
||||||
|
. "github.com/scylladb/gocqlx/v2/gocqlxtest"
|
||||||
|
"github.com/scylladb/gocqlx/v2/qb"
|
||||||
|
"github.com/scylladb/gocqlx/v2/table"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRewriteTableTTL(t *testing.T) {
|
||||||
|
session := CreateSession(t)
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table (testtext text PRIMARY KEY)`); err != nil {
|
||||||
|
t.Fatal("create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tbl := table.New(table.Metadata{
|
||||||
|
Name: "gocqlx_test.rewrite_table",
|
||||||
|
Columns: []string{"testtext"},
|
||||||
|
PartKey: []string{"testtext"},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Insert data with 500ms TTL
|
||||||
|
q := tbl.InsertBuilder().TTL(500 * time.Millisecond).Query(session)
|
||||||
|
if err := q.Bind("a").Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
if err := q.Bind("b").Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
if err := q.Bind("c").Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rewrite data without TTL
|
||||||
|
if err := dbutil.RewriteTable(session, tbl, tbl, nil); err != nil {
|
||||||
|
t.Fatal("rewrite:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait and check if data persisted
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := qb.Select(tbl.Name()).CountAll().Query(session).Scan(&n); err != nil {
|
||||||
|
t.Fatal("scan:", err)
|
||||||
|
}
|
||||||
|
if n != 3 {
|
||||||
|
t.Fatal("expected 3 entries")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRewriteTableClone(t *testing.T) {
|
||||||
|
session := CreateSession(t)
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table_clone_src (testtext text PRIMARY KEY, testint int)`); err != nil {
|
||||||
|
t.Fatal("create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
src := table.New(table.Metadata{
|
||||||
|
Name: "gocqlx_test.rewrite_table_clone_src",
|
||||||
|
Columns: []string{"testtext", "testint"},
|
||||||
|
PartKey: []string{"testtext"},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table_clone_dst (testtext text PRIMARY KEY, testfloat float)`); err != nil {
|
||||||
|
t.Fatal("create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dst := table.New(table.Metadata{
|
||||||
|
Name: "gocqlx_test.rewrite_table_clone_dst",
|
||||||
|
Columns: []string{"testtext", "testfloat"},
|
||||||
|
PartKey: []string{"testtext"},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Insert data
|
||||||
|
q := src.InsertBuilder().Query(session)
|
||||||
|
if err := q.Bind("a", 1).Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
if err := q.Bind("b", 2).Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
if err := q.Bind("c", 3).Exec(); err != nil {
|
||||||
|
t.Fatal("insert:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
transformer := func(m map[string]interface{}) {
|
||||||
|
m["testfloat"] = float32(m["testint"].(int))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rewrite data
|
||||||
|
if err := dbutil.RewriteTable(session, dst, src, transformer); err != nil {
|
||||||
|
t.Fatal("rewrite:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var n int
|
||||||
|
if err := qb.Select(dst.Name()).CountAll().Query(session).Scan(&n); err != nil {
|
||||||
|
t.Fatal("scan:", err)
|
||||||
|
}
|
||||||
|
if n != 3 {
|
||||||
|
t.Fatal("expected 3 entries")
|
||||||
|
}
|
||||||
|
var f float32
|
||||||
|
if err := dst.GetQuery(session, "testfloat").Bind("a").Scan(&f); err != nil {
|
||||||
|
t.Fatal("scan:", err)
|
||||||
|
}
|
||||||
|
if f != 1 {
|
||||||
|
t.Fatal("expected 1")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,31 +0,0 @@
|
|||||||
// Copyright (C) 2017 ScyllaDB
|
|
||||||
// Use of this source code is governed by a ALv2-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
package table
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/scylladb/gocqlx/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RewriteRows performs a sequential rewrite of all rows in a table.
|
|
||||||
func RewriteRows(session gocqlx.Session, t *Table, options ...func(q *gocqlx.Queryx)) error {
|
|
||||||
insert := t.InsertQuery(session)
|
|
||||||
defer insert.Release()
|
|
||||||
|
|
||||||
// Apply query options
|
|
||||||
for _, o := range options {
|
|
||||||
o(insert)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over all rows and reinsert them
|
|
||||||
iter := session.Query(t.SelectAll()).Iter()
|
|
||||||
m := make(map[string]interface{})
|
|
||||||
for iter.MapScan(m) {
|
|
||||||
if err := insert.BindMap(m).Exec(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
return iter.Close()
|
|
||||||
}
|
|
||||||
@@ -1,59 +0,0 @@
|
|||||||
// Copyright (C) 2017 ScyllaDB
|
|
||||||
// Use of this source code is governed by a ALv2-style
|
|
||||||
// license that can be found in the LICENSE file.
|
|
||||||
|
|
||||||
// +build all integration
|
|
||||||
|
|
||||||
package table_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
. "github.com/scylladb/gocqlx/v2/gocqlxtest"
|
|
||||||
"github.com/scylladb/gocqlx/v2/qb"
|
|
||||||
"github.com/scylladb/gocqlx/v2/table"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRewriteRows(t *testing.T) {
|
|
||||||
session := CreateSession(t)
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table (testtext text PRIMARY KEY)`); err != nil {
|
|
||||||
t.Fatal("create table:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tbl := table.New(table.Metadata{
|
|
||||||
Name: "gocqlx_test.rewrite_table",
|
|
||||||
Columns: []string{"testtext"},
|
|
||||||
PartKey: []string{"testtext"},
|
|
||||||
})
|
|
||||||
|
|
||||||
// Insert data with 500ms TTL
|
|
||||||
q := tbl.InsertBuilder().TTL(500 * time.Millisecond).Query(session)
|
|
||||||
if err := q.Bind("a").Exec(); err != nil {
|
|
||||||
t.Fatal("insert:", err)
|
|
||||||
}
|
|
||||||
if err := q.Bind("b").Exec(); err != nil {
|
|
||||||
t.Fatal("insert:", err)
|
|
||||||
}
|
|
||||||
if err := q.Bind("c").Exec(); err != nil {
|
|
||||||
t.Fatal("insert:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rewrite data without TTL
|
|
||||||
if err := table.RewriteRows(session, tbl); err != nil {
|
|
||||||
t.Fatal("rewrite:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait and check if data persisted
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
var n int
|
|
||||||
if err := qb.Select(tbl.Name()).CountAll().Query(session).Scan(&n); err != nil {
|
|
||||||
t.Fatal("scan:", err)
|
|
||||||
}
|
|
||||||
if n != 3 {
|
|
||||||
t.Fatal("expected 3 entries")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user