Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.micronaut.context.annotation.Factory
Expand Down Expand Up @@ -80,19 +82,27 @@ class SafeDestinationCatalogFactory {
fun checkCatalog(
namespaceMapper: NamespaceMapper,
@Named("checkNamespace") checkNamespace: String?,
config: BigqueryConfiguration,
): DestinationCatalog {
// Copied from DefaultDestinationCatalogFactory to maintain behavior
val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
val random = RandomStringUtils.randomAlphabetic(5).lowercase()
val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random"
val schemaFields = linkedMapOf("test" to FieldType(IntegerType, nullable = true))
if (
!config.defaultPartitioningField.isNullOrBlank() &&
config.defaultPartitioningField != "_airbyte_extracted_at"
) {
schemaFields[config.defaultPartitioningField] =
FieldType(TimestampTypeWithTimezone, nullable = true)
}
return DestinationCatalog(
listOf(
DestinationStream(
unmappedNamespace = namespace,
unmappedName = "test$date$random",
importType = Append,
schema =
ObjectType(linkedMapOf("test" to FieldType(IntegerType, nullable = true))),
schema = ObjectType(schemaFields),
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ package io.airbyte.integrations.destination.bigquery

import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.integrations.destination.bigquery.spec.BatchedStandardInsertConfiguration
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
import io.airbyte.integrations.destination.bigquery.spec.BigqueryRegion
import io.airbyte.integrations.destination.bigquery.spec.CdcDeletionMode
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand Down Expand Up @@ -62,10 +67,43 @@ class SafeDestinationCatalogFactoryTest {
val factory = SafeDestinationCatalogFactory()
val namespaceMapper = mockk<NamespaceMapper>(relaxed = true)

val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns")
val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns", config())

assertEquals(1, destCatalog.streams.size)
assertEquals("custom_check_ns", destCatalog.streams.first().unmappedNamespace)
assert(destCatalog.streams.first().unmappedName.startsWith("test"))
}

@Test
fun `test checkCatalog adds default partitioning field to check stream schema`() {
val factory = SafeDestinationCatalogFactory()
val namespaceMapper = mockk<NamespaceMapper>(relaxed = true)

val destCatalog =
factory.checkCatalog(
namespaceMapper,
"custom_check_ns",
config(defaultPartitioningField = "created_at"),
)

val schema = destCatalog.streams.first().schema as ObjectType
assert(schema.properties.containsKey("created_at"))
}

private fun config(defaultPartitioningField: String? = null): BigqueryConfiguration =
BigqueryConfiguration(
projectId = "test-project",
datasetLocation = BigqueryRegion.US,
datasetId = "test_dataset",
loadingMethod = BatchedStandardInsertConfiguration,
credentialsJson = null,
cdcDeletionMode = CdcDeletionMode.HARD_DELETE,
internalTableDataset = "airbyte_internal",
legacyRawTablesOnly = false,
defaultPartitioningField = defaultPartitioningField,
defaultClusteringField = null,
defaultTableSuffix = null,
defaultPartitioningGranularity = null,
streamConfigMap = emptyMap(),
)
}
Loading