diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 9aac4ba08ebb..61813513d1fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -43,10 +43,8 @@ import java.io.File; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.stream.IntStream; @@ -71,7 +69,7 @@ public class ClusteringCompactManager extends CompactFutureManager { private final RowType keyType; private final RowType valueType; private final ExecutorService executor; - private final BucketedDvMaintainer dvMaintainer; + @Nullable private final BucketedDvMaintainer dvMaintainer; private final boolean lazyGenDeletionFile; @Nullable private final CompactionMetrics.Reporter metricsReporter; @@ -89,7 +87,7 @@ public ClusteringCompactManager( KeyValueFileReaderFactory valueReaderFactory, KeyValueFileWriterFactory writerFactory, ExecutorService executor, - BucketedDvMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, boolean lazyGenDeletionFile, List restoreFiles, long targetFileSize, @@ -206,14 +204,8 @@ private CompactResult compact(boolean fullCompaction) throws Exception { // Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2 List existingSortedFiles = fileLevels.sortedFiles(); for (DataFileMeta file : unsortedFiles) { - Set originalFileNames = Collections.singleton(file.fileName()); List sortedFiles = - fileRewriter.sortAndRewriteFiles( - singletonList(file), - kvSerializer, - kvSchemaType, - keyIndex, - originalFileNames); + fileRewriter.sortAndRewriteFile(file, kvSerializer, kvSchemaType, keyIndex); result.before().add(file); result.after().addAll(sortedFiles); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java index 3689e704833a..7788d4ced9a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java @@ -48,8 +48,6 @@ import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; import org.apache.paimon.utils.MutableObjectIterator; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -57,7 +55,6 @@ import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; -import java.util.Set; /** * Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and @@ -115,20 +112,16 @@ public ClusteringFileRewriter( } /** - * Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them + * Sort and rewrite unsorted file by clustering columns. Reads all KeyValue records, sorts them * using an external sort buffer, and writes to new level-1 files. Checks the key index inline * during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old - * positions in DV) and updates the index without re-reading the output files. - * - * @param keyIndex the key index for inline checking and batch update, or null to skip - * @param originalFileNames file names of the original files being replaced (for index check) + * positions in DV) and updates the index. */ - public List sortAndRewriteFiles( - List inputFiles, + public List sortAndRewriteFile( + DataFileMeta inputFile, KeyValueSerializer kvSerializer, RowType kvSchemaType, - @Nullable ClusteringKeyIndex keyIndex, - Set originalFileNames) + ClusteringKeyIndex keyIndex) throws Exception { int[] sortFieldsInKeyValue = Arrays.stream(clusteringColumns) @@ -146,21 +139,17 @@ public List sortAndRewriteFiles( MemorySize.MAX_VALUE, false); - for (DataFileMeta file : inputFiles) { - try (RecordReader reader = valueReaderFactory.createRecordReader(file)) { - try (CloseableIterator iterator = reader.toCloseableIterator()) { - while (iterator.hasNext()) { - KeyValue kv = iterator.next(); - InternalRow serializedRow = kvSerializer.toRow(kv); - sortBuffer.write(serializedRow); - } + try (RecordReader reader = valueReaderFactory.createRecordReader(inputFile)) { + try (CloseableIterator iterator = reader.toCloseableIterator()) { + while (iterator.hasNext()) { + KeyValue kv = iterator.next(); + InternalRow serializedRow = kvSerializer.toRow(kv); + sortBuffer.write(serializedRow); } } } - RowCompactedSerializer keySerializer = - keyIndex != null ? new RowCompactedSerializer(keyType) : null; - List collectedKeys = keyIndex != null ? new ArrayList<>() : null; + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); RollingFileWriter writer = writerFactory.createRollingClusteringFileWriter(); @@ -173,12 +162,9 @@ public List sortAndRewriteFiles( kv.copy( new InternalRowSerializer(keyType), new InternalRowSerializer(valueType)); - if (keyIndex != null) { - byte[] keyBytes = keySerializer.serializeToBytes(copied.key()); - if (!keyIndex.checkKey(keyBytes, originalFileNames)) { - continue; - } - collectedKeys.add(keyBytes); + byte[] keyBytes = keySerializer.serializeToBytes(copied.key()); + if (!keyIndex.checkKey(keyBytes)) { + continue; } writer.write(copied); } @@ -188,23 +174,13 @@ public List sortAndRewriteFiles( } List newFiles = writer.result(); - for (DataFileMeta file : inputFiles) { - fileLevels.removeFile(file); - } + fileLevels.removeFile(inputFile); for (DataFileMeta newFile : newFiles) { fileLevels.addNewFile(newFile); } - - // Batch update index using collected keys, split by file rowCount - if (keyIndex != null) { - int offset = 0; - for (DataFileMeta newFile : newFiles) { - int count = (int) newFile.rowCount(); - keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count)); - offset += count; - } + for (DataFileMeta sortedFile : newFiles) { + keyIndex.rebuildIndex(sortedFile); } - return newFiles; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java index 2b4e41272759..fee1abae2b7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java @@ -39,6 +39,8 @@ import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.MutableObjectIterator; +import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Closeable; @@ -48,9 +50,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.IntStream; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt; import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt; @@ -63,7 +65,7 @@ public class ClusteringKeyIndex implements Closeable { private final RowType keyType; private final IOManager ioManager; private final KeyValueFileReaderFactory keyReaderFactory; - private final BucketedDvMaintainer dvMaintainer; + private final @Nullable BucketedDvMaintainer dvMaintainer; private final SimpleLsmKvDb kvDb; private final ClusteringFiles fileLevels; private final boolean firstRow; @@ -76,7 +78,7 @@ public ClusteringKeyIndex( RowType keyType, IOManager ioManager, KeyValueFileReaderFactory keyReaderFactory, - BucketedDvMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, SimpleLsmKvDb kvDb, ClusteringFiles fileLevels, boolean firstRow, @@ -216,20 +218,20 @@ public Map.Entry next() { * in deletion vectors, return true (write the new record). * * @param keyBytes serialized key bytes - * @param originalFileNames file names of the original unsorted files being replaced * @return true if the record should be written, false to skip (FIRST_ROW dedup) */ - public boolean checkKey(byte[] keyBytes, Set originalFileNames) throws Exception { + public boolean checkKey(byte[] keyBytes) throws Exception { byte[] oldValue = kvDb.get(keyBytes); if (oldValue != null) { ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue); int oldFileId = decodeInt(valueIn); int oldPosition = decodeInt(valueIn); DataFileMeta oldFile = fileLevels.getFileById(oldFileId); - if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) { + if (oldFile != null) { if (firstRow) { return false; } else { + checkNotNull(dvMaintainer, "DvMaintainer cannot be null for DEDUPLICATE mode."); dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition); } } @@ -237,20 +239,6 @@ public boolean checkKey(byte[] keyBytes, Set originalFileNames) throws E return true; } - /** - * Batch update the key index for a new sorted file using pre-collected key bytes. Avoids - * re-reading the file. - */ - public void batchPutIndex(DataFileMeta sortedFile, List keyBytesList) throws Exception { - int fileId = fileLevels.getFileIdByName(sortedFile.fileName()); - for (int position = 0; position < keyBytesList.size(); position++) { - ByteArrayOutputStream value = new ByteArrayOutputStream(8); - encodeInt(value, fileId); - encodeInt(value, position); - kvDb.put(keyBytesList.get(position), value.toByteArray()); - } - } - /** Delete key index entries for the given file (only if they still point to it). */ public void deleteIndex(DataFileMeta file) throws Exception { RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);