-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add sync handler base #73
Changes from 1 commit
2343b8a
9cf9921
0bb5cce
1d0ad1c
76d8a78
b951bac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,6 +251,77 @@ def post(self): | |
self.finish() | ||
|
||
|
||
# ==================================================================================== | ||
# Handle Syncing Local and Remote filesystems | ||
# ==================================================================================== | ||
class FilesystemSyncHandler(BaseFileSystemHandler): | ||
def initialize(self, fs_manager): | ||
self.fs_manager = fs_manager | ||
|
||
async def get(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
# remote to local (fetching latest changes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's to be explicitly "remote to local", then fs.get() may be simpler, unless you really use features from rsync (e.g., testing the file sizes of files, deleting ones that disappeared on remote). |
||
request_data = json.loads(self.request.body.decode("utf-8")) | ||
local_destination_path = request_data.get("local_path") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Were we ever thinking of using pydantic? Unwrapping all the keys and validating seems a little tedious. |
||
remote_source_path = request_data.get("remote_path") | ||
dest_fs_key = request_data.get("destination_key") | ||
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) | ||
dest_path = dest_fs_info["path"] | ||
|
||
response = {"content": None} | ||
|
||
fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) | ||
remote_fs_instance = fs["instance"] # noqa: F841 | ||
|
||
try: | ||
# rsync | ||
# (remote_source_path, local_destination_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you don't trigger this yet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can probably figure out here is this is going to be async; OR, we can use the wrapper to make everything async (run on a thread) to make sure we don't block. |
||
# await remote_fs_instance.... | ||
self.set_status(200) | ||
response["status"] = "success" | ||
response["description"] = ( | ||
f"Synced {local_destination_path} to {remote_source_path}." | ||
) | ||
except Exception as e: | ||
print(f"Error with sync handler: {e}") | ||
self.set_status(500) | ||
response["status"] = "failed" | ||
response["description"] = str(e) | ||
self.write(response) | ||
self.finish() | ||
|
||
async def post(self): | ||
# local to remote (pushing latest changes) | ||
key = self.get_argument("key") # noqa: F841 | ||
request_data = json.loads(self.request.body.decode("utf-8")) | ||
local_source_path = request_data.get("local_path") | ||
remote_destination_path = request_data.get("remote_path") | ||
dest_fs_key = request_data.get("destination_key") | ||
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) | ||
dest_path = dest_fs_info["path"] | ||
|
||
response = {"content": None} | ||
|
||
fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) | ||
remote_fs_instance = fs["instance"] # noqa: F841 | ||
|
||
try: | ||
# rsync | ||
# (local_source_path, remote_destination_path) | ||
# await remote_fs_instance.... | ||
self.set_status(200) | ||
response["status"] = "success" | ||
response["description"] = ( | ||
f"Synced {remote_destination_path} to {local_source_path}." | ||
) | ||
except Exception as e: | ||
logger.debug(f"Error with sync handler: {e}") | ||
self.set_status(500) | ||
response["status"] = "failed" | ||
response["description"] = str(e) | ||
self.write(response) | ||
self.finish() | ||
|
||
|
||
# ==================================================================================== | ||
# CRUD for FileSystem | ||
# ==================================================================================== | ||
|
@@ -556,13 +627,15 @@ def setup_handlers(web_app): | |
route_fs_file_transfer = url_path_join( | ||
base_url, "jupyter_fsspec", "files", "transfer" | ||
) | ||
route_fs_sync = url_path_join(base_url, "jupyter_fsspec", "sync") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really see why we need a separate URL here is we don't generally for other operations. |
||
|
||
handlers = [ | ||
(route_fsspec_config, FsspecConfigHandler, dict(fs_manager=fs_manager)), | ||
(route_files, FileSystemHandler, dict(fs_manager=fs_manager)), | ||
(route_rename_files, RenameFileHandler, dict(fs_manager=fs_manager)), | ||
(route_file_actions, FileActionHandler, dict(fs_manager=fs_manager)), | ||
(route_fs_file_transfer, FileTransferHandler, dict(fs_manager=fs_manager)), | ||
(route_fs_sync, FilesystemSyncHandler, dict(fs_manager=fs_manager)), | ||
] | ||
|
||
web_app.add_handlers(host_pattern, handlers) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -539,3 +539,70 @@ async def test_upload_download(fs_manager_instance, jp_fetch): | |
# downloaded_dirpath = local_root_path + '/some' | ||
# new_local_items = local_fs.ls(downloaded_dirpath) | ||
# assert downloaded_dirpath in new_local_items | ||
|
||
|
||
async def xtest_sync_push(fs_manager_instance, jp_fetch): | ||
# WIP | ||
fs_manager = fs_manager_instance | ||
remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") | ||
remote_key = remote_fs_info["key"] | ||
remote_fs = remote_fs_info["info"]["instance"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's weird to me, that the call to get_filesystem succeeds without an exception but returns nothing. |
||
remote_root_path = remote_fs_info["info"]["path"] | ||
assert remote_fs is not None | ||
|
||
local_fs_info = fs_manager.get_filesystem_by_protocol("local") | ||
local_key = local_fs_info["key"] # noqa: F841 | ||
local_fs = local_fs_info["info"]["instance"] | ||
local_root_path = local_fs_info["info"]["path"] | ||
assert local_fs is not None | ||
|
||
push_sync_payload = { | ||
"local_path": local_root_path, | ||
"remote_path": remote_root_path, | ||
"destination_key": remote_key, | ||
} | ||
|
||
# calling sync on the remote | ||
sync_local_to_remote_response = await jp_fetch( | ||
"jupyter_fsspec", | ||
"sync", | ||
method="POST", | ||
params={"key": remote_key}, | ||
body=json.dumps(push_sync_payload), | ||
) | ||
|
||
assert sync_local_to_remote_response.code == 200 | ||
|
||
|
||
async def test_sync_pull(fs_manager_instance, jp_fetch): | ||
fs_manager = fs_manager_instance | ||
remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") | ||
remote_key = remote_fs_info["key"] | ||
remote_fs = remote_fs_info["info"]["instance"] | ||
remote_root_path = remote_fs_info["info"]["path"] | ||
assert remote_fs is not None | ||
|
||
local_fs_info = fs_manager.get_filesystem_by_protocol("local") | ||
local_key = local_fs_info["key"] # noqa: F841 | ||
local_fs = local_fs_info["info"]["instance"] | ||
local_root_path = local_fs_info["info"]["path"] | ||
assert local_fs is not None | ||
|
||
pull_sync_payload = { | ||
"local_path": local_root_path, | ||
"remote_path": remote_root_path, | ||
"destination_key": remote_key, | ||
} | ||
|
||
# calling sync on the remote | ||
sync_remote_to_local_response = await jp_fetch( | ||
"jupyter_fsspec", | ||
"sync", | ||
method="GET", | ||
params={"key": remote_key}, | ||
body=json.dumps(pull_sync_payload), | ||
allow_nonstandard_methods=True, | ||
) | ||
|
||
assert sync_remote_to_local_response.code == 200 | ||
# assert body["status"] == "success" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -366,6 +366,52 @@ export class FsspecModel { | |
} | ||
} | ||
|
||
async sync_push( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may wish to eventually provide a feedback mechanism (see e.g., https://filesystem-spec.readthedocs.io/en/latest/features.html#callbacks ). |
||
key: string, | ||
remote_path: string, | ||
local_path: string | ||
): Promise<any> { | ||
try { | ||
const reqBody = JSON.stringify({ | ||
key: key, | ||
remote_path, | ||
local_path | ||
}); | ||
await requestAPI<any>('sync', { | ||
method: 'POST', | ||
body: reqBody, | ||
headers: { | ||
'Content-Type': 'application/json' | ||
} | ||
}); | ||
} catch (error) { | ||
console.error('Failed to sync local to remote: ', error); | ||
} | ||
} | ||
|
||
async sync_pull( | ||
key: string, | ||
remote_path: string, | ||
local_path: string | ||
): Promise<any> { | ||
try { | ||
const reqBody = JSON.stringify({ | ||
key: key, | ||
remote_path, | ||
local_path | ||
}); | ||
await requestAPI<any>('sync', { | ||
method: 'GET', | ||
body: reqBody, | ||
headers: { | ||
'Content-Type': 'application/json' | ||
} | ||
}); | ||
} catch (error) { | ||
console.error('Failed to sync remote to local: ', error); | ||
} | ||
} | ||
|
||
// async update( | ||
// key: any = 'local%7CSourceDisk%7C.', | ||
// item_path = '', | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest that the FIlesystemManager should have the core functionality, and that the handler only take care of passing on the appropriate parameters and set the output status.