From 2abaec4fcf67d837b9761fe31692244e05430ed2 Mon Sep 17 00:00:00 2001 From: "alan578.zhao" <956322745@qq.com> Date: Thu, 13 Feb 2025 16:17:08 +0800 Subject: [PATCH 1/2] #2728 --- .../apache/hugegraph/StandardHugeGraph.java | 55 ++++++++++++------- .../backend/cache/CachedGraphTransaction.java | 22 +++++--- .../backend/tx/GraphTransaction.java | 6 ++ 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index eb991c0f68..834811817d 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -17,6 +17,7 @@ package org.apache.hugegraph; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -1615,37 +1616,51 @@ public SysTransaction(HugeGraphParams graph, BackendStore store) { private static class AbstractCacheNotifier implements CacheNotifier { + public static final Logger LOG = Log.logger(AbstractCacheNotifier.class); + private final EventHub hub; private final EventListener cacheEventListener; public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) { this.hub = hub; this.cacheEventListener = event -> { - Object[] args = event.args(); - E.checkArgument(args.length > 0 && args[0] instanceof String, - "Expect event action argument"); - if (Cache.ACTION_INVALIDED.equals(args[0])) { - event.checkArgs(String.class, HugeType.class, Object.class); - HugeType type = (HugeType) args[1]; - Object ids = args[2]; - if (ids instanceof Id[]) { - // argument type mismatch: proxy.invalid2(type,Id[]ids) - proxy.invalid2(type, (Id[]) ids); - } else if (ids instanceof Id) { - proxy.invalid(type, (Id) ids); - } else { - E.checkArgument(false, "Unexpected argument: %s", ids); + try { + LOG.info("Received event: {}", event); + Object[] args = event.args(); + E.checkArgument(args.length > 0 && args[0] instanceof String, + "Expect event action argument"); + String action = (String) args[0]; + LOG.debug("Event action: {}", action); + if (Cache.ACTION_INVALIDED.equals(action)) { + event.checkArgs(String.class, HugeType.class, Object.class); + HugeType type = (HugeType) args[1]; + Object ids = args[2]; + if (ids instanceof Id[]) { + LOG.debug("Calling proxy.invalid2 with type: {}, IDs: {}", type, Arrays.toString((Id[]) ids)); + proxy.invalid2(type, (Id[]) ids); + } else if (ids instanceof Id) { + LOG.debug("Calling proxy.invalid with type: {}, ID: {}", type, ids); + proxy.invalid(type, (Id) ids); + } else { + LOG.error("Unexpected argument: {}", ids); + E.checkArgument(false, "Unexpected argument: %s", ids); + } + return true; + } else if (Cache.ACTION_CLEARED.equals(action)) { + event.checkArgs(String.class, HugeType.class); + HugeType type = (HugeType) args[1]; + LOG.debug("Calling proxy.clear with type: {}", type); + proxy.clear(type); + return true; } - return true; - } else if (Cache.ACTION_CLEARED.equals(args[0])) { - event.checkArgs(String.class, HugeType.class); - HugeType type = (HugeType) args[1]; - proxy.clear(type); - return true; + } catch (Exception e) { + LOG.error("Error processing cache event: {}", e.getMessage(), e); } + LOG.warn("Event {} not handled",event); return false; }; this.hub.listen(Events.CACHE, this.cacheEventListener); + LOG.info("Cache event listener registered successfully. cacheEventListener {}",this.cacheEventListener); } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java index 83ab7f51ad..cbf23e14d5 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java @@ -133,7 +133,9 @@ private void listenChanges() { } return false; }; - this.store().provider().listen(this.storeEventListener); + if(storeEventListenStatus.putIfAbsent(this.params().name(),true)==null){ + this.store().provider().listen(this.storeEventListener); + } // Listen cache event: "cache"(invalid cache item) this.cacheEventListener = event -> { @@ -182,19 +184,21 @@ private void listenChanges() { } return false; }; - EventHub graphEventHub = this.params().graphEventHub(); - if (!graphEventHub.containsListener(Events.CACHE)) { + if(graphCacheListenStatus.putIfAbsent(this.params().name(),true)==null){ + EventHub graphEventHub = this.params().graphEventHub(); graphEventHub.listen(Events.CACHE, this.cacheEventListener); } } private void unlistenChanges() { - // Unlisten store event - this.store().provider().unlisten(this.storeEventListener); - - // Unlisten cache event - EventHub graphEventHub = this.params().graphEventHub(); - graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); + String graphName = this.params().name(); + if (graphCacheListenStatus.remove(graphName) != null) { + EventHub graphEventHub = this.params().graphEventHub(); + graphEventHub.unlisten(Events.CACHE, this.cacheEventListener); + } + if (storeEventListenStatus.remove(graphName) != null) { + this.store().provider().unlisten(this.storeEventListener); + } } private void notifyChanges(String action, HugeType type, Id[] ids) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java index 7f441574eb..957249a7c6 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -139,6 +140,11 @@ public class GraphTransaction extends IndexableTransaction { private final int verticesCapacity; private final int edgesCapacity; + protected static final ConcurrentHashMap graphCacheListenStatus = + new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap storeEventListenStatus = + new ConcurrentHashMap<>(); + public GraphTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); From 3135e170b366f9a773efb0133d40a900ebd4fe08 Mon Sep 17 00:00:00 2001 From: imbajin Date: Tue, 1 Apr 2025 15:57:23 +0800 Subject: [PATCH 2/2] fix some typo & tiny improve --- .../apache/hugegraph/StandardHugeGraph.java | 8 +- .../backend/tx/GraphTransaction.java | 88 +++++++++---------- 2 files changed, 44 insertions(+), 52 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index 834811817d..ed9cd42349 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -578,11 +578,7 @@ private BackendStoreProvider loadStoreProvider() { private AbstractSerializer serializer() { String name = this.configuration.get(CoreOptions.SERIALIZER); LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name); - AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name); - if (serializer == null) { - throw new HugeException("Can't load serializer with name " + name); - } - return serializer; + return SerializerFactory.serializer(this.configuration, name); } private Analyzer analyzer() { @@ -598,7 +594,7 @@ protected void reloadRamtable() { } protected void reloadRamtable(boolean loadFromFile) { - // Expect triggered manually, like gremlin job + // Expect triggered manually, like a gremlin job if (this.ramtable != null) { this.ramtable.reload(loadFromFile, this.name); } else { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java index 957249a7c6..e50fa5c6f8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java @@ -145,7 +145,6 @@ public class GraphTransaction extends IndexableTransaction { protected static final ConcurrentHashMap storeEventListenStatus = new ConcurrentHashMap<>(); - public GraphTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); @@ -406,8 +405,8 @@ protected void prepareDeletions(Map removedVertices, /* * If the backend stores vertex together with edges, it's edges * would be removed after removing vertex. Otherwise, if the - * backend stores vertex which is separated from edges, it's - * edges should be removed manually when removing vertex. + * backend stores vertex which is separated from edges, + * its edges should be removed manually when removing vertex. */ this.doRemove(this.serializer.writeVertex(v.prepareRemoved())); this.indexTx.updateVertexIndex(v, true); @@ -441,7 +440,7 @@ protected void prepareUpdates(Set> addedProps, if (this.store().features().supportsUpdateVertexProperty()) { // Update vertex index without removed property this.indexTx.updateVertexIndex(prop.element(), false); - // Eliminate the property(OUT and IN owner edge) + // Eliminate the property (OUT and IN owner edge) this.doEliminate(this.serializer.writeVertexProperty(prop)); } else { // Override vertex @@ -453,12 +452,12 @@ protected void prepareUpdates(Set> addedProps, if (this.store().features().supportsUpdateEdgeProperty()) { // Update edge index without removed property this.indexTx.updateEdgeIndex(prop.element(), false); - // Eliminate the property(OUT and IN owner edge) + // Eliminate the property (OUT and IN owner edge) this.doEliminate(this.serializer.writeEdgeProperty(prop)); this.doEliminate(this.serializer.writeEdgeProperty( prop.switchEdgeOwner())); } else { - // Override edge(it will be in addedEdges & updatedEdges) + // Override edge (it will be in addedEdges & updatedEdges) this.addEdge(prop.element()); } } @@ -470,7 +469,7 @@ protected void prepareUpdates(Set> addedProps, if (this.store().features().supportsUpdateVertexProperty()) { // Update vertex index with new added property this.indexTx.updateVertexIndex(prop.element(), false); - // Append new property(OUT and IN owner edge) + // Append new property (OUT and IN owner edge) this.doAppend(this.serializer.writeVertexProperty(prop)); } else { // Override vertex @@ -480,9 +479,9 @@ protected void prepareUpdates(Set> addedProps, assert p.element().type().isEdge(); HugeEdgeProperty prop = (HugeEdgeProperty) p; if (this.store().features().supportsUpdateEdgeProperty()) { - // Update edge index with new added property + // Update edge-index with new added property this.indexTx.updateEdgeIndex(prop.element(), false); - // Append new property(OUT and IN owner edge) + // Append new property (OUT and IN owner edge) this.doAppend(this.serializer.writeEdgeProperty(prop)); this.doAppend(this.serializer.writeEdgeProperty( prop.switchEdgeOwner())); @@ -566,12 +565,12 @@ public Number queryNumber(Query query) { QueryList queries = this.optimizeQueries(query, q -> { boolean isIndexQuery = q instanceof IdQuery; assert isIndexQuery || isConditionQuery || q == query; - // Need to fallback if there are uncommitted records + // Need to fall back if there are uncommitted records boolean fallback = hasUpdate; Number result; if (fallback) { - // Here just ignore it, and do fallback later + // Here just ignore it, and do fall back later result = null; } else if (!isIndexQuery || !isConditionQuery) { // It's a sysprop-query, let parent tx do it @@ -584,7 +583,7 @@ public Number queryNumber(Query query) { assert query instanceof ConditionQuery; OptimizedType optimized = ((ConditionQuery) query).optimized(); if (this.optimizeAggrByIndex && optimized == OptimizedType.INDEX) { - // The ids size means results count (assume no left index) + // The id's size means result count (assume no left index) result = q.idsSize(); } else { assert !fallback; @@ -593,7 +592,7 @@ public Number queryNumber(Query query) { } } - // Can't be optimized, then do fallback + // Can't be optimized, then do fall back if (fallback) { assert result == null; assert q.resultType().isVertex() || q.resultType().isEdge(); @@ -635,7 +634,7 @@ public HugeVertex addVertex(HugeVertex vertex) { /* * No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label * update only can add nullable properties and user data, which is - * unconcerned with add vertex + * unconcerned with added vertex */ this.beforeWrite(); this.addedVertices.put(vertex.id(), vertex); @@ -768,21 +767,16 @@ public Iterator queryServerInfos(Object... vertexIds) { return this.queryVerticesByIds(vertexIds, false, false, HugeType.SERVER); } - return this.queryVerticesByIds(vertexIds, false, false, - HugeType.VERTEX); + return this.queryVerticesByIds(vertexIds, false, false, HugeType.VERTEX); } - protected Iterator queryVerticesByIds(Object[] vertexIds, - boolean adjacentVertex, - boolean checkMustExist) { - return this.queryVerticesByIds(vertexIds, adjacentVertex, checkMustExist, - HugeType.VERTEX); + protected Iterator queryVerticesByIds(Object[] vertexIds, boolean adjacentVertex, + boolean checkMustExist) { + return this.queryVerticesByIds(vertexIds, adjacentVertex, checkMustExist, HugeType.VERTEX); } - protected Iterator queryVerticesByIds(Object[] vertexIds, - boolean adjacentVertex, - boolean checkMustExist, - HugeType type) { + protected Iterator queryVerticesByIds(Object[] vertexIds, boolean adjacentVertex, + boolean checkMustExist, HugeType type) { Query.checkForceCapacity(vertexIds.length); // NOTE: allowed duplicated vertices if query by duplicated ids @@ -896,7 +890,7 @@ public HugeEdge addEdge(HugeEdge edge) { /* * No need to lock EDGE_LABEL_ADD_UPDATE, because edge label * update only can add nullable properties and user data, which is - * unconcerned with add edge + * unconcerned with added edge */ this.beforeWrite(); this.addedEdges.put(edge.id(), edge); @@ -1138,7 +1132,7 @@ public void addVertexProperty(HugeVertexProperty prop) { E.checkState(vertex != null, "No owner for updating property '%s'", prop.key()); - // Add property in memory for new created vertex + // Add property in memory for newly created vertex if (vertex.fresh()) { // The owner will do property update vertex.setProperty(prop); @@ -1183,7 +1177,7 @@ public void removeVertexProperty(HugeVertexProperty prop) { List primaryKeyIds = vertex.schemaLabel().primaryKeys(); E.checkArgument(!primaryKeyIds.contains(propKey.id()), "Can't remove primary key '%s'", prop.key()); - // Remove property in memory for new created vertex + // Remove property in memory for newly created vertex if (vertex.fresh()) { // The owner will do property update vertex.removeProperty(propKey.id()); @@ -1216,7 +1210,7 @@ public void addEdgeProperty(HugeEdgeProperty prop) { E.checkState(edge != null, "No owner for updating property '%s'", prop.key()); - // Add property in memory for new created edge + // Add property in memory for newly created edge if (edge.fresh()) { // The owner will do property update edge.setProperty(prop); @@ -1256,11 +1250,11 @@ public void removeEdgeProperty(HugeEdgeProperty prop) { if (!edge.hasProperty(propKey.id())) { return; } - // Check is removing sort key + // Check is removing a sort key List sortKeyIds = edge.schemaLabel().sortKeys(); E.checkArgument(!sortKeyIds.contains(prop.propertyKey().id()), "Can't remove sort key '%s'", prop.key()); - // Remove property in memory for new created edge + // Remove property in memory for newly created edge if (edge.fresh()) { // The owner will do property update edge.removeProperty(propKey.id()); @@ -1286,7 +1280,7 @@ public void removeEdgeProperty(HugeEdgeProperty prop) { } /** - * Construct one edge condition query based on source vertex, direction and + * Construct one-edge condition query based on source vertex, direction and * edge labels * * @param sourceVertex source vertex of edge @@ -1346,8 +1340,8 @@ public static ConditionQuery constructEdgesQuery(Id sourceVertex, } private static ConditionQuery constructEdgesQuery(Id sourceVertex, - Directions direction, - List edgeLabels) { + Directions direction, + List edgeLabels) { E.checkState(sourceVertex != null, "The edge query must contain source vertex"); E.checkState(direction != null, @@ -1460,7 +1454,7 @@ private static void verifyEdgesConditionQuery(ConditionQuery query) { /* * Supported query: * 1.query just by edge label - * 2.query just by PROPERTIES (like containsKey,containsValue) + * 2.query just by PROPERTIES (like containsKey, containsValue) * 3.query with scan */ if (query.containsCondition(HugeKeys.LABEL) || @@ -1576,8 +1570,8 @@ private Query optimizeQuery(ConditionQuery query) { } if (vertexIdList.size() != filterVertexList.size()) { - // Modify on the copied relation to avoid affecting other query - Condition.Relation relation = + // Modify on the copied relation to avoid affecting another query + Condition.Relation relation = query.copyRelationAndUpdateQuery(HugeKeys.OWNER_VERTEX); relation.value(filterVertexList); } @@ -1609,7 +1603,8 @@ private Query optimizeQuery(ConditionQuery query) { */ query.resetUserpropConditions(); - if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && query.condition(HugeKeys.SUB_LABEL) == null) { + if (this.storeFeatures().supportsFatherAndSubEdgeLabel() && + query.condition(HugeKeys.SUB_LABEL) == null) { query.eq(HugeKeys.SUB_LABEL, el.id()); } LOG.debug("Query edges by sortKeys: {}", query); @@ -1619,7 +1614,7 @@ private Query optimizeQuery(ConditionQuery query) { /* * Query only by sysprops, like: by vertex label, by edge label. - * NOTE: we assume sysprops would be indexed by backend store + * NOTE: we assume sysprops would be indexed by backend store, * but we don't support query edges only by direction/target-vertex. */ if (query.allSysprop()) { @@ -1848,7 +1843,7 @@ private void lockForUpdateProperty(SchemaLabel schemaLabel, } /* * No need to lock INDEX_LABEL_ADD_UPDATE, because index label - * update only can add user data, which is unconcerned with + * update only can add user data, which is unconcerned with * update property */ this.beforeWrite(); @@ -1925,7 +1920,8 @@ private boolean rightResultFromIndexQuery(Query query, HugeElement elem) { } if (cq.optimized() == OptimizedType.INDEX) { // g.E().hasLabel(xxx).has(yyy) - // consider OptimizedType.INDEX_FILTER occurred in org.apache.hugegraph.core.EdgeCoreTest.testQueryCount + // consider OptimizedType.INDEX_FILTER occurred in org.apache.hugegraph.core + // .EdgeCoreTest.testQueryCount try { this.indexTx.asyncRemoveIndexLeft(cq, elem); } catch (Throwable e) { @@ -1940,7 +1936,7 @@ private boolean rightResultFromIndexQuery(Query query, HugeElement elem) { if (cq.existLeftIndex(elem.id())) { /* * Both have correct and left index, wo should return true - * but also needs to cleaned up left index + * but also needs to clean up left index */ try { this.indexTx.asyncRemoveIndexLeft(cq, elem); @@ -2073,8 +2069,8 @@ private Iterator joinTxRecords( Set txResults = InsertionOrderUtil.newSet(); /* - * Collect added/updated records - * Records in memory have higher priority than query from backend store + * Collect added/updated records. + * Records in memory have higher priority than a query from backend store */ for (V elem : addedTxRecords.values()) { if (query.reachLimit(txResults.size())) { @@ -2281,7 +2277,7 @@ private void traverseByLabel(SchemaLabel label, while (iter.hasNext()) { consumer.accept(iter.next()); /* - * Commit per batch to avoid too much data in single commit, + * Commit per batch to avoid too much data in a single commit, * especially for Cassandra backend */ this.commitIfGtSize(GraphTransaction.COMMIT_BATCH); @@ -2306,7 +2302,7 @@ private void traverseByLabel(SchemaLabel label, if (label.equals(elemLabel)) { consumer.accept(e); /* - * Commit per batch to avoid too much data in single + * Commit per batch to avoid too much data in a single * commit, especially for Cassandra backend */ this.commitIfGtSize(GraphTransaction.COMMIT_BATCH);