From 69f6f201f262416d881000f45cf1758310770596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Matczuk?= Date: Wed, 2 Dec 2020 14:08:49 +0100 Subject: [PATCH] dbutil: RewriteTable, generalize table.RewriteRows to copy from source table to destination table with row transformation --- Makefile | 3 +- dbutil/doc.go | 6 ++ dbutil/rewrite.go | 37 +++++++++++++ dbutil/rewrite_test.go | 121 +++++++++++++++++++++++++++++++++++++++++ table/rewrite.go | 31 ----------- table/rewrite_test.go | 59 -------------------- 6 files changed, 166 insertions(+), 91 deletions(-) create mode 100644 dbutil/doc.go create mode 100644 dbutil/rewrite.go create mode 100644 dbutil/rewrite_test.go delete mode 100644 table/rewrite.go delete mode 100644 table/rewrite_test.go diff --git a/Makefile b/Makefile index f2a0e0f..01e4eb8 100644 --- a/Makefile +++ b/Makefile @@ -29,9 +29,10 @@ GOTEST := go test -cpu $(GOTEST_CPU) -count=1 -cover -race -tags all .PHONY: test test: @$(GOTEST) . - @$(GOTEST) ./migrate @$(GOTEST) ./qb @$(GOTEST) ./table + @$(GOTEST) ./migrate + @$(GOTEST) ./dbutil .PHONY: bench bench: diff --git a/dbutil/doc.go b/dbutil/doc.go new file mode 100644 index 0000000..e852baa --- /dev/null +++ b/dbutil/doc.go @@ -0,0 +1,6 @@ +// Copyright (C) 2017 ScyllaDB +// Use of this source code is governed by a ALv2-style +// license that can be found in the LICENSE file. + +// Package dbutil provides various utilities built on top of core gocqlx modules. +package dbutil diff --git a/dbutil/rewrite.go b/dbutil/rewrite.go new file mode 100644 index 0000000..943e9b1 --- /dev/null +++ b/dbutil/rewrite.go @@ -0,0 +1,37 @@ +// Copyright (C) 2017 ScyllaDB +// Use of this source code is governed by a ALv2-style +// license that can be found in the LICENSE file. + +package dbutil + +import ( + "github.com/scylladb/gocqlx/v2" + "github.com/scylladb/gocqlx/v2/table" +) + +// RewriteTable rewrites src table to dst table. +// Rows can be transformed using the transform function. +// Additional options can be passed to modify the insert query. +func RewriteTable(session gocqlx.Session, dst, src *table.Table, transform func(map[string]interface{}), options ...func(q *gocqlx.Queryx)) error { + insert := dst.InsertQuery(session) + defer insert.Release() + + // Apply query options + for _, o := range options { + o(insert) + } + + // Iterate over all rows and reinsert them to dst table + iter := session.Query(src.SelectAll()).Iter() + m := make(map[string]interface{}) + for iter.MapScan(m) { + if transform != nil { + transform(m) + } + if err := insert.BindMap(m).Exec(); err != nil { + return err + } + m = map[string]interface{}{} + } + return iter.Close() +} diff --git a/dbutil/rewrite_test.go b/dbutil/rewrite_test.go new file mode 100644 index 0000000..851bd6b --- /dev/null +++ b/dbutil/rewrite_test.go @@ -0,0 +1,121 @@ +// Copyright (C) 2017 ScyllaDB +// Use of this source code is governed by a ALv2-style +// license that can be found in the LICENSE file. + +// +build all integration + +package dbutil_test + +import ( + "testing" + "time" + + "github.com/scylladb/gocqlx/v2/dbutil" + . "github.com/scylladb/gocqlx/v2/gocqlxtest" + "github.com/scylladb/gocqlx/v2/qb" + "github.com/scylladb/gocqlx/v2/table" +) + +func TestRewriteTableTTL(t *testing.T) { + session := CreateSession(t) + defer session.Close() + + if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table (testtext text PRIMARY KEY)`); err != nil { + t.Fatal("create table:", err) + } + + tbl := table.New(table.Metadata{ + Name: "gocqlx_test.rewrite_table", + Columns: []string{"testtext"}, + PartKey: []string{"testtext"}, + }) + + // Insert data with 500ms TTL + q := tbl.InsertBuilder().TTL(500 * time.Millisecond).Query(session) + if err := q.Bind("a").Exec(); err != nil { + t.Fatal("insert:", err) + } + if err := q.Bind("b").Exec(); err != nil { + t.Fatal("insert:", err) + } + if err := q.Bind("c").Exec(); err != nil { + t.Fatal("insert:", err) + } + + // Rewrite data without TTL + if err := dbutil.RewriteTable(session, tbl, tbl, nil); err != nil { + t.Fatal("rewrite:", err) + } + + // Wait and check if data persisted + time.Sleep(time.Second) + + var n int + if err := qb.Select(tbl.Name()).CountAll().Query(session).Scan(&n); err != nil { + t.Fatal("scan:", err) + } + if n != 3 { + t.Fatal("expected 3 entries") + } +} + +func TestRewriteTableClone(t *testing.T) { + session := CreateSession(t) + defer session.Close() + + if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table_clone_src (testtext text PRIMARY KEY, testint int)`); err != nil { + t.Fatal("create table:", err) + } + + src := table.New(table.Metadata{ + Name: "gocqlx_test.rewrite_table_clone_src", + Columns: []string{"testtext", "testint"}, + PartKey: []string{"testtext"}, + }) + + if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table_clone_dst (testtext text PRIMARY KEY, testfloat float)`); err != nil { + t.Fatal("create table:", err) + } + + dst := table.New(table.Metadata{ + Name: "gocqlx_test.rewrite_table_clone_dst", + Columns: []string{"testtext", "testfloat"}, + PartKey: []string{"testtext"}, + }) + + // Insert data + q := src.InsertBuilder().Query(session) + if err := q.Bind("a", 1).Exec(); err != nil { + t.Fatal("insert:", err) + } + if err := q.Bind("b", 2).Exec(); err != nil { + t.Fatal("insert:", err) + } + if err := q.Bind("c", 3).Exec(); err != nil { + t.Fatal("insert:", err) + } + + transformer := func(m map[string]interface{}) { + m["testfloat"] = float32(m["testint"].(int)) + } + + // Rewrite data + if err := dbutil.RewriteTable(session, dst, src, transformer); err != nil { + t.Fatal("rewrite:", err) + } + + var n int + if err := qb.Select(dst.Name()).CountAll().Query(session).Scan(&n); err != nil { + t.Fatal("scan:", err) + } + if n != 3 { + t.Fatal("expected 3 entries") + } + var f float32 + if err := dst.GetQuery(session, "testfloat").Bind("a").Scan(&f); err != nil { + t.Fatal("scan:", err) + } + if f != 1 { + t.Fatal("expected 1") + } +} diff --git a/table/rewrite.go b/table/rewrite.go deleted file mode 100644 index 321596f..0000000 --- a/table/rewrite.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2017 ScyllaDB -// Use of this source code is governed by a ALv2-style -// license that can be found in the LICENSE file. - -package table - -import ( - "github.com/scylladb/gocqlx/v2" -) - -// RewriteRows performs a sequential rewrite of all rows in a table. -func RewriteRows(session gocqlx.Session, t *Table, options ...func(q *gocqlx.Queryx)) error { - insert := t.InsertQuery(session) - defer insert.Release() - - // Apply query options - for _, o := range options { - o(insert) - } - - // Iterate over all rows and reinsert them - iter := session.Query(t.SelectAll()).Iter() - m := make(map[string]interface{}) - for iter.MapScan(m) { - if err := insert.BindMap(m).Exec(); err != nil { - return err - } - m = make(map[string]interface{}) - } - return iter.Close() -} diff --git a/table/rewrite_test.go b/table/rewrite_test.go deleted file mode 100644 index 6aa8034..0000000 --- a/table/rewrite_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2017 ScyllaDB -// Use of this source code is governed by a ALv2-style -// license that can be found in the LICENSE file. - -// +build all integration - -package table_test - -import ( - "testing" - "time" - - . "github.com/scylladb/gocqlx/v2/gocqlxtest" - "github.com/scylladb/gocqlx/v2/qb" - "github.com/scylladb/gocqlx/v2/table" -) - -func TestRewriteRows(t *testing.T) { - session := CreateSession(t) - defer session.Close() - - if err := session.ExecStmt(`CREATE TABLE gocqlx_test.rewrite_table (testtext text PRIMARY KEY)`); err != nil { - t.Fatal("create table:", err) - } - - tbl := table.New(table.Metadata{ - Name: "gocqlx_test.rewrite_table", - Columns: []string{"testtext"}, - PartKey: []string{"testtext"}, - }) - - // Insert data with 500ms TTL - q := tbl.InsertBuilder().TTL(500 * time.Millisecond).Query(session) - if err := q.Bind("a").Exec(); err != nil { - t.Fatal("insert:", err) - } - if err := q.Bind("b").Exec(); err != nil { - t.Fatal("insert:", err) - } - if err := q.Bind("c").Exec(); err != nil { - t.Fatal("insert:", err) - } - - // Rewrite data without TTL - if err := table.RewriteRows(session, tbl); err != nil { - t.Fatal("rewrite:", err) - } - - // Wait and check if data persisted - time.Sleep(time.Second) - - var n int - if err := qb.Select(tbl.Name()).CountAll().Query(session).Scan(&n); err != nil { - t.Fatal("scan:", err) - } - if n != 3 { - t.Fatal("expected 3 entries") - } -}