Skip to content

Commit 836f80f

Browse files
authored
Use temporary store for segment index for compound annotations (#8422)
1 parent 174acdf commit 836f80f

File tree

5 files changed

+119
-39
lines changed

5 files changed

+119
-39
lines changed

CHANGELOG.unreleased.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
1414

1515
### Changed
1616
- When using a zarr link to a wk-served data layer as another layer’s source, the user’s token is used to access the data. [#8322](https://github.com/scalableminds/webknossos/pull/8322/)
17+
- Compound annotations (created when viewing all annotations of a task) no longer permanently store data in the FossilDB. [#8422](https://github.com/scalableminds/webknossos/pull/8422)
1718

1819
### Fixed
19-
- Fixed a bug that would lock a non existing mapping to an empty segmentation layer under certain conditions. [#8401](https://github.com/scalableminds/webknossos/pull/8401)
20+
- Fixed a bug that would lock a non-existing mapping to an empty segmentation layer under certain conditions. [#8401](https://github.com/scalableminds/webknossos/pull/8401)
2021
- Fixed the alignment of the button that allows restricting floodfill operations to a bounding box. [#8388](https://github.com/scalableminds/webknossos/pull/8388)
2122
- Fixed rare bug where saving got stuck. [#8409](https://github.com/scalableminds/webknossos/pull/8409)
2223

webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/TemporaryTracingService.scala

+12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.scalableminds.util.tools.Fox.bool2Fox
55
import com.scalableminds.webknossos.datastore.Annotation.AnnotationProto
66
import com.scalableminds.webknossos.datastore.SkeletonTracing.SkeletonTracing
77
import com.scalableminds.webknossos.datastore.VolumeTracing.VolumeTracing
8+
import com.scalableminds.webknossos.datastore.geometry.ListOfVec3IntProto
89
import com.scalableminds.webknossos.tracingstore.TracingStoreRedisStore
910
import scalapb.GeneratedMessageCompanion
1011

@@ -19,6 +20,7 @@ class TemporaryTracingService @Inject()(
1920
volumeStore: TemporaryTracingStore[VolumeTracing],
2021
volumeDataStore: TemporaryTracingStore[Array[Byte]],
2122
annotationStore: TemporaryTracingStore[AnnotationProto],
23+
segmentIndexStore: TemporaryTracingStore[ListOfVec3IntProto],
2224
temporaryTracingIdStore: TracingStoreRedisStore)(implicit ec: ExecutionContext) {
2325

2426
implicit def skeletonTracingCompanion: GeneratedMessageCompanion[SkeletonTracing] = SkeletonTracing
@@ -51,6 +53,9 @@ class TemporaryTracingService @Inject()(
5153
def getAllVolumeBucketsWithPrefix(bucketPrefix: String): collection.Map[String, Array[Byte]] =
5254
volumeDataStore.getAllConditionalWithKey(key => key.startsWith(bucketPrefix))
5355

56+
def getVolumeSegmentIndexBufferForKey(segmentIndexKey: String): Option[ListOfVec3IntProto] =
57+
segmentIndexStore.get(segmentIndexKey)
58+
5459
def saveSkeleton(tracingId: String, skeletonTracing: SkeletonTracing): Fox[Unit] = {
5560
skeletonStore.insert(tracingId, skeletonTracing, Some(temporaryStoreTimeout))
5661
registerTracingId(tracingId)
@@ -74,6 +79,13 @@ class TemporaryTracingService @Inject()(
7479
Fox.successful(())
7580
}
7681

82+
def saveVolumeSegmentIndexBuffer(tracingId: String,
83+
segmentIndexBuffer: Map[String, ListOfVec3IntProto]): Fox[Unit] = {
84+
segmentIndexStore.insertAll(segmentIndexBuffer.toSeq: _*)
85+
registerTracingId(tracingId)
86+
Fox.successful(())
87+
}
88+
7789
def isTemporaryAnnotation(annotationId: String): Fox[Boolean] =
7890
temporaryTracingIdStore.contains(temporaryAnnotationIdKey(annotationId))
7991

webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/volume/VolumeSegmentIndexBuffer.scala

+67-21
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import com.scalableminds.webknossos.datastore.helpers.ProtoGeometryImplicits
88
import com.scalableminds.webknossos.tracingstore.TSRemoteDatastoreClient
99
import com.scalableminds.webknossos.datastore.models.AdditionalCoordinate
1010
import com.scalableminds.webknossos.datastore.models.datasource.AdditionalAxis
11-
import com.scalableminds.webknossos.tracingstore.tracings.{FossilDBClient, KeyValueStoreImplicits, RemoteFallbackLayer}
11+
import com.scalableminds.webknossos.tracingstore.tracings.{
12+
FossilDBClient,
13+
KeyValueStoreImplicits,
14+
RemoteFallbackLayer,
15+
TemporaryTracingService
16+
}
1217
import com.typesafe.scalalogging.LazyLogging
1318

1419
import scala.collection.mutable
@@ -34,7 +39,9 @@ class VolumeSegmentIndexBuffer(tracingId: String,
3439
remoteDatastoreClient: TSRemoteDatastoreClient,
3540
fallbackLayer: Option[RemoteFallbackLayer],
3641
additionalAxes: Option[Seq[AdditionalAxis]],
37-
tc: TokenContext)
42+
temporaryTracingService: TemporaryTracingService,
43+
tc: TokenContext,
44+
toTemporaryStore: Boolean = false)
3845
extends KeyValueStoreImplicits
3946
with SegmentIndexKeyHelper
4047
with ProtoGeometryImplicits
@@ -68,11 +75,15 @@ class VolumeSegmentIndexBuffer(tracingId: String,
6875
}
6976

7077
def flush()(implicit ec: ExecutionContext): Fox[Unit] =
71-
for {
72-
_ <- Fox.serialCombined(segmentIndexBuffer.keys.toList) { key =>
73-
volumeSegmentIndexClient.put(key, version, segmentIndexBuffer(key))
74-
}
75-
} yield ()
78+
if (toTemporaryStore) {
79+
temporaryTracingService.saveVolumeSegmentIndexBuffer(tracingId, segmentIndexBuffer.toMap)
80+
} else {
81+
for {
82+
_ <- Fox.serialCombined(segmentIndexBuffer.keys.toList) { key =>
83+
volumeSegmentIndexClient.put(key, version, segmentIndexBuffer(key))
84+
}
85+
} yield ()
86+
}
7687

7788
private def getFallback(segmentId: Long,
7889
mag: Vec3Int,
@@ -144,28 +155,27 @@ class VolumeSegmentIndexBuffer(tracingId: String,
144155
(hits, misses)
145156
}
146157

147-
// Get a map from segment to bucket position (e.g. an index) from all sources (buffer, fossilDB, file)
148-
def getSegmentToBucketIndexMap(segmentIds: List[Long],
149-
mag: Vec3Int,
150-
mappingName: Option[String],
151-
editableMappingTracingId: Option[String],
152-
additionalCoordinates: Option[Seq[AdditionalCoordinate]])(
158+
private def getSegmentToBucketIndexMapFromPermanentStorage(segmentIds: List[Long],
159+
mag: Vec3Int,
160+
mappingName: Option[String],
161+
editableMappingTracingId: Option[String],
162+
additionalCoordinates: Option[Seq[AdditionalCoordinate]])(
153163
implicit ec: ExecutionContext): Fox[List[(Long, Seq[Vec3Int])]] =
154164
for {
155-
_ <- Fox.successful(())
156-
157-
(bufferHits, bufferMisses) = getSegmentsFromBufferNoteMisses(segmentIds, mag, additionalCoordinates)
158165
(mutableIndexHits, mutableIndexMisses) <- getSegmentsFromFossilDBNoteMisses(tracingId,
159166
mag,
160167
additionalCoordinates,
161168
additionalAxes,
162-
bufferMisses)
163-
missesSoFar = bufferMisses ++ mutableIndexMisses
169+
segmentIds)
164170
fileBucketPositions <- fallbackLayer match {
165171
case Some(layer) =>
166172
for {
167-
fileBucketPositionsOpt <- Fox.runIf(missesSoFar.nonEmpty)(remoteDatastoreClient
168-
.querySegmentIndexForMultipleSegments(layer, missesSoFar, mag, mappingName, editableMappingTracingId)(tc))
173+
fileBucketPositionsOpt <- Fox.runIf(mutableIndexMisses.nonEmpty)(
174+
remoteDatastoreClient.querySegmentIndexForMultipleSegments(layer,
175+
mutableIndexMisses,
176+
mag,
177+
mappingName,
178+
editableMappingTracingId)(tc))
169179
fileBucketPositions = fileBucketPositionsOpt.getOrElse(Seq())
170180
_ = fileBucketPositions.map {
171181
case (segmentId, positions) =>
@@ -177,7 +187,43 @@ class VolumeSegmentIndexBuffer(tracingId: String,
177187
} yield fileBucketPositions
178188
case _ => Fox.successful(List[(Long, Seq[Vec3Int])]())
179189
}
180-
allHits = mutableIndexHits ++ fileBucketPositions ++ bufferHits
190+
} yield mutableIndexHits ++ fileBucketPositions
191+
192+
private def getSegmentToBucketIndexMapFromTemporaryStorage(segmentIds: List[Long],
193+
mag: Vec3Int,
194+
additionalCoordinates: Option[Seq[AdditionalCoordinate]])(
195+
implicit ec: ExecutionContext): Fox[List[(Long, Seq[Vec3Int])]] =
196+
Fox.successful(
197+
segmentIds
198+
.map(segmentId => {
199+
val key = segmentIndexKey(tracingId, segmentId, mag, additionalCoordinates, additionalAxes)
200+
temporaryTracingService.getVolumeSegmentIndexBufferForKey(key) match {
201+
case Some(positions) => Some(segmentId, positions.values.map(vec3IntFromProto))
202+
case None => None
203+
}
204+
})
205+
.collect { case Some(x) => x })
206+
207+
// Get a map from segment to bucket position (e.g. an index) from all sources (buffer, temporary storage, fossilDB, file)
208+
def getSegmentToBucketIndexMap(segmentIds: List[Long],
209+
mag: Vec3Int,
210+
mappingName: Option[String],
211+
editableMappingTracingId: Option[String],
212+
additionalCoordinates: Option[Seq[AdditionalCoordinate]])(
213+
implicit ec: ExecutionContext): Fox[List[(Long, Seq[Vec3Int])]] =
214+
for {
215+
_ <- Fox.successful(())
216+
(bufferHits, bufferMisses) = getSegmentsFromBufferNoteMisses(segmentIds, mag, additionalCoordinates)
217+
remainingHits <- if (toTemporaryStore) {
218+
getSegmentToBucketIndexMapFromTemporaryStorage(bufferMisses, mag, additionalCoordinates)
219+
} else {
220+
getSegmentToBucketIndexMapFromPermanentStorage(bufferMisses,
221+
mag,
222+
mappingName,
223+
editableMappingTracingId,
224+
additionalCoordinates)
225+
}
226+
allHits = remainingHits ++ bufferHits
181227
allHitsFilled = segmentIds.map { segmentId =>
182228
allHits.find(_._1 == segmentId) match {
183229
case Some((_, positions)) => (segmentId, positions)

webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/volume/VolumeSegmentIndexService.scala

+11-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import com.scalableminds.webknossos.tracingstore.tracings.{
1717
FossilDBClient,
1818
KeyValueStoreImplicits,
1919
RemoteFallbackLayer,
20+
TemporaryTracingService,
2021
TracingDataStore
2122
}
2223
import com.typesafe.scalalogging.LazyLogging
@@ -39,7 +40,8 @@ object VolumeSegmentIndexService {
3940
// key: tracing id, segment id, mag – value: list of buckets
4041
// used for calculating segment statistics
4142
class VolumeSegmentIndexService @Inject()(val tracingDataStore: TracingDataStore,
42-
remoteDatastoreClient: TSRemoteDatastoreClient)
43+
remoteDatastoreClient: TSRemoteDatastoreClient,
44+
temporaryTracingService: TemporaryTracingService)
4345
extends KeyValueStoreImplicits
4446
with ProtoGeometryImplicits
4547
with VolumeBucketCompression
@@ -224,7 +226,14 @@ class VolumeSegmentIndexService @Inject()(val tracingDataStore: TracingDataStore
224226
getSegmentToBucketIndexFromFile(fallbackLayer, segmentId, mag, mappingName, editableMappingTracingId) // additional coordinates not supported, see #7556
225227
case _ => Fox.successful(Seq.empty)
226228
}
227-
combined = fromMutableIndex.values.map(vec3IntFromProto) ++ fromFileIndex
229+
isTemporaryTracing <- temporaryTracingService.isTemporaryTracing(tracingId)
230+
fromTemporaryIndex <- Fox.runIf(isTemporaryTracing)(
231+
temporaryTracingService.getVolumeSegmentIndexBufferForKey(
232+
segmentIndexKey(tracingId, segmentId, mag, additionalCoordinates, additionalAxes)))
233+
combined = fromMutableIndex.values.map(vec3IntFromProto) ++ fromFileIndex ++ fromTemporaryIndex
234+
.getOrElse(ListOfVec3IntProto())
235+
.values
236+
.map(vec3IntFromProto)
228237
} yield ListOfVec3IntProto(combined.map(vec3IntToProto))
229238

230239
private def getSegmentToBucketIndexFromFossilDB(

webknossos-tracingstore/app/com/scalableminds/webknossos/tracingstore/tracings/volume/VolumeTracingService.scala

+27-15
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ class VolumeTracingService @Inject()(
121121
remoteDatastoreClient,
122122
fallbackLayerOpt,
123123
AdditionalAxis.fromProtosAsOpt(tracing.additionalAxes),
124+
temporaryTracingService,
124125
tc
125126
)
126127
_ <- Fox.serialCombined(updateActions) {
@@ -261,13 +262,16 @@ class VolumeTracingService @Inject()(
261262

262263
for {
263264
fallbackLayer <- getFallbackLayer(tracingId, tracingBeforeRevert)
264-
segmentIndexBuffer = new VolumeSegmentIndexBuffer(tracingId,
265-
volumeSegmentIndexClient,
266-
newVersion,
267-
remoteDatastoreClient,
268-
fallbackLayer,
269-
dataLayer.additionalAxes,
270-
tc)
265+
segmentIndexBuffer = new VolumeSegmentIndexBuffer(
266+
tracingId,
267+
volumeSegmentIndexClient,
268+
newVersion,
269+
remoteDatastoreClient,
270+
fallbackLayer,
271+
dataLayer.additionalAxes,
272+
temporaryTracingService,
273+
tc
274+
)
271275
mappingName <- getMappingNameUnlessEditable(sourceTracing)
272276
_ <- Fox.serialCombined(bucketStreamBeforeRevert) {
273277
case (bucketPosition, dataBeforeRevert, version) =>
@@ -360,6 +364,7 @@ class VolumeTracingService @Inject()(
360364
remoteDatastoreClient,
361365
fallbackLayer,
362366
AdditionalAxis.fromProtosAsOpt(tracing.additionalAxes),
367+
temporaryTracingService,
363368
tc
364369
)
365370
_ <- mergedVolume.withMergedBuckets { (bucketPosition, bytes) =>
@@ -401,6 +406,7 @@ class VolumeTracingService @Inject()(
401406
remoteDatastoreClient,
402407
fallbackLayer,
403408
AdditionalAxis.fromProtosAsOpt(tracing.additionalAxes),
409+
temporaryTracingService,
404410
tc
405411
)
406412
_ <- withBucketsFromZip(initialData) { (bucketPosition, bytes) =>
@@ -551,6 +557,7 @@ class VolumeTracingService @Inject()(
551557
remoteDatastoreClient,
552558
fallbackLayer,
553559
AdditionalAxis.fromProtosAsOpt(sourceTracing.additionalAxes),
560+
temporaryTracingService,
554561
tc
555562
)
556563
mappingName <- getMappingNameUnlessEditable(sourceTracing)
@@ -789,7 +796,9 @@ class VolumeTracingService @Inject()(
789796
remoteDatastoreClient,
790797
fallbackLayer,
791798
mergedAdditionalAxes,
792-
tc)
799+
temporaryTracingService,
800+
tc,
801+
toTemporaryStore)
793802
_ <- mergedVolume.withMergedBuckets { (bucketPosition, bucketBytes) =>
794803
for {
795804
_ <- saveBucket(newId,
@@ -847,13 +856,16 @@ class VolumeTracingService @Inject()(
847856
fallbackLayer <- getFallbackLayer(tracingId, tracing)
848857
mappingName <- getMappingNameUnlessEditable(tracing)
849858
segmentIndexBuffer <- Fox.successful(
850-
new VolumeSegmentIndexBuffer(tracingId,
851-
volumeSegmentIndexClient,
852-
tracing.version + 1,
853-
remoteDatastoreClient,
854-
fallbackLayer,
855-
dataLayer.additionalAxes,
856-
tc))
859+
new VolumeSegmentIndexBuffer(
860+
tracingId,
861+
volumeSegmentIndexClient,
862+
tracing.version + 1,
863+
remoteDatastoreClient,
864+
fallbackLayer,
865+
dataLayer.additionalAxes,
866+
temporaryTracingService,
867+
tc
868+
))
857869
_ <- mergedVolume.withMergedBuckets { (bucketPosition, bucketBytes) =>
858870
for {
859871
_ <- saveBucket(volumeLayer, bucketPosition, bucketBytes, tracing.version + 1)

0 commit comments

Comments
 (0)