Skip to content

Commit

Permalink
Simplify CreateSnapshotSchema.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Feb 2, 2024
1 parent c8a0074 commit 469cbc4
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ import com.evolutiongaming.scassandra.TableName

object CreateSnapshotSchema {

/** Creates Cassandra schema for storage of a snapshot.
*
* The class does not perform a schema migration if any of the tables are
* already present in a database, and relies on a caller to use a returned
* value to perfom the necessary migrations afterwards.
*
* @return
* Fully qualified table names, and `true` if all of the tables were
* created from scratch, or `false` if one or more of them were already
* present in a keyspace.
*/
def apply[F[_] : Concurrent : CassandraCluster : CassandraSession : CassandraSync : LogOf](
config: SnapshotSchemaConfig
config: SnapshotSchemaConfig
): F[(SnapshotSchema, MigrateSchema.Fresh)] = {

for {
Expand All @@ -30,35 +41,30 @@ object CreateSnapshotSchema {
def createTables1 = {
val keyspace = config.keyspace.name

def tableName(table: CreateTables.Table) = TableName(keyspace = keyspace, table = table.name)

def table(name: String, query: TableName => Nel[String]) = {
val tableName = TableName(keyspace = keyspace, table = name)
CreateTables.Table(name = name, queries = query(tableName))
}
val schema = SnapshotSchema(
snapshot = TableName(keyspace = keyspace, table = config.snapshotTable),
setting = TableName(keyspace = keyspace, table = config.settingTable)
)

val snapshot = table(config.snapshotTable, a => Nel.of(SnapshotStatements.createTable(a)))
val snapshotStatement = SnapshotStatements.createTable(schema.snapshot)
val settingStatement = SettingStatements.createTable(schema.setting)

val setting = table(config.settingTable, a => Nel.of(SettingStatements.createTable(a)))
val snapshot = CreateTables.Table(config.snapshotTable, snapshotStatement)
val setting = CreateTables.Table(config.settingTable, settingStatement)

val schema = SnapshotSchema(
snapshot = tableName(snapshot),
setting = tableName(setting))

if (config.autoCreate) {
for {
result <- createTables(keyspace, Nel.of(snapshot, setting))
} yield {
(schema, result)
val createSchema =
if (config.autoCreate) {
createTables(keyspace, Nel.of(snapshot, setting))
} else {
false.pure[F]
}
} else {
(schema, false).pure[F]
}

createSchema.map((schema, _))
}

for {
_ <- createKeyspace(config.keyspace)
result <- createTables1
} yield result
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

final case class SnapshotSchemaConfig(
keyspace: KeyspaceConfig = KeyspaceConfig.default,
keyspace: KeyspaceConfig = KeyspaceConfig.default.copy(name = "snapshot"),
snapshotTable: String = "snapshot_buffer",
settingTable: String = "setting",
locksTable: String = "locks",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,80 +1,124 @@
package com.evolutiongaming.kafka.journal.snapshot.cassandra

import cats.Id
import cats.data.{NonEmptyList => Nel}
import cats.data.{NonEmptyList => Nel, State}
import cats.syntax.all._
import com.evolutiongaming.kafka.journal.eventual.cassandra.{CreateKeyspace, CreateTables, KeyspaceConfig}
import com.evolutiongaming.kafka.journal.snapshot.cassandra.{CreateSnapshotSchema, SnapshotSchemaConfig}
import com.evolutiongaming.scassandra.TableName
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class CreateSnapshotSchemaSpec extends AnyFunSuite with Matchers { self =>
class CreateSchemaSpec extends AnyFunSuite {

type F[A] = State[Database, A]

test("create keyspace and tables") {
val config = SnapshotSchemaConfig.default
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables)
val initial = State.empty.copy(createTables = true)
val (state, (schema, fresh)) = createSchema.run(initial)
state shouldEqual initial.copy(actions = List(Action.CreateTables, Action.CreateKeyspace))
fresh shouldEqual true
schema shouldEqual self.schema
val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables)
val (database, (schema, fresh)) = createSchema.run(Database.empty).value
assert(database.keyspaces == List("snapshot"))
assert(
database.tables.sorted == List(
"snapshot.setting",
"snapshot.snapshot_buffer"
)
)
assert(fresh)
assert(schema == this.schema)
}

test("not create keyspace and tables") {
val config = SnapshotSchemaConfig.default.copy(autoCreate = false)
val createSchema = CreateSnapshotSchema[StateT](config, createKeyspace, createTables)
val initial = State.empty.copy(createTables = true)
val (state, (schema, fresh)) = createSchema.run(initial)
state shouldEqual initial.copy(actions = List(Action.CreateKeyspace))
fresh shouldEqual false
schema shouldEqual self.schema
val config = SnapshotSchemaConfig.default.copy(
autoCreate = false,
keyspace = SnapshotSchemaConfig.default.keyspace.copy(autoCreate = false)
)
val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables)
val (database, (schema, fresh)) = createSchema.run(Database.empty).value
assert(database.keyspaces == Nil)
assert(database.tables == Nil)
assert(!fresh)
assert(schema == this.schema)
}

test("create part of the tables") {
val config = SnapshotSchemaConfig.default.copy(
keyspace = SnapshotSchemaConfig.default.keyspace.copy(autoCreate = false)
)
val initialState = Database.empty.copy(
keyspaces = List("snapshot"),
tables = List("snapshot.setting")
)
val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables)
val (database, (schema, fresh)) = createSchema.run(initialState).value
assert(database.keyspaces == List("snapshot"))
assert(
database.tables.sorted == List(
"snapshot.setting",
"snapshot.snapshot_buffer"
)
)
assert(!fresh)
assert(schema == this.schema)
}

private val schema = SnapshotSchema(
snapshot = TableName(keyspace = "journal", table = "snapshot_buffer"),
setting = TableName(keyspace = "journal", table = "setting"))
snapshot = TableName(keyspace = "snapshot", table = "snapshot_buffer"),
setting = TableName(keyspace = "snapshot", table = "setting")
)

val createTables: CreateTables[StateT] = new CreateTables[StateT] {
val createTables: CreateTables[F] = new CreateTables[F] {
def apply(keyspace: String, tables: Nel[CreateTables.Table]) = {
StateT { state =>
val state1 = state.add(Action.CreateTables)
(state1, state.createTables)
val results = tables.traverse { table =>
assert(
table.queries.head.contains(
s"CREATE TABLE IF NOT EXISTS $keyspace.${table.name}"
)
)
Database.createTable(keyspace, table.name)
}
results.map(_.forall(_ == true))
}
}

val createKeyspace: CreateKeyspace[StateT] = new CreateKeyspace[StateT] {
def apply(config: KeyspaceConfig) = {
StateT { state =>
val state1 = state.add(Action.CreateKeyspace)
(state1, ())
}
}
val createKeyspace: CreateKeyspace[F] = new CreateKeyspace[F] {
def apply(config: KeyspaceConfig) =
if (config.autoCreate) Database.createKeyspace(config.name)
else ().pure[F]
}

case class Database(keyspaces: List[String], tables: List[String]) {

case class State(createTables: Boolean, actions: List[Action]) {
def existsKeyspace(keyspace: String): Boolean =
keyspaces.contains(keyspace)

def add(action: Action): State = copy(actions = action :: actions)
}
def createKeyspace(keyspace: String): Database =
this.copy(keyspaces = keyspace :: keyspaces)

object State {
val empty: State = State(createTables = false, actions = Nil)
}
def existsTable(keyspace: String, name: String): Boolean =
tables.contains(s"$keyspace.$name")

def createTable(keyspace: String, name: String): Database =
this.copy(tables = s"$keyspace.$name" :: tables)

type StateT[A] = cats.data.StateT[Id, State, A]

object StateT {
def apply[A](f: State => (State, A)): StateT[A] = cats.data.StateT[Id, State, A](f)
}

object Database {

val empty: Database = Database(keyspaces = Nil, tables = Nil)

sealed trait Action extends Product
def createKeyspace(keyspace: String): F[Unit] =
State.modify(_.createKeyspace(keyspace))

def createTable(keyspace: String, name: String): F[Boolean] =
State { database =>
if (!database.existsKeyspace(keyspace)) {
fail(s"Keyspace '$keyspace' does not exist")
}
if (database.existsTable(keyspace, name)) {
(database, false)
} else {
(database.createTable(keyspace, name), true)
}
}

object Action {
case object CreateTables extends Action
case object CreateKeyspace extends Action
}

}

0 comments on commit 469cbc4

Please sign in to comment.