feat(server): adoption for off-heap memory management in kout module#2704
feat(server): adoption for off-heap memory management in kout module#2704Pengzna wants to merge 6 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2704 +/- ##
============================================
- Coverage 46.99% 37.56% -9.43%
+ Complexity 821 588 -233
============================================
Files 745 756 +11
Lines 60062 62108 +2046
Branches 7670 8068 +398
============================================
- Hits 28225 23330 -4895
- Misses 29014 36266 +7252
+ Partials 2823 2512 -311 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| "'{}', max degree '{}', capacity '{}' and limit '{}'", | ||
| graph, source, direction, edgeLabel, depth, | ||
| nearest, maxDegree, capacity, limit); | ||
| MemoryPool queryPool = MemoryManager.getInstance().addQueryMemoryPool(); |
There was a problem hiding this comment.
can we bind a MemoryPool for each graph, or bind MemoryManager to graph?
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); | ||
| }); |
There was a problem hiding this comment.
add a method like graph.switchToMemoryPool("kout", "main")?
| }); | ||
|
|
||
| ApiMeasurer measure = new ApiMeasurer(); | ||
| try { |
There was a problem hiding this comment.
prefer to add a wrapper method for MemoryPool init-and-gc, then call the original method?
| return QueryResults.emptyIterator(); | ||
| } | ||
| if (needCacheVertex(vertex)) { | ||
| vertex.convertIdToOnHeapIfNeeded(); |
There was a problem hiding this comment.
it's ok to just call convert in HeapCache.update(), at the same time, we avoid modifying the code everywhere
| // NOTE: it's slower performance to use: | ||
| // String.format("%x-%s", type.code(), name) | ||
| return IdGenerator.of(type.string() + "-" + id.asString()); | ||
| return new IdGenerator.StringId(type.string() + "-" + id.asString()); |
There was a problem hiding this comment.
add a OnHeapIdGenerator.of() class?
| Thread.currentThread() | ||
| .getName()); | ||
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? |
There was a problem hiding this comment.
expect this.taskMemoryPool style
| @@ -58,7 +58,9 @@ public BinaryBackendEntry.BinaryId newBinaryId(byte[] bytes, Id id) { | |||
| .getCorrespondingTaskMemoryPool( | |||
| Thread.currentThread() | |||
| .getName()); | |||
There was a problem hiding this comment.
prefer by Thread id instead of Thread name to avoid duplicate names
| return null; | ||
| } | ||
|
|
||
| int count = queryMemoryPools.size(); |
There was a problem hiding this comment.
What is the difference between QueryMemoryPool and MemoryPool? if no difference, just naming MemoryPool is ok.
and can we make some special names for CorrespondingTaskMemoryPool and CurrentWorkingOperatorMemoryPool, such as RequestMemoryPool and RequestStageMemoryPool
| private final ExecutorService arbitrateExecutor; | ||
|
|
||
| private static MemoryMode MEMORY_MODE = MemoryMode.ENABLE_OFF_HEAP_MANAGEMENT; | ||
| private static MemoryMode MEMORY_MODE = MemoryMode.DISABLE_MEMORY_MANAGEMENT; |
There was a problem hiding this comment.
MEMORY_MODE style is only for const var, and if there is only a single MemoryManager, also remove static mark: private MemoryMode memoryMode
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
We can directly get the memory pool at one time MemoryManager.getInstance().currentMemoryPool(), it does the 2 steps:
getCorrespondingTaskMemoryPool, getCurrentWorkingOperatorMemoryPool.
There was a problem hiding this comment.
and can we rename some methods for a more concise and consistent style:
- getCorrespondingTaskMemoryPool => currentTaskMemoryPool
- getCurrentWorkingOperatorMemoryPool => currentOperatorMemoryPool
- MemoryPool.getSnapShot() => MemoryPool.getStats()
There was a problem hiding this comment.
also move MemoryPoolStats out from impl?
There was a problem hiding this comment.
prefer to get MemoryPool from threadlocal
|
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
javeme
left a comment
There was a problem hiding this comment.
This is a great improvement to our memory performance, looking forward to merging it soon.
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = | ||
| currentTaskPool.addChildPool("kout-consume-operation"); |
There was a problem hiding this comment.
the Consumers is a general class, seems it's not appropriate to put the upper-level kout code here
| this.runningFutures.add( | ||
| this.executor.submit(new ContextCallable<>(this::runAndDone))); | ||
| this.executor.submit( | ||
| new ContextCallable<>(() -> this.runAndDone(MemoryManager.getInstance() |
There was a problem hiding this comment.
should we move this TaskMemoryPool-bind code into ContextCallable?
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
and can we rename some methods for a more concise and consistent style:
- getCorrespondingTaskMemoryPool => currentTaskMemoryPool
- getCurrentWorkingOperatorMemoryPool => currentOperatorMemoryPool
- MemoryPool.getSnapShot() => MemoryPool.getStats()
| MemoryManager.getInstance() | ||
| .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), | ||
| (TaskMemoryPool) currentTaskPool); | ||
| MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); |
There was a problem hiding this comment.
we can let:
- MemoryPool.addChildPool return QueryMemoryPool
- QueryMemoryPool.addChildPool return TaskMemoryPool
- TaskMemoryPooll.addChildPool return OperatorMemoryPool
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
also move MemoryPoolStats out from impl?
| } | ||
| return manager.serializer(g, measure.measures()).writeList("vertices", ids); | ||
| } finally { | ||
| Optional.ofNullable(queryPool) |
| return new BinaryIdOffHeap(bytes, null, | ||
| return taskMemoryPool == null ? | ||
| new BinaryBackendEntry.BinaryId(bytes, id) : | ||
| new BinaryIdOffHeap(bytes, null, |
There was a problem hiding this comment.
prefer to get MemoryPool from threadlocal
| this.isOutEdge = true; | ||
| } | ||
|
|
||
| public void convertIdToOnHeapIfNeeded() { |
| return new HugeEdgeProperty<>(this, pkey, val); | ||
| Class<V> valueType = (Class<V>) val.getClass(); | ||
| PropertyFactory<V> propertyFactory = PropertyFactory.getInstance(valueType); | ||
| return propertyFactory.newHugeEdgeProperty(this, pkey, val); |
| new HugeVertexProperty<>(owner, key, value) : | ||
| new HugeVertexPropertyOffHeap<>( | ||
| taskMemoryPool.getCurrentWorkingOperatorMemoryPool(), owner, key, | ||
| value); |
There was a problem hiding this comment.
can we avoid bytes copy in serializeSelfToByteBuf
There was a problem hiding this comment.
- preter to let OffHeapObject.serializeSelfToByteBuf return ByteBuf.
- can we rename OffHeapObject.zeroCopyReadFromByteBuf to readAsHeapObject?
- OffHeapObject.getAllMemoryBlock is useless? will we release an object individually?
- HugeVertexPropertyOffHeap.isPresent optimize with 'valueOffHeap != null'
wip...