Skip to content

Commit

Permalink
DEX-725 Correct logic after storing into queue failure (#250)
Browse files Browse the repository at this point in the history
 * Removing from active orders if case of failure of the storing of places
 * Do nothing in case of failure of the storing of cancels
  • Loading branch information
DenisSerebryansky authored Apr 29, 2020
1 parent 0b5f6f6 commit a9bc71c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,31 @@ class KafkaIssuesTestSuite extends MatcherSuiteBase {

"Matcher should free reserved balances if order wasn't placed into the queue" in {

placeAndAwaitAtDex(mkOrderDP(alice, wavesUsdPair, SELL, 10.waves, 3.0))
val order = mkOrderDP(alice, wavesUsdPair, SELL, 10.waves, 3.0)
placeAndAwaitAtDex(order)

dex1.api.currentOffset shouldBe 0
dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 10.003.waves))

disconnectKafkaFromNetwork()

dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0))
dex1.api.tryCancel(alice, order) shouldBe 'left
dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0)) shouldBe 'left

dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 10.003.waves))

val oh = dex1.api.orderHistory(alice, Some(true))
oh should have size 1
oh.head.id shouldBe order.id()

connectKafkaToNetwork()

dex1.api.tryCancel(alice, order) shouldBe 'right
dex1.api.orderHistory(alice, Some(true)) should have size 0
dex1.api.reservedBalance(alice) shouldBe empty

dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0)) shouldBe 'right
dex1.api.orderHistory(alice, Some(true)) should have size 1
dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 30.003.waves))
}
}
20 changes: 13 additions & 7 deletions dex/src/main/scala/com/wavesplatform/dex/AddressActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,18 @@ class AddressActor(owner: Address,
val orders = if (onlyActive) matchingActiveOrders else matchingActiveOrders ++ orderDB.getFinalizedOrders(owner, maybePair)
sender ! Reply.OrdersStatuses(orders)

case event: Event.StoreFailed =>
log.trace(s"Got $event")
pendingCommands.remove(event.orderId).foreach { item =>
item.client ! CanNotPersist(event.reason)
case storeFailed @ Event.StoreFailed(orderId, reason, queueEvent) =>
log.trace(s"Got $storeFailed")
pendingCommands.remove(orderId).foreach { item =>
item.client ! CanNotPersist(reason)
}
queueEvent match {
case QueueEvent.Placed(_) | QueueEvent.PlacedMarket(_) =>
activeOrders.remove(orderId).foreach { ao =>
openVolume = openVolume |-| ao.reservableBalance
}
case _ =>
}
openVolume = openVolume |-| activeOrders(event.orderId).reservableBalance

case event: ValidationEvent =>
log.trace(s"Got $event")
Expand Down Expand Up @@ -389,7 +395,7 @@ class AddressActor(owner: Address,
Success(Some(error.CanNotPersistEvent))
}
.onComplete {
case Success(Some(error)) => self ! Event.StoreFailed(orderId, error)
case Success(Some(error)) => self ! Event.StoreFailed(orderId, error, event)
case Success(None) => log.trace(s"$event saved")
case _ => throw new IllegalStateException("Impossibru")
}
Expand Down Expand Up @@ -477,7 +483,7 @@ object AddressActor {
case class ValidationPassed(acceptedOrder: AcceptedOrder) extends ValidationEvent {
override def orderId: Order.Id = acceptedOrder.order.id()
}
case class StoreFailed(orderId: Order.Id, reason: MatcherError) extends Event
case class StoreFailed(orderId: Order.Id, reason: MatcherError, queueEvent: QueueEvent) extends Event
}

private case class CancelExpiredOrder(orderId: ByteStr)
Expand Down

0 comments on commit a9bc71c

Please sign in to comment.