Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
ericsnekbytes committed Oct 28, 2024
2 parents c89c02a + 382dcd6 commit 896485c
Show file tree
Hide file tree
Showing 14 changed files with 1,729 additions and 260 deletions.
91 changes: 91 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from pathlib import Path
from unittest.mock import patch
import pytest
import fsspec
from jupyter_fsspec.file_manager import FileSystemManager


pytest_plugins = ['pytest_jupyter.jupyter_server', 'jupyter_server.pytest_plugin',
'pytest_asyncio']

@pytest.fixture(scope='function', autouse=True)
def setup_config_file(tmp_path: Path):
config_dir = tmp_path / "config"
config_dir.mkdir(exist_ok=True)

yaml_content = """sources:
- name: "TestSourceAWS"
path: "/path/to/set1"
type: "s3"
additional_options:
anon: false
key: "my-access-key"
secret: "my-secret-key"
- name: "TestSourceDisk"
path: "."
type: "local"
- name: "TestDir"
path: "/Users/rosioreyes/Desktop/test_fsspec"
type: "local"
- name: "TestEmptyLocalDir"
path: "/Users/rosioreyes/Desktop/notebooks/sample/nothinghere"
type: "local"
- name: "TestMem Source"
path: "/my_mem_dir"
type: "memory"
- name: "TestDoesntExistDir"
path: "/Users/rosioreyes/Desktop/notebooks/doesnotexist"
type: "local"
"""
yaml_file = config_dir / "jupyter-fsspec.yaml"
yaml_file.write_text(yaml_content)

with patch('jupyter_core.paths.jupyter_config_dir', return_value=str(config_dir)):
print(f"Patching jupyter_config_dir to: {config_dir}")
yield


@pytest.fixture(scope='function')
def fs_manager_instance(setup_config_file):
fs_manager = FileSystemManager(config_file='jupyter-fsspec.yaml')
fs_info = fs_manager.get_filesystem_by_type('memory')
key = fs_info['key']
fs = fs_info['info']['instance']
mem_root_path = fs_info['info']['path']

if fs:
if fs.exists('/my_mem_dir/test_dir'):
fs.rm('/my_mem_dir/test_dir', recursive=True)
if fs.exists('/my_mem_dir/second_dir'):
fs.rm('/my_mem_dir/second_dir', recursive=True)

fs.touch('/my_mem_dir/file_in_root.txt')
with fs.open('/my_mem_dir/file_in_root.txt', 'wb') as f:
f.write("Root file content".encode())

fs.mkdir('/my_mem_dir/test_dir', exist_ok=True)
fs.mkdir('/my_mem_dir/second_dir', exist_ok=True)
# fs.mkdir('/my_mem_dir/second_dir/subdir', exist_ok=True)
fs.touch('/my_mem_dir/test_dir/file1.txt')
with fs.open('/my_mem_dir/test_dir/file1.txt', "wb") as f:
f.write("Test content".encode())
f.close()
else:
print("In memory filesystem NOT FOUND")

if fs.exists('/my_mem_dir/test_dir/file1.txt'):
file_info = fs.info('/my_mem_dir/test_dir/file1.txt')
print(f"File exists. size: {file_info}")
else:
print("File does not exist!")
return fs_manager

@pytest.fixture
def jp_server_config(jp_server_config):
return {
"ServerApp": {
"jpserver_extensions": {
"jupyter_fsspec": True
}
}
}
164 changes: 136 additions & 28 deletions jupyter_fsspec/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
import yaml
import urllib.parse
from datetime import datetime
from pathlib import PurePath

class FileSystemManager:
def __init__(self, config_file):
base_dir = jupyter_config_dir()
config_path = os.path.join(base_dir, config_file)
self.config_path = os.path.join(base_dir, config_file)
try:
with open(config_path, 'r') as file:
with open(self.config_path, 'r') as file:
self.config = yaml.safe_load(file)
except Exception as e:
print(f"Error loading configuration file: {e}")
Expand All @@ -22,17 +23,26 @@ def __init__(self, config_file):
def _encode_key(self, fs_config):
fs_path = fs_config['path'].strip('/')

combined = f"{fs_config['type']}|{fs_config['name']}|{fs_path}"
combined = f"{fs_config['type']}|{fs_path}"
encoded_key = urllib.parse.quote(combined, safe='')
return encoded_key

#TODO: verify
def _decode_key(self, encoded_key):
combined = urllib.parse.unquote(encoded_key)
fs_type, fs_name, fs_path = combined.split('|', 2)
return fs_type, fs_name, fs_path
fs_type, fs_path = combined.split('|', 1)
return fs_type, fs_path

def read_config(self):
try:
with open(self.config_path, 'r') as file:
self.config = yaml.safe_load(file)
except Exception as e:
print(f"Error loading configuration file: {e}")
return None

def _initialize_filesystems(self):
self.read_config()

for fs_config in self.config['sources']:
key = self._encode_key(fs_config)

Expand All @@ -43,35 +53,58 @@ def _initialize_filesystems(self):

# Init filesystem
fs = fsspec.filesystem(fs_type, **options)
if fs_type == 'memory':
if not fs.exists(fs_path):
fs.mkdir(fs_path)

# Store the filesystem instance
self.filesystems[key] = {"instance": fs, "name": fs_name, "type": fs_type, "path": fs_path}

def get_all_filesystems(self):
self._initialize_filesystems()

def get_filesystem(self, key):
return self.filesystems.get(key)

def get_filesystem_by_type(self, fs_type):
for encoded_key, fs_info in self.filesystems.items():
if fs_info.get('type') == fs_type:
return {'key': encoded_key, 'info': fs_info}
return None

# ===================================================
# File/Folder Read/Write Operations
# ===================================================
def write(self, key, item_path: str, content): # writePath
# write directory
# write file with content
# write empty file at directory
# write to an existing file
def write(self, key, item_path: str, content, overwrite=False): # writePath
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']

if fs.exists(item_path) and not fs.isdir(item_path):
return {"status_code": 409, "status": f"Failed: Path {item_path} already exists."}

if fs.isdir(item_path):
content = content.decode('utf-8')
if overwrite:
return {"status_code": 409, "response": {"status": "failed", "description": f"Failed: Path {item_path} already exists."}}
if isinstance(content, bytes):
content = content.decode('utf-8')
new_dir_path = str(PurePath(item_path) / content) + '/'
print(f"new_dir_path is: {new_dir_path}")

if fs.exists(new_dir_path):
return {"status_code": 409, "status": f"Failed: Path {item_path} already exists."}
fs.mkdir(new_dir_path, create_parents=True)
return {"status_code": 200, "response": {"status": "success", "description": f"Wrote {new_dir_path}."}}
return {"status_code": 409, "response": {"status": "failed", "description": f"Failed: Path {item_path} already exists."}}
else:
fs.mkdir(new_dir_path, create_parents=True)
return {"status_code": 200, "response": {"status": "success", "description": f"Wrote {new_dir_path}"}}
else:
# TODO: Process content for different mime types correctly
with fs.open(item_path, 'wb') as file:
file.write(content);
if not isinstance(content, bytes):
content = content.encode()

if fs.exists(item_path) and not overwrite:
return {"status_code": 409, "response": {"status": "failed", "description": f"Failed: Path {item_path} already exists."}}
else:
with fs.open(item_path, 'wb') as file:
file.write(content);
return {"status_code": 200, "response": {"status": "success", "description": f"Wrote {item_path}"}}


Expand All @@ -83,30 +116,37 @@ def read(self, key, item_path: str, find: bool = False): # readPath
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist."}}

if fs.isdir(item_path) and find:
# find(): a simple list of files
content = []
dir_ls = fs.find(item_path, maxdepth=None, withdirs=True, detail=False)
for path in dir_ls:
content.append(fs.info(path))
content.append(path)
elif fs.isdir(item_path):
content = []
dir_ls = fs.ls(item_path)
for path in dir_ls:
content.append(fs.info(path))
if not isinstance(path, str): #TODO: improve
path = path['name']

info = fs.info(path)

if isinstance(info.get('created'), datetime):
info['created'] = info['created'].isoformat()
content.append(info)
else:
with fs.open(item_path, 'rb') as file:
content = file.read()
content = content.decode('utf-8')
# TODO: Process content for different mime types for request body eg. application/json
return {"status_code": 200, "response": {"status": "success", "description": f"Read {item_path}", "content": content}}

# TODO:
# TODO: remove
def accessMemoryFS(self, key, item_path):
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']
content = 'Temporary Content: memory fs accessed'
return {"status_code": 200, "response": {"status": "success", "description": f"Read {item_path}", "content": content}}


def update(self, key, item_path, content): #updateFile
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']
Expand All @@ -128,38 +168,76 @@ def delete(self, key, item_path): # deletePath
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist."}}

if fs.isdir(item_path):
fs.delete(item_path, recursive=True) #TODO: await fs._rm()
fs.delete(item_path) #TODO: await fs._rm() Do not want recursive=True
else:
fs.delete(item_path, recursive=False)
return {"status_code": 200, "response": {"status": "success", "description": f"Deleted {item_path}."}}

def move(self, key, item_path, dest_path): # movePath
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']

if not fs.exists(item_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist."}}

if fs.isdir(item_path):
print(f"directory")
fs.mv(item_path, dest_path, recursive=True)
else:
print(f"file")
_, item_extension = os.path.splitext(item_path)
_, dest_extension = os.path.splitext(dest_path)

if not dest_extension:
dest_path = dest_path + item_extension

fs.mv(item_path, dest_path, recursive=False)
return {"status_code": 200, "response": {"status": "success", "description": f"Moved {item_path} to path: {dest_path}."}}
return {"status_code": 200, "response": {"status": "success", "description": f"Moved {item_path} to {dest_path}"}}

def move_diff_fs(self, key, full_item_path, dest_key, full_dest_path): # movePath
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']
dest_fs_obj = self.get_filesystem(dest_key)
dest_fs =dest_fs_obj['instance']

if not fs.exists(full_item_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {full_item_path} does not exist"}}

if fs.isdir(full_item_path):
if not dest_fs.exists(full_dest_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {full_dest_path} does not exist"}}
fsspec.mv(full_item_path, full_dest_path, recursive=True)
else:
fsspec.mv(full_item_path, full_dest_path, recursive=False)
return {"status_code": 200, "response": {"status": "success", "description": f"Moved {full_item_path} to path: {full_dest_path}"}}

def copy(self, key, item_path, dest_path): # copyPath
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']

if not fs.exists(item_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist."}}
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist"}}

if fs.isdir(item_path):
fs.copy(item_path, dest_path, recursive=True)
else:
_, item_extension = os.path.splitext(item_path)
_, dest_extension = os.path.splitext(dest_path)

if not dest_extension:
dest_path = dest_path + item_extension
fs.copy(item_path, dest_path, recursive=False)
return {"status_code": 200, "response": {"status": "success", "description": f"Copied {item_path} to path: {dest_path}."}}
return {"status_code": 200, "response": {"status": "success", "description": f"Copied {item_path} to {dest_path}"}}

def copy_diff_fs(self, key, full_item_path, dest_key, full_dest_path): # copyPath
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']

if not fs.exists(full_item_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {full_item_path} does not exist"}}

if fs.isdir(full_item_path):
fs.copy(full_item_path, full_dest_path, recursive=True)
else:
fs.copy(full_item_path, full_dest_path, recursive=False)
return {"status_code": 200, "response": {"status": "success", "description": f"Copied {full_item_path} to path: {full_dest_path}"}}

def open(self, key, item_path, start, end):
fs_obj = self.get_filesystem(key)
Expand All @@ -177,6 +255,36 @@ def open(self, key, item_path, start, end):
content = data.decode('utf-8')
return {"status_code": 206, "response": {"status": "success", "description": f"Partial content read from: {item_path}", "content": content}}


def rename(self, key, item_path, dest_path):
fs_obj = self.get_filesystem(key)
fs = fs_obj['instance']

if not fs.exists(item_path):
return {"status_code": 404, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {item_path} does not exist"}}

dir_root_path = os.path.dirname(item_path)

# directory
if fs.isdir(item_path):
new_dest_path = dir_root_path + '/' + dest_path
if fs.exists(new_dest_path):
return {"status_code": 403, "response": {"status": "failed", "error": "PATH_NOT_FOUND", "description": f"Path {new_dest_path} already exist"}}
else:
fs.rename(item_path, new_dest_path)
# file
else:
# check for dest_path file extension? if not infer, reassign dest_path
_, item_extension = os.path.splitext(item_path)
_, dest_extension = os.path.splitext(dest_path)

if not dest_extension:
dest_path = dest_path + item_extension
new_dest_path = dir_root_path + '/' + dest_path
fs.rename(item_path, new_dest_path)

return {"status_code": 200, "response": {"status": "success", "description": f"Renamed {item_path} to {new_dest_path}"}}

# ===================================================
# File/Folder Management Operations
# ===================================================
Expand Down
Loading

0 comments on commit 896485c

Please sign in to comment.