diff --git a/node-generator/src/main/scala/com/wavesplatform/generator/Mode.scala b/node-generator/src/main/scala/com/wavesplatform/generator/Mode.scala index 703f1702c0..d9453a5d3e 100644 --- a/node-generator/src/main/scala/com/wavesplatform/generator/Mode.scala +++ b/node-generator/src/main/scala/com/wavesplatform/generator/Mode.scala @@ -1,7 +1,9 @@ package com.wavesplatform.generator -import pureconfig.generic.derivation.EnumConfigReader +import pureconfig.generic.derivation.EnumConfigReaderDerivation -enum Mode derives EnumConfigReader { +enum Mode derives ECD.EnumConfigReader { case WIDE, NARROW, DYN_WIDE, MULTISIG, ORACLE, SWARM } + +object ECD extends EnumConfigReaderDerivation(identity) \ No newline at end of file diff --git a/node-generator/src/main/scala/com/wavesplatform/generator/Preconditions.scala b/node-generator/src/main/scala/com/wavesplatform/generator/Preconditions.scala index ad35ee20b1..0083cd2ad3 100644 --- a/node-generator/src/main/scala/com/wavesplatform/generator/Preconditions.scala +++ b/node-generator/src/main/scala/com/wavesplatform/generator/Preconditions.scala @@ -1,7 +1,6 @@ package com.wavesplatform.generator import com.google.common.primitives.{Bytes, Ints} -import com.typesafe.config.Config import com.wavesplatform.account.{Address, KeyPair} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2 @@ -20,7 +19,6 @@ import pureconfig.ConfigReader import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} -import scala.collection.Factory import scala.util.Try object Preconditions { diff --git a/node-generator/src/main/scala/com/wavesplatform/generator/TransactionsGeneratorApp.scala b/node-generator/src/main/scala/com/wavesplatform/generator/TransactionsGeneratorApp.scala index 0ff3132db2..e6a5384e68 100644 --- a/node-generator/src/main/scala/com/wavesplatform/generator/TransactionsGeneratorApp.scala +++ b/node-generator/src/main/scala/com/wavesplatform/generator/TransactionsGeneratorApp.scala @@ -1,14 +1,13 @@ package com.wavesplatform.generator import cats.implicits.showInterpolator -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.ConfigFactory import com.wavesplatform.Application import com.wavesplatform.account.AddressScheme import com.wavesplatform.features.EstimatorProvider.* import com.wavesplatform.generator.GeneratorSettings.NodeAddress import com.wavesplatform.generator.Preconditions.{PGenSettings, UniverseHolder} import com.wavesplatform.generator.cli.ScoptImplicits -import com.wavesplatform.generator.config.FicusImplicits import com.wavesplatform.generator.utils.Universe import com.wavesplatform.network.client.NetworkSender import com.wavesplatform.transaction.Transaction @@ -18,239 +17,242 @@ import org.asynchttpclient.AsyncHttpClient import org.asynchttpclient.Dsl.asyncHttpClient import org.slf4j.LoggerFactory import pureconfig.* -import pureconfig.generic.derivation.* import scopt.OptionParser import java.io.File -import java.net.{InetSocketAddress, URI} import java.util.concurrent.Executors import scala.concurrent.* import scala.concurrent.duration.* import scala.util.{Failure, Random, Success} -object TransactionsGeneratorApp extends App with ScoptImplicits { - implicit val httpClient: AsyncHttpClient = asyncHttpClient() +object TransactionsGeneratorApp extends ScoptImplicits { - val log = LoggerFacade(LoggerFactory.getLogger("generator")) - val parser = new OptionParser[GeneratorSettings]("generator") { - head("TransactionsGenerator - Waves load testing transactions generator") - opt[File]('c', "configuration").valueName("").text("generator configuration path") - opt[FiniteDuration]('d', "delay").valueName("").text("delay between iterations").action { (v, c) => - c.copy(worker = c.worker.copy(delay = v)) - } - opt[Boolean]('r', "auto-reconnect").valueName("").text("reconnect on errors").action { (v, c) => - c.copy(worker = c.worker.copy(autoReconnect = v)) - } - help("help").text("display this help message") + def main(args: Array[String]): Unit = { + implicit val httpClient: AsyncHttpClient = asyncHttpClient() + val log = LoggerFacade(LoggerFactory.getLogger("generator")) - cmd("narrow") - .action { (_, c) => - c.copy(mode = Mode.NARROW) + val parser = new OptionParser[GeneratorSettings]("generator") { + head("TransactionsGenerator - Waves load testing transactions generator") + opt[File]('c', "configuration").valueName("").text("generator configuration path") + opt[FiniteDuration]('d', "delay").valueName("").text("delay between iterations").action { (v, c) => + c.copy(worker = c.worker.copy(delay = v)) } - .text("Run transactions between pre-defined accounts") - .children( - opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => - c.copy(narrow = c.narrow.copy(transactions = x)) + opt[Boolean]('r', "auto-reconnect").valueName("").text("reconnect on errors").action { (v, c) => + c.copy(worker = c.worker.copy(autoReconnect = v)) + } + help("help").text("display this help message") + + cmd("narrow") + .action { (_, c) => + c.copy(mode = Mode.NARROW) } - ) + .text("Run transactions between pre-defined accounts") + .children( + opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => + c.copy(narrow = c.narrow.copy(transactions = x)) + } + ) - cmd("wide") - .action { (_, c) => - c.copy(mode = Mode.WIDE) - } - .text("Run transactions those transfer funds to another accounts") - .children( - opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => - c.copy(wide = c.wide.copy(transactions = x)) - }, - opt[Option[Int]]("limit-accounts").abbr("la").optional().text("limit recipients").action { (x, c) => - c.copy(wide = c.wide.copy(limitDestAccounts = x)) + cmd("wide") + .action { (_, c) => + c.copy(mode = Mode.WIDE) } - ) + .text("Run transactions those transfer funds to another accounts") + .children( + opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => + c.copy(wide = c.wide.copy(transactions = x)) + }, + opt[Option[Int]]("limit-accounts").abbr("la").optional().text("limit recipients").action { (x, c) => + c.copy(wide = c.wide.copy(limitDestAccounts = x)) + } + ) - cmd("dyn-wide") - .action { (_, c) => - c.copy(mode = Mode.DYN_WIDE) - } - .text("Like wide, but the number of transactions is changed during the iteration") - .children( - opt[Int]("start").abbr("s").optional().text("initial amount of transactions").action { (x, c) => - c.copy(dynWide = c.dynWide.copy(start = x)) - }, - opt[Double]("grow-adder").abbr("g").optional().action { (x, c) => - c.copy(dynWide = c.dynWide.copy(growAdder = x)) - }, - opt[Int]("max").abbr("m").optional().action { (x, c) => - c.copy(dynWide = c.dynWide.copy(maxTxsPerRequest = Some(x))) - }, - opt[Option[Int]]("limit-accounts").abbr("la").optional().text("limit recipients").action { (x, c) => - c.copy(dynWide = c.dynWide.copy(limitDestAccounts = x)) + cmd("dyn-wide") + .action { (_, c) => + c.copy(mode = Mode.DYN_WIDE) } - ) + .text("Like wide, but the number of transactions is changed during the iteration") + .children( + opt[Int]("start").abbr("s").optional().text("initial amount of transactions").action { (x, c) => + c.copy(dynWide = c.dynWide.copy(start = x)) + }, + opt[Double]("grow-adder").abbr("g").optional().action { (x, c) => + c.copy(dynWide = c.dynWide.copy(growAdder = x)) + }, + opt[Int]("max").abbr("m").optional().action { (x, c) => + c.copy(dynWide = c.dynWide.copy(maxTxsPerRequest = Some(x))) + }, + opt[Option[Int]]("limit-accounts").abbr("la").optional().text("limit recipients").action { (x, c) => + c.copy(dynWide = c.dynWide.copy(limitDestAccounts = x)) + } + ) - cmd("multisig") - .action { (_, c) => - c.copy(mode = Mode.MULTISIG) - } - .text("Multisig cycle of funding, initializng and sending funds back") - .children( - opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => - c.copy(multisig = c.multisig.copy(transactions = x)) - }, - opt[Boolean]("first-run").abbr("first").optional().text("generate set multisig script transaction").action { (x, c) => - c.copy(multisig = c.multisig.copy(firstRun = x)) + cmd("multisig") + .action { (_, c) => + c.copy(mode = Mode.MULTISIG) } - ) + .text("Multisig cycle of funding, initializng and sending funds back") + .children( + opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => + c.copy(multisig = c.multisig.copy(transactions = x)) + }, + opt[Boolean]("first-run").abbr("first").optional().text("generate set multisig script transaction").action { (x, c) => + c.copy(multisig = c.multisig.copy(firstRun = x)) + } + ) - cmd("oracle") - .action { (_, c) => - c.copy(mode = Mode.ORACLE) - } - .text("Oracle load test") - .children( - opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => - c.copy(oracle = c.oracle.copy(transactions = x)) - }, - opt[Boolean]("enabled").abbr("e").optional().text("DataEnty value").action { (x, c) => - c.copy(multisig = c.multisig.copy(firstRun = x)) + cmd("oracle") + .action { (_, c) => + c.copy(mode = Mode.ORACLE) } - ) + .text("Oracle load test") + .children( + opt[Int]("transactions").abbr("t").optional().text("number of transactions").action { (x, c) => + c.copy(oracle = c.oracle.copy(transactions = x)) + }, + opt[Boolean]("enabled").abbr("e").optional().text("DataEnty value").action { (x, c) => + c.copy(multisig = c.multisig.copy(firstRun = x)) + } + ) - cmd("swarm") - .action { (_, c) => - c.copy(mode = Mode.SWARM) - } - .text("SetScript load test") - .children( - opt[Int]("scripts").abbr("st").optional().text("number of SetScripts transactions").action { (x, c) => - c.copy(swarm = c.swarm.copy(scripts = x)) - }, - opt[Int]("transfers").abbr("tt").optional().text("number of Transfer transactions").action { (x, c) => - c.copy(swarm = c.swarm.copy(transfers = x)) - }, - opt[Boolean]("complexity").abbr("ct").optional().text(" script complexity").action { (x, c) => - c.copy(swarm = c.swarm.copy(complexity = x)) - }, - opt[Int]("exchange").abbr("et").optional().text("number of exchange transactions").action { (x, c) => - c.copy(swarm = c.swarm.copy(exchange = x)) + cmd("swarm") + .action { (_, c) => + c.copy(mode = Mode.SWARM) } - ) - } + .text("SetScript load test") + .children( + opt[Int]("scripts").abbr("st").optional().text("number of SetScripts transactions").action { (x, c) => + c.copy(swarm = c.swarm.copy(scripts = x)) + }, + opt[Int]("transfers").abbr("tt").optional().text("number of Transfer transactions").action { (x, c) => + c.copy(swarm = c.swarm.copy(transfers = x)) + }, + opt[Boolean]("complexity").abbr("ct").optional().text(" script complexity").action { (x, c) => + c.copy(swarm = c.swarm.copy(complexity = x)) + }, + opt[Int]("exchange").abbr("et").optional().text("number of exchange transactions").action { (x, c) => + c.copy(swarm = c.swarm.copy(exchange = x)) + } + ) + } - val configParamParser = new OptionParser[File]("configuration") { - opt[String]('c', "configuration").action { case (c, _) => new File(c) } - override def errorOnUnknownArgument: Boolean = false - override def reportWarning(msg: String): Unit = () - } + val configParamParser = new OptionParser[File]("configuration") { + opt[String]('c', "configuration").action { case (c, _) => new File(c) } - val externalConf = - configParamParser - .parse(args, new File("generator.local.conf")) - .getOrElse(throw new RuntimeException("Failed to parse configuration path from command line parameters")) + override def errorOnUnknownArgument: Boolean = false - val wavesSettings = Application.loadApplicationConfig(if (externalConf.isFile) Some(externalConf) else None) + override def reportWarning(msg: String): Unit = () + } - val defaultConfig = - ConfigSource.fromConfig(wavesSettings.config).at("waves.generator").loadOrThrow[GeneratorSettings] + val externalConf = + configParamParser + .parse(args, new File("generator.local.conf")) + .getOrElse(throw new RuntimeException("Failed to parse configuration path from command line parameters")) - parser.parse(args, defaultConfig) match { - case None => parser.failure("Failed to parse command line parameters") - case Some(finalConfig) => - log.info(show"The final configuration: \n$finalConfig") + val wavesSettings = Application.loadApplicationConfig(if (externalConf.isFile) Some(externalConf) else None) - AddressScheme.current = new AddressScheme { - override val chainId: Byte = finalConfig.addressScheme.toByte - } + val defaultConfig = + ConfigSource.fromConfig(wavesSettings.config).at("waves.generator").loadOrThrow[GeneratorSettings] - val time = new NTP("pool.ntp.org") + parser.parse(args, defaultConfig) match { + case None => parser.failure("Failed to parse command line parameters") + case Some(finalConfig) => + log.info(show"The final configuration: \n$finalConfig") - val preconditions = - ConfigSource.file("preconditions.conf").at("preconditions").loadOrThrow[Option[PGenSettings]] + AddressScheme.current = new AddressScheme { + override val chainId: Byte = finalConfig.addressScheme.toByte + } - val estimator = wavesSettings.estimator + val time = new NTP("pool.ntp.org") - val (universe, initialUniTransactions, initialTailTransactions) = preconditions - .fold((UniverseHolder(), List.empty[Transaction], List.empty[Transaction]))(Preconditions.mk(_, time, estimator)) + val preconditions = + ConfigSource.fromConfig(ConfigFactory.load("preconditions.conf")).at("preconditions").loadOrThrow[Option[PGenSettings]] - Universe.Accounts = universe.accounts - Universe.IssuedAssets = universe.issuedAssets - Universe.Leases = universe.leases + val estimator = wavesSettings.estimator - val generator: TransactionGenerator = finalConfig.mode match { - case Mode.NARROW => NarrowTransactionGenerator(finalConfig.narrow, finalConfig.privateKeyAccounts, time, estimator) - case Mode.WIDE => new WideTransactionGenerator(finalConfig.wide, finalConfig.privateKeyAccounts) - case Mode.DYN_WIDE => new DynamicWideTransactionGenerator(finalConfig.dynWide, finalConfig.privateKeyAccounts) - case Mode.MULTISIG => new MultisigTransactionGenerator(finalConfig.multisig, finalConfig.privateKeyAccounts, estimator) - case Mode.ORACLE => new OracleTransactionGenerator(finalConfig.oracle, finalConfig.privateKeyAccounts, estimator) - case Mode.SWARM => new SmartGenerator(finalConfig.swarm, finalConfig.privateKeyAccounts, estimator) - } + val (universe, initialUniTransactions, initialTailTransactions) = preconditions + .fold((UniverseHolder(), List.empty[Transaction], List.empty[Transaction]))(Preconditions.mk(_, time, estimator)) - val threadPool = Executors.newFixedThreadPool(Math.max(1, finalConfig.sendTo.size)) - implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(threadPool) + Universe.Accounts = universe.accounts + Universe.IssuedAssets = universe.issuedAssets + Universe.Leases = universe.leases - val sender = new NetworkSender(wavesSettings.networkSettings.trafficLogger, finalConfig.addressScheme, "generator", nonce = Random.nextLong()) + val generator: TransactionGenerator = finalConfig.mode match { + case Mode.NARROW => NarrowTransactionGenerator(finalConfig.narrow, finalConfig.privateKeyAccounts, time, estimator) + case Mode.WIDE => new WideTransactionGenerator(finalConfig.wide, finalConfig.privateKeyAccounts) + case Mode.DYN_WIDE => new DynamicWideTransactionGenerator(finalConfig.dynWide, finalConfig.privateKeyAccounts) + case Mode.MULTISIG => new MultisigTransactionGenerator(finalConfig.multisig, finalConfig.privateKeyAccounts, estimator) + case Mode.ORACLE => new OracleTransactionGenerator(finalConfig.oracle, finalConfig.privateKeyAccounts, estimator) + case Mode.SWARM => new SmartGenerator(finalConfig.swarm, finalConfig.privateKeyAccounts, estimator) + } - sys.addShutdownHook(sender.close()) + val threadPool = Executors.newFixedThreadPool(Math.max(1, finalConfig.sendTo.size)) + implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(threadPool) - @volatile - var canContinue = true + val sender = new NetworkSender(wavesSettings.networkSettings.trafficLogger, finalConfig.addressScheme, "generator", nonce = Random.nextLong()) - sys.addShutdownHook { - log.error("Stopping generator") - canContinue = false - } + sys.addShutdownHook(sender.close()) - if (finalConfig.worker.workingTime > Duration.Zero) { - log.info(s"Generator will be stopped after ${finalConfig.worker.workingTime}") + @volatile + var canContinue = true - Scheduler.global.scheduleOnce(finalConfig.worker.workingTime) { - log.warn(s"Stopping generator after: ${finalConfig.worker.workingTime}") + sys.addShutdownHook { + log.error("Stopping generator") canContinue = false } - } - val initialGenTransactions = generator.initial - val initialGenTailTransactions = generator.tailInitial - - log.info(s"Universe precondition transactions size: ${initialUniTransactions.size}") - log.info(s"Generator precondition transactions size: ${initialGenTransactions.size}") - log.info(s"Universe precondition tail transactions size: ${initialTailTransactions.size}") - log.info(s"Generator precondition tail transactions size: ${initialGenTailTransactions.size}") - - val workers = finalConfig.sendTo.map { case NodeAddress(node, nodeRestUrl) => - log.info(s"Creating worker: ${node.getHostString}:${node.getPort}") - // new Worker(finalConfig.worker, sender, node, generator, initialTransactions.map(RawBytes.from)) - new Worker( - finalConfig.worker, - Iterator.continually(generator.next()).flatten, - sender, - node, - nodeRestUrl, - () => canContinue, - initialUniTransactions ++ initialGenTransactions, - finalConfig.privateKeyAccounts.map(_.toAddress.toString), - initialTailTransactions ++ initialGenTailTransactions - ) - } + if (finalConfig.worker.workingTime > Duration.Zero) { + log.info(s"Generator will be stopped after ${finalConfig.worker.workingTime}") - def close(status: Int): Unit = { - sender.close() - time.close() - threadPool.shutdown() - System.exit(status) - } + Scheduler.global.scheduleOnce(finalConfig.worker.workingTime) { + log.warn(s"Stopping generator after: ${finalConfig.worker.workingTime}") + canContinue = false + } + } - Future - .sequence(workers.map(_.run())) - .onComplete { - case Success(_) => - log.info("Done") - close(0) + val initialGenTransactions = generator.initial + val initialGenTailTransactions = generator.tailInitial + + log.info(s"Universe precondition transactions size: ${initialUniTransactions.size}") + log.info(s"Generator precondition transactions size: ${initialGenTransactions.size}") + log.info(s"Universe precondition tail transactions size: ${initialTailTransactions.size}") + log.info(s"Generator precondition tail transactions size: ${initialGenTailTransactions.size}") + + val workers = finalConfig.sendTo.map { case NodeAddress(node, nodeRestUrl) => + log.info(s"Creating worker: ${node.getHostString}:${node.getPort}") + // new Worker(finalConfig.worker, sender, node, generator, initialTransactions.map(RawBytes.from)) + new Worker( + finalConfig.worker, + Iterator.continually(generator.next()).flatten, + sender, + node, + nodeRestUrl, + () => canContinue, + initialUniTransactions ++ initialGenTransactions, + finalConfig.privateKeyAccounts.map(_.toAddress.toString), + initialTailTransactions ++ initialGenTailTransactions + ) + } - case Failure(e) => - log.error("Failed", e) - close(1) + def close(status: Int): Unit = { + sender.close() + time.close() + threadPool.shutdown() + System.exit(status) } + + Future + .sequence(workers.map(_.run())) + .onComplete { + case Success(_) => + log.info("Done") + close(0) + + case Failure(e) => + log.error("Failed", e) + close(1) + } + } } } diff --git a/node-generator/src/main/scala/com/wavesplatform/generator/config/FicusImplicits.scala b/node-generator/src/main/scala/com/wavesplatform/generator/config/FicusImplicits.scala index d27625cbec..dc3e0a7fc5 100644 --- a/node-generator/src/main/scala/com/wavesplatform/generator/config/FicusImplicits.scala +++ b/node-generator/src/main/scala/com/wavesplatform/generator/config/FicusImplicits.scala @@ -9,10 +9,9 @@ import com.wavesplatform.transaction.TransactionType.TransactionType import com.wavesplatform.transaction.{TransactionParser, TransactionParsers, TransactionType} import play.api.libs.json.* import pureconfig.* -import pureconfig.error.{CannotParse, ConfigReaderFailures, ThrowableFailure} +import pureconfig.error.ThrowableFailure -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal trait FicusImplicits { @@ -53,7 +52,7 @@ trait FicusImplicits { warmUpStart <- warmUpConfig.required[Int]("start") warmUpEnd <- warmUpConfig.optionalWithDefault[Int]("end", utxLimit) warmUpStep <- warmUpConfig.required[Int]("step") - warmUpDuration <- warmUpConfig.required[Option[FiniteDuration]](s"duration") + warmUpDuration <- warmUpConfig.optionalWithDefault[Option[FiniteDuration]]("duration", None) warmUpOnce <- warmUpConfig.optionalWithDefault[Boolean]("once", true) } yield Worker.WarmUp(warmUpStart, warmUpEnd, warmUpStep, warmUpDuration, warmUpOnce) @@ -66,8 +65,10 @@ trait FicusImplicits { reconnectDelay <- obj.required[FiniteDuration]("reconnect-delay") warmUpObj <- obj.atKey("warm-up").flatMap(_.asObjectCursor) warmUp <- readWarmUp(warmUpObj, utxLimit) - initWarmUpObj <- obj.atKeyOrUndefined("init-warm-up").asObjectCursor - initWarmUp <- readWarmUp(initWarmUpObj, utxLimit).fold(_ => Right(None), v => Right(Some(v))) + initWarmUp <- obj + .atKeyOrUndefined("init-warm-up") + .asObjectCursor + .fold[ConfigReader.Result[Option[Worker.WarmUp]]](_ => Right(None), v => readWarmUp(v, utxLimit).map(v => Some(v))) initialDelay <- readInitialDelay(obj, "initial-delay", delay) tailInitialDelay <- readInitialDelay(obj, "tail-initial-delay", delay) } yield Worker.Settings(utxLimit, delay, tailInitialDelay, initialDelay, workingTime, autoReconnect, reconnectDelay, warmUp, initWarmUp)