Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multireaderfilestream Redesign #4595

Merged
merged 22 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from io import BytesIO
from io import BytesIO, SEEK_SET, SEEK_END
from threading import Lock

from codalab.worker.un_gzip_stream import BytesBuffer


import time
class MultiReaderFileStream(BytesIO):
"""
FileStream that support multiple readers
FileStream that supports N readers with the following features and constraints:
- Each reader's postion is tracked
- A buffer of bytes() is stored which stores bytes from the position of the slowest reader
minus a lookback length of 32MiB to the fastest reader
- The fastest reader can be at most 64MiB ahead of the slowest reader, reads made
further than 64MiB will sleep until the slowest reader catches up
"""
NUM_READERS = 2
LOOKBACK_LENGTH = 33554432 # 32 MiB
MAX_THRESHOLD = LOOKBACK_LENGTH * 2

def __init__(self, fileobj):
self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.

self._buffer = bytes() # Buffer of bytes read from the file object within the limits defined
self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH)
self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj
self._fileobj = fileobj # The original file object the readers are reading from
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer.
class FileStreamReader(BytesIO):
def __init__(s, index):
s._index = index
Expand All @@ -25,33 +29,67 @@ def read(s, num_bytes=None):

def peek(s, num_bytes):
return self.peek(s._index, num_bytes)

def seek(s, offset, whence=SEEK_SET):
return self.seek(s._index, offset, whence)

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]

def _fill_buf_bytes(self, index: int, num_bytes=None):
with self._lock:
while num_bytes is None or len(self._bufs[index]) < num_bytes:
s = self._fileobj.read(num_bytes)
if not s:
break
for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)
def _fill_buf_bytes(self, num_bytes=0):
"""
Fills the buffer with bytes from the fileobj
"""
s = self._fileobj.read(num_bytes)
if not s:
return
self._buffer += s


def read(self, index: int, num_bytes=None): # type: ignore
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""
self._fill_buf_bytes(index, num_bytes)
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self._pos[index] += len(s)
if num_bytes == None:
# Read remaining in buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever call this with None? If not, don't support it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so, but this emulates default read() behavior for other io streams in python, where None means read to the end of the fileobj, or in our case the buffer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end of fileobj is different semantically from end of buffer (which is arbitrary), so I would avoid having None to set the wrong expectations of a behavior that we don't support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll make the change.

num_bytes = (self._buffer_pos + len(self._buffer)) - self._pos[index]

s = self.peek(index, num_bytes)
with self._lock:
# Modify reader position in fileobj
self._pos[index] += len(s)

# If this reader is the minimum reader, we can remove some bytes from the beginning of the buffer
# Calculated min position of buffer minus current min position of buffer
diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos
# NOTE: it's possible for diff < 0 if seek backwards occur
if diff > 0:
self._buffer = self._buffer[diff:]
self._buffer_pos += diff
return s

def peek(self, index: int, num_bytes): # type: ignore
self._fill_buf_bytes(index, num_bytes)
s = self._bufs[index].peek(num_bytes)
def peek(self, index: int, num_bytes: int): # type: ignore
new_pos = self._pos[index] + num_bytes
while (new_pos) - self._buffer_pos > self.MAX_THRESHOLD:
time.sleep(.1) # 100 ms

with self._lock:
# Calculate how many new bytes need to be read
new_bytes_needed = new_pos - max(self._pos)
if new_bytes_needed > 0:
self._fill_buf_bytes(new_bytes_needed)

# Get the bytes in the buffer that correspond to the read function call
buffer_index = self._pos[index] - self._buffer_pos
s = self._buffer[buffer_index:buffer_index + num_bytes]

return s

def seek(self, index: int, offset: int, whence=SEEK_SET):
if whence == SEEK_END:
super().seek(offset, whence)
else:
assert offset >= self._buffer_pos
self._pos[index] = offset

def close(self):
self.__input.close()
3 changes: 2 additions & 1 deletion codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ def write_fileobj(
conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str
try:
CHUNK_SIZE = 16 * 1024
# Chunk size set to 1MiB for performance
CHUNK_SIZE = 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why increase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upload speed with the smaller chunk size was too slow due to the sleep behavior that occurs on the faster reader, which is always the index reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there's no super meaningful reason to keep chunk size smallish since the speed tradeoff is too large


def upload_file_content():
iteration = 0
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ websockets==9.1
kubernetes==12.0.1
google-cloud-storage==2.0.0
httpio==0.3.0
memory_profiler==0.61.0
155 changes: 155 additions & 0 deletions tests/unit/beam/multireaderfilestream_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import tempfile
import time
import unittest

from threading import Thread

from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream

FILESIZE = 100000000
CHUNKSIZE = FILESIZE/10

class MultiReaderFileStreamTest(unittest.TestCase):
def test_reader_distance(self):
"""
This test verifies that both readers in the Multireaderfilestream
are within the limits defined in the class
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)

t1.start()

# Sleep a little for thread 1 to start reading
time.sleep(.5)

# Assert that the first reader has not read past the maximum threshold
self.assertGreater(70000000, m_stream._pos[0])

t2.start()

# Sleep a little for thread 2 to start reading
time.sleep(.5)

# Assert that the first reader is at 100000000, second reader is at 40000000
self.assertEqual(100000000, m_stream._pos[0])
self.assertEqual(40000000, m_stream._pos[1])

# Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH)
self.assertEqual(6445568, m_stream._buffer_pos)

# Assert that the buffer is length 100000000 - 6445568
self.assertEqual(93554432, len(m_stream._buffer))

t1.join()
t2.join()

def test_backwards_seek(self):
"""
This test verifies that a backwards seek within the lookback length
defined in the Multireaderfilestream class behaves as expected
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to 10000000 and read another 4/10 of the file
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(10000000)
except AssertionError as e:
result = e

for _ in range(4):
reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsNone(result)

# Check that reader 2 is at 50000000 and buffer position is correct
self.assertEqual(50000000, m_stream._pos[1])
self.assertEqual(16445568, m_stream._buffer_pos)


def test_toofar_seek(self):
"""
This test verifies that a backwards seek past the lookback length
defined in the Multireaderfilestream class behaves as expected with
an AssertionError
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to the beginning
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(0)
except AssertionError as e:
result = e

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsInstance(result, AssertionError)
57 changes: 55 additions & 2 deletions tests/unit/lib/upload_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from io import BytesIO
from memory_profiler import memory_usage
from typing import IO, cast
from unittest.mock import MagicMock
from urllib.response import addinfourl
Expand All @@ -18,7 +19,8 @@
from tests.unit.server.bundle_manager import TestBase

urlopen_real = urllib.request.urlopen

LARGE_FILE_SIZE = 16777216 #16MB
EXTRA_LARGE_FILE_SIZE = 134217728 #128MB for Memory Profiling Only

class UploadManagerTestBase(TestBase):
"""A class that contains the base for an UploadManager test. Subclasses
Expand All @@ -42,6 +44,13 @@ def check_file_equals_string(self, file_subpath: str, expected_contents: str):
def listdir(self):
"""List the files in the current bundle location."""
raise NotImplementedError

def check_file_size(self):
"""Check the file sizes in the current bundle location"""
with FileSystems.open(
self.bundle_location, compression_type=CompressionTypes.UNCOMPRESSED
) as f, tarfile.open(fileobj=f, mode='r:gz') as tf:
return [tarinfo.size for tarinfo in tf.getmembers()]

@property
def bundle_location(self):
Expand Down Expand Up @@ -78,6 +87,37 @@ def test_fileobj_gz(self):
self.do_upload(('source.gz', BytesIO(gzip_bytestring(b'testing'))))
self.check_file_equals_string('', 'testing')

def test_fileobj_tar_gz(self):
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(10, os.path.join(source, 'file'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['file'], sorted(self.listdir()))
self.assertEqual([0, 10], self.check_file_size())

def test_large_fileobj_tar_gz(self):
"""
Large bundles should not cause issues
"""
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile'))
self.write_string_to_file('testing', os.path.join(source, 'README'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['README', 'bigfile'], sorted(self.listdir()))

def test_large_fileobj_tar_gz2(self):
"""
Large bundles should not cause issues
"""
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile'))
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile2'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['bigfile', 'bigfile2'], sorted(self.listdir()))
self.assertEqual([0, LARGE_FILE_SIZE, LARGE_FILE_SIZE], self.check_file_size())

def test_fileobj_tar_gz_should_not_simplify_archives(self):
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
Expand Down Expand Up @@ -108,7 +148,7 @@ def test_fileobj_tar_gz_with_dsstore_should_not_simplify_archive_2(self):
self.write_string_to_file('testing', os.path.join(source, '.DS_Store'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['.DS_Store', 'README', 'README2'], sorted(self.listdir()))

def mock_url_source(self, fileobj, ext=""):
"""Returns a URL that is mocked to return the contents of fileobj.
The URL will end in the extension "ext", if given.
Expand Down Expand Up @@ -149,10 +189,23 @@ def test_url_git(self):
# change, then update this test.
self.check_file_equals_string('testfile.md', '# test\nUsed for testing\n')

def test_upload_memory(self):
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(self.temp_dir, 'bigfile'))
mem_usage = memory_usage(
(self.do_upload(('bigfile', os.path.join(self.temp_dir, 'bigfile'))), ),
interval=0.1,
timeout=1
)
self.assertEqual(max(mem_usage) < 100000000, True)

def write_string_to_file(self, string, file_path):
with open(file_path, 'w') as f:
f.write(string)

def write_file_of_size(self, size: int, file_path: str):
with open(file_path, "wb") as f:
f.seek(size - 1)
f.write(b"\0")

class UploadManagerDiskStorageTest(UploadManagerTestBase, unittest.TestCase):
"""Tests for UploadManager that upload files to disk storage."""
Expand Down
Loading