Skip to content

Commit

Permalink
is it gonna be a good move?
Browse files Browse the repository at this point in the history
  • Loading branch information
dpdani committed Sep 25, 2024
1 parent b371305 commit 75ba32d
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 61 deletions.
7 changes: 2 additions & 5 deletions src/cereggii/atomic_dict/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
#include "atomic_dict.h"
#include "atomic_dict_internal.h"
#include "constants.h"
#define Py_BUILD_CORE
#include "internal/pycore_dict.h"
#undef Py_BUILD_CORE


void
Expand Down Expand Up @@ -316,10 +313,10 @@ AtomicDict_BatchGetItem(AtomicDict *self, PyObject *args, PyObject *kwargs)

assert(_PyDict_GetItem_KnownHash(batch, key, hash) != NULL); // returns a borrowed reference
if (result.found) {
if (_PyDict_SetItem_KnownHash(batch, key, result.entry.value, hash) < 0)
if (PyDict_SetItem(batch, key, result.entry.value) < 0)
goto fail;
} else {
if (_PyDict_SetItem_KnownHash(batch, key, NOT_FOUND, hash) < 0)
if (PyDict_SetItem(batch, key, NOT_FOUND) < 0)
goto fail;
}
}
Expand Down
89 changes: 33 additions & 56 deletions src/cereggii/atomic_ref.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,20 @@

#include "atomic_ref.h"
#include "atomic_ops.h"


static inline int
_Py_TryIncRefShared(PyObject *op)
{
// I know, I know
#ifdef Py_GIL_DISABLED
// https://github.com/python/cpython/blob/9e551f9b351440ebae79e07a02d0e4a1b61d139e/Include/internal/pycore_object.h#L499
Py_ssize_t shared = op->ob_ref_shared;
for (;;) {
// If the shared refcount is zero and the object is either merged
// or may not have weak references, then we cannot incref it.
if (shared == 0 || shared == 0x3) {
return 0;
}

if (CereggiiAtomic_CompareExchangeSsize(
(Py_ssize_t *) &op->ob_ref_shared,
shared,
shared + (1 << _Py_REF_SHARED_SHIFT))) {
return 1;
}
return 0; // are they actually using some other function?
}
#else
Py_INCREF(op);
return 1;
#endif
}

static inline void
_PyObject_SetMaybeWeakref(PyObject *op)
{
#ifdef Py_GIL_DISABLED
// https://github.com/python/cpython/blob/9e551f9b351440ebae79e07a02d0e4a1b61d139e/Include/internal/pycore_object.h#L608
for (;;) {
Py_ssize_t shared = CereggiiAtomic_LoadSsize((const Py_ssize_t *) &op->ob_ref_shared);
if ((shared & 0x3) != 0) {
// Nothing to do if it's in WEAKREFS, QUEUED, or MERGED states.
return;
}
if (CereggiiAtomic_CompareExchangeSsize(
(Py_ssize_t *) &op->ob_ref_shared, shared, shared | 0x1)) {
return;
}
}
#endif
}
#include "_internal_py_core.h"


PyObject *
AtomicRef_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSED(kwds))
{
AtomicRef *self;
self = (AtomicRef *) type->tp_alloc(type, 0);
if (self != NULL) {
self->reference = Py_None;
_PyObject_SetMaybeWeakref(self->reference);
}

if (self == NULL)
return NULL;

self->reference = Py_None;

if (!PyObject_GC_IsTracked((PyObject *) self)) {
PyObject_GC_Track(self);
}
Expand All @@ -73,14 +28,19 @@ int
AtomicRef_init(AtomicRef *self, PyObject *args, PyObject *Py_UNUSED(kwargs))
{
PyObject *reference = NULL;

if (args != NULL) {
if (!PyArg_ParseTuple(args, "|O", &reference))
goto fail;
}

if (reference != NULL) {
Py_INCREF(reference);
_PyObject_SetMaybeWeakref(reference);
#ifdef Py_GIL_DISABLED
if (!_Py_IsImmortal(reference)) {
_PyObject_SetMaybeWeakref(reference);
}
#endif
// decref'ed in destructor
self->reference = reference;
}
Expand Down Expand Up @@ -110,9 +70,14 @@ PyObject *AtomicRef_Get(AtomicRef *self)
{
PyObject *reference;
reference = self->reference;
while (!_Py_TryIncRefShared(reference)) {

#ifndef Py_GIL_DISABLED
Py_INCREF(reference);
#else
while (!_Py_TryIncref(reference)) {
reference = self->reference;
}
#endif
return reference;
}

Expand All @@ -122,13 +87,17 @@ AtomicRef_Set(AtomicRef *self, PyObject *reference)
assert(reference != NULL);

Py_INCREF(reference);
#ifdef Py_GIL_DISABLED
_PyObject_SetMaybeWeakref(reference);
#endif

PyObject *current_reference;
current_reference = AtomicRef_Get(self);
while (!CereggiiAtomic_CompareExchangePtr((void **) &self->reference, current_reference, reference)) {
Py_DECREF(current_reference);
current_reference = AtomicRef_Get(self);
}

Py_DECREF(current_reference); // decrement for the AtomicRef_Get
Py_DECREF(current_reference); // decrement because not holding it anymore
Py_RETURN_NONE;
Expand All @@ -141,7 +110,11 @@ AtomicRef_CompareAndSet(AtomicRef *self, PyObject *expected, PyObject *new)
assert(new != NULL);

Py_INCREF(new);
_PyObject_SetMaybeWeakref(new);
#ifdef Py_GIL_DISABLED
if (!_Py_IsImmortal(new)) {
_PyObject_SetMaybeWeakref(new);
}
#endif
int retval = CereggiiAtomic_CompareExchangePtr((void **) &self->reference, expected, new);
if (retval) {
Py_DECREF(expected);
Expand Down Expand Up @@ -174,7 +147,11 @@ PyObject *AtomicRef_GetAndSet(AtomicRef *self, PyObject *new)
assert(new != NULL);

Py_INCREF(new);
_PyObject_SetMaybeWeakref(new);
#ifdef Py_GIL_DISABLED
if (!_Py_IsImmortal(new)) {
_PyObject_SetMaybeWeakref(new);
}
#endif
PyObject *current_reference = CereggiiAtomic_ExchangePtr((void **) &self->reference, new);
// don't decref current_reference: passing it to python
return current_reference;
Expand Down
183 changes: 183 additions & 0 deletions src/include/_internal_py_core.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#include "Python.h"


// The shared reference count uses the two least-significant bits to store
// flags. The remaining bits are used to store the reference count.
#define _Py_REF_SHARED_SHIFT 2
#define _Py_REF_SHARED_FLAG_MASK 0x3

// The shared flags are initialized to zero.
#define _Py_REF_SHARED_INIT 0x0
#define _Py_REF_MAYBE_WEAKREF 0x1
#define _Py_REF_QUEUED 0x2
#define _Py_REF_MERGED 0x3


/* Tries to increment an object's reference count
*
* This is a specialized version of _Py_TryIncref that only succeeds if the
* object is immortal or local to this thread. It does not handle the case
* where the reference count modification requires an atomic operation. This
* allows call sites to specialize for the immortal/local case.
*/
static inline int
_Py_TryIncrefFast(PyObject *op) {
uint32_t local = _Py_atomic_load_uint32_relaxed(&op->ob_ref_local);
local += 1;
if (local == 0) {
// immortal
return 1;
}
if (_Py_IsOwnedByCurrentThread(op)) {
_Py_INCREF_STAT_INC();
_Py_atomic_store_uint32_relaxed(&op->ob_ref_local, local);
#ifdef Py_REF_DEBUG
_Py_IncRefTotal(_PyThreadState_GET());
#endif
return 1;
}
return 0;
}

static inline int
_Py_TryIncRefShared(PyObject *op)
{
Py_ssize_t shared = _Py_atomic_load_ssize_relaxed(&op->ob_ref_shared);
for (;;) {
// If the shared refcount is zero and the object is either merged
// or may not have weak references, then we cannot incref it.
if (shared == 0 || shared == _Py_REF_MERGED) {
return 0;
}

if (_Py_atomic_compare_exchange_ssize(
&op->ob_ref_shared,
&shared,
shared + (1 << _Py_REF_SHARED_SHIFT))) {
#ifdef Py_REF_DEBUG
_Py_IncRefTotal(_PyThreadState_GET());
#endif
_Py_INCREF_STAT_INC();
return 1;
}
}
}

/* Tries to incref the object op and ensures that *src still points to it. */
static inline int
_Py_TryIncrefCompare(PyObject **src, PyObject *op)
{
if (_Py_TryIncrefFast(op)) {
return 1;
}
if (!_Py_TryIncRefShared(op)) {
return 0;
}
if (op != _Py_atomic_load_ptr(src)) {
Py_DECREF(op);
return 0;
}
return 1;
}

/* Loads and increfs an object from ptr, which may contain a NULL value.
Safe with concurrent (atomic) updates to ptr.
NOTE: The writer must set maybe-weakref on the stored object! */
static inline PyObject *
_Py_XGetRef(PyObject **ptr)
{
for (;;) {
PyObject *value = _Py_atomic_load_ptr(ptr);
if (value == NULL) {
return value;
}
if (_Py_TryIncrefCompare(ptr, value)) {
return value;
}
}
}

/* Attempts to loads and increfs an object from ptr. Returns NULL
on failure, which may be due to a NULL value or a concurrent update. */
static inline PyObject *
_Py_TryXGetRef(PyObject **ptr)
{
PyObject *value = _Py_atomic_load_ptr(ptr);
if (value == NULL) {
return value;
}
if (_Py_TryIncrefCompare(ptr, value)) {
return value;
}
return NULL;
}

/* Like Py_NewRef but also optimistically sets _Py_REF_MAYBE_WEAKREF
on objects owned by a different thread. */
static inline PyObject *
_Py_NewRefWithLock(PyObject *op)
{
if (_Py_TryIncrefFast(op)) {
return op;
}
#ifdef Py_REF_DEBUG
_Py_IncRefTotal(_PyThreadState_GET());
#endif
_Py_INCREF_STAT_INC();
for (;;) {
Py_ssize_t shared = _Py_atomic_load_ssize_relaxed(&op->ob_ref_shared);
Py_ssize_t new_shared = shared + (1 << _Py_REF_SHARED_SHIFT);
if ((shared & _Py_REF_SHARED_FLAG_MASK) == 0) {
new_shared |= _Py_REF_MAYBE_WEAKREF;
}
if (_Py_atomic_compare_exchange_ssize(
&op->ob_ref_shared,
&shared,
new_shared)) {
return op;
}
}
}

static inline PyObject *
_Py_XNewRefWithLock(PyObject *obj)
{
if (obj == NULL) {
return NULL;
}
return _Py_NewRefWithLock(obj);
}

static inline void
_PyObject_SetMaybeWeakref(PyObject *op)
{
if (_Py_IsImmortal(op)) {
return;
}
for (;;) {
Py_ssize_t shared = _Py_atomic_load_ssize_relaxed(&op->ob_ref_shared);
if ((shared & _Py_REF_SHARED_FLAG_MASK) != 0) {
// Nothing to do if it's in WEAKREFS, QUEUED, or MERGED states.
return;
}
if (_Py_atomic_compare_exchange_ssize(
&op->ob_ref_shared, &shared, shared | _Py_REF_MAYBE_WEAKREF)) {
return;
}
}
}

/* Tries to incref op and returns 1 if successful or 0 otherwise. */
static inline int
_Py_TryIncref(PyObject *op)
{
#ifdef Py_GIL_DISABLED
return _Py_TryIncrefFast(op) || _Py_TryIncRefShared(op);
#else
if (Py_REFCNT(op) > 0) {
Py_INCREF(op);
return 1;
}
return 0;
#endif
}
19 changes: 19 additions & 0 deletions tests/test_atomic_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ def thread(result, ref):
assert id(obj_0) == id_0 and id(obj_1) == id_1


def test_counter():
r = AtomicRef(0)

def thread():
for _ in range(1_000):
expected = r.get()
while not r.compare_and_set(expected, expected + 1):
expected = r.get()

t0 = threading.Thread(target=thread)
t1 = threading.Thread(target=thread)
t0.start()
t1.start()
t0.join()
t1.join()

assert r.get() == 2_000


def test_swap():
r = AtomicRef()
result_0 = Result()
Expand Down

0 comments on commit 75ba32d

Please sign in to comment.