diff --git a/conftest.py b/conftest.py index 1d405b1..9e24217 100644 --- a/conftest.py +++ b/conftest.py @@ -118,8 +118,6 @@ def empty_config(tmp_path: Path): @pytest.fixture(scope="function") def setup_config_file_fs(tmp_path: Path, setup_tmp_local): tmp_local = setup_tmp_local[0] - items_tmp_local = list(tmp_local.iterdir()) - print(f"items_tmp_local: {items_tmp_local}") empty_tmp_local = setup_tmp_local[1] config_dir = tmp_path / "config" config_dir.mkdir(exist_ok=True) @@ -158,9 +156,10 @@ def setup_config_file_fs(tmp_path: Path, setup_tmp_local): @pytest.fixture(scope="function") async def fs_manager_instance(setup_config_file_fs, s3_client): fs_manager = setup_config_file_fs - fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_fs = fs_info["info"]["instance"] - mem_root_path = fs_info["info"]["path"] + fs_info = fs_manager.get_filesystem("TestMem Source") + print(f"fs_info: {fs_info}") + mem_fs = fs_info["instance"] + mem_root_path = fs_info["path"] if mem_fs: if await mem_fs._exists(f"{mem_root_path}/test_dir"): @@ -192,7 +191,7 @@ def get_boto3_client(): ) -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def s3_base(): server = ThreadedMotoServer(ip_address="127.0.0.1", port=PORT) server.start() @@ -207,21 +206,24 @@ def s3_base(): server.stop() -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def s3_client(s3_base): client = get_boto3_client() - client.create_bucket(Bucket="my-test-bucket", ACL="public-read") + + bucket_name = "my-test-bucket" + client.create_bucket(Bucket=bucket_name, ACL="public-read") client.put_object( - Body=b"Hello, World1!", Bucket="my-test-bucket", Key="bucket-filename1.txt" + Body=b"Hello, World1!", Bucket=bucket_name, Key="bucket-filename1.txt" ) client.put_object( - Body=b"Hello, World2!", Bucket="my-test-bucket", Key="some/bucket-filename2.txt" + Body=b"Hello, World2!", Bucket=bucket_name, Key="some/bucket-filename2.txt" ) client.put_object( - Body=b"Hello, World3!", Bucket="my-test-bucket", Key="some/bucket-filename3.txt" + Body=b"Hello, World3!", Bucket=bucket_name, Key="some/bucket-filename3.txt" ) yield client + client.close() @pytest.fixture diff --git a/jupyter_fsspec/file_manager.py b/jupyter_fsspec/file_manager.py index 204cc82..3f059e9 100644 --- a/jupyter_fsspec/file_manager.py +++ b/jupyter_fsspec/file_manager.py @@ -184,16 +184,26 @@ def check_reload_config(self): return new_config_content + def validate_fs(self, request_type, key, item_path): + if not key: + raise ValueError("Missing required parameter `key`") + + fs = self.get_filesystem(key) + + if not item_path: + if str(type) != "range" and request_type == "get": + item_path = self.fs_manager.filesystems[key]["path"] + else: + raise ValueError("Missing required parameter `item_path`") + + if fs is None: + raise ValueError(f"No filesystem found for key: {key}") + + return fs, item_path + def get_filesystem(self, key): return self.filesystems.get(key) - # TODO: Update to pull full dict with all filesystems - def get_filesystem_by_protocol(self, fs_protocol): - for encoded_key, fs_info in self.filesystems.items(): - if fs_info.get("protocol") == fs_protocol: - return {"key": encoded_key, "info": fs_info} - return None - def get_filesystem_protocol(self, key): filesystem_rep = self.filesystems.get(key) return filesystem_rep["protocol"] + "://" diff --git a/jupyter_fsspec/handlers.py b/jupyter_fsspec/handlers.py index c12ed25..357fc6b 100644 --- a/jupyter_fsspec/handlers.py +++ b/jupyter_fsspec/handlers.py @@ -1,6 +1,7 @@ from .file_manager import FileSystemManager from jupyter_server.base.handlers import APIHandler from jupyter_server.utils import url_path_join +from .schemas import GetRequest, PostRequest, DeleteRequest, TransferRequest, Direction from .utils import parse_range from .exceptions import ConfigFileException from contextlib import contextmanager @@ -14,38 +15,6 @@ logger = logging.getLogger(__name__) -class BaseFileSystemHandler(APIHandler): - def initialize(self, fs_manager): - self.fs_manager = fs_manager - - def validate_fs(self, request_type, key, item_path): - """Retrieve the filesystem instance and path of the item - - :raises [ValueError]: [Missing required key parameter] - :raises [ValueError]: [Missing required parameter item_path] - :raises [ValueError]: [No filesystem identified for provided key] - - :return: filesystem instance and item_path - :rtype: fsspec filesystem instance and string - """ - - if not key: - raise ValueError("Missing required parameter `key`") - - fs = self.fs_manager.get_filesystem(key) - - if not item_path: - if str(type) != "range" and request_type == "get": - item_path = self.fs_manager.filesystems[key]["path"] - else: - raise ValueError("Missing required parameter `item_path`") - - if fs is None: - raise ValueError(f"No filesystem found for key: {key}") - - return fs, item_path - - @contextmanager def handle_exception( handler, @@ -116,7 +85,7 @@ def get(self): # ==================================================================================== # Handle Move and Copy Requests # ==================================================================================== -class FileActionHandler(BaseFileSystemHandler): +class FileActionHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -132,14 +101,16 @@ async def post(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - action = request_data.get("action") - destination = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + action = post_request.action + destination = post_request.content + response = {"content": None} - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] try: @@ -168,7 +139,7 @@ async def post(self): # ==================================================================================== # Handle Move and Copy Requests Across filesystems # ==================================================================================== -class FileTransferHandler(BaseFileSystemHandler): +class FileTransferHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -177,46 +148,50 @@ async def post(self): """Upload/Download the resource at the input path to destination path. :param [key]: [Query arg string used to retrieve the appropriate filesystem instance] - :param [item_path]: [Query arg string path to file or directory to be retrieved] - :param [action]: [Request body string move or copy] - :param [content]: [Request body property file or directory path] + :param [local_path]: [Request body string path to file/directory to be retrieved] + :param [remote_path]: [Request body string path to file/directory to be modified] + :param [action]: [Request body string upload or download] :return: dict with a status, description and (optionally) error :rtype: dict """ request_data = json.loads(self.request.body.decode("utf-8")) - action = request_data.get("action") - local_path = request_data.get("local_path") - remote_path = request_data.get("remote_path") - dest_fs_key = request_data.get("destination_key") + transfer_request = TransferRequest(**request_data) + key = transfer_request.key + local_path = transfer_request.local_path + remote_path = transfer_request.remote_path + dest_fs_key = transfer_request.destination_key dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key) dest_path = dest_fs_info["canonical_path"] - # if destination is subfolder, need to parse canonical_path for protocol? + fs_info = self.fs_manager.get_filesystem(key) + path = fs_info["canonical_path"] response = {"content": None} - fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path) - fs_instance = fs["instance"] - print(f"fs_instance: {fs_instance}") - try: - if action == "upload": - # upload remote.put(local_path, remote_path) + if transfer_request.action == Direction.UPLOAD: logger.debug("Upload file") - protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key) - if protocol not in remote_path: - remote_path = protocol + remote_path - # TODO: handle creating directories? current: flat item upload - # remote_path = remote_path (root) + 'nested/' - await fs_instance._put(local_path, remote_path, recursive=True) + fs, dest_path = self.fs_manager.validate_fs( + "post", dest_fs_key, dest_path + ) + fs_instance = fs["instance"] + + if fs_instance.async_impl: + await fs_instance._put(local_path, remote_path, recursive=True) + else: + fs_instance.put(local_path, remote_path, recursive=True) + response["description"] = f"Uploaded {local_path} to {remote_path}." else: - # download remote.get(remote_path, local_path) logger.debug("Download file") - protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key) - if protocol not in remote_path: - remote_path = protocol + remote_path - await fs_instance._get(remote_path, local_path, recursive=True) + fs, dest_path = self.fs_manager.validate_fs("post", key, path) + fs_instance = fs["instance"] + + if fs_instance.async_impl: + await fs_instance._get(remote_path, local_path, recursive=True) + else: + fs_instance.get(remote_path, local_path, recursive=True) + response["description"] = f"Downloaded {remote_path} to {local_path}." response["status"] = "success" @@ -234,19 +209,19 @@ async def post(self): # ==================================================================================== # Handle Rename requests (?seperate or not?) # ==================================================================================== -class RenameFileHandler(BaseFileSystemHandler): +class RenameFileHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager def post(self): - key = self.get_argument("key") - request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content response = {"content": None} - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] # expect item path to end with `/` for directories # expect content to be the FULL new path @@ -269,7 +244,7 @@ def post(self): # ==================================================================================== # CRUD for FileSystem # ==================================================================================== -class FileSystemHandler(BaseFileSystemHandler): +class FileSystemHandler(APIHandler): def initialize(self, fs_manager): self.fs_manager = fs_manager @@ -294,11 +269,12 @@ async def get(self): # item_path: /some_directory/file.txt # GET /jupyter_fsspec/files?key=my-key&item_path=/some_directory/file.txt&type=range # content header specifying the byte range - key = self.get_argument("key") - req_item_path = self.get_argument("item_path") - type = self.get_argument("type", default="default") + request_data = {k: self.get_argument(k) for k in self.request.arguments} + get_request = GetRequest(**request_data) + key = get_request.key + req_item_path = get_request.item_path - fs, item_path = self.validate_fs("get", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("get", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -310,7 +286,7 @@ async def get(self): else: isdir = fs_instance.isdir(item_path) - if type == "range": + if get_request.type == "range": range_header = self.request.headers.get("Range") start, end = parse_range(range_header) if is_async: @@ -378,12 +354,13 @@ async def post(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content - fs, item_path = self.validate_fs("post", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -433,12 +410,13 @@ async def put(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") - content = request_data.get("content") + post_request = PostRequest(**request_data) + key = post_request.key + req_item_path = post_request.item_path + content = post_request.content - fs, item_path = self.validate_fs("put", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("put", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} @@ -480,7 +458,7 @@ async def patch(self): offset = request_data.get("offset") content = request_data.get("content") - fs, item_path = self.validate_fs("patch", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("patch", key, req_item_path) fs_instance = fs["instance"] # TODO: offset @@ -531,11 +509,12 @@ async def delete(self): :return: dict with a status, description and (optionally) error :rtype: dict """ - key = self.get_argument("key") request_data = json.loads(self.request.body.decode("utf-8")) - req_item_path = request_data.get("item_path") + delete_request = DeleteRequest(**request_data) + key = delete_request.key + req_item_path = delete_request.item_path - fs, item_path = self.validate_fs("delete", key, req_item_path) + fs, item_path = self.fs_manager.validate_fs("delete", key, req_item_path) fs_instance = fs["instance"] response = {"content": None} diff --git a/jupyter_fsspec/schemas.py b/jupyter_fsspec/schemas.py new file mode 100644 index 0000000..a212b04 --- /dev/null +++ b/jupyter_fsspec/schemas.py @@ -0,0 +1,43 @@ +from pydantic import BaseModel +from typing import Optional +from enum import Enum + + +class RequestType(str, Enum): + default = "default" + range = "range" + + +class RequestAction(str, Enum): + move = "move" + + +class BaseRequest(BaseModel): + key: str + item_path: str + + +class GetRequest(BaseRequest): + type: Optional[RequestType] = RequestType.default + + +class PostRequest(BaseRequest): + content: Optional[str] = None + action: Optional[RequestAction] = None + + +class DeleteRequest(BaseRequest): + pass + + +class Direction(str, Enum): + UPLOAD = "upload" + DOWNLOAD = "download" + + +class TransferRequest(BaseModel): + key: str + destination_key: str + local_path: str + remote_path: str + action: Direction diff --git a/jupyter_fsspec/tests/test_api.py b/jupyter_fsspec/tests/test_api.py index b2f7b61..635f921 100644 --- a/jupyter_fsspec/tests/test_api.py +++ b/jupyter_fsspec/tests/test_api.py @@ -52,10 +52,10 @@ async def test_empty_config(empty_config, jp_fetch): async def test_get_files_memory(fs_manager_instance, jp_fetch): fs_manager = await fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] - mem_item_path = mem_fs_info["info"]["path"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] + mem_item_path = mem_fs_info["path"] assert mem_fs is not None # Read directory @@ -110,16 +110,20 @@ async def test_get_files_memory(fs_manager_instance, jp_fetch): async def test_post_files(fs_manager_instance, jp_fetch): fs_manager = await fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # Post new file with content filepath = "/my_mem_dir/test_dir/file2.txt" # File does not already exist assert not mem_fs.exists(filepath) - file_payload = {"item_path": filepath, "content": "This is test file2 content"} + file_payload = { + "key": mem_key, + "item_path": filepath, + "content": "This is test file2 content", + } file_response = await jp_fetch( "jupyter_fsspec", "files", @@ -139,7 +143,7 @@ async def test_post_files(fs_manager_instance, jp_fetch): newdirpath = "/my_mem_dir/test_dir/subdir/" # Directory does not already exist assert not mem_fs.exists(newdirpath) - dir_payload = {"item_path": newdirpath} + dir_payload = {"key": mem_key, "item_path": newdirpath} dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -157,16 +161,16 @@ async def test_post_files(fs_manager_instance, jp_fetch): async def test_delete_files(fs_manager_instance, jp_fetch): fs_manager = await fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # Delete file filepath = "/my_mem_dir/test_dir/file1.txt" assert mem_fs.exists(filepath) - file_payload = {"item_path": filepath} + file_payload = {"key": mem_key, "item_path": filepath} response = await jp_fetch( "jupyter_fsspec", "files", @@ -187,7 +191,7 @@ async def test_delete_files(fs_manager_instance, jp_fetch): dirpath = "/my_mem_dir/test_dir" assert mem_fs.exists(dirpath) - dir_payload = {"item_path": dirpath} + dir_payload = {"key": mem_key, "item_path": dirpath} dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -208,14 +212,18 @@ async def test_delete_files(fs_manager_instance, jp_fetch): async def test_put_files(fs_manager_instance, jp_fetch): # PUT replace entire resource fs_manager = await fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # replace entire file content filepath = "/my_mem_dir/test_dir/file1.txt" - file_payload = {"item_path": filepath, "content": "Replaced content"} + file_payload = { + "key": mem_key, + "item_path": filepath, + "content": "Replaced content", + } file_response = await jp_fetch( "jupyter_fsspec", "files", @@ -232,7 +240,7 @@ async def test_put_files(fs_manager_instance, jp_fetch): # replacing directory returns error dirpath = "/my_mem_dir/test_dir" - dir_payload = {"item_path": dirpath, "content": "new_test_dir"} + dir_payload = {"key": mem_key, "item_path": dirpath, "content": "new_test_dir"} with pytest.raises(HTTPClientError) as exc_info: await jp_fetch( "jupyter_fsspec", @@ -246,14 +254,15 @@ async def test_put_files(fs_manager_instance, jp_fetch): async def test_rename_files(fs_manager_instance, jp_fetch): fs_manager = await fs_manager_instance - mem_fs_info = fs_manager.get_filesystem_by_protocol("memory") - mem_key = mem_fs_info["key"] - mem_fs = mem_fs_info["info"]["instance"] + mem_key = "TestMem Source" + mem_fs_info = fs_manager.get_filesystem(mem_key) + mem_fs = mem_fs_info["instance"] assert mem_fs is not None # rename file filepath = "/my_mem_dir/test_dir/file1.txt" file_payload = { + "key": mem_key, "item_path": filepath, "content": "/my_mem_dir/test_dir/new_file.txt", } @@ -279,7 +288,11 @@ async def test_rename_files(fs_manager_instance, jp_fetch): # rename directory dirpath = "/my_mem_dir/second_dir" - dir_payload = {"item_path": dirpath, "content": "/my_mem_dir/new_dir"} + dir_payload = { + "key": mem_key, + "item_path": dirpath, + "content": "/my_mem_dir/new_dir", + } dir_response = await jp_fetch( "jupyter_fsspec", "files", @@ -431,25 +444,25 @@ async def xtest_action_same_fs_files(fs_manager_instance, jp_fetch): ) -# TODO: Test count files; Upload/download no more than expected async def test_upload_download(fs_manager_instance, jp_fetch): fs_manager = await fs_manager_instance - remote_fs_info = fs_manager.get_filesystem_by_protocol("s3") - remote_key = remote_fs_info["key"] - remote_fs = remote_fs_info["info"]["instance"] - remote_root_path = remote_fs_info["info"]["path"] + remote_key = "TestSourceAWS" + remote_fs_info = fs_manager.get_filesystem(remote_key) + remote_fs = remote_fs_info["instance"] + remote_root_path = remote_fs_info["path"] assert remote_fs is not None - local_fs_info = fs_manager.get_filesystem_by_protocol("local") - local_key = local_fs_info["key"] - local_fs = local_fs_info["info"]["instance"] - local_root_path = local_fs_info["info"]["path"] + local_key = "TestDir" + local_fs_info = fs_manager.get_filesystem(local_key) + local_fs = local_fs_info["instance"] + local_root_path = local_fs_info["path"] assert local_fs is not None # upload file [local to remote] local_upload_filepath = f"{local_root_path}/file_loc.txt" assert local_fs.exists(local_upload_filepath) upload_file_payload = { + "key": local_key, "local_path": local_upload_filepath, "remote_path": remote_root_path, "destination_key": remote_key, @@ -470,23 +483,29 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert upfile_body["status"] == "success" assert ( upfile_body["description"] - == f"Uploaded {local_upload_filepath} to s3://{remote_root_path}." + == f"Uploaded {local_upload_filepath} to {remote_root_path}." ) uploaded_filepath = remote_root_path + "/file_loc.txt" remote_file_items = await remote_fs._ls(remote_root_path) assert uploaded_filepath in remote_file_items + assert len(remote_file_items) == 3 + + all_remote = await remote_fs._find(remote_root_path) + assert len(all_remote) == 4 # upload dir [local to remote] upload_dirpath = local_root_path + "/nested/" assert local_fs.exists(upload_dirpath) upload_dir_payload = { + "key": remote_key, "local_path": upload_dirpath, "remote_path": remote_root_path, "destination_key": remote_key, "action": "upload", } + upload_dir_res = await jp_fetch( "jupyter_fsspec", "files", @@ -500,20 +519,20 @@ async def test_upload_download(fs_manager_instance, jp_fetch): updir_body = json.loads(updir_body_json) assert updir_body["status"] == "success" assert ( - updir_body["description"] - == f"Uploaded {upload_dirpath} to s3://{remote_root_path}." + updir_body["description"] == f"Uploaded {upload_dirpath} to {remote_root_path}." ) remote_file_items = await remote_fs._ls(remote_root_path) # TODO: remote_root_path + "/nested" - assert "my-test-bucket/.keep" in remote_file_items - assert "my-test-bucket/.empty" in remote_file_items + assert f"{remote_root_path}/.keep" in remote_file_items + assert f"{remote_root_path}/.empty" in remote_file_items # download file [other to remote] #remote_root_path that we want to download. - download_filepath = "my-test-bucket/bucket-filename1.txt" + download_filepath = f"{remote_root_path}/bucket-filename1.txt" file_present = await remote_fs._exists(download_filepath) assert file_present download_file_payload = { + "key": remote_key, "remote_path": download_filepath, "local_path": local_root_path, "destination_key": remote_key, @@ -533,7 +552,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert download_file_body["status"] == "success" assert ( download_file_body["description"] - == f"Downloaded s3://{download_filepath} to {local_root_path}." + == f"Downloaded {download_filepath} to {local_root_path}." ) downloaded_filepath = local_root_path + "/bucket-filename1.txt" @@ -543,6 +562,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): # download dir [other to local] download_dirpath = f"{remote_root_path}/some/" download_dir_payload = { + "key": remote_key, "remote_path": download_dirpath, "local_path": local_root_path, "destination_key": remote_key, @@ -562,7 +582,7 @@ async def test_upload_download(fs_manager_instance, jp_fetch): assert download_dir_body["status"] == "success" assert ( download_dir_body["description"] - == f"Downloaded s3://{download_dirpath} to {local_root_path}." + == f"Downloaded {download_dirpath} to {local_root_path}." ) files_in_local = local_fs.ls(local_root_path) @@ -576,3 +596,63 @@ async def test_upload_download(fs_manager_instance, jp_fetch): # downloaded_dirpath = local_root_path + '/some' # new_local_items = local_fs.ls(downloaded_dirpath) # assert downloaded_dirpath in new_local_items + + # download entire remote to local + download_sync_payload = { + "key": remote_key, + "destination_key": local_key, + "remote_path": remote_root_path, + "local_path": local_root_path, + "action": "download", + } + download_sync_res = await jp_fetch( + "jupyter_fsspec", + "files", + "transfer", + method="POST", + params={"key": remote_key}, + body=json.dumps(download_sync_payload), + ) + download_sync_body_json = download_sync_res.body.decode("utf-8") + download_sync_body = json.loads(download_sync_body_json) + assert download_sync_body["status"] == "success" + assert ( + download_sync_body["description"] + == f"Downloaded {remote_root_path} to {local_root_path}." + ) + new_local_files = local_fs.find(local_root_path) + assert ( + len(new_local_files) == 10 + ) # pulls in individual items in remote root dir into local + + current_remote_files = await remote_fs._find(remote_root_path) + print(f"current_remote_files: {current_remote_files}") + + # try uploading entire local to remote + upload_sync_payload = { + "key": local_key, + "destination_key": remote_key, + "remote_path": remote_root_path, + "local_path": local_root_path, + "action": "upload", + } + upload_sync_res = await jp_fetch( + "jupyter_fsspec", + "files", + "transfer", + method="POST", + params={"key": local_key}, + body=json.dumps(upload_sync_payload), + ) + upload_sync_body_json = upload_sync_res.body.decode("utf-8") + upload_sync_body = json.loads(upload_sync_body_json) + assert upload_sync_body["status"] == "success" + assert ( + upload_sync_body["description"] + == f"Uploaded {local_root_path} to {remote_root_path}." + ) + new_remote_files = await remote_fs._find(remote_root_path) + assert len(new_remote_files) == 16 # aggregate- dumps local root dir into remote + + +# TODO: Fix Event loop closed error (unclosed session); Dirty state between tests with s3 diff --git a/src/handler/fileOperations.ts b/src/handler/fileOperations.ts index b847fb4..2b4b7bb 100644 --- a/src/handler/fileOperations.ts +++ b/src/handler/fileOperations.ts @@ -222,8 +222,13 @@ export class FsspecModel { key, item_path }); + const reqBody = JSON.stringify({ + key: key, + item_path + }); const response = await requestAPI(`files?${query.toString()}`, { method: 'DELETE', + body: reqBody, headers: { 'Content-Type': 'application/json' } @@ -366,6 +371,52 @@ export class FsspecModel { } } + async sync_push( + key: string, + remote_path: string, + local_path: string + ): Promise { + try { + const reqBody = JSON.stringify({ + key: key, + remote_path, + local_path + }); + await requestAPI('sync', { + method: 'POST', + body: reqBody, + headers: { + 'Content-Type': 'application/json' + } + }); + } catch (error) { + console.error('Failed to sync local to remote: ', error); + } + } + + async sync_pull( + key: string, + remote_path: string, + local_path: string + ): Promise { + try { + const reqBody = JSON.stringify({ + key: key, + remote_path, + local_path + }); + await requestAPI('sync', { + method: 'GET', + body: reqBody, + headers: { + 'Content-Type': 'application/json' + } + }); + } catch (error) { + console.error('Failed to sync remote to local: ', error); + } + } + // async update( // key: any = 'local%7CSourceDisk%7C.', // item_path = '',