Skip to content

Commit

Permalink
Merge pull request #73 from RRosio/sync
Browse files Browse the repository at this point in the history
[WIP] Add sync handler base
  • Loading branch information
RRosio authored Feb 24, 2025
2 parents 0860388 + b951bac commit 76b3292
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 146 deletions.
24 changes: 13 additions & 11 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ def empty_config(tmp_path: Path):
@pytest.fixture(scope="function")
def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
tmp_local = setup_tmp_local[0]
items_tmp_local = list(tmp_local.iterdir())
print(f"items_tmp_local: {items_tmp_local}")
empty_tmp_local = setup_tmp_local[1]
config_dir = tmp_path / "config"
config_dir.mkdir(exist_ok=True)
Expand Down Expand Up @@ -158,9 +156,10 @@ def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
@pytest.fixture(scope="function")
async def fs_manager_instance(setup_config_file_fs, s3_client):
fs_manager = setup_config_file_fs
fs_info = fs_manager.get_filesystem_by_protocol("memory")
mem_fs = fs_info["info"]["instance"]
mem_root_path = fs_info["info"]["path"]
fs_info = fs_manager.get_filesystem("TestMem Source")
print(f"fs_info: {fs_info}")
mem_fs = fs_info["instance"]
mem_root_path = fs_info["path"]

if mem_fs:
if await mem_fs._exists(f"{mem_root_path}/test_dir"):
Expand Down Expand Up @@ -192,7 +191,7 @@ def get_boto3_client():
)


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def s3_base():
server = ThreadedMotoServer(ip_address="127.0.0.1", port=PORT)
server.start()
Expand All @@ -207,21 +206,24 @@ def s3_base():
server.stop()


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def s3_client(s3_base):
client = get_boto3_client()
client.create_bucket(Bucket="my-test-bucket", ACL="public-read")

bucket_name = "my-test-bucket"
client.create_bucket(Bucket=bucket_name, ACL="public-read")
client.put_object(
Body=b"Hello, World1!", Bucket="my-test-bucket", Key="bucket-filename1.txt"
Body=b"Hello, World1!", Bucket=bucket_name, Key="bucket-filename1.txt"
)
client.put_object(
Body=b"Hello, World2!", Bucket="my-test-bucket", Key="some/bucket-filename2.txt"
Body=b"Hello, World2!", Bucket=bucket_name, Key="some/bucket-filename2.txt"
)
client.put_object(
Body=b"Hello, World3!", Bucket="my-test-bucket", Key="some/bucket-filename3.txt"
Body=b"Hello, World3!", Bucket=bucket_name, Key="some/bucket-filename3.txt"
)

yield client
client.close()


@pytest.fixture
Expand Down
24 changes: 17 additions & 7 deletions jupyter_fsspec/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,26 @@ def check_reload_config(self):

return new_config_content

def validate_fs(self, request_type, key, item_path):
if not key:
raise ValueError("Missing required parameter `key`")

fs = self.get_filesystem(key)

if not item_path:
if str(type) != "range" and request_type == "get":
item_path = self.fs_manager.filesystems[key]["path"]
else:
raise ValueError("Missing required parameter `item_path`")

if fs is None:
raise ValueError(f"No filesystem found for key: {key}")

return fs, item_path

def get_filesystem(self, key):
return self.filesystems.get(key)

# TODO: Update to pull full dict with all filesystems
def get_filesystem_by_protocol(self, fs_protocol):
for encoded_key, fs_info in self.filesystems.items():
if fs_info.get("protocol") == fs_protocol:
return {"key": encoded_key, "info": fs_info}
return None

def get_filesystem_protocol(self, key):
filesystem_rep = self.filesystems.get(key)
return filesystem_rep["protocol"] + "://"
155 changes: 67 additions & 88 deletions jupyter_fsspec/handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .file_manager import FileSystemManager
from jupyter_server.base.handlers import APIHandler
from jupyter_server.utils import url_path_join
from .schemas import GetRequest, PostRequest, DeleteRequest, TransferRequest, Direction
from .utils import parse_range
from .exceptions import ConfigFileException
from contextlib import contextmanager
Expand All @@ -14,38 +15,6 @@
logger = logging.getLogger(__name__)


class BaseFileSystemHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

def validate_fs(self, request_type, key, item_path):
"""Retrieve the filesystem instance and path of the item
:raises [ValueError]: [Missing required key parameter]
:raises [ValueError]: [Missing required parameter item_path]
:raises [ValueError]: [No filesystem identified for provided key]
:return: filesystem instance and item_path
:rtype: fsspec filesystem instance and string
"""

if not key:
raise ValueError("Missing required parameter `key`")

fs = self.fs_manager.get_filesystem(key)

if not item_path:
if str(type) != "range" and request_type == "get":
item_path = self.fs_manager.filesystems[key]["path"]
else:
raise ValueError("Missing required parameter `item_path`")

if fs is None:
raise ValueError(f"No filesystem found for key: {key}")

return fs, item_path


@contextmanager
def handle_exception(
handler,
Expand Down Expand Up @@ -116,7 +85,7 @@ def get(self):
# ====================================================================================
# Handle Move and Copy Requests
# ====================================================================================
class FileActionHandler(BaseFileSystemHandler):
class FileActionHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -132,14 +101,16 @@ async def post(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
action = request_data.get("action")
destination = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
action = post_request.action
destination = post_request.content

response = {"content": None}

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]

try:
Expand Down Expand Up @@ -168,7 +139,7 @@ async def post(self):
# ====================================================================================
# Handle Move and Copy Requests Across filesystems
# ====================================================================================
class FileTransferHandler(BaseFileSystemHandler):
class FileTransferHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -177,46 +148,50 @@ async def post(self):
"""Upload/Download the resource at the input path to destination path.
:param [key]: [Query arg string used to retrieve the appropriate filesystem instance]
:param [item_path]: [Query arg string path to file or directory to be retrieved]
:param [action]: [Request body string move or copy]
:param [content]: [Request body property file or directory path]
:param [local_path]: [Request body string path to file/directory to be retrieved]
:param [remote_path]: [Request body string path to file/directory to be modified]
:param [action]: [Request body string upload or download]
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
request_data = json.loads(self.request.body.decode("utf-8"))
action = request_data.get("action")
local_path = request_data.get("local_path")
remote_path = request_data.get("remote_path")
dest_fs_key = request_data.get("destination_key")
transfer_request = TransferRequest(**request_data)
key = transfer_request.key
local_path = transfer_request.local_path
remote_path = transfer_request.remote_path
dest_fs_key = transfer_request.destination_key
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
dest_path = dest_fs_info["canonical_path"]
# if destination is subfolder, need to parse canonical_path for protocol?
fs_info = self.fs_manager.get_filesystem(key)
path = fs_info["canonical_path"]

response = {"content": None}

fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path)
fs_instance = fs["instance"]
print(f"fs_instance: {fs_instance}")

try:
if action == "upload":
# upload remote.put(local_path, remote_path)
if transfer_request.action == Direction.UPLOAD:
logger.debug("Upload file")
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
if protocol not in remote_path:
remote_path = protocol + remote_path
# TODO: handle creating directories? current: flat item upload
# remote_path = remote_path (root) + 'nested/'
await fs_instance._put(local_path, remote_path, recursive=True)
fs, dest_path = self.fs_manager.validate_fs(
"post", dest_fs_key, dest_path
)
fs_instance = fs["instance"]

if fs_instance.async_impl:
await fs_instance._put(local_path, remote_path, recursive=True)
else:
fs_instance.put(local_path, remote_path, recursive=True)

response["description"] = f"Uploaded {local_path} to {remote_path}."
else:
# download remote.get(remote_path, local_path)
logger.debug("Download file")
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
if protocol not in remote_path:
remote_path = protocol + remote_path
await fs_instance._get(remote_path, local_path, recursive=True)
fs, dest_path = self.fs_manager.validate_fs("post", key, path)
fs_instance = fs["instance"]

if fs_instance.async_impl:
await fs_instance._get(remote_path, local_path, recursive=True)
else:
fs_instance.get(remote_path, local_path, recursive=True)

response["description"] = f"Downloaded {remote_path} to {local_path}."

response["status"] = "success"
Expand All @@ -234,19 +209,19 @@ async def post(self):
# ====================================================================================
# Handle Rename requests (?seperate or not?)
# ====================================================================================
class RenameFileHandler(BaseFileSystemHandler):
class RenameFileHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

def post(self):
key = self.get_argument("key")

request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content
response = {"content": None}

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]
# expect item path to end with `/` for directories
# expect content to be the FULL new path
Expand All @@ -269,7 +244,7 @@ def post(self):
# ====================================================================================
# CRUD for FileSystem
# ====================================================================================
class FileSystemHandler(BaseFileSystemHandler):
class FileSystemHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -294,11 +269,12 @@ async def get(self):
# item_path: /some_directory/file.txt
# GET /jupyter_fsspec/files?key=my-key&item_path=/some_directory/file.txt&type=range
# content header specifying the byte range
key = self.get_argument("key")
req_item_path = self.get_argument("item_path")
type = self.get_argument("type", default="default")
request_data = {k: self.get_argument(k) for k in self.request.arguments}
get_request = GetRequest(**request_data)
key = get_request.key
req_item_path = get_request.item_path

fs, item_path = self.validate_fs("get", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("get", key, req_item_path)

fs_instance = fs["instance"]
response = {"content": None}
Expand All @@ -310,7 +286,7 @@ async def get(self):
else:
isdir = fs_instance.isdir(item_path)

if type == "range":
if get_request.type == "range":
range_header = self.request.headers.get("Range")
start, end = parse_range(range_header)
if is_async:
Expand Down Expand Up @@ -378,12 +354,13 @@ async def post(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down Expand Up @@ -433,12 +410,13 @@ async def put(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content

fs, item_path = self.validate_fs("put", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("put", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down Expand Up @@ -480,7 +458,7 @@ async def patch(self):
offset = request_data.get("offset")
content = request_data.get("content")

fs, item_path = self.validate_fs("patch", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("patch", key, req_item_path)
fs_instance = fs["instance"]

# TODO: offset
Expand Down Expand Up @@ -531,11 +509,12 @@ async def delete(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
delete_request = DeleteRequest(**request_data)
key = delete_request.key
req_item_path = delete_request.item_path

fs, item_path = self.validate_fs("delete", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("delete", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down
Loading

0 comments on commit 76b3292

Please sign in to comment.