Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.fluss.rpc.messages.PbCreateAclRespInfo;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.security.acl.AclBinding;
import org.apache.fluss.utils.MapUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBinding;

Expand All @@ -43,7 +43,7 @@ public class CreateAclsResult {
private final Map<AclBinding, CompletableFuture<Void>> futures;

public CreateAclsResult(Collection<AclBinding> aclBindings) {
Map<AclBinding, CompletableFuture<Void>> futures = MapUtils.newConcurrentHashMap();
Map<AclBinding, CompletableFuture<Void>> futures = new ConcurrentHashMap<>();
aclBindings.forEach(aclBinding -> futures.put(aclBinding, new CompletableFuture<>()));
this.futures = futures;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.security.acl.AclBinding;
import org.apache.fluss.security.acl.AclBindingFilter;
import org.apache.fluss.utils.MapUtils;

import javax.annotation.Nullable;

Expand All @@ -34,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBinding;

Expand Down Expand Up @@ -80,7 +80,7 @@ public List<FilterResult> values() {

DropAclsResult(Collection<AclBindingFilter> filters) {
final Map<AclBindingFilter, CompletableFuture<DropAclsResult.FilterResults>> futures =
MapUtils.newConcurrentHashMap();
new ConcurrentHashMap<>();
for (AclBindingFilter filter : filters) {
if (!futures.containsKey(filter)) {
futures.put(filter, new CompletableFuture<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.security.acl.AclBinding;
import org.apache.fluss.security.acl.AclBindingFilter;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.concurrent.FutureUtils;

import javax.annotation.Nullable;
Expand All @@ -111,6 +110,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterDatabaseRequest;
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
Expand Down Expand Up @@ -542,7 +542,7 @@ private ListOffsetsResult listOffsets(
buckets,
offsetSpec,
tableInfo.getTablePath());
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = new ConcurrentHashMap<>();
for (int bucket : buckets) {
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;

/** Schema getter for client. */
@Internal
Expand All @@ -50,7 +50,7 @@ public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admi
this.tablePath = tablePath;
this.latestSchemaInfo = latestSchemaInfo;
this.admin = admin;
this.schemasById = newConcurrentHashMap();
this.schemasById = new ConcurrentHashMap<>();
schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.utils.MapUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -48,7 +47,7 @@ public DynamicWriteBatchSizeEstimator(
this.dynamicBatchSizeEnabled = dynamicBatchSizeEnabled;

if (dynamicBatchSizeEnabled) {
this.estimatedBatchSizeMap = MapUtils.newConcurrentHashMap();
this.estimatedBatchSizeMap = new ConcurrentHashMap<>();
} else {
this.estimatedBatchSizeMap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.apache.fluss.server.zk.data.ServerTags;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.MapUtils;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -107,6 +106,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -2261,7 +2261,7 @@ public CompletableFuture<ListOffsetsResponse> listOffsets(
Map<Integer, ListOffsetsRequest> leaderToRequestMap = new HashMap<>();
leaderToRequestMap.put(1, request);

Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = new ConcurrentHashMap<>();
bucketToOffsetMap.put(0, new CompletableFuture<>());
bucketToOffsetMap.put(1, new CompletableFuture<>());
bucketToOffsetMap.put(2, new CompletableFuture<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.Projection;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo
private final boolean projectionPushDowned;
private final SchemaGetter schemaGetter;
private final ConcurrentHashMap<Integer, VectorSchemaRoot> vectorSchemaRootMap =
MapUtils.newConcurrentHashMap();
new ConcurrentHashMap<>();

public static LogRecordReadContext createReadContext(
TableInfo tableInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.fluss.types.DataType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.fluss.row.encode.ValueEncoder.SCHEMA_ID_LENGTH;
import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;

/**
* A decoder to decode a schema id and {@link BinaryRow} from a byte array value which is encoded by
Expand All @@ -42,7 +42,7 @@ public class ValueDecoder {
private final KvFormat kvFormat;

public ValueDecoder(SchemaGetter schemaGetter, KvFormat kvFormat) {
this.rowDecoders = newConcurrentHashMap();
this.rowDecoders = new ConcurrentHashMap<>();
this.schemaGetter = schemaGetter;
this.kvFormat = kvFormat;
}
Expand Down
57 changes: 0 additions & 57 deletions fluss-common/src/main/java/org/apache/fluss/utils/MapUtils.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.fluss.fs.local.LocalDataOutputStream;
import org.apache.fluss.fs.local.LocalFileStatus;
import org.apache.fluss.fs.local.LocalFileSystem;
import org.apache.fluss.utils.MapUtils;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.fluss.utils.Preconditions.checkNotNull;
Expand All @@ -43,7 +43,7 @@ public class TestFileSystem extends LocalFileSystem {

// current number of created, unclosed (output) stream
private static final Map<FsPath, Integer> currentUnclosedOutputStream =
MapUtils.newConcurrentHashMap();
new ConcurrentHashMap<>();

private final Configuration configuration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.fluss.fs.azure;

import org.apache.fluss.utils.MapUtils;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
Expand All @@ -37,14 +35,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** Util file system abstraction. */
public class MemoryFileSystem extends FileSystem {

private final URI uri;
private final Map<Path, byte[]> files = MapUtils.newConcurrentHashMap();
private final Set<Path> directories =
Collections.newSetFromMap(MapUtils.newConcurrentHashMap());
private final Map<Path, byte[]> files = new ConcurrentHashMap<>();
private final Set<Path> directories = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MemoryFileSystem(URI uri) {
this.uri = uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.MapUtils;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand Down Expand Up @@ -689,7 +688,7 @@ public Map<TableBucket, Long> getBucketOffsets() {
* @param initialOffsets the initial bucket offsets
*/
protected void initializeBucketOffsets(Map<TableBucket, Long> initialOffsets) {
this.bucketOffsets = MapUtils.newConcurrentHashMap();
this.bucketOffsets = new ConcurrentHashMap<>();
this.bucketOffsets.putAll(initialOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap;

/**
* Factory for creating {@link UndoRecoveryOperator} instances.
Expand Down Expand Up @@ -346,7 +345,7 @@ private static class ProducerOffsetReporterHolder
* called, it looks up the delegate from this registry.
*/
private static final Map<String, ProducerOffsetReporter> DELEGATE_REGISTRY =
newConcurrentHashMap();
new ConcurrentHashMap<>();

/** Unique ID for this holder, used to look up the delegate in the registry. */
private final String holderId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.fluss.rpc.messages.PbLakeTieringStats;
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.MapUtils;

import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
Expand All @@ -66,6 +65,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -134,9 +134,9 @@ public TieringSourceEnumerator(
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
this.pendingSplits = Collections.synchronizedList(new ArrayList<>());
this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>());
this.tieringTableEpochs = MapUtils.newConcurrentHashMap();
this.finishedTables = MapUtils.newConcurrentHashMap();
this.failedTableEpochs = MapUtils.newConcurrentHashMap();
this.tieringTableEpochs = new ConcurrentHashMap<>();
this.finishedTables = new ConcurrentHashMap<>();
this.failedTableEpochs = new ConcurrentHashMap<>();
this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package org.apache.fluss.flink.tiering.source.enumerator;

import org.apache.fluss.utils.MapUtils;

import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -44,12 +43,12 @@ class FlussMockSplitEnumeratorContext<SplitT extends SourceSplit>

public FlussMockSplitEnumeratorContext(int parallelism) {
super(parallelism);
this.registeredReaders = MapUtils.newConcurrentHashMap();
this.registeredReaders = new ConcurrentHashMap<>();
}

public void registerSourceReader(int subtaskId, int attemptNumber, String location) {
final Map<Integer, ReaderInfo> attemptReaders =
registeredReaders.computeIfAbsent(subtaskId, k -> MapUtils.newConcurrentHashMap());
registeredReaders.computeIfAbsent(subtaskId, k -> new ConcurrentHashMap<>());
checkState(
!attemptReaders.containsKey(attemptNumber),
"ReaderInfo of subtask %s (#%s) already exists.",
Expand Down
Loading