Skip to content

Commit

Permalink
FABN-1612 cherrypick from master to release-2.2 (#305)
Browse files Browse the repository at this point in the history
* FABN-1608 remove error message from discovery (#289)

When a group has insufficient members after filtering
based on user's requirements (specific mspids, specific peers,
ledger height) an error message was posted. This will be
changed to a debug message as this is not an error, but an expected
result of the filtering.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>

* FABN-1607 Allow future startBlocks on EventService (#286)

Allow the setting of startBlocks when setting up an EventService
that have not happened and may not happen for sometime. We must
keep the stream setup timer to be able to monitor for errors
from the Peer event service, however we will check if the timer
pops if we are waiting for a defined start block. In this case
we will assume that the stream is all good and resolve the start
service for the user call.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>

* FABN-1560 add disconnected peer to channel (#288)

Allow for all service endpoints to be disconnected when added
to a channel. When a service endpoint (peer, events, orderer, discovery) is
added to a channel, it may not be active or it may go down between usages.
The service connection will be checked and reset when an outbound request is made.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>

* FABN-1601 discovery required organizations (#291)

When a discovery user knows the organizations that are needed
to endorse a proposal, they may use the 'requiredOrgs' setting
to have the DiscoveryHandler send it to peer that have been
discovered for the required organizations.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>

* FABN-1596 add noPrivateReads (#293)

Allow users to set the "noPrivateReads" when setting up a
discovery hint that will be used as the interest when the
discovery service builds an endorsement plan.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>

* FABN-1611 Blockdecoder needs kv_reads array (#297)

The RangeQueryInfo raw_reads attribute is a QueryRead message object
which has an array of KVRead objects and is not an array itself.

Signed-off-by: Bret Harrison <beharrison@nc.rr.com>
  • Loading branch information
harrisob authored Aug 19, 2020
1 parent 8f36a57 commit 475fc7d
Show file tree
Hide file tree
Showing 24 changed files with 570 additions and 130 deletions.
2 changes: 1 addition & 1 deletion fabric-common/lib/BlockDecoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ function decodeRangeQueryInfo(rangeQueryInfoProto) {
if (rangeQueryInfoProto.raw_reads) {
range_query_info.raw_reads = {};
range_query_info.raw_reads.kv_reads = [];
for (const kVReadProto of rangeQueryInfoProto.raw_reads) {
for (const kVReadProto of rangeQueryInfoProto.raw_reads.kv_reads) {
range_query_info.raw_reads.kv_reads.push(decodeKVRead(kVReadProto));
}
} else if (rangeQueryInfoProto.reads_merkle_hashes) {
Expand Down
8 changes: 4 additions & 4 deletions fabric-common/lib/Channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ const Channel = class {
if (!(endorser.type === 'Endorser')) {
throw Error('Missing valid endorser instance');
}
if (!(endorser.connected)) {
throw Error('Endorser must be connected');
if (!endorser.isConnectable()) {
throw Error('Endorser must be connectable');
}
const name = endorser.name;
const check = this.endorsers.get(name);
Expand Down Expand Up @@ -336,8 +336,8 @@ const Channel = class {
if (!(committer.type === 'Committer')) {
throw Error('Missing valid committer instance');
}
if (!(committer.connected)) {
throw Error('Committer must be connected');
if (!committer.isConnectable()) {
throw Error('Committer must be connectable');
}
const name = committer.name;
const check = this.committers.get(name);
Expand Down
25 changes: 17 additions & 8 deletions fabric-common/lib/Commit.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,28 @@ class Commit extends Proposal {
} else if (targets) {
logger.debug('%s - sending to the targets', method);
const committers = this.channel.getTargetCommitters(targets);
let bad_result = {};
bad_result.status = 'UNKNOWN';
let result;
for (const committer of committers) {
const result = await committer.sendBroadcast(envelope, requestTimeout);
if (result.status === 'SUCCESS') {

return result;
const isConnected = await committer.checkConnection();
if (isConnected) {
try {
result = await committer.sendBroadcast(envelope, requestTimeout);
if (result.status === 'SUCCESS') {
break;
}
} catch (error) {
logger.error('%s - Unable to commit on %s ::%s', method, committer.name, error);
result = error;
}
} else {
bad_result = result;
result = new Error(`Committer ${committer.name} is not connected`);
}
}
if (result instanceof Error) {
throw result;
}

return bad_result;
return result;
} else {
throw checkParameter('targets');
}
Expand Down
68 changes: 54 additions & 14 deletions fabric-common/lib/DiscoveryHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,23 @@ class DiscoveryHandler extends ServiceHandler {
for (const committer of committers) {
logger.debug('%s - sending to committer %s', method, committer.name);
try {
const results = await committer.sendBroadcast(signedEnvelope, timeout);
if (results) {
if (results.status === 'SUCCESS') {
logger.debug('%s - Successfully sent transaction to the committer %s', method, committer.name);
return results;
const isConnected = await committer.checkConnection();
if (isConnected) {
const results = await committer.sendBroadcast(signedEnvelope, timeout);
if (results) {
if (results.status === 'SUCCESS') {
logger.debug('%s - Successfully sent transaction to the committer %s', method, committer.name);
return results;
} else {
logger.debug('%s - Failed to send transaction successfully to the committer status:%s', method, results.status);
return_error = new Error('Failed to send transaction successfully to the committer status:' + results.status);
}
} else {
logger.debug('%s - Failed to send transaction successfully to the committer status:%s', method, results.status);
return_error = new Error('Failed to send transaction successfully to the committer status:' + results.status);
return_error = new Error('Failed to send transaction to the committer');
logger.debug('%s - Failed to send transaction to the committer %s', method, committer.name);
}
} else {
return_error = new Error('Failed to send transaction to the committer');
logger.debug('%s - Failed to send transaction to the committer %s', method, committer.name);
return_error = new Error(`Committer ${committer.name} is not connected`);
}
} catch (error) {
logger.debug('%s - Caught: %s', method, error.toString());
Expand Down Expand Up @@ -155,7 +160,20 @@ class DiscoveryHandler extends ServiceHandler {

const results = await this.discovery.getDiscoveryResults(true);

if (results && results.endorsement_plan) {
if (results && request.requiredOrgs) {
// special case when user knows which organizations to send the endorsement
// let's build our own endorsement plan so that we can use the sorting and sending code
const endorsement_plan = this._buildRequiredOrgPlan(results.peers_by_org);

// remove all org and peer
const orgs_request = {
sort: request.sort,
preferredHeightGap: request.preferredHeightGap
};

return this._endorse(endorsement_plan, orgs_request, signedProposal, timeout);
} else if (results && results.endorsement_plan) {
// normal processing of the discovery results
const working_discovery = JSON.parse(JSON.stringify(results.endorsement_plan));

return this._endorse(working_discovery, request, signedProposal, timeout);
Expand Down Expand Up @@ -259,7 +277,7 @@ class DiscoveryHandler extends ServiceHandler {
if (required > group.peers.length) {
results.success = false;
const error = new Error(`Endorsement plan group does not contain enough peers (${group.peers.length}) to satisfy policy (required:${required})`);
logger.error(error);
logger.debug(error.message);
results.endorsements.push(error);
break; // no need to look at other groups, this layout failed
}
Expand Down Expand Up @@ -302,6 +320,23 @@ class DiscoveryHandler extends ServiceHandler {
return responses;
}

_buildRequiredOrgPlan(peers_by_org) {
const method = '_buildRequiredOrgPlan';
logger.debug('%s - starting', method);
const endorsement_plan = {plan_id: 'required organizations'};
endorsement_plan.groups = {};
endorsement_plan.layouts = [{}]; // only one layout which will have all organizations

for (const mspid in peers_by_org) {
logger.debug(`${method} - found org:${mspid}`);
endorsement_plan.groups[mspid] = {}; // make a group for each organization
endorsement_plan.groups[mspid].peers = peers_by_org[mspid].peers; // now put in all peers from that organization
endorsement_plan.layouts[0][mspid] = 1; // add this org to the one layout and require one peer to endorse
}

return endorsement_plan;
}

/*
* utility method to build a promise that will return one of the required
* endorsements or an error object
Expand All @@ -326,9 +361,14 @@ class DiscoveryHandler extends ServiceHandler {
logger.debug('%s - send endorsement to %s', method, peer_info.name);
peer_info.in_use = true;
try {
endorsement = await peer.sendProposal(proposal, timeout);
// save this endorsement results in case we try this peer again
logger.debug('%s - endorsement completed to %s', method, peer_info.name);
const isConnected = await peer.checkConnection();
if (isConnected) {
endorsement = await peer.sendProposal(proposal, timeout);
// save this endorsement results in case we try this peer again
logger.debug('%s - endorsement completed to %s', method, peer_info.name);
} else {
endorsement = new Error(`Peer ${peer.name} is not connected`);
}
} catch (error) {
endorsement = error;
logger.error('%s - error on endorsement to %s error %s', method, peer_info.name, error);
Expand Down
59 changes: 38 additions & 21 deletions fabric-common/lib/DiscoveryService.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class DiscoveryService extends ServiceAction {
}

for (const discoverer of targets) {
if (discoverer.connected || discoverer.isConnectable()) {
logger.debug('%s - target is or could be connected %s', method, discoverer.name);
if (discoverer.isConnectable()) {
logger.debug('%s - target is connectable%s', method, discoverer.name);
} else {
throw Error(`Discoverer ${discoverer.name} is not connected`);
throw Error(`Discoverer ${discoverer.name} is not connectable`);
}
}
// must be all targets are connected
Expand Down Expand Up @@ -103,14 +103,18 @@ class DiscoveryService extends ServiceAction {
* sending to the peer.
* @property {Endorsement} [endorsement] - Optional. Include the endorsement
* instance to build the discovery request based on the proposal.
* This will get the discovery interest (chaincode names and collections)
* This will get the discovery interest (chaincode names, collections and "no private reads")
* from the endorsement instance. Use the {@link Proposal#addCollectionInterest}
* to add collections to the endorsement's chaincode. Use the
* {@link Proposal#addChaincodeCollectionsInterest} to add chaincodes
* and collections that will be called by the endorsement's chaincode.
* to add collections to the endorsement's chaincode.
* Use the {@link Proposal#setNoPrivateReads} to set the proposals "no private reads"
* setting of the discovery interest.
* Use the {@link Proposal#addCollectionInterest} to add chaincodes,
* collections, and no private reads that will be used to get an endorsement plan
* from the peer's discovery service.
* @property {DiscoveryChaincode} [interest] - Optional. An
* array of {@link DiscoveryChaincodeInterest} that have chaincodes
* and collections to calculate the endorsement plans.
* array of {@link DiscoveryChaincodeInterest} that have chaincodes, collections,
* and "no private reads" to help the peer's discovery service calculate the
* endorsement plan.
* @example <caption>"single chaincode"</caption>
* [
* { name: "mychaincode"}
Expand All @@ -121,17 +125,21 @@ class DiscoveryService extends ServiceAction {
* ]
* @example <caption>"single chaincode with a collection"</caption>
* [
* { name: "mychaincode", collection_names: ["mycollection"] }
* { name: "mychaincode", collectionNames: ["mycollection"] }
* ]
* @example <caption>"single chaincode with a collection allowing no private data reads"</caption>
* [
* { name: "mychaincode", collectionNames: ["mycollection"], noPrivateReads: true }
* ]
* @example <caption>"chaincode to chaincode with a collection"</caption>
* [
* { name: "mychaincode", collection_names: ["mycollection"] },
* { name: "myotherchaincode", collection_names: ["mycollection"] }}
* { name: "mychaincode", collectionNames: ["mycollection"] },
* { name: "myotherchaincode", collectionNames: ["mycollection"] }}
* ]
* @example <caption>"chaincode to chaincode with collections"</caption>
* [
* { name: "mychaincode", collection_names: ["mycollection", "myothercollection"] },
* { name: "myotherchaincode", collection_names: ["mycollection", "myothercollection"] }}
* { name: "mychaincode", collectionNames: ["mycollection", "myothercollection"] },
* { name: "myotherchaincode", collectionNames: ["mycollection", "myothercollection"] }}
* ]
*/

Expand All @@ -144,7 +152,8 @@ class DiscoveryService extends ServiceAction {
/**
* @typedef {Object} DiscoveryChaincodeCall
* @property {string} name - The name of the chaincode
* @property {string[]} [collection_names] - The names of the related collections
* @property {string[]} [collectionNames] - The names of the related collections
* @property {boolean} [noPrivateReads] - Indicates we do not need to read from private data
*/

/**
Expand Down Expand Up @@ -195,7 +204,7 @@ class DiscoveryService extends ServiceAction {
queries.push(localQuery);
}

// add a chaincode query to get endorsement plans
// add a discovery chaincode query to get endorsement plans
if (endorsement || interest) {
const interests = [];

Expand Down Expand Up @@ -285,9 +294,12 @@ class DiscoveryService extends ServiceAction {
for (const target of this.targets) {
logger.debug(`${method} - about to discover on ${target.endpoint.url}`);
try {
response = await target.sendDiscovery(signedEnvelope, this.requestTimeout);
this.currentTarget = target;
break;
const isConnected = await target.checkConnection();
if (isConnected) {
response = await target.sendDiscovery(signedEnvelope, this.requestTimeout);
this.currentTarget = target;
break;
}
} catch (error) {
response = error;
}
Expand Down Expand Up @@ -376,6 +388,9 @@ class DiscoveryService extends ServiceAction {
const chaincodeCall = fabproto6.discovery.ChaincodeCall.create();
if (typeof chaincode.name === 'string') {
chaincodeCall.name = chaincode.name;
if (chaincode.noPrivateReads) {
chaincodeCall.no_private_reads = chaincode.noPrivateReads;
}
// support both names
if (chaincode.collection_names) {
_getCollectionNames(chaincode.collection_names, chaincodeCall);
Expand Down Expand Up @@ -738,7 +753,7 @@ class DiscoveryService extends ServiceAction {
}
}

function _getCollectionNames(names, chaincode_call) {
function _getCollectionNames(names, chaincodeCall) {
if (Array.isArray(names)) {
const collection_names = [];
names.map(name => {
Expand All @@ -748,7 +763,9 @@ function _getCollectionNames(names, chaincode_call) {
throw Error('The collection name must be a string');
}
});
chaincode_call.collection_names = collection_names;
// this collection_names must be in snake case as it will
// be used by the gRPC create message
chaincodeCall.collection_names = collection_names;
} else {
throw Error('Collection names must be an array of strings');
}
Expand Down
27 changes: 18 additions & 9 deletions fabric-common/lib/EventService.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class EventService extends ServiceAction {
// will be set during the .build call
this.blockType = FILTERED_BLOCK;
this.replay = false;
this.startSpecified = false;

this.myNumber = count++;
}
Expand All @@ -113,13 +114,13 @@ class EventService extends ServiceAction {
}

for (const eventer of targets) {
if (eventer.connected || eventer.isConnectable()) {
logger.debug('%s - target is or could be connected %s', method, eventer.name);
if (eventer.isConnectable()) {
logger.debug('%s - target is connectable %s', method, eventer.name);
} else {
throw Error(`Eventer ${eventer.name} is not connectable`);
}
}
// must be all targets are connected
// must be all targets are connectable
this.targets = targets;

return this;
Expand Down Expand Up @@ -266,6 +267,7 @@ class EventService extends ServiceAction {
number: this.startBlock
});
this.replay = true;
this.startSpecified = true;
}

// build stop proto
Expand Down Expand Up @@ -356,10 +358,6 @@ class EventService extends ServiceAction {
logger.debug('%s - target has a stream, is already listening %s', method, target.toString());
startError = Error(`Event service ${target.name} is currently listening`);
} else {
if (target.isConnectable()) {
logger.debug('%s - target needs to connect %s', method, target.toString());
await target.connect(); // target endpoint has been previously assigned, but not connected yet
}
const isConnected = await target.checkConnection();
if (!isConnected) {
startError = Error(`Event service ${target.name} is not connected`);
Expand Down Expand Up @@ -409,8 +407,19 @@ class EventService extends ServiceAction {

logger.debug('%s - create stream setup timeout', method);
const connectionSetupTimeout = setTimeout(() => {
logger.error(`EventService[${this.name}] timed out after:${requestTimeout}`);
reject(Error('Event service timed out - Unable to start listening'));
// this service may be waiting for a start block that has not happened
if (this.startSpecified) {
logger.debug(`EventService[${this.name}] timed out after:${requestTimeout}`);
logger.debug(`EventService[${this.name}] not stopping service, wait indefinitely`);
// resolve the promise as if we did get a good response from the peer, since we did
// not get an "end" or "error" back indicating that the request was invalid
// application should have a timer just in case this peer never gets this block
resolve(eventer);
} else {
logger.error(`EventService[${this.name}] timed out after:${requestTimeout}`);
reject(Error('Event service timed out - Unable to start listening'));
}

}, requestTimeout);

logger.debug('%s - create stream based on blockType', method, this.blockType);
Expand Down
2 changes: 2 additions & 0 deletions fabric-common/lib/Eventer.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Eventer extends ServiceEndpoint {
const method = `checkConnection[${this.name}:${this.myCount}]`;
logger.debug(`${method} - start`);

super.checkConnection();

let result = false;
if (this.service) {
try {
Expand Down
Loading

0 comments on commit 475fc7d

Please sign in to comment.