Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add sync handler base #73

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions jupyter_fsspec/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,77 @@ def post(self):
self.finish()


# ====================================================================================
# Handle Syncing Local and Remote filesystems
# ====================================================================================
class FilesystemSyncHandler(BaseFileSystemHandler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that the FIlesystemManager should have the core functionality, and that the handler only take care of passing on the appropriate parameters and set the output status.

def initialize(self, fs_manager):
self.fs_manager = fs_manager

async def get(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get implies no change of state. I would call both methods "post", but have them on different endpoints or with different operation parameters as with the other filesystem commands.

# remote to local (fetching latest changes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's to be explicitly "remote to local", then fs.get() may be simpler, unless you really use features from rsync (e.g., testing the file sizes of files, deleting ones that disappeared on remote).

request_data = json.loads(self.request.body.decode("utf-8"))
local_destination_path = request_data.get("local_path")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were we ever thinking of using pydantic? Unwrapping all the keys and validating seems a little tedious.
The endpoint doc must document verbosely what the expected input is like (e.g., this might eventually appear in swagger generated docs).

remote_source_path = request_data.get("remote_path")
dest_fs_key = request_data.get("destination_key")
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
dest_path = dest_fs_info["path"]

response = {"content": None}

fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path)
remote_fs_instance = fs["instance"] # noqa: F841

try:
# rsync
# (remote_source_path, local_destination_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you don't trigger this yet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably figure out here is this is going to be async; OR, we can use the wrapper to make everything async (run on a thread) to make sure we don't block.

# await remote_fs_instance....
self.set_status(200)
response["status"] = "success"
response["description"] = (
f"Synced {local_destination_path} to {remote_source_path}."
)
except Exception as e:
print(f"Error with sync handler: {e}")
self.set_status(500)
response["status"] = "failed"
response["description"] = str(e)
self.write(response)
self.finish()

async def post(self):
# local to remote (pushing latest changes)
key = self.get_argument("key") # noqa: F841
request_data = json.loads(self.request.body.decode("utf-8"))
local_source_path = request_data.get("local_path")
remote_destination_path = request_data.get("remote_path")
dest_fs_key = request_data.get("destination_key")
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
dest_path = dest_fs_info["path"]

response = {"content": None}

fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path)
remote_fs_instance = fs["instance"] # noqa: F841

try:
# rsync
# (local_source_path, remote_destination_path)
# await remote_fs_instance....
self.set_status(200)
response["status"] = "success"
response["description"] = (
f"Synced {remote_destination_path} to {local_source_path}."
)
except Exception as e:
logger.debug(f"Error with sync handler: {e}")
self.set_status(500)
response["status"] = "failed"
response["description"] = str(e)
self.write(response)
self.finish()


# ====================================================================================
# CRUD for FileSystem
# ====================================================================================
Expand Down Expand Up @@ -556,13 +627,15 @@ def setup_handlers(web_app):
route_fs_file_transfer = url_path_join(
base_url, "jupyter_fsspec", "files", "transfer"
)
route_fs_sync = url_path_join(base_url, "jupyter_fsspec", "sync")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see why we need a separate URL here is we don't generally for other operations.


handlers = [
(route_fsspec_config, FsspecConfigHandler, dict(fs_manager=fs_manager)),
(route_files, FileSystemHandler, dict(fs_manager=fs_manager)),
(route_rename_files, RenameFileHandler, dict(fs_manager=fs_manager)),
(route_file_actions, FileActionHandler, dict(fs_manager=fs_manager)),
(route_fs_file_transfer, FileTransferHandler, dict(fs_manager=fs_manager)),
(route_fs_sync, FilesystemSyncHandler, dict(fs_manager=fs_manager)),
]

web_app.add_handlers(host_pattern, handlers)
67 changes: 67 additions & 0 deletions jupyter_fsspec/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,70 @@ async def test_upload_download(fs_manager_instance, jp_fetch):
# downloaded_dirpath = local_root_path + '/some'
# new_local_items = local_fs.ls(downloaded_dirpath)
# assert downloaded_dirpath in new_local_items


async def xtest_sync_push(fs_manager_instance, jp_fetch):
# WIP
fs_manager = fs_manager_instance
remote_fs_info = fs_manager.get_filesystem_by_protocol("s3")
remote_key = remote_fs_info["key"]
remote_fs = remote_fs_info["info"]["instance"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to me, that the call to get_filesystem succeeds without an exception but returns nothing.
Also, didn't we key these things by configured name rather than protocol?

remote_root_path = remote_fs_info["info"]["path"]
assert remote_fs is not None

local_fs_info = fs_manager.get_filesystem_by_protocol("local")
local_key = local_fs_info["key"] # noqa: F841
local_fs = local_fs_info["info"]["instance"]
local_root_path = local_fs_info["info"]["path"]
assert local_fs is not None

push_sync_payload = {
"local_path": local_root_path,
"remote_path": remote_root_path,
"destination_key": remote_key,
}

# calling sync on the remote
sync_local_to_remote_response = await jp_fetch(
"jupyter_fsspec",
"sync",
method="POST",
params={"key": remote_key},
body=json.dumps(push_sync_payload),
)

assert sync_local_to_remote_response.code == 200


async def test_sync_pull(fs_manager_instance, jp_fetch):
fs_manager = fs_manager_instance
remote_fs_info = fs_manager.get_filesystem_by_protocol("s3")
remote_key = remote_fs_info["key"]
remote_fs = remote_fs_info["info"]["instance"]
remote_root_path = remote_fs_info["info"]["path"]
assert remote_fs is not None

local_fs_info = fs_manager.get_filesystem_by_protocol("local")
local_key = local_fs_info["key"] # noqa: F841
local_fs = local_fs_info["info"]["instance"]
local_root_path = local_fs_info["info"]["path"]
assert local_fs is not None

pull_sync_payload = {
"local_path": local_root_path,
"remote_path": remote_root_path,
"destination_key": remote_key,
}

# calling sync on the remote
sync_remote_to_local_response = await jp_fetch(
"jupyter_fsspec",
"sync",
method="GET",
params={"key": remote_key},
body=json.dumps(pull_sync_payload),
allow_nonstandard_methods=True,
)

assert sync_remote_to_local_response.code == 200
# assert body["status"] == "success"
46 changes: 46 additions & 0 deletions src/handler/fileOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,52 @@ export class FsspecModel {
}
}

async sync_push(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may wish to eventually provide a feedback mechanism (see e.g., https://filesystem-spec.readthedocs.io/en/latest/features.html#callbacks ).

key: string,
remote_path: string,
local_path: string
): Promise<any> {
try {
const reqBody = JSON.stringify({
key: key,
remote_path,
local_path
});
await requestAPI<any>('sync', {
method: 'POST',
body: reqBody,
headers: {
'Content-Type': 'application/json'
}
});
} catch (error) {
console.error('Failed to sync local to remote: ', error);
}
}

async sync_pull(
key: string,
remote_path: string,
local_path: string
): Promise<any> {
try {
const reqBody = JSON.stringify({
key: key,
remote_path,
local_path
});
await requestAPI<any>('sync', {
method: 'GET',
body: reqBody,
headers: {
'Content-Type': 'application/json'
}
});
} catch (error) {
console.error('Failed to sync remote to local: ', error);
}
}

// async update(
// key: any = 'local%7CSourceDisk%7C.',
// item_path = '',
Expand Down
Loading