Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
Expand All @@ -71,7 +69,7 @@ public class ClusteringCompactManager extends CompactFutureManager {
private final RowType keyType;
private final RowType valueType;
private final ExecutorService executor;
private final BucketedDvMaintainer dvMaintainer;
@Nullable private final BucketedDvMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
@Nullable private final CompactionMetrics.Reporter metricsReporter;

Expand All @@ -89,7 +87,7 @@ public ClusteringCompactManager(
KeyValueFileReaderFactory valueReaderFactory,
KeyValueFileWriterFactory writerFactory,
ExecutorService executor,
BucketedDvMaintainer dvMaintainer,
@Nullable BucketedDvMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
List<DataFileMeta> restoreFiles,
long targetFileSize,
Expand Down Expand Up @@ -206,14 +204,8 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
// Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
for (DataFileMeta file : unsortedFiles) {
Set<String> originalFileNames = Collections.singleton(file.fileName());
List<DataFileMeta> sortedFiles =
fileRewriter.sortAndRewriteFiles(
singletonList(file),
kvSerializer,
kvSchemaType,
keyIndex,
originalFileNames);
fileRewriter.sortAndRewriteFile(file, kvSerializer, kvSchemaType, keyIndex);
result.before().add(file);
result.after().addAll(sortedFiles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,13 @@
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.MutableObjectIterator;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;

/**
* Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and
Expand Down Expand Up @@ -115,20 +112,16 @@ public ClusteringFileRewriter(
}

/**
* Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them
* Sort and rewrite unsorted file by clustering columns. Reads all KeyValue records, sorts them
* using an external sort buffer, and writes to new level-1 files. Checks the key index inline
* during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old
* positions in DV) and updates the index without re-reading the output files.
*
* @param keyIndex the key index for inline checking and batch update, or null to skip
* @param originalFileNames file names of the original files being replaced (for index check)
* positions in DV) and updates the index.
*/
public List<DataFileMeta> sortAndRewriteFiles(
List<DataFileMeta> inputFiles,
public List<DataFileMeta> sortAndRewriteFile(
DataFileMeta inputFile,
KeyValueSerializer kvSerializer,
RowType kvSchemaType,
@Nullable ClusteringKeyIndex keyIndex,
Set<String> originalFileNames)
ClusteringKeyIndex keyIndex)
throws Exception {
int[] sortFieldsInKeyValue =
Arrays.stream(clusteringColumns)
Expand All @@ -146,21 +139,17 @@ public List<DataFileMeta> sortAndRewriteFiles(
MemorySize.MAX_VALUE,
false);

for (DataFileMeta file : inputFiles) {
try (RecordReader<KeyValue> reader = valueReaderFactory.createRecordReader(file)) {
try (CloseableIterator<KeyValue> iterator = reader.toCloseableIterator()) {
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
InternalRow serializedRow = kvSerializer.toRow(kv);
sortBuffer.write(serializedRow);
}
try (RecordReader<KeyValue> reader = valueReaderFactory.createRecordReader(inputFile)) {
try (CloseableIterator<KeyValue> iterator = reader.toCloseableIterator()) {
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
InternalRow serializedRow = kvSerializer.toRow(kv);
sortBuffer.write(serializedRow);
}
}
}

RowCompactedSerializer keySerializer =
keyIndex != null ? new RowCompactedSerializer(keyType) : null;
List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : null;
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);

RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingClusteringFileWriter();
Expand All @@ -173,12 +162,9 @@ public List<DataFileMeta> sortAndRewriteFiles(
kv.copy(
new InternalRowSerializer(keyType),
new InternalRowSerializer(valueType));
if (keyIndex != null) {
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
continue;
}
collectedKeys.add(keyBytes);
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
if (!keyIndex.checkKey(keyBytes)) {
continue;
}
writer.write(copied);
}
Expand All @@ -188,23 +174,13 @@ public List<DataFileMeta> sortAndRewriteFiles(
}

List<DataFileMeta> newFiles = writer.result();
for (DataFileMeta file : inputFiles) {
fileLevels.removeFile(file);
}
fileLevels.removeFile(inputFile);
for (DataFileMeta newFile : newFiles) {
fileLevels.addNewFile(newFile);
}

// Batch update index using collected keys, split by file rowCount
if (keyIndex != null) {
int offset = 0;
for (DataFileMeta newFile : newFiles) {
int count = (int) newFile.rowCount();
keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count));
offset += count;
}
for (DataFileMeta sortedFile : newFiles) {
keyIndex.rebuildIndex(sortedFile);
}

return newFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.MutableObjectIterator;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
Expand All @@ -48,9 +50,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;

Expand All @@ -63,7 +65,7 @@ public class ClusteringKeyIndex implements Closeable {
private final RowType keyType;
private final IOManager ioManager;
private final KeyValueFileReaderFactory keyReaderFactory;
private final BucketedDvMaintainer dvMaintainer;
private final @Nullable BucketedDvMaintainer dvMaintainer;
private final SimpleLsmKvDb kvDb;
private final ClusteringFiles fileLevels;
private final boolean firstRow;
Expand All @@ -76,7 +78,7 @@ public ClusteringKeyIndex(
RowType keyType,
IOManager ioManager,
KeyValueFileReaderFactory keyReaderFactory,
BucketedDvMaintainer dvMaintainer,
@Nullable BucketedDvMaintainer dvMaintainer,
SimpleLsmKvDb kvDb,
ClusteringFiles fileLevels,
boolean firstRow,
Expand Down Expand Up @@ -216,41 +218,27 @@ public Map.Entry<byte[], byte[]> next() {
* in deletion vectors, return true (write the new record).
*
* @param keyBytes serialized key bytes
* @param originalFileNames file names of the original unsorted files being replaced
* @return true if the record should be written, false to skip (FIRST_ROW dedup)
*/
public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) throws Exception {
public boolean checkKey(byte[] keyBytes) throws Exception {
byte[] oldValue = kvDb.get(keyBytes);
if (oldValue != null) {
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
int oldFileId = decodeInt(valueIn);
int oldPosition = decodeInt(valueIn);
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
if (oldFile != null) {
if (firstRow) {
return false;
} else {
checkNotNull(dvMaintainer, "DvMaintainer cannot be null for DEDUPLICATE mode.");
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
}
}
}
return true;
}

/**
* Batch update the key index for a new sorted file using pre-collected key bytes. Avoids
* re-reading the file.
*/
public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> keyBytesList) throws Exception {
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
for (int position = 0; position < keyBytesList.size(); position++) {
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
encodeInt(value, fileId);
encodeInt(value, position);
kvDb.put(keyBytesList.get(position), value.toByteArray());
}
}

/** Delete key index entries for the given file (only if they still point to it). */
public void deleteIndex(DataFileMeta file) throws Exception {
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);
Expand Down