From 01a8119c5759da2c289d5a8742a09a8be3bc0e9a Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 20 Apr 2026 10:07:36 +0800 Subject: [PATCH] [common] Remove MapUtils.newConcurrentMap() to avoid dependencies conflict. --- .../fluss/client/admin/CreateAclsResult.java | 4 +- .../fluss/client/admin/DropAclsResult.java | 4 +- .../apache/fluss/client/admin/FlussAdmin.java | 4 +- .../client/metadata/ClientSchemaGetter.java | 4 +- .../write/DynamicWriteBatchSizeEstimator.java | 3 +- .../fluss/client/admin/FlussAdminITCase.java | 4 +- .../fluss/record/LogRecordReadContext.java | 3 +- .../apache/fluss/row/encode/ValueDecoder.java | 4 +- .../java/org/apache/fluss/utils/MapUtils.java | 57 ------------------- .../org/apache/fluss/fs/TestFileSystem.java | 4 +- .../fluss/fs/azure/MemoryFileSystem.java | 8 +-- .../flink/sink/undo/UndoRecoveryOperator.java | 3 +- .../undo/UndoRecoveryOperatorFactory.java | 5 +- .../enumerator/TieringSourceEnumerator.java | 8 +-- .../FlussMockSplitEnumeratorContext.java | 7 +-- .../fluss/lake/values/TestingValuesLake.java | 7 +-- .../fluss/rpc/metrics/ClientMetricGroup.java | 4 +- .../fluss/rpc/metrics/ConnectionMetrics.java | 4 +- .../fluss/rpc/netty/client/NettyClient.java | 4 +- .../rpc/netty/client/ServerConnection.java | 4 +- .../fluss/server/DynamicServerConfig.java | 6 +- .../CompletedSnapshotStoreManager.java | 3 +- .../lease/KvSnapshotLeaseHandler.java | 4 +- .../lease/KvSnapshotLeaseManager.java | 6 +- .../rebalance/RebalanceManager.java | 6 +- .../org/apache/fluss/server/kv/KvManager.java | 4 +- .../apache/fluss/server/log/LogManager.java | 4 +- .../server/log/remote/RemoteLogManager.java | 6 +- .../server/metadata/ServerSchemaCache.java | 6 +- .../fluss/server/metrics/UserMetrics.java | 4 +- .../metrics/group/CoordinatorMetricGroup.java | 6 +- .../metrics/group/TableMetricGroup.java | 4 +- .../group/TabletServerMetricGroup.java | 5 +- .../server/replica/AdjustIsrManager.java | 5 +- .../apache/fluss/server/replica/Replica.java | 5 +- .../fluss/server/replica/ReplicaManager.java | 4 +- .../delay/DelayedOperationManager.java | 4 +- .../lease/KvSnapshotLeaseHandlerTest.java | 5 +- .../KvSnapshotLeaseMetadataManagerTest.java | 3 +- .../TestingCompletedKvSnapshotCommitter.java | 4 +- tools/maven/checkstyle.xml | 7 +-- tools/maven/suppressions.xml | 2 - 42 files changed, 84 insertions(+), 164 deletions(-) delete mode 100644 fluss-common/src/main/java/org/apache/fluss/utils/MapUtils.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/CreateAclsResult.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/CreateAclsResult.java index 1c6eacdf84..4a272eee2a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/CreateAclsResult.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/CreateAclsResult.java @@ -20,12 +20,12 @@ import org.apache.fluss.rpc.messages.PbCreateAclRespInfo; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; -import org.apache.fluss.utils.MapUtils; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBinding; @@ -43,7 +43,7 @@ public class CreateAclsResult { private final Map> futures; public CreateAclsResult(Collection aclBindings) { - Map> futures = MapUtils.newConcurrentHashMap(); + Map> futures = new ConcurrentHashMap<>(); aclBindings.forEach(aclBinding -> futures.put(aclBinding, new CompletableFuture<>())); this.futures = futures; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/DropAclsResult.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/DropAclsResult.java index 4401ce26e1..12d38374a8 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/DropAclsResult.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/DropAclsResult.java @@ -24,7 +24,6 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; -import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; @@ -34,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBinding; @@ -80,7 +80,7 @@ public List values() { DropAclsResult(Collection filters) { final Map> futures = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); for (AclBindingFilter filter : filters) { if (!futures.containsKey(filter)) { futures.put(filter, new CompletableFuture<>()); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index a8398af6d7..a1d429c99a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -98,7 +98,6 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.FutureUtils; import javax.annotation.Nullable; @@ -111,6 +110,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterDatabaseRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest; @@ -542,7 +542,7 @@ private ListOffsetsResult listOffsets( buckets, offsetSpec, tableInfo.getTablePath()); - Map> bucketToOffsetMap = MapUtils.newConcurrentHashMap(); + Map> bucketToOffsetMap = new ConcurrentHashMap<>(); for (int bucket : buckets) { bucketToOffsetMap.put(bucket, new CompletableFuture<>()); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java index 486f807449..348fdb8e36 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java @@ -31,10 +31,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException; -import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap; /** Schema getter for client. */ @Internal @@ -50,7 +50,7 @@ public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admi this.tablePath = tablePath; this.latestSchemaInfo = latestSchemaInfo; this.admin = admin; - this.schemasById = newConcurrentHashMap(); + this.schemasById = new ConcurrentHashMap<>(); schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema()); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicWriteBatchSizeEstimator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicWriteBatchSizeEstimator.java index e3d011f995..afa5db88ef 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicWriteBatchSizeEstimator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicWriteBatchSizeEstimator.java @@ -19,7 +19,6 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.metadata.PhysicalTablePath; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,7 @@ public DynamicWriteBatchSizeEstimator( this.dynamicBatchSizeEnabled = dynamicBatchSizeEnabled; if (dynamicBatchSizeEnabled) { - this.estimatedBatchSizeMap = MapUtils.newConcurrentHashMap(); + this.estimatedBatchSizeMap = new ConcurrentHashMap<>(); } else { this.estimatedBatchSizeMap = null; } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index b9084ca3f2..e129d6a226 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -87,7 +87,6 @@ import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DataTypes; -import org.apache.fluss.utils.MapUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -107,6 +106,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -2261,7 +2261,7 @@ public CompletableFuture listOffsets( Map leaderToRequestMap = new HashMap<>(); leaderToRequestMap.put(1, request); - Map> bucketToOffsetMap = MapUtils.newConcurrentHashMap(); + Map> bucketToOffsetMap = new ConcurrentHashMap<>(); bucketToOffsetMap.put(0, new CompletableFuture<>()); bucketToOffsetMap.put(1, new CompletableFuture<>()); bucketToOffsetMap.put(2, new CompletableFuture<>()); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index b85547fc10..b65a3ac8e1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -33,7 +33,6 @@ import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.Projection; import javax.annotation.Nullable; @@ -63,7 +62,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo private final boolean projectionPushDowned; private final SchemaGetter schemaGetter; private final ConcurrentHashMap vectorSchemaRootMap = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); public static LogRecordReadContext createReadContext( TableInfo tableInfo, diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java index 794e6ad6be..5e243799cc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java @@ -27,9 +27,9 @@ import org.apache.fluss.types.DataType; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.row.encode.ValueEncoder.SCHEMA_ID_LENGTH; -import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap; /** * A decoder to decode a schema id and {@link BinaryRow} from a byte array value which is encoded by @@ -42,7 +42,7 @@ public class ValueDecoder { private final KvFormat kvFormat; public ValueDecoder(SchemaGetter schemaGetter, KvFormat kvFormat) { - this.rowDecoders = newConcurrentHashMap(); + this.rowDecoders = new ConcurrentHashMap<>(); this.schemaGetter = schemaGetter; this.kvFormat = kvFormat; } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/MapUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/MapUtils.java deleted file mode 100644 index 2ab59e9cb2..0000000000 --- a/fluss-common/src/main/java/org/apache/fluss/utils/MapUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.utils; - -import org.apache.commons.lang3.JavaVersion; -import org.apache.commons.lang3.SystemUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -/** Utilities that expand the usage of {@link Map}. */ -public class MapUtils { - - public static ConcurrentHashMap newConcurrentHashMap() { - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - return new ConcurrentHashMap<>(); - } else { - return new ConcurrentHashMapForJDK8<>(); - } - } - - /** - * For JDK8, there is a bug in {@code ConcurrentHashMap#computeIfAbsent}, checking the key - * existence to speed up. See details in JDK-8161372. - */ - private static class ConcurrentHashMapForJDK8 extends ConcurrentHashMap { - - public ConcurrentHashMapForJDK8() { - super(); - } - - @Override - public V computeIfAbsent(K key, Function mappingFunction) { - V result; - if (null == (result = get(key))) { - result = super.computeIfAbsent(key, mappingFunction); - } - return result; - } - } -} diff --git a/fluss-common/src/test/java/org/apache/fluss/fs/TestFileSystem.java b/fluss-common/src/test/java/org/apache/fluss/fs/TestFileSystem.java index 55e7132a3b..f232e217ef 100644 --- a/fluss-common/src/test/java/org/apache/fluss/fs/TestFileSystem.java +++ b/fluss-common/src/test/java/org/apache/fluss/fs/TestFileSystem.java @@ -21,11 +21,11 @@ import org.apache.fluss.fs.local.LocalDataOutputStream; import org.apache.fluss.fs.local.LocalFileStatus; import org.apache.fluss.fs.local.LocalFileSystem; -import org.apache.fluss.utils.MapUtils; import java.io.IOException; import java.net.URI; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -43,7 +43,7 @@ public class TestFileSystem extends LocalFileSystem { // current number of created, unclosed (output) stream private static final Map currentUnclosedOutputStream = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); private final Configuration configuration; diff --git a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java index f8e3a05e15..5b24e69e03 100644 --- a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java +++ b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java @@ -17,8 +17,6 @@ package org.apache.fluss.fs.azure; -import org.apache.fluss.utils.MapUtils; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; @@ -37,14 +35,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** Util file system abstraction. */ public class MemoryFileSystem extends FileSystem { private final URI uri; - private final Map files = MapUtils.newConcurrentHashMap(); - private final Set directories = - Collections.newSetFromMap(MapUtils.newConcurrentHashMap()); + private final Map files = new ConcurrentHashMap<>(); + private final Set directories = Collections.newSetFromMap(new ConcurrentHashMap<>()); public MemoryFileSystem(URI uri) { this.uri = uri; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperator.java index 67d215b379..da055f1f56 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperator.java @@ -29,7 +29,6 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.types.RowType; -import org.apache.fluss.utils.MapUtils; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -689,7 +688,7 @@ public Map getBucketOffsets() { * @param initialOffsets the initial bucket offsets */ protected void initializeBucketOffsets(Map initialOffsets) { - this.bucketOffsets = MapUtils.newConcurrentHashMap(); + this.bucketOffsets = new ConcurrentHashMap<>(); this.bucketOffsets.putAll(initialOffsets); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperatorFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperatorFactory.java index 3f286800fb..5945429a46 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperatorFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/undo/UndoRecoveryOperatorFactory.java @@ -36,8 +36,7 @@ import java.io.Serializable; import java.util.Map; import java.util.UUID; - -import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap; /** * Factory for creating {@link UndoRecoveryOperator} instances. @@ -346,7 +345,7 @@ private static class ProducerOffsetReporterHolder * called, it looks up the delegate from this registry. */ private static final Map DELEGATE_REGISTRY = - newConcurrentHashMap(); + new ConcurrentHashMap<>(); /** Unique ID for this holder, used to look up the delegate in the registry. */ private final String holderId; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index e81362fa84..89a10ee898 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -42,7 +42,6 @@ import org.apache.fluss.rpc.messages.PbLakeTieringStats; import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; -import org.apache.fluss.utils.MapUtils; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.SourceEvent; @@ -66,6 +65,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -134,9 +134,9 @@ public TieringSourceEnumerator( this.pollTieringTableIntervalMs = pollTieringTableIntervalMs; this.pendingSplits = Collections.synchronizedList(new ArrayList<>()); this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>()); - this.tieringTableEpochs = MapUtils.newConcurrentHashMap(); - this.finishedTables = MapUtils.newConcurrentHashMap(); - this.failedTableEpochs = MapUtils.newConcurrentHashMap(); + this.tieringTableEpochs = new ConcurrentHashMap<>(); + this.finishedTables = new ConcurrentHashMap<>(); + this.failedTableEpochs = new ConcurrentHashMap<>(); this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>()); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java index 2bbd11ef6f..8bb7bd3d1e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java @@ -18,8 +18,6 @@ package org.apache.fluss.flink.tiering.source.enumerator; -import org.apache.fluss.utils.MapUtils; - import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; @@ -27,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.apache.flink.util.Preconditions.checkState; @@ -44,12 +43,12 @@ class FlussMockSplitEnumeratorContext public FlussMockSplitEnumeratorContext(int parallelism) { super(parallelism); - this.registeredReaders = MapUtils.newConcurrentHashMap(); + this.registeredReaders = new ConcurrentHashMap<>(); } public void registerSourceReader(int subtaskId, int attemptNumber, String location) { final Map attemptReaders = - registeredReaders.computeIfAbsent(subtaskId, k -> MapUtils.newConcurrentHashMap()); + registeredReaders.computeIfAbsent(subtaskId, k -> new ConcurrentHashMap<>()); checkState( !attemptReaders.containsKey(attemptNumber), "ReaderInfo of subtask %s (#%s) already exists.", diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java index 218d7a6f88..77606a47d7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java @@ -20,7 +20,6 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.InternalRow; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -46,10 +46,9 @@ public class TestingValuesLake { private static final Logger LOG = LoggerFactory.getLogger(TestingValuesLake.class); - private static final Map globalTables = - MapUtils.newConcurrentHashMap(); + private static final Map globalTables = new ConcurrentHashMap<>(); private static final Map FAILURE_CONTROLLERS = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); public static TableFailureController failWhen(String tableId) { return FAILURE_CONTROLLERS.computeIfAbsent(tableId, k -> new TableFailureController()); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java index 02abeaac7d..44143d87f7 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java @@ -21,15 +21,15 @@ import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; -import org.apache.fluss.utils.MapUtils; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.ToLongFunction; /** The metric group for clients. */ public class ClientMetricGroup extends AbstractMetricGroup { private final Map nodeToConnectionMetrics = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); private static final String NAME = "client"; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java index 6b1fd90648..5b5a8b5441 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java @@ -20,13 +20,13 @@ import org.apache.fluss.metrics.Counter; import org.apache.fluss.metrics.ThreadSafeSimpleCounter; import org.apache.fluss.rpc.protocol.ApiKeys; -import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** Metrics for ServerConnection with {@link ClientMetricGroup} as parent group. */ @@ -38,7 +38,7 @@ public class ConnectionMetrics { private final ClientMetricGroup clientMetricGroup; /** Metrics for different request/response metrics with specify {@link ApiKeys}. */ - final Map metricsByRequestName = MapUtils.newConcurrentHashMap(); + final Map metricsByRequestName = new ConcurrentHashMap<>(); public ConnectionMetrics(String serverId, ClientMetricGroup clientMetricGroup) { this.serverId = serverId; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java index 2dc284e221..50e714d026 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java @@ -33,7 +33,6 @@ import org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocator; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.FutureUtils; import org.slf4j.Logger; @@ -45,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -79,7 +79,7 @@ public final class NettyClient implements RpcClient { private volatile boolean isClosed = false; public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) { - this.connections = MapUtils.newConcurrentHashMap(); + this.connections = new ConcurrentHashMap<>(); // build bootstrap this.eventGroup = diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java index fbb99f65c1..46ec080611 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java @@ -44,7 +44,6 @@ import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFutureListener; import org.apache.fluss.utils.ExponentialBackoff; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +58,7 @@ import java.util.ArrayDeque; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -72,7 +72,7 @@ final class ServerConnection { private final ServerNode node; // TODO: add max inflight requests limit like Kafka's "max.in.flight.requests.per.connection" - private final Map inflightRequests = MapUtils.newConcurrentHashMap(); + private final Map inflightRequests = new ConcurrentHashMap<>(); private final CompletableFuture closeFuture = new CompletableFuture<>(); private final ConnectionMetrics connectionMetrics; private final ClientAuthenticator authenticator; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index 634a91a1c7..dcec31b463 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -24,7 +24,6 @@ import org.apache.fluss.config.cluster.ConfigValidator; import org.apache.fluss.config.cluster.ServerReconfigurable; import org.apache.fluss.exception.ConfigException; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -69,11 +69,11 @@ class DynamicServerConfig { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Map, ServerReconfigurable> - serverReconfigures = MapUtils.newConcurrentHashMap(); + serverReconfigures = new ConcurrentHashMap<>(); /** Registered stateless config validators, organized by config key for efficient lookup. */ private final Map>> configValidatorsByKey = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); /** The initial configuration items when the server starts from server.yaml. */ private final Map initialConfigMap; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java index 1b739f9a2a..6e483853c9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java @@ -31,7 +31,6 @@ import org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +95,7 @@ public CompletedSnapshotStoreManager( maxNumberOfSnapshotsToRetain > 0, "maxNumberOfSnapshotsToRetain must be positive"); this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain; this.zooKeeperClient = zooKeeperClient; - this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap(); + this.bucketCompletedSnapshotStores = new ConcurrentHashMap<>(); this.ioExecutor = ioExecutor; this.snapshotInUseChecker = snapshotInUseChecker; this.makeZookeeperCompletedSnapshotHandleStore = makeZookeeperCompletedSnapshotHandleStore; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandler.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandler.java index 8570de048b..95e5ab1f51 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandler.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandler.java @@ -19,13 +19,13 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease; -import org.apache.fluss.utils.MapUtils; import javax.annotation.concurrent.NotThreadSafe; import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; /** handler of kv snapshot lease. */ @NotThreadSafe @@ -36,7 +36,7 @@ public class KvSnapshotLeaseHandler { private final Map tableIdToTableLease; public KvSnapshotLeaseHandler(long expirationTime) { - this(expirationTime, MapUtils.newConcurrentHashMap()); + this(expirationTime, new ConcurrentHashMap<>()); } public KvSnapshotLeaseHandler( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java index 7ae131b0a5..97ec1dd1a0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java @@ -26,7 +26,6 @@ import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; @@ -68,15 +67,14 @@ public class KvSnapshotLeaseManager { /** lease id to kv snapshot lease. */ @GuardedBy("managerLock") private final ConcurrentHashMap kvSnapshotLeaseMap = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); /** * KvSnapshotLeaseForBucket to the ref count, which means this table bucket + snapshotId has * been leased by how many lease id. */ @GuardedBy("managerLock") - private final Map refCount = - MapUtils.newConcurrentHashMap(); + private final Map refCount = new ConcurrentHashMap<>(); /** For metrics. */ private final AtomicInteger leasedBucketCount = new AtomicInteger(0); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java index 30cf40bd39..253070c7c7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java @@ -36,7 +36,6 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.RebalanceTask; -import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +52,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; @@ -78,11 +78,11 @@ public class RebalanceManager { /** A mapping from table bucket to rebalance status of pending and running tasks. */ private final Map inProgressRebalanceTasks = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); /** A mapping from table bucket to rebalance status of failed or completed tasks. */ private final Map finishedRebalanceTasks = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); private final GoalOptimizer goalOptimizer; private volatile long registerTime; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index c4701170b1..14617ae1ca 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -47,7 +47,6 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.types.Tuple2; import org.rocksdb.RateLimiter; @@ -64,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.utils.concurrent.LockUtils.inLock; @@ -115,7 +115,7 @@ public static RateLimiter getDefaultRateLimiter() { private final ZooKeeperClient zkClient; - private final Map currentKvs = MapUtils.newConcurrentHashMap(); + private final Map currentKvs = new ConcurrentHashMap<>(); /** * For arrow log format. The buffer allocator to allocate memory for arrow write batch of diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index 048a2f5424..f7f35f4618 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -34,7 +34,6 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.concurrent.Scheduler; import org.apache.fluss.utils.types.Tuple2; @@ -53,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -92,7 +92,7 @@ public final class LogManager extends TabletManagerBase { private final TabletServerMetricGroup serverMetricGroup; private final ReentrantLock logCreationOrDeletionLock = new ReentrantLock(); - private final Map currentLogs = MapUtils.newConcurrentHashMap(); + private final Map currentLogs = new ConcurrentHashMap<>(); private volatile OffsetCheckpointFile recoveryPointCheckpoint; private boolean loadLogsCompletedFlag = false; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index 85ea47fefc..282ae8d9ee 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -31,7 +31,6 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.utils.IOUtils; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; @@ -48,6 +47,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -78,8 +78,8 @@ public class RemoteLogManager implements Closeable { private final Clock clock; private final ZooKeeperClient zkClient; - private final Map rlmTasks = MapUtils.newConcurrentHashMap(); - private final Map remoteLogs = MapUtils.newConcurrentHashMap(); + private final Map rlmTasks = new ConcurrentHashMap<>(); + private final Map remoteLogs = new ConcurrentHashMap<>(); public RemoteLogManager( Configuration conf, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java index 374e20501e..1ba9c87476 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerSchemaCache.java @@ -27,13 +27,13 @@ import org.apache.fluss.shaded.guava32.com.google.common.cache.Cache; import org.apache.fluss.shaded.guava32.com.google.common.cache.CacheBuilder; import org.apache.fluss.shaded.guava32.com.google.common.util.concurrent.UncheckedExecutionException; -import org.apache.fluss.utils.MapUtils; import javax.annotation.concurrent.ThreadSafe; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; /** @@ -55,8 +55,8 @@ public class ServerSchemaCache { public ServerSchemaCache(MetadataManager metadataManager) { this.metadataManager = metadataManager; // thread safe is guaranteed by subscriberCounters. - this.subscriberCounters = MapUtils.newConcurrentHashMap(); - this.latestSchemaByTableId = MapUtils.newConcurrentHashMap(); + this.subscriberCounters = new ConcurrentHashMap<>(); + this.latestSchemaByTableId = new ConcurrentHashMap<>(); this.schemaCache = CacheBuilder.newBuilder().maximumSize(100).build(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java index 95ae062fc5..256c440b24 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/UserMetrics.java @@ -29,7 +29,6 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.metrics.group.UserMetricGroup; import org.apache.fluss.server.metrics.group.UserPerTableMetricGroup; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.Scheduler; import org.slf4j.Logger; @@ -39,6 +38,7 @@ import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +71,7 @@ public class UserMetrics implements AutoCloseable { private final ScheduledFuture schedule; private final ConcurrentMap metrics = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); private final AtomicBoolean isClosed = new AtomicBoolean(false); public UserMetrics( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java index 5ae0999230..31120962f6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java @@ -26,7 +26,6 @@ import org.apache.fluss.metrics.groups.MetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.server.coordinator.event.CoordinatorEvent; -import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; @@ -36,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -45,14 +45,14 @@ public class CoordinatorMetricGroup extends AbstractMetricGroup { private static final String NAME = "coordinator"; private final Map metricGroupByTable = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); protected final String clusterId; protected final String hostname; protected final String serverId; private final Map, CoordinatorEventMetricGroup> - eventMetricGroups = MapUtils.newConcurrentHashMap(); + eventMetricGroups = new ConcurrentHashMap<>(); public CoordinatorMetricGroup( MetricRegistry registry, String clusterId, String hostname, String serverId) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index 5f280c8e49..b103dc1596 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -28,11 +28,11 @@ import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; -import org.apache.fluss.utils.MapUtils; import javax.annotation.Nullable; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -43,7 +43,7 @@ */ public class TableMetricGroup extends AbstractMetricGroup { - private final Map buckets = MapUtils.newConcurrentHashMap(); + private final Map buckets = new ConcurrentHashMap<>(); private final TablePath tablePath; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java index f4b2c6b730..22215bc6de 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -31,9 +31,9 @@ import org.apache.fluss.metrics.groups.AbstractMetricGroup; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics; -import org.apache.fluss.utils.MapUtils; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** The metric group for tablet server. */ public class TabletServerMetricGroup extends AbstractMetricGroup { @@ -41,8 +41,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private static final String NAME = "tabletserver"; private static final int WINDOW_SIZE = 1024; - private final Map metricGroupByTable = - MapUtils.newConcurrentHashMap(); + private final Map metricGroupByTable = new ConcurrentHashMap<>(); protected final String clusterId; protected final String rack; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java index 8ca3b90537..342ede724e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java @@ -25,7 +25,6 @@ import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.server.entity.AdjustIsrResultForBucket; import org.apache.fluss.server.zk.data.LeaderAndIsr; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.Scheduler; import org.slf4j.Logger; @@ -36,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrResponseData; @@ -61,8 +61,7 @@ public class AdjustIsrManager { private final int serverId; /** Used to allow only one pending adjust Isr request per bucket (visible for testing). */ - protected final Map unsentAdjustIsrMap = - MapUtils.newConcurrentHashMap(); + protected final Map unsentAdjustIsrMap = new ConcurrentHashMap<>(); /** Used to allow only one in-flight request at a time. */ private final AtomicBoolean inflightRequest = new AtomicBoolean(false); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 9e55d65b88..7a48813237 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -102,7 +102,6 @@ import org.apache.fluss.utils.CloseableRegistry; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.IOUtils; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.types.Tuple2; @@ -128,6 +127,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; @@ -197,8 +197,7 @@ public final class Replica { * *

followerId -> {@link FollowerReplica}. */ - private final Map followerReplicasMap = - MapUtils.newConcurrentHashMap(); + private final Map followerReplicasMap = new ConcurrentHashMap<>(); private volatile IsrState isrState = new IsrState.CommittedIsrState(Collections.emptyList()); private volatile int leaderEpoch = LeaderAndIsr.INITIAL_LEADER_EPOCH - 1; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index d2a417282b..3b847266eb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -110,7 +110,6 @@ import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.concurrent.Scheduler; @@ -132,6 +131,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -163,7 +163,7 @@ public class ReplicaManager implements ServerReconfigurable { private final OffsetCheckpointFile highWatermarkCheckpoint; @GuardedBy("replicaStateChangeLock") - private final Map allReplicas = MapUtils.newConcurrentHashMap(); + private final Map allReplicas = new ConcurrentHashMap<>(); private final TabletServerMetadataCache metadataCache; private final ExecutorService ioExecutor; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedOperationManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedOperationManager.java index fe2a5d31a1..38bf2e0745 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedOperationManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedOperationManager.java @@ -22,7 +22,6 @@ import org.apache.fluss.server.utils.timer.DefaultTimer; import org.apache.fluss.server.utils.timer.Timer; import org.apache.fluss.server.utils.timer.TimerTask; -import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.concurrent.ShutdownableThread; import org.slf4j.Logger; @@ -34,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -236,7 +236,7 @@ private class WatcherList { private final Lock watcherLock; public WatcherList() { - this.watchersByKey = MapUtils.newConcurrentHashMap(); + this.watchersByKey = new ConcurrentHashMap<>(); this.watcherLock = new ReentrantLock(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandlerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandlerTest.java index b1905a614a..9baac064cc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandlerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseHandlerTest.java @@ -19,7 +19,6 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease; -import org.apache.fluss.utils.MapUtils; import org.junit.jupiter.api.Test; @@ -238,12 +237,12 @@ void testEqualsAndHashCode() { // Create two leases with same logical content but different array objects Map map1 = new HashMap<>(); - ConcurrentHashMap partitionSnapshots1 = MapUtils.newConcurrentHashMap(); + ConcurrentHashMap partitionSnapshots1 = new ConcurrentHashMap<>(); partitionSnapshots1.put(2001L, new Long[] {100L, -1L}); partitionSnapshots1.put(2002L, new Long[] {-1L, 101L}); map1.put(1L, new KvSnapshotTableLease(1L, new Long[] {100L, -1L}, partitionSnapshots1)); Map map2 = new HashMap<>(); - ConcurrentHashMap partitionSnapshots2 = MapUtils.newConcurrentHashMap(); + ConcurrentHashMap partitionSnapshots2 = new ConcurrentHashMap<>(); partitionSnapshots2.put(2001L, new Long[] {100L, -1L}); partitionSnapshots2.put(2002L, new Long[] {-1L, 101L}); map2.put(1L, new KvSnapshotTableLease(1L, new Long[] {100L, -1L}, partitionSnapshots2)); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseMetadataManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseMetadataManagerTest.java index 6f10400f6b..9d66613bc2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseMetadataManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseMetadataManagerTest.java @@ -27,7 +27,6 @@ import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLeaseJsonSerde; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.utils.IOUtils; -import org.apache.fluss.utils.MapUtils; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -99,7 +98,7 @@ void testRegisterAndUpdateLease() throws Exception { Map tableIdToTableLease = new HashMap<>(); tableIdToTableLease.put(1L, new KvSnapshotTableLease(1L, new Long[] {100L, -1L})); - ConcurrentHashMap partitionSnapshots = MapUtils.newConcurrentHashMap(); + ConcurrentHashMap partitionSnapshots = new ConcurrentHashMap<>(); partitionSnapshots.put(1000L, new Long[] {111L, 122L}); partitionSnapshots.put(1001L, new Long[] {122L, -1L}); tableIdToTableLease.put(2L, new KvSnapshotTableLease(2L, partitionSnapshots)); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java index 5f2cddf70b..4475d0aa7b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java @@ -18,13 +18,13 @@ package org.apache.fluss.server.kv.snapshot; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.utils.MapUtils; import java.time.Duration; import java.util.Deque; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; @@ -36,7 +36,7 @@ public class TestingCompletedKvSnapshotCommitter implements CompletedKvSnapshotCommitter { protected final Map> snapshots = - MapUtils.newConcurrentHashMap(); + new ConcurrentHashMap<>(); protected final Map> bucketSnapshotLeaderEpoch = new HashMap<>(); diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index c41fa39e63..0827c9e87d 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -558,12 +558,7 @@ This file is based on the checkstyle file of Apache Beam. - - - - - + diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index ecf71d1c2f..694a6979b2 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -33,6 +33,4 @@ - -