From 2343b8a770d3333aabf8d0ec2dd88a910916c3c7 Mon Sep 17 00:00:00 2001 From: RRosio Date: Thu, 30 Jan 2025 12:30:05 -0800 Subject: [PATCH 1/5] Add sync handler base, frontend server call functions and base for backend sync handler tests --- jupyter_fsspec/handlers.py | 73 ++++++++++++++++++++++++++++++++ jupyter_fsspec/tests/test_api.py | 67 +++++++++++++++++++++++++++++ src/handler/fileOperations.ts | 46 ++++++++++++++++++++ 3 files changed, 186 insertions(+) diff --git a/jupyter_fsspec/handlers.py b/jupyter_fsspec/handlers.py index 729d5d6..faa8881 100644 --- a/jupyter_fsspec/handlers.py +++ b/jupyter_fsspec/handlers.py @@ -251,6 +251,77 @@ def post(self): self.finish() +# ==================================================================================== +# Handle Syncing Local and Remote filesystems +# ==================================================================================== +class FilesystemSyncHandler(BaseFileSystemHandler): + def initialize(self, fs_manager): + self.fs_manager = fs_manager + + async def get(self): + # remote to local (fetching latest changes) + request_data = json.loads(self.request.body.decode("utf-8")) + local_destination_path = request_data.get("local_path") + remote_source_path = request_data.get("remote_path") + dest_fs_key = request_data.get("destination_key") + dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) + dest_path = dest_fs_info["path"] + + response = {"content": None} + + fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) + remote_fs_instance = fs["instance"] # noqa: F841 + + try: + # rsync + # (remote_source_path, local_destination_path) + # await remote_fs_instance.... + self.set_status(200) + response["status"] = "success" + response["description"] = ( + f"Synced {local_destination_path} to {remote_source_path}." + ) + except Exception as e: + print(f"Error with sync handler: {e}") + self.set_status(500) + response["status"] = "failed" + response["description"] = str(e) + self.write(response) + self.finish() + + async def post(self): + # local to remote (pushing latest changes) + key = self.get_argument("key") # noqa: F841 + request_data = json.loads(self.request.body.decode("utf-8")) + local_source_path = request_data.get("local_path") + remote_destination_path = request_data.get("remote_path") + dest_fs_key = request_data.get("destination_key") + dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) + dest_path = dest_fs_info["path"] + + response = {"content": None} + + fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) + remote_fs_instance = fs["instance"] # noqa: F841 + + try: + # rsync + # (local_source_path, remote_destination_path) + # await remote_fs_instance.... + self.set_status(200) + response["status"] = "success" + response["description"] = ( + f"Synced {remote_destination_path} to {local_source_path}." + ) + except Exception as e: + logger.debug(f"Error with sync handler: {e}") + self.set_status(500) + response["status"] = "failed" + response["description"] = str(e) + self.write(response) + self.finish() + + # ==================================================================================== # CRUD for FileSystem # ==================================================================================== @@ -556,6 +627,7 @@ def setup_handlers(web_app): route_fs_file_transfer = url_path_join( base_url, "jupyter_fsspec", "files", "transfer" ) + route_fs_sync = url_path_join(base_url, "jupyter_fsspec", "sync") handlers = [ (route_fsspec_config, FsspecConfigHandler, dict(fs_manager=fs_manager)), @@ -563,6 +635,7 @@ def setup_handlers(web_app): (route_rename_files, RenameFileHandler, dict(fs_manager=fs_manager)), (route_file_actions, FileActionHandler, dict(fs_manager=fs_manager)), (route_fs_file_transfer, FileTransferHandler, dict(fs_manager=fs_manager)), + (route_fs_sync, FilesystemSyncHandler, dict(fs_manager=fs_manager)), ] web_app.add_handlers(host_pattern, handlers) diff --git a/jupyter_fsspec/tests/test_api.py b/jupyter_fsspec/tests/test_api.py index a218438..9c2f598 100644 --- a/jupyter_fsspec/tests/test_api.py +++ b/jupyter_fsspec/tests/test_api.py @@ -539,3 +539,70 @@ async def test_upload_download(fs_manager_instance, jp_fetch): # downloaded_dirpath = local_root_path + '/some' # new_local_items = local_fs.ls(downloaded_dirpath) # assert downloaded_dirpath in new_local_items + + +async def xtest_sync_push(fs_manager_instance, jp_fetch): + # WIP + fs_manager = fs_manager_instance + remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") + remote_key = remote_fs_info["key"] + remote_fs = remote_fs_info["info"]["instance"] + remote_root_path = remote_fs_info["info"]["path"] + assert remote_fs is not None + + local_fs_info = fs_manager.get_filesystem_by_protocol("local") + local_key = local_fs_info["key"] # noqa: F841 + local_fs = local_fs_info["info"]["instance"] + local_root_path = local_fs_info["info"]["path"] + assert local_fs is not None + + push_sync_payload = { + "local_path": local_root_path, + "remote_path": remote_root_path, + "destination_key": remote_key, + } + + # calling sync on the remote + sync_local_to_remote_response = await jp_fetch( + "jupyter_fsspec", + "sync", + method="POST", + params={"key": remote_key}, + body=json.dumps(push_sync_payload), + ) + + assert sync_local_to_remote_response.code == 200 + + +async def test_sync_pull(fs_manager_instance, jp_fetch): + fs_manager = fs_manager_instance + remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") + remote_key = remote_fs_info["key"] + remote_fs = remote_fs_info["info"]["instance"] + remote_root_path = remote_fs_info["info"]["path"] + assert remote_fs is not None + + local_fs_info = fs_manager.get_filesystem_by_protocol("local") + local_key = local_fs_info["key"] # noqa: F841 + local_fs = local_fs_info["info"]["instance"] + local_root_path = local_fs_info["info"]["path"] + assert local_fs is not None + + pull_sync_payload = { + "local_path": local_root_path, + "remote_path": remote_root_path, + "destination_key": remote_key, + } + + # calling sync on the remote + sync_remote_to_local_response = await jp_fetch( + "jupyter_fsspec", + "sync", + method="GET", + params={"key": remote_key}, + body=json.dumps(pull_sync_payload), + allow_nonstandard_methods=True, + ) + + assert sync_remote_to_local_response.code == 200 + # assert body["status"] == "success" diff --git a/src/handler/fileOperations.ts b/src/handler/fileOperations.ts index b847fb4..46b89a3 100644 --- a/src/handler/fileOperations.ts +++ b/src/handler/fileOperations.ts @@ -366,6 +366,52 @@ export class FsspecModel { } } + async sync_push( + key: string, + remote_path: string, + local_path: string + ): Promise { + try { + const reqBody = JSON.stringify({ + key: key, + remote_path, + local_path + }); + await requestAPI('sync', { + method: 'POST', + body: reqBody, + headers: { + 'Content-Type': 'application/json' + } + }); + } catch (error) { + console.error('Failed to sync local to remote: ', error); + } + } + + async sync_pull( + key: string, + remote_path: string, + local_path: string + ): Promise { + try { + const reqBody = JSON.stringify({ + key: key, + remote_path, + local_path + }); + await requestAPI('sync', { + method: 'GET', + body: reqBody, + headers: { + 'Content-Type': 'application/json' + } + }); + } catch (error) { + console.error('Failed to sync remote to local: ', error); + } + } + // async update( // key: any = 'local%7CSourceDisk%7C.', // item_path = '', From 9cf9921295ffaea022b94f8beffead707b6b39bb Mon Sep 17 00:00:00 2001 From: RRosio Date: Tue, 11 Feb 2025 23:20:39 -0800 Subject: [PATCH 2/5] remove base handler, move validation to file manager class --- jupyter_fsspec/file_manager.py | 17 +++++++++ jupyter_fsspec/handlers.py | 67 +++++++++++----------------------- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/jupyter_fsspec/file_manager.py b/jupyter_fsspec/file_manager.py index 9822def..876be2c 100644 --- a/jupyter_fsspec/file_manager.py +++ b/jupyter_fsspec/file_manager.py @@ -203,6 +203,23 @@ def check_reload_config(self): def get_all_filesystems(self): self._initialize_filesystems() + def validate_fs(self, request_type, key, item_path): + if not key: + raise ValueError("Missing required parameter `key`") + + fs = self.get_filesystem(key) + + if not item_path: + if str(type) != "range" and request_type == "get": + item_path = self.fs_manager.filesystems[key]["path"] + else: + raise ValueError("Missing required parameter `item_path`") + + if fs is None: + raise ValueError(f"No filesystem found for key: {key}") + + return fs, item_path + def get_filesystem(self, key): return self.filesystems.get(key) diff --git a/jupyter_fsspec/handlers.py b/jupyter_fsspec/handlers.py index faa8881..a21cd00 100644 --- a/jupyter_fsspec/handlers.py +++ b/jupyter_fsspec/handlers.py @@ -11,38 +11,6 @@ logger = logging.getLogger(__name__) -class BaseFileSystemHandler(APIHandler): - def initialize(self, fs_manager): - self.fs_manager = fs_manager - - def validate_fs(self, request_type, key, item_path): - """Retrieve the filesystem instance and path of the item - - :raises [ValueError]: [Missing required key parameter] - :raises [ValueError]: [Missing required parameter item_path] - :raises [ValueError]: [No filesystem identified for provided key] - - :return: filesystem instance and item_path - :rtype: fsspec filesystem instance and string - """ - - if not key: - raise ValueError("Missing required parameter `key`") - - fs = self.fs_manager.get_filesystem(key) - - if not item_path: - if str(type) != "range" and request_type == "get": - item_path = self.fs_manager.filesystems[key]["path"] - else: - raise ValueError("Missing required parameter `item_path`") - - if fs is None: - raise ValueError(f"No filesystem found for key: {key}") - - return fs, item_path - - class FsspecConfigHandler(APIHandler): """ @@ -101,7 +69,7 @@ def get(self): # ==================================================================================== # Handle Move and Copy Requests # ==================================================================================== -class FileActionHandler(BaseFileSystemHandler): +class FileActionHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -124,7 +92,7 @@ async def post(self): destination = request_data.get("content") response = {"content": None} - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] try: @@ -153,7 +121,7 @@ async def post(self): # ==================================================================================== # Handle Move and Copy Requests Across filesystems # ==================================================================================== -class FileTransferHandler(BaseFileSystemHandler): +class FileTransferHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -180,7 +148,7 @@ async def post(self): response = {"content": None} - fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) + fs, dest_path = self.fs_manager.validate_fs("post", dest_fs_key, dest_path) fs_instance = fs["instance"] print(f"fs_instance: {fs_instance}") @@ -219,7 +187,7 @@ async def post(self): # ==================================================================================== # Handle Rename requests (?seperate or not?) # ==================================================================================== -class RenameFileHandler(BaseFileSystemHandler): +class RenameFileHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -231,7 +199,7 @@ def post(self): content = request_data.get("content") response = {"content": None} - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] # expect item path to end with `/` for directories # expect content to be the FULL new path @@ -254,7 +222,7 @@ def post(self): # ==================================================================================== # Handle Syncing Local and Remote filesystems # ==================================================================================== -class FilesystemSyncHandler(BaseFileSystemHandler): +class FilesystemSyncHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -269,7 +237,7 @@ async def get(self): response = {"content": None} - fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) + fs, dest_path = self.fs_manager.validate_fs("post", dest_fs_key, dest_path) remote_fs_instance = fs["instance"] # noqa: F841 try: @@ -308,6 +276,13 @@ async def post(self): # rsync # (local_source_path, remote_destination_path) # await remote_fs_instance.... + if remote_fs_instance.async_impl: + # async + pass + else: + # Non-async + pass + self.set_status(200) response["status"] = "success" response["description"] = ( @@ -325,7 +300,7 @@ async def post(self): # ==================================================================================== # CRUD for FileSystem # ==================================================================================== -class FileSystemHandler(BaseFileSystemHandler): +class FileSystemHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -354,7 +329,7 @@ async def get(self): req_item_path = self.get_argument("item_path") type = self.get_argument("type", default="default") - fs, item_path = self.validate_fs("get", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("get", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -440,7 +415,7 @@ async def post(self): req_item_path = request_data.get("item_path") content = request_data.get("content") - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -495,7 +470,7 @@ async def put(self): req_item_path = request_data.get("item_path") content = request_data.get("content") - fs, item_path = self.validate_fs("put", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("put", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -537,7 +512,7 @@ async def patch(self): offset = request_data.get("offset") content = request_data.get("content") - fs, item_path = self.validate_fs("patch", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("patch", key, req_item_path) fs_instance = fs["instance"] # TODO: offset @@ -592,7 +567,7 @@ async def delete(self): request_data = json.loads(self.request.body.decode("utf-8")) req_item_path = request_data.get("item_path") - fs, item_path = self.validate_fs("delete", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("delete", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} From 0bb5cceb1df3836f2ecfb53b67b9d39383b49eae Mon Sep 17 00:00:00 2001 From: RRosio Date: Thu, 13 Feb 2025 23:35:34 -0800 Subject: [PATCH 3/5] consolidate sync into upload/download handler, add pydantic schema for requests, add to tests up/downloading from root filesystems --- conftest.py | 19 ++-- jupyter_fsspec/file_manager.py | 6 - jupyter_fsspec/handlers.py | 181 ++++++++++--------------------- jupyter_fsspec/schemas.py | 43 ++++++++ jupyter_fsspec/tests/test_api.py | 165 +++++++++++++++------------- src/handler/fileOperations.ts | 5 + 6 files changed, 203 insertions(+), 216 deletions(-) create mode 100644 jupyter_fsspec/schemas.py diff --git a/conftest.py b/conftest.py index d13ecee..49181c2 100644 --- a/conftest.py +++ b/conftest.py @@ -35,11 +35,9 @@ def setup_tmp_local(tmp_path: Path): yield [local_root, local_empty_root] -@pytest.fixture(scope="function", autouse=True) +@pytest.fixture(scope="function") def setup_config_file_fs(tmp_path: Path, setup_tmp_local): tmp_local = setup_tmp_local[0] - items_tmp_local = list(tmp_local.iterdir()) - print(f"items_tmp_local: {items_tmp_local}") empty_tmp_local = setup_tmp_local[1] config_dir = tmp_path / "config" config_dir.mkdir(exist_ok=True) @@ -121,7 +119,7 @@ def get_boto3_client(): ) -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def s3_base(): server = ThreadedMotoServer(ip_address="127.0.0.1", port=PORT) server.start() @@ -136,21 +134,24 @@ def s3_base(): server.stop() -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def s3_client(s3_base): client = get_boto3_client() - client.create_bucket(Bucket="my-test-bucket", ACL="public-read") + + bucket_name = "my-test-bucket" + client.create_bucket(Bucket=bucket_name, ACL="public-read") client.put_object( - Body=b"Hello, World1!", Bucket="my-test-bucket", Key="bucket-filename1.txt" + Body=b"Hello, World1!", Bucket=bucket_name, Key="bucket-filename1.txt" ) client.put_object( - Body=b"Hello, World2!", Bucket="my-test-bucket", Key="some/bucket-filename2.txt" + Body=b"Hello, World2!", Bucket=bucket_name, Key="some/bucket-filename2.txt" ) client.put_object( - Body=b"Hello, World3!", Bucket="my-test-bucket", Key="some/bucket-filename3.txt" + Body=b"Hello, World3!", Bucket=bucket_name, Key="some/bucket-filename3.txt" ) yield client + client.close() @pytest.fixture diff --git a/jupyter_fsspec/file_manager.py b/jupyter_fsspec/file_manager.py index 876be2c..9048118 100644 --- a/jupyter_fsspec/file_manager.py +++ b/jupyter_fsspec/file_manager.py @@ -223,12 +223,6 @@ def validate_fs(self, request_type, key, item_path): def get_filesystem(self, key): return self.filesystems.get(key) - def get_filesystem_by_protocol(self, fs_protocol): - for encoded_key, fs_info in self.filesystems.items(): - if fs_info.get("protocol") == fs_protocol: - return {"key": encoded_key, "info": fs_info} - return None - def get_filesystem_protocol(self, key): filesystem_rep = self.filesystems.get(key) print(f"filesystem_rep: {filesystem_rep}") diff --git a/jupyter_fsspec/handlers.py b/jupyter_fsspec/handlers.py index a21cd00..379fc8d 100644 --- a/jupyter_fsspec/handlers.py +++ b/jupyter_fsspec/handlers.py @@ -1,6 +1,7 @@ from .file_manager import FileSystemManager from jupyter_server.base.handlers import APIHandler from jupyter_server.utils import url_path_join +from .schemas import GetRequest, PostRequest, DeleteRequest, TransferRequest, Direction from .utils import parse_range import tornado import json @@ -85,11 +86,13 @@ async def post(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - action = request_data.get("action") - destination = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + action = post_request.action + destination = post_request.content + response = {"content": None} fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) @@ -130,46 +133,50 @@ async def post(self): """Upload/Download the resource at the input path to destination path. :param [key]: [Query arg string used to retrieve the appropriate filesystem instance] - :param [item_path]: [Query arg string path to file or directory to be retrieved] - :param [action]: [Request body string move or copy] - :param [content]: [Request body property file or directory path] + :param [local_path]: [Request body string path to file/directory to be retrieved] + :param [remote_path]: [Request body string path to file/directory to be modified] + :param [action]: [Request body string upload or download] :return: dict with a status, description and (optionally) error :rtype: dict """ request_data = json.loads(self.request.body.decode("utf-8")) - action = request_data.get("action") - local_path = request_data.get("local_path") - remote_path = request_data.get("remote_path") - dest_fs_key = request_data.get("destination_key") + transfer_request = TransferRequest(**request_data) + key = transfer_request.key + local_path = transfer_request.local_path + remote_path = transfer_request.remote_path + dest_fs_key = transfer_request.destination_key dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) dest_path = dest_fs_info["canonical_path"] - # if destination is subfolder, need to parse canonical_path for protocol? + fs_info = self.fs_manager.get_filesystem(key) + path = fs_info["canonical_path"] response = {"content": None} - fs, dest_path = self.fs_manager.validate_fs("post", dest_fs_key, dest_path) - fs_instance = fs["instance"] - print(f"fs_instance: {fs_instance}") - try: - if action == "upload": - # upload remote.put(local_path, remote_path) + if transfer_request.action == Direction.UPLOAD: logger.debug("Upload file") - protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key) - if protocol not in remote_path: - remote_path = protocol + remote_path - # TODO: handle creating directories? current: flat item upload - # remote_path = remote_path (root) + 'nested/' - await fs_instance._put(local_path, remote_path, recursive=True) + fs, dest_path = self.fs_manager.validate_fs( + "post", dest_fs_key, dest_path + ) + fs_instance = fs["instance"] + + if fs_instance.async_impl: + await fs_instance._put(local_path, remote_path, recursive=True) + else: + fs_instance.put(local_path, remote_path, recursive=True) + response["description"] = f"Uploaded {local_path} to {remote_path}." else: - # download remote.get(remote_path, local_path) logger.debug("Download file") - protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key) - if protocol not in remote_path: - remote_path = protocol + remote_path - await fs_instance._get(remote_path, local_path, recursive=True) + fs, dest_path = self.fs_manager.validate_fs("post", key, path) + fs_instance = fs["instance"] + + if fs_instance.async_impl: + await fs_instance._get(remote_path, local_path, recursive=True) + else: + fs_instance.get(remote_path, local_path, recursive=True) + response["description"] = f"Downloaded {remote_path} to {local_path}." response["status"] = "success" @@ -192,11 +199,11 @@ def initialize(self, fs_manager): self.fs_manager = fs_manager def post(self): - key = self.get_argument("key") - request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content response = {"content": None} fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) @@ -219,84 +226,6 @@ def post(self): self.finish() -# ==================================================================================== -# Handle Syncing Local and Remote filesystems -# ==================================================================================== -class FilesystemSyncHandler(APIHandler): - def initialize(self, fs_manager): - self.fs_manager = fs_manager - - async def get(self): - # remote to local (fetching latest changes) - request_data = json.loads(self.request.body.decode("utf-8")) - local_destination_path = request_data.get("local_path") - remote_source_path = request_data.get("remote_path") - dest_fs_key = request_data.get("destination_key") - dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) - dest_path = dest_fs_info["path"] - - response = {"content": None} - - fs, dest_path = self.fs_manager.validate_fs("post", dest_fs_key, dest_path) - remote_fs_instance = fs["instance"] # noqa: F841 - - try: - # rsync - # (remote_source_path, local_destination_path) - # await remote_fs_instance.... - self.set_status(200) - response["status"] = "success" - response["description"] = ( - f"Synced {local_destination_path} to {remote_source_path}." - ) - except Exception as e: - print(f"Error with sync handler: {e}") - self.set_status(500) - response["status"] = "failed" - response["description"] = str(e) - self.write(response) - self.finish() - - async def post(self): - # local to remote (pushing latest changes) - key = self.get_argument("key") # noqa: F841 - request_data = json.loads(self.request.body.decode("utf-8")) - local_source_path = request_data.get("local_path") - remote_destination_path = request_data.get("remote_path") - dest_fs_key = request_data.get("destination_key") - dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) - dest_path = dest_fs_info["path"] - - response = {"content": None} - - fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) - remote_fs_instance = fs["instance"] # noqa: F841 - - try: - # rsync - # (local_source_path, remote_destination_path) - # await remote_fs_instance.... - if remote_fs_instance.async_impl: - # async - pass - else: - # Non-async - pass - - self.set_status(200) - response["status"] = "success" - response["description"] = ( - f"Synced {remote_destination_path} to {local_source_path}." - ) - except Exception as e: - logger.debug(f"Error with sync handler: {e}") - self.set_status(500) - response["status"] = "failed" - response["description"] = str(e) - self.write(response) - self.finish() - - # ==================================================================================== # CRUD for FileSystem # ==================================================================================== @@ -325,9 +254,10 @@ async def get(self): # item_path: /some_directory/file.txt # GET /jupyter_fsspec/files?key=my-key&item_path=/some_directory/file.txt&type=range # content header specifying the byte range - key = self.get_argument("key") - req_item_path = self.get_argument("item_path") - type = self.get_argument("type", default="default") + request_data = {k: self.get_argument(k) for k in self.request.arguments} + get_request = GetRequest(**request_data) + key = get_request.key + req_item_path = get_request.item_path fs, item_path = self.fs_manager.validate_fs("get", key, req_item_path) @@ -340,7 +270,7 @@ async def get(self): else: isdir = fs_instance.isdir(item_path) - if type == "range": + if get_request.type == "range": range_header = self.request.headers.get("Range") start, end = parse_range(range_header) if fs_instance.async_impl: @@ -410,10 +340,11 @@ async def post(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] @@ -465,10 +396,11 @@ async def put(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content fs, item_path = self.fs_manager.validate_fs("put", key, req_item_path) fs_instance = fs["instance"] @@ -563,9 +495,10 @@ async def delete(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") + delete_request = DeleteRequest(**request_data) + key = delete_request.key + req_item_path = delete_request.item_path fs, item_path = self.fs_manager.validate_fs("delete", key, req_item_path) fs_instance = fs["instance"] @@ -602,7 +535,6 @@ def setup_handlers(web_app): route_fs_file_transfer = url_path_join( base_url, "jupyter_fsspec", "files", "transfer" ) - route_fs_sync = url_path_join(base_url, "jupyter_fsspec", "sync") handlers = [ (route_fsspec_config, FsspecConfigHandler, dict(fs_manager=fs_manager)), @@ -610,7 +542,6 @@ def setup_handlers(web_app): (route_rename_files, RenameFileHandler, dict(fs_manager=fs_manager)), (route_file_actions, FileActionHandler, dict(fs_manager=fs_manager)), (route_fs_file_transfer, FileTransferHandler, dict(fs_manager=fs_manager)), - (route_fs_sync, FilesystemSyncHandler, dict(fs_manager=fs_manager)), ] web_app.add_handlers(host_pattern, handlers) diff --git a/jupyter_fsspec/schemas.py b/jupyter_fsspec/schemas.py new file mode 100644 index 0000000..a212b04 --- /dev/null +++ b/jupyter_fsspec/schemas.py @@ -0,0 +1,43 @@ +from pydantic import BaseModel +from typing import Optional +from enum import Enum + + +class RequestType(str, Enum): + default = "default" + range = "range" + + +class RequestAction(str, Enum): + move = "move" + + +class BaseRequest(BaseModel): + key: str + item_path: str + + +class GetRequest(BaseRequest): + type: Optional[RequestType] = RequestType.default + + +class PostRequest(BaseRequest): + content: Optional[str] = None + action: Optional[RequestAction] = None + + +class DeleteRequest(BaseRequest): + pass + + +class Direction(str, Enum): + UPLOAD = "upload" + DOWNLOAD = "download" + + +class TransferRequest(BaseModel): + key: str + destination_key: str + local_path: str + remote_path: str + action: Direction diff --git a/jupyter_fsspec/tests/test_api.py b/jupyter_fsspec/tests/test_api.py index 9c2f598..cf006e6 100644 --- a/jupyter_fsspec/tests/test_api.py +++ b/jupyter_fsspec/tests/test_api.py @@ -15,7 +15,7 @@ async def test_get_config(jp_fetch): async def test_get_files_memory(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") + mem_fs_info = fs_manager.get_filesystem("TestMem Source") mem_key = mem_fs_info["key"] mem_fs = mem_fs_info["info"]["instance"] mem_item_path = mem_fs_info["info"]["path"] @@ -73,7 +73,7 @@ async def test_get_files_memory(fs_manager_instance, jp_fetch): async def test_post_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") + mem_fs_info = fs_manager.get_filesystem("TestMem Source") mem_key = mem_fs_info["key"] mem_fs = mem_fs_info["info"]["instance"] assert mem_fs is not None @@ -82,7 +82,11 @@ async def test_post_files(fs_manager_instance, jp_fetch): filepath = "/my_mem_dir/test_dir/file2.txt" # File does not already exist assert not mem_fs.exists(filepath) - file_payload = {"item_path": filepath, "content": "This is test file2 content"} + file_payload = { + "key": mem_key, + "item_path": filepath, + "content": "This is test file2 content", + } file_response = await jp_fetch( "jupyter_fsspec", "files", @@ -102,7 +106,7 @@ async def test_post_files(fs_manager_instance, jp_fetch): newdirpath = "/my_mem_dir/test_dir/subdir/" # Directory does not already exist assert not mem_fs.exists(newdirpath) - dir_payload = {"item_path": newdirpath} + dir_payload = {"key": mem_key, "item_path": newdirpath} dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -120,7 +124,7 @@ async def test_post_files(fs_manager_instance, jp_fetch): async def test_delete_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") + mem_fs_info = fs_manager.get_filesystem("TestMem Source") mem_key = mem_fs_info["key"] mem_fs = mem_fs_info["info"]["instance"] assert mem_fs is not None @@ -129,7 +133,7 @@ async def test_delete_files(fs_manager_instance, jp_fetch): filepath = "/my_mem_dir/test_dir/file1.txt" assert mem_fs.exists(filepath) - file_payload = {"item_path": filepath} + file_payload = {"key": mem_key, "item_path": filepath} response = await jp_fetch( "jupyter_fsspec", "files", @@ -150,7 +154,7 @@ async def test_delete_files(fs_manager_instance, jp_fetch): dirpath = "/my_mem_dir/test_dir" assert mem_fs.exists(dirpath) - dir_payload = {"item_path": dirpath} + dir_payload = {"key": mem_key, "item_path": dirpath} dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -171,14 +175,18 @@ async def test_delete_files(fs_manager_instance, jp_fetch): async def test_put_files(fs_manager_instance, jp_fetch): # PUT replace entire resource fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") + mem_fs_info = fs_manager.get_filesystem("TestMem Source") mem_key = mem_fs_info["key"] mem_fs = mem_fs_info["info"]["instance"] assert mem_fs is not None # replace entire file content filepath = "/my_mem_dir/test_dir/file1.txt" - file_payload = {"item_path": filepath, "content": "Replaced content"} + file_payload = { + "key": mem_key, + "item_path": filepath, + "content": "Replaced content", + } file_response = await jp_fetch( "jupyter_fsspec", "files", @@ -195,7 +203,7 @@ async def test_put_files(fs_manager_instance, jp_fetch): # replacing directory returns error dirpath = "/my_mem_dir/test_dir" - dir_payload = {"item_path": dirpath, "content": "new_test_dir"} + dir_payload = {"key": mem_key, "item_path": dirpath, "content": "new_test_dir"} with pytest.raises(HTTPClientError) as exc_info: await jp_fetch( "jupyter_fsspec", @@ -209,7 +217,7 @@ async def test_put_files(fs_manager_instance, jp_fetch): async def test_rename_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") + mem_fs_info = fs_manager.get_filesystem("TestMem Source") mem_key = mem_fs_info["key"] mem_fs = mem_fs_info["info"]["instance"] assert mem_fs is not None @@ -217,6 +225,7 @@ async def test_rename_files(fs_manager_instance, jp_fetch): # rename file filepath = "/my_mem_dir/test_dir/file1.txt" file_payload = { + "key": mem_key, "item_path": filepath, "content": "/my_mem_dir/test_dir/new_file.txt", } @@ -242,7 +251,11 @@ async def test_rename_files(fs_manager_instance, jp_fetch): # rename directory dirpath = "/my_mem_dir/second_dir" - dir_payload = {"item_path": dirpath, "content": "/my_mem_dir/new_dir"} + dir_payload = { + "key": mem_key, + "item_path": dirpath, + "content": "/my_mem_dir/new_dir", + } dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -394,16 +407,15 @@ async def xtest_action_same_fs_files(fs_manager_instance, jp_fetch): ) -# TODO: Test count files; Upload/download no more than expected async def test_upload_download(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") + remote_fs_info = fs_manager.get_filesystem("TestSourceAWS") remote_key = remote_fs_info["key"] remote_fs = remote_fs_info["info"]["instance"] remote_root_path = remote_fs_info["info"]["path"] assert remote_fs is not None - local_fs_info = fs_manager.get_filesystem_by_protocol("local") + local_fs_info = fs_manager.get_filesystem("TestDir") local_key = local_fs_info["key"] local_fs = local_fs_info["info"]["instance"] local_root_path = local_fs_info["info"]["path"] @@ -413,6 +425,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): local_upload_filepath = f"{local_root_path}/file_loc.txt" assert local_fs.exists(local_upload_filepath) upload_file_payload = { + "key": local_key, "local_path": local_upload_filepath, "remote_path": remote_root_path, "destination_key": remote_key, @@ -433,23 +446,29 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert upfile_body["status"] == "success" assert ( upfile_body["description"] - == f"Uploaded {local_upload_filepath} to s3://{remote_root_path}." + == f"Uploaded {local_upload_filepath} to {remote_root_path}." ) uploaded_filepath = remote_root_path + "/file_loc.txt" remote_file_items = await remote_fs._ls(remote_root_path) assert uploaded_filepath in remote_file_items + assert len(remote_file_items) == 3 + + all_remote = await remote_fs._find(remote_root_path) + assert len(all_remote) == 4 # upload dir [local to remote] upload_dirpath = local_root_path + "/nested/" assert local_fs.exists(upload_dirpath) upload_dir_payload = { + "key": remote_key, "local_path": upload_dirpath, "remote_path": remote_root_path, "destination_key": remote_key, "action": "upload", } + upload_dir_res = await jp_fetch( "jupyter_fsspec", "files", @@ -463,20 +482,20 @@ async def test_upload_download(fs_manager_instance, jp_fetch): updir_body = json.loads(updir_body_json) assert updir_body["status"] == "success" assert ( - updir_body["description"] - == f"Uploaded {upload_dirpath} to s3://{remote_root_path}." + updir_body["description"] == f"Uploaded {upload_dirpath} to {remote_root_path}." ) remote_file_items = await remote_fs._ls(remote_root_path) # TODO: remote_root_path + "/nested" - assert "my-test-bucket/.keep" in remote_file_items - assert "my-test-bucket/.empty" in remote_file_items + assert f"{remote_root_path}/.keep" in remote_file_items + assert f"{remote_root_path}/.empty" in remote_file_items # download file [other to remote] #remote_root_path that we want to download. - download_filepath = "my-test-bucket/bucket-filename1.txt" + download_filepath = f"{remote_root_path}/bucket-filename1.txt" file_present = await remote_fs._exists(download_filepath) assert file_present download_file_payload = { + "key": remote_key, "remote_path": download_filepath, "local_path": local_root_path, "destination_key": remote_key, @@ -496,7 +515,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert download_file_body["status"] == "success" assert ( download_file_body["description"] - == f"Downloaded s3://{download_filepath} to {local_root_path}." + == f"Downloaded {download_filepath} to {local_root_path}." ) downloaded_filepath = local_root_path + "/bucket-filename1.txt" @@ -506,6 +525,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): # download dir [other to local] download_dirpath = f"{remote_root_path}/some/" download_dir_payload = { + "key": remote_key, "remote_path": download_dirpath, "local_path": local_root_path, "destination_key": remote_key, @@ -525,7 +545,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert download_dir_body["status"] == "success" assert ( download_dir_body["description"] - == f"Downloaded s3://{download_dirpath} to {local_root_path}." + == f"Downloaded {download_dirpath} to {local_root_path}." ) files_in_local = local_fs.ls(local_root_path) @@ -540,69 +560,62 @@ async def test_upload_download(fs_manager_instance, jp_fetch): # new_local_items = local_fs.ls(downloaded_dirpath) # assert downloaded_dirpath in new_local_items - -async def xtest_sync_push(fs_manager_instance, jp_fetch): - # WIP - fs_manager = fs_manager_instance - remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") - remote_key = remote_fs_info["key"] - remote_fs = remote_fs_info["info"]["instance"] - remote_root_path = remote_fs_info["info"]["path"] - assert remote_fs is not None - - local_fs_info = fs_manager.get_filesystem_by_protocol("local") - local_key = local_fs_info["key"] # noqa: F841 - local_fs = local_fs_info["info"]["instance"] - local_root_path = local_fs_info["info"]["path"] - assert local_fs is not None - - push_sync_payload = { - "local_path": local_root_path, + # download entire remote to local + download_sync_payload = { + "key": remote_key, + "destination_key": local_key, "remote_path": remote_root_path, - "destination_key": remote_key, + "local_path": local_root_path, + "action": "download", } - - # calling sync on the remote - sync_local_to_remote_response = await jp_fetch( + download_sync_res = await jp_fetch( "jupyter_fsspec", - "sync", + "files", + "transfer", method="POST", params={"key": remote_key}, - body=json.dumps(push_sync_payload), + body=json.dumps(download_sync_payload), ) + download_sync_body_json = download_sync_res.body.decode("utf-8") + download_sync_body = json.loads(download_sync_body_json) + assert download_sync_body["status"] == "success" + assert ( + download_sync_body["description"] + == f"Downloaded {remote_root_path} to {local_root_path}." + ) + new_local_files = local_fs.find(local_root_path) + assert ( + len(new_local_files) == 10 + ) # pulls in individual items in remote root dir into local - assert sync_local_to_remote_response.code == 200 - - -async def test_sync_pull(fs_manager_instance, jp_fetch): - fs_manager = fs_manager_instance - remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") - remote_key = remote_fs_info["key"] - remote_fs = remote_fs_info["info"]["instance"] - remote_root_path = remote_fs_info["info"]["path"] - assert remote_fs is not None - - local_fs_info = fs_manager.get_filesystem_by_protocol("local") - local_key = local_fs_info["key"] # noqa: F841 - local_fs = local_fs_info["info"]["instance"] - local_root_path = local_fs_info["info"]["path"] - assert local_fs is not None + current_remote_files = await remote_fs._find(remote_root_path) + print(f"current_remote_files: {current_remote_files}") - pull_sync_payload = { - "local_path": local_root_path, - "remote_path": remote_root_path, + # try uploading entire local to remote + upload_sync_payload = { + "key": local_key, "destination_key": remote_key, + "remote_path": remote_root_path, + "local_path": local_root_path, + "action": "upload", } - - # calling sync on the remote - sync_remote_to_local_response = await jp_fetch( + upload_sync_res = await jp_fetch( "jupyter_fsspec", - "sync", - method="GET", - params={"key": remote_key}, - body=json.dumps(pull_sync_payload), - allow_nonstandard_methods=True, + "files", + "transfer", + method="POST", + params={"key": local_key}, + body=json.dumps(upload_sync_payload), ) + upload_sync_body_json = upload_sync_res.body.decode("utf-8") + upload_sync_body = json.loads(upload_sync_body_json) + assert upload_sync_body["status"] == "success" + assert ( + upload_sync_body["description"] + == f"Uploaded {local_root_path} to {remote_root_path}." + ) + new_remote_files = await remote_fs._find(remote_root_path) + assert len(new_remote_files) == 16 # aggregate- dumps local root dir into remote + - assert sync_remote_to_local_response.code == 200 - # assert body["status"] == "success" +# TODO: Fix Event loop closed error (unclosed session); Dirty state between tests with s3 diff --git a/src/handler/fileOperations.ts b/src/handler/fileOperations.ts index 46b89a3..2b4b7bb 100644 --- a/src/handler/fileOperations.ts +++ b/src/handler/fileOperations.ts @@ -222,8 +222,13 @@ export class FsspecModel { key, item_path }); + const reqBody = JSON.stringify({ + key: key, + item_path + }); const response = await requestAPI(`files?${query.toString()}`, { method: 'DELETE', + body: reqBody, headers: { 'Content-Type': 'application/json' } From 1d0ad1c052a2e309c97ee46561ffcf49a8c3fda4 Mon Sep 17 00:00:00 2001 From: RRosio Date: Thu, 13 Feb 2025 23:41:24 -0800 Subject: [PATCH 4/5] add pydantic dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9714e92..3f0ca7f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ classifiers = [ ] dependencies = [ "fsspec", + "pydantic>=2", "jupyter_server>=2.4.0,<3" ] dynamic = ["version", "description", "urls", "keywords"] From 76d8a781a886bf5e23dfb16ffa944ad59d90b2af Mon Sep 17 00:00:00 2001 From: RRosio Date: Thu, 13 Feb 2025 23:56:39 -0800 Subject: [PATCH 5/5] updated fixture function and tests using get_filesystem --- conftest.py | 7 +++-- jupyter_fsspec/tests/test_api.py | 48 ++++++++++++++++---------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/conftest.py b/conftest.py index 49181c2..a137120 100644 --- a/conftest.py +++ b/conftest.py @@ -76,9 +76,10 @@ def setup_config_file_fs(tmp_path: Path, setup_tmp_local): @pytest.fixture(scope="function") def fs_manager_instance(setup_config_file_fs, s3_client): fs_manager = setup_config_file_fs - fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_fs = fs_info["info"]["instance"] - mem_root_path = fs_info["info"]["path"] + fs_info = fs_manager.get_filesystem("TestMem Source") + print(f"fs_info: {fs_info}") + mem_fs = fs_info["instance"] + mem_root_path = fs_info["path"] if mem_fs: if mem_fs.exists(f"{mem_root_path}/test_dir"): diff --git a/jupyter_fsspec/tests/test_api.py b/jupyter_fsspec/tests/test_api.py index cf006e6..a22c9b6 100644 --- a/jupyter_fsspec/tests/test_api.py +++ b/jupyter_fsspec/tests/test_api.py @@ -15,10 +15,10 @@ async def test_get_config(jp_fetch): async def test_get_files_memory(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem("TestMem Source") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] - mem_item_path = mem_fs_info["info"]["path"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] + mem_item_path = mem_fs_info["path"] assert mem_fs is not None # Read directory @@ -73,9 +73,9 @@ async def test_get_files_memory(fs_manager_instance, jp_fetch): async def test_post_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem("TestMem Source") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # Post new file with content @@ -124,9 +124,9 @@ async def test_post_files(fs_manager_instance, jp_fetch): async def test_delete_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem("TestMem Source") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # Delete file @@ -175,9 +175,9 @@ async def test_delete_files(fs_manager_instance, jp_fetch): async def test_put_files(fs_manager_instance, jp_fetch): # PUT replace entire resource fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem("TestMem Source") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # replace entire file content @@ -217,9 +217,9 @@ async def test_put_files(fs_manager_instance, jp_fetch): async def test_rename_files(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - mem_fs_info = fs_manager.get_filesystem("TestMem Source") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # rename file @@ -409,16 +409,16 @@ async def xtest_action_same_fs_files(fs_manager_instance, jp_fetch): async def test_upload_download(fs_manager_instance, jp_fetch): fs_manager = fs_manager_instance - remote_fs_info = fs_manager.get_filesystem("TestSourceAWS") - remote_key = remote_fs_info["key"] - remote_fs = remote_fs_info["info"]["instance"] - remote_root_path = remote_fs_info["info"]["path"] + remote_key = "TestSourceAWS" + remote_fs_info = fs_manager.get_filesystem(remote_key) + remote_fs = remote_fs_info["instance"] + remote_root_path = remote_fs_info["path"] assert remote_fs is not None - local_fs_info = fs_manager.get_filesystem("TestDir") - local_key = local_fs_info["key"] - local_fs = local_fs_info["info"]["instance"] - local_root_path = local_fs_info["info"]["path"] + local_key = "TestDir" + local_fs_info = fs_manager.get_filesystem(local_key) + local_fs = local_fs_info["instance"] + local_root_path = local_fs_info["path"] assert local_fs is not None # upload file [local to remote]