Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove coordinator and support forking #1067

Merged
merged 59 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3b61fad
WIP: Add ScheduleCollection when requesting GC
wks Dec 28, 2023
18f7bb6
WIP: Rework GC triggering
wks Dec 28, 2023
ffe8f33
Working
wks Jan 2, 2024
b64e2a7
WIP: Remove Controller
wks Jan 3, 2024
4fa8fac
Fix deadlock between GCRequester and GC workers.
wks Jan 3, 2024
8f8c3e0
Remove comments and add back trace points
wks Jan 4, 2024
845eea6
WIP: Updating comments and minor code change.
wks Jan 4, 2024
2d52f00
Fix a case where workers wait forever
wks Jan 4, 2024
fa2aa50
Added comments and removed redundant code.
wks Jan 5, 2024
867eac0
Merge branch 'master' into fix/no-coordinator
wks Jan 8, 2024
2230c3f
Merge branch 'master' into fix/no-coordinator
wks Jan 11, 2024
5bda9ee
Move the gc_start trace point.
wks Jan 11, 2024
70d7b95
Merge branch 'master' into fix/no-coordinator
wks Jan 12, 2024
72e6eb9
Remove the EndOfGC work packet.
wks Jan 12, 2024
742c3a2
WIP: Refactor to support forking
wks Jan 12, 2024
90cb6df
Coding style change
wks Jan 12, 2024
2a25331
Split WorkerGoals and WorkerMonitor
wks Jan 13, 2024
1f9d905
Create worker structs lazily
wks Jan 17, 2024
029bfda
Allow the loop in GCWorker::run to break
wks Jan 18, 2024
d328859
Stop and restart GC workers for forking
wks Jan 18, 2024
0d79d8d
Adjust logging
wks Jan 19, 2024
44f22ba
Update eBPF tracing document
wks Jan 19, 2024
e243776
Merge branch 'master' into feature/fork
wks Jan 31, 2024
54f586b
Revert the change on `GcStatus`.
wks Jan 31, 2024
da19f04
Merge branch 'master' into feature/fork
wks Feb 1, 2024
822f6f5
Fix clippy warning
wks Feb 1, 2024
7cb0b2c
Formatting
wks Feb 1, 2024
176ad47
Replace uninitialize_collection with prepare/after fork.
wks Feb 5, 2024
53309d9
Merge branch 'master' into feature/fork
wks Feb 7, 2024
d3ddf7d
Fix unused warning
wks Feb 7, 2024
5c125fb
Do not wait for all workers to give in their structs
wks Feb 7, 2024
41ee28c
Fix comment
wks Feb 8, 2024
c4ab8b4
Only print TID on linux.
wks Feb 8, 2024
d9b451a
Fix comments
wks Feb 8, 2024
7a2074e
Refactor WorkerRequest and fix comments
wks Feb 8, 2024
eb1d8df
Use EnumMap for requested goals
wks Feb 8, 2024
54d798a
Fix style for stable Rust
wks Feb 8, 2024
f0e2755
Fix MacOS build
wks Feb 8, 2024
6e907e0
Merge branch 'master' into feature/fork
wks Mar 15, 2024
ea53ce1
Minor fix.
wks Mar 19, 2024
2c15ecd
Update comment
wks Mar 19, 2024
9aacb2d
Change type/variable/function names
wks Mar 21, 2024
351d982
Align GCWorkers with GCWorkerShared
wks Mar 21, 2024
9f743a8
Remove comment sentence
wks Mar 22, 2024
fd2f9c3
Adjust eBPF trace points and their docs.
wks Mar 25, 2024
991314a
Update comments and clean up simple wrapper functions
wks Mar 25, 2024
eaa00bf
Refactored state transition.
wks Mar 25, 2024
b791aa7
Minor changes
wks Mar 25, 2024
f89db28
Merge branch 'master' into feature/fork
wks Mar 25, 2024
f049f73
Add a mock test for forking support.
wks Apr 1, 2024
a6d834a
Fix clippy warning
wks Apr 2, 2024
e9da4c2
Fix stale comments
wks Apr 2, 2024
d293c04
Added unit tests
wks Apr 2, 2024
b526ec1
Merge branch 'master' into feature/fork
wks Apr 3, 2024
d9ef6a3
Style fix
wks Apr 3, 2024
a0b2f0b
Merge branch 'master' into feature/fork
wks Apr 3, 2024
31a660c
Fix typo
wks Apr 4, 2024
49c979e
Add hyperlink
wks Apr 4, 2024
a56e38d
Merge branch 'master' into feature/fork
wks Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/header/mmtk.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ extern void mmtk_scan_region();
// Request MMTk to trigger a GC. Note that this may not actually trigger a GC
extern void mmtk_handle_user_collection_request(void* tls);

// Run the main loop for the GC controller thread. Does not return
extern void mmtk_start_control_collector(void* tls, void* worker);

// Run the main loop for a GC worker. Does not return
extern void mmtk_start_worker(void* tls, void* worker);

Expand Down
6 changes: 6 additions & 0 deletions src/global_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Instant;

use atomic_refcell::AtomicRefCell;

/// This stores some global states for an MMTK instance.
/// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it.
Expand All @@ -15,6 +18,8 @@ pub struct GlobalState {
pub(crate) initialized: AtomicBool,
/// The current GC status.
pub(crate) gc_status: Mutex<GcStatus>,
/// When did the last GC start? Only accessed by the last parked worker.
pub(crate) gc_start_time: AtomicRefCell<Option<Instant>>,
/// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should
/// attempt to collect as much as we can.
pub(crate) emergency_collection: AtomicBool,
Expand Down Expand Up @@ -195,6 +200,7 @@ impl Default for GlobalState {
Self {
initialized: AtomicBool::new(false),
gc_status: Mutex::new(GcStatus::NotInGC),
gc_start_time: AtomicRefCell::new(None),
stacks_prepared: AtomicBool::new(false),
emergency_collection: AtomicBool::new(false),
user_triggered_collection: AtomicBool::new(false),
Expand Down
49 changes: 8 additions & 41 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::mmtk::MMTK;
use crate::plan::AllocationSemantics;
use crate::plan::{Mutator, MutatorContext};
use crate::scheduler::WorkBucketStage;
use crate::scheduler::{GCController, GCWork, GCWorker};
use crate::scheduler::{GCWork, GCWorker};
use crate::util::alloc::allocators::AllocatorSelector;
use crate::util::constants::{LOG_BYTES_IN_PAGE, MIN_OBJECT_SIZE};
use crate::util::heap::layout::vm_layout::vm_layout;
Expand All @@ -25,7 +25,7 @@ use crate::util::{Address, ObjectReference};
use crate::vm::edge_shape::MemorySlice;
use crate::vm::ReferenceGlue;
use crate::vm::VMBinding;
use std::sync::atomic::Ordering;

/// Initialize an MMTk instance. A VM should call this method after creating an [`crate::MMTK`]
/// instance but before using any of the methods provided in MMTk (except `process()` and `process_bulk()`).
///
Expand Down Expand Up @@ -438,6 +438,7 @@ pub fn free_with_size<VM: VMBinding>(mmtk: &MMTK<VM>, addr: Address, old_size: u
/// Get the current active malloc'd bytes. Here MMTk only accounts for bytes that are done through those 'counted malloc' functions.
#[cfg(feature = "malloc_counted_size")]
pub fn get_malloc_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
use std::sync::atomic::Ordering;
mmtk.state.malloc_bytes.load(Ordering::SeqCst)
}

Expand All @@ -460,53 +461,18 @@ pub fn gc_poll<VM: VMBinding>(mmtk: &MMTK<VM>, tls: VMMutatorThread) {
}
}

/// Run the main loop for the GC controller thread. This method does not return.
///
/// Arguments:
/// * `tls`: The thread that will be used as the GC controller.
/// * `gc_controller`: The execution context of the GC controller threa.
/// It is the `GCController` passed to `Collection::spawn_gc_thread`.
/// * `mmtk`: A reference to an MMTk instance.
pub fn start_control_collector<VM: VMBinding>(
_mmtk: &'static MMTK<VM>,
tls: VMWorkerThread,
gc_controller: &mut GCController<VM>,
) {
gc_controller.run(tls);
}

/// Run the main loop of a GC worker. This method does not return.
///
/// Arguments:
/// * `tls`: The thread that will be used as the GC worker.
/// * `worker`: The execution context of the GC worker thread.
/// It is the `GCWorker` passed to `Collection::spawn_gc_thread`.
/// * `mmtk`: A reference to an MMTk instance.
/// Wrapper for [`crate::scheduler::GCWorker::run`].
pub fn start_worker<VM: VMBinding>(
mmtk: &'static MMTK<VM>,
tls: VMWorkerThread,
worker: &mut GCWorker<VM>,
worker: Box<GCWorker<VM>>,
) {
worker.run(tls, mmtk);
}

/// Initialize the scheduler and GC workers that are required for doing garbage collections.
/// This is a mandatory call for a VM during its boot process once its thread system
/// is ready. This should only be called once. This call will invoke Collection::spawn_gc_thread()
/// to create GC threads.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
/// * `tls`: The thread that wants to enable the collection. This value will be passed back to the VM in
/// Collection::spawn_gc_thread() so that the VM knows the context.
/// Wrapper for [`crate::mmtk::MMTK::initialize_collection`].
pub fn initialize_collection<VM: VMBinding>(mmtk: &'static MMTK<VM>, tls: VMThread) {
assert!(
!mmtk.state.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
mmtk.scheduler.spawn_gc_threads(mmtk, tls);
mmtk.state.initialized.store(true, Ordering::SeqCst);
probe!(mmtk, collection_initialized);
mmtk.initialize_collection(tls);
}

/// Process MMTk run-time options. Returns true if the option is processed successfully.
Expand Down Expand Up @@ -554,6 +520,7 @@ pub fn free_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
/// to call this method is at the end of a GC (e.g. when the runtime is about to resume threads).
#[cfg(feature = "count_live_bytes_in_gc")]
pub fn live_bytes_in_last_gc<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
use std::sync::atomic::Ordering;
mmtk.state.live_bytes_in_last_gc.load(Ordering::SeqCst)
}

Expand Down
91 changes: 90 additions & 1 deletion src/mmtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<VM: VMBinding> MMTK<VM> {

let state = Arc::new(GlobalState::default());

let gc_requester = Arc::new(GCRequester::new());
let gc_requester = Arc::new(GCRequester::new(scheduler.clone()));

let gc_trigger = Arc::new(GCTrigger::new(
options.clone(),
Expand Down Expand Up @@ -220,6 +220,93 @@ impl<VM: VMBinding> MMTK<VM> {
}
}

/// Initialize the GC worker threads that are required for doing garbage collections.
/// This is a mandatory call for a VM during its boot process once its thread system
/// is ready.
///
/// Internally, this function will invoke [`Collection::spawn_gc_thread()`] to spawn GC worker
/// threads.
///
/// # Arguments
///
/// * `tls`: The thread that wants to enable the collection. This value will be passed back
/// to the VM in [`Collection::spawn_gc_thread()`] so that the VM knows the context.
///
/// [`Collection::spawn_gc_thread()`]: crate::vm::Collection::spawn_gc_thread()
pub fn initialize_collection(&'static self, tls: VMThread) {
assert!(
!self.state.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
self.scheduler.spawn_gc_threads(self, tls);
self.state.initialized.store(true, Ordering::SeqCst);
probe!(mmtk, collection_initialized);
}

/// Prepare an MMTk instance for calling the `fork()` system call.
///
/// The `fork()` system call is available on Linux and some UNIX variants, and may be emulated
/// on other platforms by libraries such as Cygwin. The properties of the `fork()` system call
/// requires the users to do some preparation before calling it.
///
/// - **Multi-threading**: If `fork()` is called when the process has multiple threads, it
/// will only duplicate the current thread into the child process, and the child process can
/// only call async-signal-safe functions, notably `exec()`. For VMs that that use
/// multi-process concurrency, it is imperative that when calling `fork()`, only one thread may
/// exist in the process.
///
/// - **File descriptors**: The child process inherits copies of the parent's set of open
/// file descriptors. This may or may not be desired depending on use cases.
///
/// This function helps VMs that use `fork()` for multi-process concurrency. It instructs all
/// GC threads to save their contexts and return from their entry-point functions. Currently,
/// such threads only include GC workers, and the entry point is
/// [`crate::memory_manager::start_worker`]. A subsequent call to `MMTK::after_fork()` will
/// re-spawn the threads using their saved contexts. The VM must not allocate objects in the
/// MMTk heap before calling `MMTK::after_fork()`.
///
/// TODO: Currently, the MMTk core does not keep any files open for a long time. In the
/// future, this function and the `after_fork` function may be used for handling open file
/// descriptors across invocations of `fork()`. One possible use case is logging GC activities
/// and statistics to files, such as performing heap dumps across multiple GCs.
///
/// If a VM intends to execute another program by calling `fork()` and immediately calling
/// `exec`, it may skip this function because the state of the MMTk instance will be irrelevant
/// in that case.
///
/// # Caution!
///
/// This function sends an asynchronous message to GC threads and returns immediately, but it
/// is only safe for the VM to call `fork()` after the underlying **native threads** of the GC
/// threads have exited. After calling this function, the VM should wait for their underlying
/// native threads to exit in VM-specific manner before calling `fork()`.
pub fn prepare_to_fork(&'static self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fork is unix specific. We should either avoid using the term, or make it specific to certain targets. As we are not doing anything specific to forking in MMTk (we just stop GC threads), I think it may be a good idea to avoid using the term fork.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. fork is unix-specific. But I think we should be very explicit about forking because it has very unusual requirements. For example, when calling fork(), the process must have only one thread. If the process has multiple threads, the only thing safe to do is calling exec() or other async-signal-safe functions (see https://man7.org/linux/man-pages/man2/fork.2.html). The implementation of prepare_to_fork is centered around those requirements. The GCThreadJoinHandle trait which I just introduced yesterday is one example. It waits until the underlying native thread of a GC thread quits. It may be very different if we simply want to, for example, grow or shrink the number of GC threads at run time. In that case, it is sufficient to let the GC thread give up their context, but the VM may repurpose those threads as other threads (such as used as mutators).

And the name should be exposed through the API. If a VM needs forking (Android ART and CRuby), it knows precisely what it is doing. CRuby carefully brings down other helper threads, too, before forking.

I am OK with making it platform-specific. VMs treat forking as platform-specific, too. CRuby only supports Kernel#fork if the underlying OS supports fork(), and Andorid only runs on its own modified Linux kernel. But I don't know how to do it well now. We need some kind of platform abstraction layer. Then we can label platforms that support fork(). That includes Linux, Mac, some UNIX variants, and CYGWIN, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. fork is unix-specific. But I think we should be very explicit about forking because it has very unusual requirements. For example, when calling fork(), the process must have only one thread. If the process has multiple threads, the only thing safe to do is calling exec() or other async-signal-safe functions (see https://man7.org/linux/man-pages/man2/fork.2.html). The implementation of prepare_to_fork is centered around those requirements. The GCThreadJoinHandle trait which I just introduced yesterday is one example. It waits until the underlying native thread of a GC thread quits. It may be very different if we simply want to, for example, grow or shrink the number of GC threads at run time. In that case, it is sufficient to let the GC thread give up their context, but the VM may repurpose those threads as other threads (such as used as mutators).

In this API function, does MMTk core do anything specific to forking other than just stopping GC threads? Is it the binding rather than MMTk core that needs to know the semantics of forking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It keeps the GCWorker struct instances so that GC workers can be re-spawn later, reusing those structs. If we are adjusting the number of GC workers, we probably would drop those structs. This probably doesn't matter because the GC workers are almost stateless when not running any work packets.
  2. It needs to ensure the underlying OS thread of the GC workers exit, not just returning from the function memory_manager::start_worker. Of course the binding needs to cooperate by implementing the GCThreadJoinHandle.

There is an alternative design. MMTk core provides a function to ask all GC threads to return from their entry-point functions, such as memory_manager::start_worker. Then the VM bindings implement their own mechanisms to wait until their underlying OS threads exited. By doing this, bindings no longer need to implement GCThreadJoinHandle and give it to mmtk-core.

Copy link
Member

@qinsoon qinsoon Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It keeps the GCWorker struct instances so that GC workers can be re-spawn later, reusing those structs. If we are adjusting the number of GC workers, we probably would drop those structs. This probably doesn't matter because the GC workers are almost stateless when not running any work packets.
  2. It needs to ensure the underlying OS thread of the GC workers exit, not just returning from the function memory_manager::start_worker. Of course the binding needs to cooperate by implementing the GCThreadJoinHandle.

It sounds to me that both are design/implementation choices rather than some requirements of fork.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is OK to destruct GCWorker when forking. We will need to construct them again later. It is fair to say this is a design choice. It is an appropriate choice for forking.

It is OK to move the responsibility of waiting for the OS threads to the binding. But either mmtk-core or the binding must wait for OS threads to exit. This is a hard requirement for fork.

assert!(
self.state.is_initialized(),
"MMTk collection has not been initialized, yet (was initialize_collection() called before?)"
);
probe!(mmtk, prepare_to_fork);
self.scheduler.stop_gc_threads_for_forking();
}

/// Call this function after the VM called the `fork()` system call.
///
/// This function will re-spawn MMTk threads from saved contexts.
///
/// # Arguments
///
/// * `tls`: The thread that wants to respawn MMTk threads after forking. This value will be
/// passed back to the VM in `Collection::spawn_gc_thread()` so that the VM knows the
/// context.
pub fn after_fork(&'static self, tls: VMThread) {
assert!(
self.state.is_initialized(),
"MMTk collection has not been initialized, yet (was initialize_collection() called before?)"
);
probe!(mmtk, after_fork);
self.scheduler.respawn_gc_threads_after_forking(tls);
}

/// Generic hook to allow benchmarks to be harnessed. MMTk will trigger a GC
/// to clear any residual garbage and start collecting statistics for the benchmark.
/// This is usually called by the benchmark harness as its last step before the actual benchmark.
Expand Down Expand Up @@ -349,6 +436,8 @@ impl<VM: VMBinding> MMTK<VM> {
self.state
.internal_triggered_collection
.store(true, Ordering::Relaxed);
// TODO: The current `GCRequester::request()` is probably incorrect for internally triggered GC.
// Consider removing functions related to "internal triggered collection".
self.gc_requester.request();
}

Expand Down
58 changes: 17 additions & 41 deletions src/plan/gc_requester.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,42 @@
use crate::scheduler::GCWorkScheduler;
use crate::vm::VMBinding;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
use std::sync::Arc;

struct RequestSync {
request_count: isize,
last_request_count: isize,
}

/// GC requester. This object allows other threads to request (trigger) GC,
/// and the GC coordinator thread waits for GC requests using this object.
/// This data structure lets mutators trigger GC.
pub struct GCRequester<VM: VMBinding> {
request_sync: Mutex<RequestSync>,
request_condvar: Condvar,
/// Set by mutators to trigger GC. It is atomic so that mutators can check if GC has already
/// been requested efficiently in `poll` without acquiring any mutex.
request_flag: AtomicBool,
phantom: PhantomData<VM>,
}

// Clippy says we need this...
impl<VM: VMBinding> Default for GCRequester<VM> {
fn default() -> Self {
Self::new()
}
scheduler: Arc<GCWorkScheduler<VM>>,
}

impl<VM: VMBinding> GCRequester<VM> {
pub fn new() -> Self {
pub fn new(scheduler: Arc<GCWorkScheduler<VM>>) -> Self {
GCRequester {
request_sync: Mutex::new(RequestSync {
request_count: 0,
last_request_count: -1,
}),
request_condvar: Condvar::new(),
request_flag: AtomicBool::new(false),
phantom: PhantomData,
scheduler,
}
}

/// Request a GC. Called by mutators when polling (during allocation) and when handling user
/// GC requests (e.g. `System.gc();` in Java).
pub fn request(&self) {
if self.request_flag.load(Ordering::Relaxed) {
return;
}

let mut guard = self.request_sync.lock().unwrap();
if !self.request_flag.load(Ordering::Relaxed) {
self.request_flag.store(true, Ordering::Relaxed);
guard.request_count += 1;
self.request_condvar.notify_all();
if !self.request_flag.swap(true, Ordering::Relaxed) {
// `GCWorkScheduler::request_schedule_collection` needs to hold a mutex to communicate
// with GC workers, which is expensive for functions like `poll`. We use the atomic
// flag `request_flag` to elide the need to acquire the mutex in subsequent calls.
self.scheduler.request_schedule_collection();
}
}

/// Clear the "GC requested" flag so that mutators can trigger the next GC.
/// Called by a GC worker when all mutators have come to a stop.
pub fn clear_request(&self) {
let guard = self.request_sync.lock().unwrap();
self.request_flag.store(false, Ordering::Relaxed);
drop(guard);
}

pub fn wait_for_request(&self) {
let mut guard = self.request_sync.lock().unwrap();
guard.last_request_count += 1;
while guard.last_request_count == guard.request_count {
guard = self.request_condvar.wait(guard).unwrap();
}
}
}
Loading
Loading