35
35
import java .util .concurrent .Semaphore ;
36
36
import java .util .concurrent .TimeoutException ;
37
37
import java .util .function .BiConsumer ;
38
- import java .util .logging .Level ;
39
38
import java .util .logging .Logger ;
40
39
import java .util .stream .Stream ;
41
40
@@ -130,10 +129,10 @@ public abstract class AbstractLocalNodeImpl<N extends AbstractLocalNodeImpl<N,C,
130
129
private final ExecutorService executors ;
131
130
132
131
/**
133
- * Cached error messages of requests that failed their {@link AbstractLocalNodeImpl#checkRequest (TransactionRequest)}.
132
+ * Cached error messages of requests that failed their {@link #checkTransaction (TransactionRequest)}.
134
133
* This is useful to avoid polling for the outcome of recent requests whose
135
- * {@link #checkRequest (TransactionRequest)} failed, hence never
136
- * got the chance to pass to {@link #deliverTransaction(TransactionRequest)} .
134
+ * {@link #checkTransaction (TransactionRequest)} failed, hence never
135
+ * got the chance to pass to delivering .
137
136
*/
138
137
private final LRUCache <TransactionReference , String > recentlyRejectedTransactionsMessages = new LRUCache <>(100 , 1000 );
139
138
@@ -181,8 +180,6 @@ protected AbstractLocalNodeImpl(C config, boolean init) throws NodeException {
181
180
initWorkingDirectory ();
182
181
183
182
this .executors = Executors .newCachedThreadPool ();
184
-
185
- addShutdownHook ();
186
183
}
187
184
188
185
@ Override
@@ -231,7 +228,7 @@ public final TransactionReference getTakamakaCode() throws NodeException, Interr
231
228
return getClassTag (manifest ).getJar ();
232
229
}
233
230
catch (UnknownReferenceException e ) {
234
- throw new NodeException ("The manifest of the node cannot be found in the node itself" , e );
231
+ throw new NodeException ("The manifest of the node cannot be found in the node itself: is the node initialized? " , e );
235
232
}
236
233
}
237
234
}
@@ -332,7 +329,7 @@ public final ClassTag getClassTag(StorageReference reference) throws UnknownRefe
332
329
.findFirst ()
333
330
.orElseThrow (() -> new NodeException ("Object " + reference + " has no class tag in store" ));
334
331
else
335
- throw new NodeException ("The creation of object " + reference + " does not contain updates" );
332
+ throw new NodeException ("The transaction that created object " + reference + " does not contain updates" );
336
333
}
337
334
catch (StoreException e ) {
338
335
throw new NodeException (e );
@@ -350,7 +347,9 @@ public final Stream<Update> getState(StorageReference reference) throws UnknownR
350
347
try {
351
348
Stream <TransactionReference > history = store .getHistory (Objects .requireNonNull (reference ));
352
349
var updates = new HashSet <Update >();
353
- CheckRunnable .check (StoreException .class , () -> history .forEachOrdered (UncheckConsumer .uncheck (StoreException .class , transaction -> addUpdatesCommitted (store , reference , transaction , updates ))));
350
+ CheckRunnable .check (NodeException .class , () -> history .forEachOrdered (UncheckConsumer .uncheck (NodeException .class ,
351
+ transaction -> collectUpdates (store , reference , transaction , updates ))));
352
+
354
353
return updates .stream ();
355
354
}
356
355
catch (StoreException e ) {
@@ -374,7 +373,7 @@ public final TransactionReference addJarStoreInitialTransaction(JarStoreInitialT
374
373
@ Override
375
374
public final void addInitializationTransaction (InitializationTransactionRequest request ) throws TransactionRejectedException , TimeoutException , InterruptedException , NodeException {
376
375
try (var scope = mkScope ()) {
377
- getPolledResponse (post (request )); // result unused
376
+ getPolledResponse (post (request ));
378
377
}
379
378
}
380
379
@@ -492,6 +491,7 @@ public final MethodFuture postStaticMethodCallTransaction(StaticMethodCallTransa
492
491
*
493
492
* @return the entered store of the head
494
493
* @throws NodeException if the operation could not be completed correctly
494
+ * @throws InterruptedException if the current thread is interrupted while waiting for the result
495
495
*/
496
496
protected abstract S enterHead () throws NodeException , InterruptedException ;
497
497
@@ -503,20 +503,44 @@ public final MethodFuture postStaticMethodCallTransaction(StaticMethodCallTransa
503
503
*/
504
504
protected void exit (S store ) throws NodeException {}
505
505
506
+ /**
507
+ * Yields the executors that can be used to start tasks with this node.
508
+ *
509
+ * @return the executors
510
+ */
506
511
protected final ExecutorService getExecutors () {
507
512
return executors ;
508
513
}
509
514
515
+ /**
516
+ * Yields the hasher of transactions to use with this node.
517
+ *
518
+ * @return the hasher of transactions
519
+ */
510
520
protected final Hasher <TransactionRequest <?>> getHasher () {
511
521
return hasher ;
512
522
}
513
523
524
+ /**
525
+ * Takes note that this node has rejected a given transaction request.
526
+ *
527
+ * @param request the rejected transaction request
528
+ * @param e the exception that explains why it has been rejected
529
+ */
514
530
protected final void signalRejected (TransactionRequest <?> request , TransactionRejectedException e ) {
515
531
var reference = TransactionReferences .of (hasher .hash (request ));
516
532
recentlyRejectedTransactionsMessages .put (reference , e .getMessage ());
517
533
signalCompleted (reference );
518
534
}
519
535
536
+ /**
537
+ * Checks that the given transaction request is valid.
538
+ *
539
+ * @param request the request
540
+ * @throws TransactionRejectedException if the request is not valid
541
+ * @throws NodeException if this node is not able to perform the operation
542
+ * @throws InterruptedException if the current thread is interrupted while performing the operation
543
+ */
520
544
protected final void checkTransaction (TransactionRequest <?> request ) throws TransactionRejectedException , NodeException , InterruptedException {
521
545
S store = enterHead ();
522
546
@@ -539,28 +563,39 @@ protected final void checkTransaction(TransactionRequest<?> request) throws Tran
539
563
* are added to the main chain of a blockchain, for each of the transactions in such blocks.
540
564
*
541
565
* @param reference the transaction to publish
542
- * @param store the store where {@code transaction} and its potential events can be found
566
+ * @param store the store where the transaction and its potential events can be found
567
+ * @throws NodeException if this node is not able to perform the operation
568
+ * @throws UnknownReferenceException if {@code reference} cannot be found in {@code store}
543
569
*/
544
- protected final void publish (TransactionReference reference , S store ) throws NodeException {
570
+ protected final void publish (TransactionReference reference , S store ) throws NodeException , UnknownReferenceException {
545
571
signalCompleted (reference );
546
572
547
573
try {
548
- if (store .getResponse (reference ) instanceof TransactionResponseWithEvents trwe )
549
- CheckRunnable .check (NodeException .class , () -> trwe .getEvents ().forEachOrdered (UncheckConsumer .uncheck (NodeException .class , event -> notifyEvent (event , store ))));
574
+ if (store .getResponse (reference ) instanceof TransactionResponseWithEvents trwe ) {
575
+ var events = trwe .getEvents ().toArray (StorageReference []::new );
576
+ for (var event : events )
577
+ notifyEvent (event , store );
578
+ }
550
579
}
551
- catch (StoreException | UnknownReferenceException e ) {
580
+ catch (StoreException e ) {
552
581
throw new NodeException (e );
553
582
}
554
583
}
555
584
585
+ /**
586
+ * Closes all the resources of this node.
587
+ *
588
+ * @throws NodeException if this node is not able to perform the operation
589
+ */
556
590
protected void closeResources () throws NodeException {
557
591
executors .shutdownNow ();
558
592
}
559
593
560
594
/**
561
- * Factory method for creating an empty store for this node, with empty cache.
595
+ * Creates an empty store for this node, with empty cache.
562
596
*
563
- * @return the store empty
597
+ * @return the empty store
598
+ * @throws NodeException if this node is not able to perform the operation
564
599
*/
565
600
protected abstract S mkEmptyStore () throws NodeException ;
566
601
@@ -569,12 +604,20 @@ protected void closeResources() throws NodeException {
569
604
* for instance by adding the request to some mempool or queue of requests to be executed.
570
605
*
571
606
* @param request the request
607
+ * @throws NodeException if this node is not able to perform the operation
608
+ * @throws InterruptedException if the current thread is interrupted while performing the operation
609
+ * @throws TimeoutException if the operation could not be completed in time
572
610
*/
573
611
protected abstract void postRequest (TransactionRequest <?> request ) throws NodeException , InterruptedException , TimeoutException ;
574
612
613
+ /**
614
+ * Cleans the directory where the node's data live.
615
+ *
616
+ * @throws NodeException if this node is not able to perform the operation
617
+ */
575
618
private void initWorkingDirectory () throws NodeException {
576
619
try {
577
- deleteRecursively (config .getDir ()); // cleans the directory where the node's data live
620
+ deleteRecursively (config .getDir ());
578
621
Files .createDirectories (config .getDir ());
579
622
}
580
623
catch (IOException e ) {
@@ -600,6 +643,8 @@ private void notifyEvent(StorageReference event, S store) throws NodeException {
600
643
creator = store .getCreator (event );
601
644
}
602
645
catch (StoreException | UnknownReferenceException | FieldNotFoundException e ) {
646
+ // this private method is only called on events in responses in store:
647
+ // if they cannot be processed, there is a problem in the database
603
648
throw new NodeException (e );
604
649
}
605
650
@@ -609,26 +654,28 @@ private void notifyEvent(StorageReference event, S store) throws NodeException {
609
654
610
655
/**
611
656
* Posts the given request. It does some preliminary preparation then calls
612
- * {@link #postRequest(TransactionRequest)}, that will implement the node-specific logic of this post .
657
+ * {@link #postRequest(TransactionRequest)}, that will implement the node-specific logic of posting .
613
658
*
614
659
* @param request the request
615
660
* @return the reference of the request
616
661
* @throws TransactionRejectedException if the request was already present in the store
617
662
*/
618
663
private TransactionReference post (TransactionRequest <?> request ) throws TransactionRejectedException , NodeException , InterruptedException , TimeoutException {
619
664
var reference = TransactionReferences .of (hasher .hash (request ));
665
+ String simpleNameOfRequest = request .getClass ().getSimpleName ();
666
+
620
667
if (request instanceof MethodCallTransactionRequest mctr )
621
- LOGGER .info (reference + ": posting (" + request . getClass (). getSimpleName () + " -> " + trim (mctr .getStaticTarget ().getName ()) + ')' );
668
+ LOGGER .info (reference + ": posting (" + simpleNameOfRequest + " -> " + trim (mctr .getStaticTarget ().getName ()) + ')' );
622
669
else if (request instanceof ConstructorCallTransactionRequest cctr )
623
- LOGGER .info (reference + ": posting (" + request . getClass (). getSimpleName () + " -> " + trim (cctr .getStaticTarget ().getDefiningClass ().getName ()) + ')' );
670
+ LOGGER .info (reference + ": posting (" + simpleNameOfRequest + " -> " + trim (cctr .getStaticTarget ().getDefiningClass ().getName ()) + ')' );
624
671
else
625
- LOGGER .info (reference + ": posting (" + request . getClass (). getSimpleName () + ')' );
672
+ LOGGER .info (reference + ": posting (" + simpleNameOfRequest + ')' );
626
673
627
674
S store = enterHead ();
628
675
629
676
try {
630
677
store .getResponse (reference );
631
- // if the response is found, then no exception is thrown above and the request was repeated
678
+ // if the response is found, then no exception is thrown above, which means that the request was repeated
632
679
throw new TransactionRejectedException ("Repeated request " + reference , store .getConfig ());
633
680
}
634
681
catch (StoreException e ) {
@@ -652,33 +699,38 @@ private static String trim(String s) {
652
699
}
653
700
654
701
/**
655
- * Adds, to the given set, the updates of the fields of the object at the given reference,
702
+ * Collects, into the given set, the updates of the fields of an object
656
703
* occurred during the execution of a given transaction.
657
704
*
658
- * @param object the reference of the object
659
- * @param referenceInHistory the reference to the transaction
705
+ * @param store the store of this node
706
+ * @param object the reference to the object
707
+ * @param reference the reference to the transaction
660
708
* @param updates the set where they must be added
661
- * @throws StoreException
709
+ * @throws NodeException if this node is misbehaving
662
710
*/
663
- private void addUpdatesCommitted (S store , StorageReference object , TransactionReference referenceInHistory , Set <Update > updates ) throws StoreException {
711
+ private void collectUpdates (S store , StorageReference object , TransactionReference reference , Set <Update > updates ) throws NodeException {
664
712
try {
665
- if (store .getResponse (referenceInHistory ) instanceof TransactionResponseWithUpdates trwu )
713
+ if (store .getResponse (reference ) instanceof TransactionResponseWithUpdates trwu )
666
714
trwu .getUpdates ()
667
715
.filter (update -> update .getObject ().equals (object ) && updates .stream ().noneMatch (update ::sameProperty ))
668
716
.forEach (updates ::add );
669
717
else
670
- throw new StoreException ("Reference " + referenceInHistory + " is part of the histories but did not generate updates" );
718
+ throw new NodeException ("Reference " + reference + " is part of the histories but did not generate updates" );
671
719
}
672
720
catch (UnknownReferenceException e ) {
673
- throw new StoreException ("Reference " + referenceInHistory + " is part of the histories but is not in the store" );
721
+ throw new NodeException ("Reference " + reference + " is part of the histories but is not in the store" );
722
+ }
723
+ catch (StoreException e ) {
724
+ throw new NodeException (e );
674
725
}
675
726
}
676
727
677
728
/**
678
- * Creates a semaphore for those who will wait for the result of the given request.
729
+ * Creates a semaphore for those who will wait for the result of a request.
679
730
*
680
- * @param reference the reference of the transaction for the request
681
- * @throws TransactionRejectedException
731
+ * @param reference the reference of the transaction of the request
732
+ * @throws TransactionRejectedException if there is already a semaphore for {@code reference}, which
733
+ * means that the request is repeated
682
734
*/
683
735
private void createSemaphore (S store , TransactionReference reference ) throws TransactionRejectedException {
684
736
if (semaphores .putIfAbsent (reference , new Semaphore (0 )) != null )
@@ -698,18 +750,4 @@ private static void deleteRecursively(Path dir) throws IOException {
698
750
.map (Path ::toFile )
699
751
.forEach (File ::delete );
700
752
}
701
-
702
- /**
703
- * Adds a shutdown hook that shuts down the blockchain orderly if the JVM terminates.
704
- */
705
- private void addShutdownHook () { // TODO: do we really need it? It seems that it is never called
706
- Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
707
- try {
708
- close ();
709
- }
710
- catch (NodeException e ) {
711
- LOGGER .log (Level .SEVERE , "The shutdown hook of the node failed" , e );
712
- }
713
- }));
714
- }
715
753
}
0 commit comments