Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add sync handler base #73

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ def empty_config(tmp_path: Path):
@pytest.fixture(scope="function")
def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
tmp_local = setup_tmp_local[0]
items_tmp_local = list(tmp_local.iterdir())
print(f"items_tmp_local: {items_tmp_local}")
empty_tmp_local = setup_tmp_local[1]
config_dir = tmp_path / "config"
config_dir.mkdir(exist_ok=True)
Expand Down Expand Up @@ -158,9 +156,10 @@ def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
@pytest.fixture(scope="function")
async def fs_manager_instance(setup_config_file_fs, s3_client):
fs_manager = setup_config_file_fs
fs_info = fs_manager.get_filesystem_by_protocol("memory")
mem_fs = fs_info["info"]["instance"]
mem_root_path = fs_info["info"]["path"]
fs_info = fs_manager.get_filesystem("TestMem Source")
print(f"fs_info: {fs_info}")
mem_fs = fs_info["instance"]
mem_root_path = fs_info["path"]

if mem_fs:
if await mem_fs._exists(f"{mem_root_path}/test_dir"):
Expand Down Expand Up @@ -192,7 +191,7 @@ def get_boto3_client():
)


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def s3_base():
server = ThreadedMotoServer(ip_address="127.0.0.1", port=PORT)
server.start()
Expand All @@ -207,21 +206,24 @@ def s3_base():
server.stop()


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def s3_client(s3_base):
client = get_boto3_client()
client.create_bucket(Bucket="my-test-bucket", ACL="public-read")

bucket_name = "my-test-bucket"
client.create_bucket(Bucket=bucket_name, ACL="public-read")
client.put_object(
Body=b"Hello, World1!", Bucket="my-test-bucket", Key="bucket-filename1.txt"
Body=b"Hello, World1!", Bucket=bucket_name, Key="bucket-filename1.txt"
)
client.put_object(
Body=b"Hello, World2!", Bucket="my-test-bucket", Key="some/bucket-filename2.txt"
Body=b"Hello, World2!", Bucket=bucket_name, Key="some/bucket-filename2.txt"
)
client.put_object(
Body=b"Hello, World3!", Bucket="my-test-bucket", Key="some/bucket-filename3.txt"
Body=b"Hello, World3!", Bucket=bucket_name, Key="some/bucket-filename3.txt"
)

yield client
client.close()


@pytest.fixture
Expand Down
24 changes: 17 additions & 7 deletions jupyter_fsspec/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,26 @@ def check_reload_config(self):

return new_config_content

def validate_fs(self, request_type, key, item_path):
if not key:
raise ValueError("Missing required parameter `key`")

fs = self.get_filesystem(key)

if not item_path:
if str(type) != "range" and request_type == "get":
item_path = self.fs_manager.filesystems[key]["path"]
else:
raise ValueError("Missing required parameter `item_path`")

if fs is None:
raise ValueError(f"No filesystem found for key: {key}")

return fs, item_path

def get_filesystem(self, key):
return self.filesystems.get(key)

# TODO: Update to pull full dict with all filesystems
def get_filesystem_by_protocol(self, fs_protocol):
for encoded_key, fs_info in self.filesystems.items():
if fs_info.get("protocol") == fs_protocol:
return {"key": encoded_key, "info": fs_info}
return None

def get_filesystem_protocol(self, key):
filesystem_rep = self.filesystems.get(key)
return filesystem_rep["protocol"] + "://"
155 changes: 67 additions & 88 deletions jupyter_fsspec/handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .file_manager import FileSystemManager
from jupyter_server.base.handlers import APIHandler
from jupyter_server.utils import url_path_join
from .schemas import GetRequest, PostRequest, DeleteRequest, TransferRequest, Direction
from .utils import parse_range
from .exceptions import ConfigFileException
from contextlib import contextmanager
Expand All @@ -14,38 +15,6 @@
logger = logging.getLogger(__name__)


class BaseFileSystemHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

def validate_fs(self, request_type, key, item_path):
"""Retrieve the filesystem instance and path of the item

:raises [ValueError]: [Missing required key parameter]
:raises [ValueError]: [Missing required parameter item_path]
:raises [ValueError]: [No filesystem identified for provided key]

:return: filesystem instance and item_path
:rtype: fsspec filesystem instance and string
"""

if not key:
raise ValueError("Missing required parameter `key`")

fs = self.fs_manager.get_filesystem(key)

if not item_path:
if str(type) != "range" and request_type == "get":
item_path = self.fs_manager.filesystems[key]["path"]
else:
raise ValueError("Missing required parameter `item_path`")

if fs is None:
raise ValueError(f"No filesystem found for key: {key}")

return fs, item_path


@contextmanager
def handle_exception(
handler,
Expand Down Expand Up @@ -116,7 +85,7 @@ def get(self):
# ====================================================================================
# Handle Move and Copy Requests
# ====================================================================================
class FileActionHandler(BaseFileSystemHandler):
class FileActionHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -132,14 +101,16 @@ async def post(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
action = request_data.get("action")
destination = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
action = post_request.action
destination = post_request.content

response = {"content": None}

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]

try:
Expand Down Expand Up @@ -168,7 +139,7 @@ async def post(self):
# ====================================================================================
# Handle Move and Copy Requests Across filesystems
# ====================================================================================
class FileTransferHandler(BaseFileSystemHandler):
class FileTransferHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -177,46 +148,50 @@ async def post(self):
"""Upload/Download the resource at the input path to destination path.

:param [key]: [Query arg string used to retrieve the appropriate filesystem instance]
:param [item_path]: [Query arg string path to file or directory to be retrieved]
:param [action]: [Request body string move or copy]
:param [content]: [Request body property file or directory path]
:param [local_path]: [Request body string path to file/directory to be retrieved]
:param [remote_path]: [Request body string path to file/directory to be modified]
:param [action]: [Request body string upload or download]

:return: dict with a status, description and (optionally) error
:rtype: dict
"""
request_data = json.loads(self.request.body.decode("utf-8"))
action = request_data.get("action")
local_path = request_data.get("local_path")
remote_path = request_data.get("remote_path")
dest_fs_key = request_data.get("destination_key")
transfer_request = TransferRequest(**request_data)
key = transfer_request.key
local_path = transfer_request.local_path
remote_path = transfer_request.remote_path
dest_fs_key = transfer_request.destination_key
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
dest_path = dest_fs_info["canonical_path"]
# if destination is subfolder, need to parse canonical_path for protocol?
fs_info = self.fs_manager.get_filesystem(key)
path = fs_info["canonical_path"]

response = {"content": None}

fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path)
fs_instance = fs["instance"]
print(f"fs_instance: {fs_instance}")

try:
if action == "upload":
# upload remote.put(local_path, remote_path)
if transfer_request.action == Direction.UPLOAD:
logger.debug("Upload file")
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
if protocol not in remote_path:
remote_path = protocol + remote_path
# TODO: handle creating directories? current: flat item upload
# remote_path = remote_path (root) + 'nested/'
await fs_instance._put(local_path, remote_path, recursive=True)
fs, dest_path = self.fs_manager.validate_fs(
"post", dest_fs_key, dest_path
)
fs_instance = fs["instance"]

if fs_instance.async_impl:
await fs_instance._put(local_path, remote_path, recursive=True)
else:
fs_instance.put(local_path, remote_path, recursive=True)

response["description"] = f"Uploaded {local_path} to {remote_path}."
else:
# download remote.get(remote_path, local_path)
logger.debug("Download file")
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
if protocol not in remote_path:
remote_path = protocol + remote_path
await fs_instance._get(remote_path, local_path, recursive=True)
fs, dest_path = self.fs_manager.validate_fs("post", key, path)
fs_instance = fs["instance"]

if fs_instance.async_impl:
await fs_instance._get(remote_path, local_path, recursive=True)
else:
fs_instance.get(remote_path, local_path, recursive=True)

response["description"] = f"Downloaded {remote_path} to {local_path}."

response["status"] = "success"
Expand All @@ -234,19 +209,19 @@ async def post(self):
# ====================================================================================
# Handle Rename requests (?seperate or not?)
# ====================================================================================
class RenameFileHandler(BaseFileSystemHandler):
class RenameFileHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

def post(self):
key = self.get_argument("key")

request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content
response = {"content": None}

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]
# expect item path to end with `/` for directories
# expect content to be the FULL new path
Expand All @@ -269,7 +244,7 @@ def post(self):
# ====================================================================================
# CRUD for FileSystem
# ====================================================================================
class FileSystemHandler(BaseFileSystemHandler):
class FileSystemHandler(APIHandler):
def initialize(self, fs_manager):
self.fs_manager = fs_manager

Expand All @@ -294,11 +269,12 @@ async def get(self):
# item_path: /some_directory/file.txt
# GET /jupyter_fsspec/files?key=my-key&item_path=/some_directory/file.txt&type=range
# content header specifying the byte range
key = self.get_argument("key")
req_item_path = self.get_argument("item_path")
type = self.get_argument("type", default="default")
request_data = {k: self.get_argument(k) for k in self.request.arguments}
get_request = GetRequest(**request_data)
key = get_request.key
req_item_path = get_request.item_path

fs, item_path = self.validate_fs("get", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("get", key, req_item_path)

fs_instance = fs["instance"]
response = {"content": None}
Expand All @@ -310,7 +286,7 @@ async def get(self):
else:
isdir = fs_instance.isdir(item_path)

if type == "range":
if get_request.type == "range":
range_header = self.request.headers.get("Range")
start, end = parse_range(range_header)
if is_async:
Expand Down Expand Up @@ -378,12 +354,13 @@ async def post(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content

fs, item_path = self.validate_fs("post", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("post", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down Expand Up @@ -433,12 +410,13 @@ async def put(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
content = request_data.get("content")
post_request = PostRequest(**request_data)
key = post_request.key
req_item_path = post_request.item_path
content = post_request.content

fs, item_path = self.validate_fs("put", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("put", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down Expand Up @@ -480,7 +458,7 @@ async def patch(self):
offset = request_data.get("offset")
content = request_data.get("content")

fs, item_path = self.validate_fs("patch", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("patch", key, req_item_path)
fs_instance = fs["instance"]

# TODO: offset
Expand Down Expand Up @@ -531,11 +509,12 @@ async def delete(self):
:return: dict with a status, description and (optionally) error
:rtype: dict
"""
key = self.get_argument("key")
request_data = json.loads(self.request.body.decode("utf-8"))
req_item_path = request_data.get("item_path")
delete_request = DeleteRequest(**request_data)
key = delete_request.key
req_item_path = delete_request.item_path

fs, item_path = self.validate_fs("delete", key, req_item_path)
fs, item_path = self.fs_manager.validate_fs("delete", key, req_item_path)
fs_instance = fs["instance"]
response = {"content": None}

Expand Down
Loading
Loading