Skip to content

Commit 54d1fd8

Browse files
committed
fix some bugs: arbitration & suspend
1 parent ef0d629 commit 54d1fd8

File tree

11 files changed

+256
-24
lines changed

11 files changed

+256
-24
lines changed

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/MemoryManager.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.hugegraph.memory;
1919

20+
import java.util.Map;
2021
import java.util.PriorityQueue;
2122
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentHashMap;
2224
import java.util.concurrent.ExecutionException;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.Future;
@@ -38,18 +40,22 @@
3840
public class MemoryManager {
3941

4042
private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
43+
private static final int ARBITRATE_MEMORY_THREAD_NUM = 12;
4144
private static final String QUERY_MEMORY_POOL_NAME_PREFIX = "QueryMemoryPool";
4245
private static final String ARBITRATE_MEMORY_POOL_NAME = "ArbitrateMemoryPool";
43-
private static final String DELIMINATOR = "_";
44-
private static final int ARBITRATE_MEMORY_THREAD_NUM = 12;
46+
public static final String DELIMINATOR = "_";
47+
4548
// TODO: read it from conf, current 1G
4649
public static final long MAX_MEMORY_CAPACITY_IN_BYTES = Bytes.GB;
4750
private final AtomicLong currentAvailableMemoryInBytes =
4851
new AtomicLong(MAX_MEMORY_CAPACITY_IN_BYTES);
4952
private final AtomicLong currentOffHeapAllocatedMemoryInBytes = new AtomicLong(0);
5053
private final AtomicLong currentOnHeapAllocatedMemoryInBytes = new AtomicLong(0);
54+
5155
private final Queue<MemoryPool> queryMemoryPools =
5256
new PriorityQueue<>((o1, o2) -> (int) (o2.getFreeBytes() - o1.getFreeBytes()));
57+
private final Map<String, MemoryPool> threadName2TaskMemoryPoolMap = new ConcurrentHashMap<>();
58+
5359
private final MemoryArbitrator memoryArbitrator;
5460
private final ExecutorService arbitrateExecutor;
5561

@@ -128,6 +134,21 @@ public synchronized long handleRequestFromQueryPool(long size) {
128134
return size;
129135
}
130136

137+
/**
138+
* Used by task thread to find its memory pool to release self's memory resource when exiting.
139+
*/
140+
public MemoryPool getCorrespondingTaskMemoryPool(String threadName) {
141+
return threadName2TaskMemoryPoolMap.getOrDefault(threadName, null);
142+
}
143+
144+
public void bindCorrespondingTaskMemoryPool(String threadName, MemoryPool memoryPool) {
145+
threadName2TaskMemoryPoolMap.computeIfAbsent(threadName, key -> memoryPool);
146+
}
147+
148+
public void removeCorrespondingTaskMemoryPool(String threadName) {
149+
threadName2TaskMemoryPoolMap.remove(threadName);
150+
}
151+
131152
public Queue<MemoryPool> getCurrentQueryMemoryPools() {
132153
return new PriorityQueue<>(queryMemoryPools);
133154
}

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/arbitrator/MemoryArbitrator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public interface MemoryArbitrator {
2323

24-
long MAX_WAIT_TIME_FOR_LOCAL_RECLAIM = 1000;
24+
long MAX_WAIT_TIME_FOR_LOCAL_RECLAIM = 2000;
2525

2626
long MAX_WAIT_TIME_FOR_GLOBAL_RECLAIM = 5000;
2727

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/AbstractMemoryPool.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,19 @@
2727
import org.apache.hugegraph.memory.MemoryManager;
2828
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
2929
import org.apache.hugegraph.memory.pool.impl.MemoryPoolStats;
30+
import org.jetbrains.annotations.TestOnly;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
public abstract class AbstractMemoryPool implements MemoryPool {
3435

3536
private static final Logger LOG = LoggerFactory.getLogger(AbstractMemoryPool.class);
36-
private final Queue<MemoryPool> children =
37+
protected final Queue<MemoryPool> children =
3738
new PriorityQueue<>((o1, o2) -> (int) (o2.getFreeBytes() - o1.getFreeBytes()));
3839
protected final MemoryManager memoryManager;
39-
protected final ReentrantLock arbitrationLock = new ReentrantLock();
40-
protected final Condition condition = arbitrationLock.newCondition();
40+
// Allocation, deAllocation, arbitration must be serial which is controlled by this lock.
41+
protected final ReentrantLock memoryActionLock = new ReentrantLock();
42+
protected final Condition condition = memoryActionLock.newCondition();
4143
protected final AtomicBoolean isBeingArbitrated = new AtomicBoolean(false);
4244
protected final MemoryPoolStats stats;
4345
protected boolean isClosed = false;
@@ -60,7 +62,7 @@ public long tryToReclaimLocalMemory(long neededBytes) {
6062
long totalReclaimedBytes = 0;
6163
long currentNeededBytes = neededBytes;
6264
try {
63-
this.arbitrationLock.lock();
65+
this.memoryActionLock.lock();
6466
this.isBeingArbitrated.set(true);
6567
for (MemoryPool child : this.children) {
6668
long reclaimedMemory = child.tryToReclaimLocalMemory(currentNeededBytes);
@@ -79,20 +81,23 @@ public long tryToReclaimLocalMemory(long neededBytes) {
7981
totalReclaimedBytes, neededBytes);
8082
return totalReclaimedBytes;
8183
} finally {
82-
this.stats.setNumShrinks(this.stats.getNumShrinks() + 1);
84+
if (totalReclaimedBytes > 0) {
85+
this.stats.setNumShrinks(this.stats.getNumShrinks() + 1);
86+
}
8387
this.stats.setAllocatedBytes(
8488
this.stats.getAllocatedBytes() - totalReclaimedBytes);
8589
this.isBeingArbitrated.set(false);
86-
this.arbitrationLock.unlock();
8790
this.condition.signalAll();
91+
this.memoryActionLock.unlock();
8892
}
8993
}
9094

9195
/**
9296
* called when one layer pool is successfully executed and exited.
9397
*/
9498
@Override
95-
public synchronized void releaseSelf(String reason) {
99+
public void releaseSelf(String reason) {
100+
this.memoryActionLock.lock();
96101
try {
97102
if (this.isBeingArbitrated.get()) {
98103
this.condition.await();
@@ -109,6 +114,7 @@ public synchronized void releaseSelf(String reason) {
109114
LOG.error("Failed to release self because ", e);
110115
Thread.currentThread().interrupt();
111116
} finally {
117+
this.memoryActionLock.unlock();
112118
// Make these objs be GCed by JVM quickly.
113119
this.parent = null;
114120
this.children.clear();
@@ -198,4 +204,9 @@ public MemoryPool findRootQueryPool() {
198204
}
199205
return getParentPool().findRootQueryPool();
200206
}
207+
208+
@TestOnly
209+
public int getChildrenCount() {
210+
return this.children.size();
211+
}
201212
}

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/MemoryPool.java

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.hugegraph.memory.consumer.MemoryConsumer;
2121
import org.apache.hugegraph.memory.pool.impl.MemoryPoolStats;
2222
import org.apache.hugegraph.memory.util.QueryOutOfMemoryException;
23+
import org.jetbrains.annotations.TestOnly;
2324

2425
public interface MemoryPool {
2526

@@ -51,5 +52,10 @@ public interface MemoryPool {
5152

5253
MemoryPool findRootQueryPool();
5354

55+
MemoryPool addChildPool();
56+
5457
void bindMemoryConsumer(MemoryConsumer memoryConsumer);
58+
59+
@TestOnly
60+
int getChildrenCount();
5561
}

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/MemoryPoolStats.java

+4
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ public class MemoryPoolStats {
2222
private final String memoryPoolName;
2323
private long maxCapacity;
2424
private long usedBytes;
25+
// it represents the cumulative used bytes.
2526
private long cumulativeBytes;
2627
private long allocatedBytes;
2728

29+
// it represents the shrinking num of allocatedBytes
2830
private long numShrinks;
31+
// for query pool, it represents the enlarging num of maxCapacity; for other pools, it
32+
// represents the enlarging num of allocatedBytes
2933
private long numExpands;
3034
private long numAborts;
3135

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/OperatorMemoryPool.java

+21-10
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,27 @@ public OperatorMemoryPool(MemoryPool parent, String poolName,
4343
this.memoryConsumers = new HashSet<>();
4444
}
4545

46+
@Override
47+
public MemoryPool addChildPool() {
48+
throw new UnsupportedOperationException();
49+
}
50+
4651
@Override
4752
public void bindMemoryConsumer(MemoryConsumer memoryConsumer) {
4853
this.memoryConsumers.add(memoryConsumer);
4954
}
5055

5156
@Override
52-
public synchronized void releaseSelf(String reason) {
57+
public void releaseSelf(String reason) {
58+
super.releaseSelf(reason);
59+
// since it is already closed, its stats will not be updated. so here we can use its
60+
// stats out of memoryActionLock.
5361
this.memoryAllocator.returnMemoryToManager(getAllocatedBytes());
62+
// release memory consumer, release byte buffer.
5463
this.memoryConsumers.forEach(memoryConsumer -> {
5564
memoryConsumer.getAllMemoryBlock().forEach(memoryAllocator::releaseMemoryBlock);
5665
});
5766
this.memoryConsumers.clear();
58-
super.releaseSelf(reason);
59-
// TODO: release memory consumer, release byte buffer.
6067
}
6168

6269
@Override
@@ -66,11 +73,12 @@ public long tryToReclaimLocalMemory(long neededBytes) {
6673
return 0;
6774
}
6875
LOG.info("[{}] tryToReclaimLocalMemory: neededBytes={}", this, neededBytes);
76+
long reclaimableBytes = 0;
6977
try {
70-
this.arbitrationLock.lock();
78+
this.memoryActionLock.lock();
7179
this.isBeingArbitrated.set(true);
7280
// 1. try to reclaim self free memory
73-
long reclaimableBytes = getFreeBytes();
81+
reclaimableBytes = getFreeBytes();
7482
// try its best to reclaim memory
7583
if (reclaimableBytes <= neededBytes) {
7684
// 2. update stats
@@ -93,9 +101,12 @@ public long tryToReclaimLocalMemory(long neededBytes) {
93101

94102
return neededBytes;
95103
} finally {
104+
if (reclaimableBytes > 0) {
105+
this.stats.setNumShrinks(this.stats.getNumShrinks() + 1);
106+
}
96107
this.isBeingArbitrated.set(false);
97-
this.arbitrationLock.unlock();
98108
this.condition.signalAll();
109+
this.memoryActionLock.unlock();
99110
}
100111
}
101112

@@ -106,9 +117,9 @@ public long tryToReclaimLocalMemory(long neededBytes) {
106117
public Object requireMemory(long bytes) {
107118
try {
108119
// use lock to ensure the atomicity of the two-step operation
109-
this.arbitrationLock.lock();
110-
long realBytes = requestMemoryInternal(bytes);
111-
return tryToAcquireMemoryInternal(realBytes);
120+
this.memoryActionLock.lock();
121+
long ignoredRealAllocatedBytes = requestMemoryInternal(bytes);
122+
return tryToAcquireMemoryInternal(bytes);
112123
} catch (QueryOutOfMemoryException e) {
113124
// Abort this query
114125
LOG.warn("[{}] detected an OOM exception when request memory, will ABORT this " +
@@ -117,7 +128,7 @@ public Object requireMemory(long bytes) {
117128
findRootQueryPool().releaseSelf(String.format(e.getMessage()));
118129
return null;
119130
} finally {
120-
this.arbitrationLock.unlock();
131+
this.memoryActionLock.unlock();
121132
}
122133
}
123134

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/QueryMemoryPool.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.hugegraph.memory.pool.impl;
1919

20+
import static org.apache.hugegraph.memory.MemoryManager.DELIMINATOR;
21+
2022
import org.apache.hugegraph.memory.MemoryManager;
2123
import org.apache.hugegraph.memory.pool.AbstractMemoryPool;
24+
import org.apache.hugegraph.memory.pool.MemoryPool;
2225
import org.apache.hugegraph.memory.util.MemoryManageUtils;
2326
import org.apache.hugegraph.util.Bytes;
2427
import org.slf4j.Logger;
@@ -27,6 +30,7 @@
2730
public class QueryMemoryPool extends AbstractMemoryPool {
2831

2932
private static final Logger LOG = LoggerFactory.getLogger(QueryMemoryPool.class);
33+
private static final String TASK_MEMORY_POOL_NAME_PREFIX = "TaskMemoryPool";
3034
// TODO: read from conf
3135
private static final long QUERY_POOL_MAX_CAPACITY = Bytes.MB * 100;
3236

@@ -35,6 +39,18 @@ public QueryMemoryPool(String poolName, MemoryManager memoryManager) {
3539
this.stats.setMaxCapacity(QUERY_POOL_MAX_CAPACITY);
3640
}
3741

42+
@Override
43+
public MemoryPool addChildPool() {
44+
int count = this.children.size();
45+
String poolName =
46+
TASK_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
47+
System.currentTimeMillis();
48+
MemoryPool taskMemoryPool = new TaskMemoryPool(this, poolName, this.memoryManager);
49+
this.children.add(taskMemoryPool);
50+
LOG.info("QueryPool-{} added task memory pool {}", this, taskMemoryPool);
51+
return taskMemoryPool;
52+
}
53+
3854
@Override
3955
public long requestMemoryInternal(long bytes) {
4056
if (this.isClosed) {
@@ -83,8 +99,10 @@ private long tryToExpandSelfCapacity(long size) {
8399
private long requestMemoryThroughArbitration(long bytes) {
84100
LOG.info("[{}] try to request memory from manager through arbitration: size={}", this,
85101
bytes);
86-
this.stats.setNumExpands(this.stats.getNumExpands() + 1);
87102
long reclaimedBytes = this.memoryManager.triggerLocalArbitration(this, bytes);
103+
if (reclaimedBytes > 0) {
104+
this.stats.setNumExpands(this.stats.getNumExpands() + 1);
105+
}
88106
// 1. if arbitrate successes, update stats and return success
89107
if (reclaimedBytes - bytes >= 0) {
90108
// here we don't update capacity & reserved & allocated, because memory is

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/TaskMemoryPool.java

+26
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.hugegraph.memory.pool.impl;
1919

20+
import static org.apache.hugegraph.memory.MemoryManager.DELIMINATOR;
21+
2022
import org.apache.hugegraph.memory.MemoryManager;
23+
import org.apache.hugegraph.memory.allocator.NettyMemoryAllocator;
2124
import org.apache.hugegraph.memory.pool.AbstractMemoryPool;
2225
import org.apache.hugegraph.memory.pool.MemoryPool;
2326
import org.apache.hugegraph.memory.util.QueryOutOfMemoryException;
@@ -27,11 +30,33 @@
2730
public class TaskMemoryPool extends AbstractMemoryPool {
2831

2932
private static final Logger LOG = LoggerFactory.getLogger(TaskMemoryPool.class);
33+
private static final String OPERATOR_MEMORY_POOL_NAME_PREFIX = "OperatorMemoryPool";
3034

3135
public TaskMemoryPool(MemoryPool parent, String poolName, MemoryManager memoryManager) {
3236
super(parent, poolName, memoryManager);
3337
}
3438

39+
@Override
40+
public synchronized void releaseSelf(String reason) {
41+
this.memoryManager.removeCorrespondingTaskMemoryPool(Thread.currentThread().getName());
42+
super.releaseSelf(reason);
43+
}
44+
45+
@Override
46+
public MemoryPool addChildPool() {
47+
int count = this.children.size();
48+
String poolName =
49+
OPERATOR_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
50+
System.currentTimeMillis();
51+
MemoryPool operatorPool =
52+
new OperatorMemoryPool(this, poolName,
53+
new NettyMemoryAllocator(this.memoryManager),
54+
this.memoryManager);
55+
this.children.add(operatorPool);
56+
LOG.info("TaskPool-{} added operator memory pool {}", this, operatorPool);
57+
return operatorPool;
58+
}
59+
3560
@Override
3661
public long requestMemoryInternal(long bytes) throws QueryOutOfMemoryException {
3762
if (this.isClosed) {
@@ -45,6 +70,7 @@ public long requestMemoryInternal(long bytes) throws QueryOutOfMemoryException {
4570
long parentRes = getParentPool().requestMemoryInternal(bytes);
4671
if (parentRes > 0) {
4772
this.stats.setAllocatedBytes(this.stats.getAllocatedBytes() + parentRes);
73+
this.stats.setNumExpands(this.stats.getNumExpands() + 1);
4874
}
4975
return parentRes;
5076
} catch (InterruptedException e) {
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,22 @@
1-
package org.apache.hugegraph.core.memory;public class MemoryConsumerTest {
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.core.memory;
19+
20+
public class MemoryConsumerTest extends MemoryManageTest {
21+
222
}

0 commit comments

Comments
 (0)