Skip to content

Commit

Permalink
feat: progress on blob/add handler
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw committed Nov 1, 2024
1 parent ffcd9c7 commit 9b16598
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 66 deletions.
2 changes: 2 additions & 0 deletions packages/upload-api/src/blob/accept.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ export const poll = async (context, receipt) => {
return { ok: {} }
}

// TODO: LOOKUP IN ALLOCATIONS STORAGE AS WE DON"T HAVE THIS INVOCATION

// Otherwise we are going to lookup allocation corresponding to this http/put
// in order to issue blob/accept.
const [, allocation] = /** @type {API.UCANAwait} */ (put.nb.url)['ucan/await']
Expand Down
151 changes: 88 additions & 63 deletions packages/upload-api/src/blob/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Receipt } from '@ucanto/core'
import { ed25519 } from '@ucanto/principal'
import * as Blob from '@storacha/capabilities/blob'
import * as SpaceBlob from '@storacha/capabilities/space/blob'
import * as W3sBlob from '@storacha/capabilities/web3.storage/blob'
import * as HTTP from '@storacha/capabilities/http'
import * as API from '../types.js'

Expand Down Expand Up @@ -32,70 +31,55 @@ const conclude = (receipt, issuer, audience = issuer) =>
* @returns {API.ServiceMethod<API.SpaceBlobAdd, API.BlobAddSuccess, API.BlobAddFailure>}
*/
export function blobAddProvider(context) {
const { routingService } = context

return Server.provideAdvanced({
capability: SpaceBlob.add,
handler: async ({ capability, invocation }) => {
const { with: space, nb } = capability
const { blob } = nb

const digest = Digest.decode(blob.digest)

const candidateRes = await routingService.selectBlobAllocationCandidate(digest, blob.size)
if (candidateRes.error) {
return candidateRes
}

const candidate = candidateRes.ok
const cap = Blob.allocate.create({
with: candidate,
nb: {
blob: {
digest: blob.digest,
size: blob.size,
},
space,
cause: invocation.link()
}
const allocation = await allocate({
context,
blob,
space,
cause: invocation.link(),
})

const confRes = await routingService.configureInvocation(candidate, cap)
if (confRes.error) {
return confRes
}

const allocReceipt = await confRes.ok.invocation.execute(confRes.ok.connection)
if (allocReceipt.out.error) {
return allocReceipt.out
if (allocation.error) {
return allocation
}

const delivery = await put({
blob,
allocation: { receipt: allocReceipt },
allocation: allocation.ok,
})
if (delivery.error) {
return delivery
}

const acceptance = await accept({
context,
provider: allocation.ok.provider,
blob,
space,
delivery,
delivery: delivery.ok,
})
if (acceptance.error) {
return acceptance
}

// Create a result describing the this invocation workflow
let result = Server.ok({
/** @type {API.BlobAddSuccess['site']} */
site: {
'ucan/await': ['.out.ok.site', acceptance.task.link()],
'ucan/await': ['.out.ok.site', acceptance.ok.task.link()],
},
})
.fork(allocation.task)
.fork(delivery.task)
.fork(acceptance.task)
.fork(allocation.ok.task)
.fork(delivery.ok.task)
.fork(acceptance.ok.task)

// As a temporary solution we fork all add effects that add inline
// receipts so they can be delivered to the client.
const fx = [...allocation.fx, ...delivery.fx, ...acceptance.fx]
const fx = [...allocation.ok.fx, ...delivery.ok.fx, ...acceptance.ok.fx]
for (const task of fx) {
result = result.fork(task)
}
Expand All @@ -116,21 +100,57 @@ export function blobAddProvider(context) {
* @param {API.Link} allocate.cause
*/
async function allocate({ context, blob, space, cause }) {
// 1. Create web3.storage/blob/allocate invocation and task
const allocate = W3sBlob.allocate.invoke({
issuer: context.id,
audience: context.id,
with: context.id.did(),
// First we check if space has storage provider associated. If it does not
// we return `InsufficientStorage` error as storage capacity is considered
// to be 0.
const provisioned = await context.provisionsStorage.hasStorageProvider(space)
if (provisioned.error) {
return provisioned
}

if (!provisioned.ok) {
return {
/** @type {API.AllocationError} */
error: {
name: 'InsufficientStorage',
message: `${space} has no storage provider`,
},
}
}

// 1. Create blob/allocate invocation and task
const { router } = context
const digest = Digest.decode(blob.digest)

const candidate = await router.selectBlobAllocationCandidate(digest, blob.size)
if (candidate.error) {
return candidate
}

const cap = Blob.allocate.create({
with: candidate.ok.did(),
nb: {
blob,
cause: cause,
blob: {
digest: blob.digest,
size: blob.size,
},
space,
},
expiration: Infinity,
cause
}
})
const task = await allocate.delegate()

const receipt = await allocate.execute(context.getServiceConnection())
const configure = await router.configureInvocation(candidate.ok, cap, { expiration: Infinity })
if (configure.error) {
return configure
}

const task = await configure.ok.invocation.delegate()
const receipt = await configure.ok.invocation.execute(configure.ok.connection)
if (receipt.out.error) {
return receipt.out
}

// TODO: ADD TO ALLOCATIONS STORAGE

// 4. Create `blob/allocate` receipt as conclude invocation to inline as effect
const concludeAllocate = createConcludeInvocation(
Expand All @@ -139,11 +159,12 @@ async function allocate({ context, blob, space, cause }) {
receipt
)

return {
return Server.ok({
provider: candidate.ok,
task,
receipt,
fx: [await concludeAllocate.delegate()],
}
})
}

/**
Expand Down Expand Up @@ -220,11 +241,11 @@ async function put({ blob, allocation }) {
})
}

return {
return Server.ok({
task,
receipt,
fx: receipt ? [await conclude(receipt, blobProvider)] : [],
}
})
}

/**
Expand All @@ -233,26 +254,30 @@ async function put({ blob, allocation }) {
*
* @param {object} input
* @param {API.BlobServiceContext} input.context
* @param {API.Principal} input.provider
* @param {API.BlobModel} input.blob
* @param {API.DIDKey} input.space
* @param {object} input.delivery
* @param {API.Invocation<API.HTTPPut>} input.delivery.task
* @param {API.Receipt|null} input.delivery.receipt
*/
async function accept({ context, blob, space, delivery }) {
// 1. Create web3.storage/blob/accept invocation and task
const accept = W3sBlob.accept.invoke({
issuer: context.id,
audience: context.id,
with: context.id.did(),
async function accept({ context, provider, blob, space, delivery }) {
// 1. Create blob/accept invocation and task
const cap = Blob.accept.create({
with: provider.did(),
nb: {
blob,
space,
_put: { 'ucan/await': ['.out.ok', delivery.task.link()] },
},
expiration: Infinity,
})
const task = await accept.delegate()

const configure = await context.router.configureInvocation(provider, cap, { expiration: Infinity })
if (configure.error) {
return configure
}

const task = await configure.ok.invocation.delegate()

let receipt = null

Expand All @@ -272,12 +297,12 @@ async function accept({ context, blob, space, delivery }) {
}
// If put has already succeeded, we can execute `blob/accept` right away.
else if (delivery.receipt?.out.ok) {
receipt = await accept.execute(context.getServiceConnection())
receipt = await configure.ok.invocation.execute(configure.ok.connection)
}

return {
return Server.ok({
task,
receipt,
fx: receipt ? [await conclude(receipt, context.id)] : [],
}
})
}
2 changes: 1 addition & 1 deletion packages/upload-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ export type BlobServiceContext = SpaceServiceContext & {
blobsStorage: BlobsStorage
agentStore: AgentStore
getServiceConnection: () => ConnectionView<Service>
routingService: RoutingService
router: RoutingService
}

export type W3ServiceContext = SpaceServiceContext & {
Expand Down
5 changes: 3 additions & 2 deletions packages/upload-api/src/types/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
UCANOptions,
IssuedInvocationView,
ConnectionView,
Principal,
} from '@ucanto/interface'
import {
Multihash,
Expand Down Expand Up @@ -135,11 +136,11 @@ export interface RoutingService {
* storage nodes.
*/
selectBlobAllocationCandidate(digest: MultihashDigest, size: number):
Promise<Result<DID, CandidateUnavailable|Failure>>
Promise<Result<Principal, CandidateUnavailable|Failure>>
/**
* Returns information required to make an invocation to the requested storage
* node.
*/
configureInvocation<C extends BlobAllocate|BlobAccept>(storageNode: DID, capability: C):
configureInvocation<C extends BlobAllocate|BlobAccept>(storageNode: Principal, capability: C, options?: Omit<UCANOptions, 'audience'>):
Promise<Result<Configuration<C>, ProofUnavailable|Failure>>
}

0 comments on commit 9b16598

Please sign in to comment.