Unable to write on a BQ table, with the new spark connector update
See original GitHub issueHi all,
Since the last release of the connector some of our Spark jobs started to fail with the following error:
Exception in thread "main" java.lang.RuntimeException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:70)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:111)
[...]
Caused by: java.lang.IllegalArgumentException: com.google.cloud.bigquery.connector.common.BigQueryConnectorException$InvalidSchemaException: Destination table's schema is not compatible with dataframe's schema
at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:466)
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:92)
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:67)
... 40 more
After some investigations, this seems to be related to the PR #613, which added a schema equality check before writing.
For example: Given a BigQuery table with the following schema (2 nullable string columns):
[
{
"mode": "NULLABLE",
"name": "field1",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "field2",
"type": "STRING"
}
]
And the following Spark dataframe (1 nullable string column):
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import scala.util.Try
val spark = SparkSession
.builder()
.master("local[2]")
.getOrCreate()
import spark.implicits._
val ds = spark.createDataset(Seq.empty[Option[String]]).toDF("field1")
ds.printSchema()
The following code worked:
ds
.write
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", "indirect")
.save("test_dataset.table")
With the new version this does not work, as well as the following code, both raising the exception shown earlier:
ds
.write
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", "indirect")
.option("enableModeCheckForSchemaFields", false)
.save("test_dataset.table")
However, the following one (adding the missing nullable column) works:
ds
.withColumn("field2", org.apache.spark.sql.functions.lit(null).cast("string"))
.write
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", "indirect")
.save("test_dataset.table")
Below I propose a test to be added to spark-bigquery-tests/src/main/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java to ensure the continuity of the previous behavior:
public void testWriteToBigQueryWithMissingNullableField() {
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
// A dataframe with missing nullable column int_null
Dataset<Row> df =
spark
.createDataset(Seq(Option(1)))
.toDF("int_req")
// Write to a table with all nullable columns
df.write()
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", writeMethod.toString())
.save(testDataset + "." + TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_NAME);
}
Kind regards.
Issue Analytics
- State:
- Created a year ago
- Comments:8 (3 by maintainers)
Top Related StackOverflow Question
seems this issue is affecting version 0.25.0? 0.24.2 is ok
Hello @MARCTELLY, the issue is fixed and the fix would be available in the next release. Thanks