From 201f1372e69cc3540778fc51f3a8603d57910d3d Mon Sep 17 00:00:00 2001 From: chon3806 <93464148+chon3806@users.noreply.github.com> Date: Sat, 25 Apr 2026 15:17:27 -0400 Subject: [PATCH 1/3] fix: accept GeoJSON strings for Edm.GeographyPoint in AzureSearchWriter (#2420) Azure AI Search expects spatial values as GeoJSON objects, but when users supplied a StringType column the writer JSON-escaped the entire string and the service rejected the request with HTTP 400. Convert string GeographyPoint columns into the canonical struct shape via from_json before serialization, mirroring the existing Edm.DateTimeOffset handling. Existing struct-based input is unchanged. --- .../ml/services/search/AzureSearch.scala | 54 +++++++++++++++++-- .../split2/SearchWriterSuitePart2.scala | 33 ++++++++++++ 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala index 5471329997..7a47d45ff8 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala @@ -20,7 +20,8 @@ import org.apache.spark.ml.util._ import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel} import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.functions.vector_to_array -import org.apache.spark.sql.functions.{col, expr, struct, to_json, to_utc_timestamp, date_format, when} +import org.apache.spark.sql.functions.{col, expr, from_json, struct, to_json, to_utc_timestamp, + date_format, when} import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -249,6 +250,50 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging } } + /** + * Converts string columns containing GeoJSON to the proper struct shape required for + * Azure Search `Edm.GeographyPoint` fields. + * + * Azure AI Search expects spatial values to be sent as a GeoJSON object + * (e.g. `{"type":"Point","coordinates":[lon, lat]}`), not as a JSON-encoded string. + * Users frequently have their GeoJSON readily available as a string column, and + * passing it as a `StringType` previously caused a `400 Bad Request` + * (see issue #2420) because the writer JSON-escaped the entire string. + * + * For each field declared as `Edm.GeographyPoint` in the index, if the corresponding + * DataFrame column is a `StringType`, parse it into the canonical + * `struct>` so that downstream `to_json` + * emits a proper GeoJSON object. Columns that are already structured are left as-is. + * + * @param df DataFrame with potential GeographyPoint columns + * @param indexJson JSON string containing the index schema + * @return DataFrame with string GeographyPoint columns converted to GeoJSON structs + */ + private def convertGeographyPointToStruct(df: DataFrame, indexJson: String): DataFrame = { + val geoStructType = StructType(Seq( + StructField("type", StringType), + StructField("coordinates", ArrayType(DoubleType)) + )) + val geoFields = parseIndexJson(indexJson).fields + .filter(_.`type` == "Edm.GeographyPoint") + .map(_.name) + geoFields.foldLeft(df) { (currentDF, fieldName) => + if (currentDF.columns.contains(fieldName)) { + currentDF.schema(fieldName).dataType match { + case StringType => + currentDF.withColumn(fieldName, + when(col(fieldName).isNotNull, from_json(col(fieldName), geoStructType)) + ) + case _ => + // Already a struct (or otherwise compatible) — let checkSchemaParity validate it. + currentDF + } + } else { + currentDF + } + } + } + private def dfToIndexJson(schema: StructType, indexName: String, keyCol: String, @@ -328,17 +373,18 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging SearchIndex.createIfNoneExists(subscriptionKey, serviceName, indexJson, apiVersion) val dateConvertedDF = convertDateTimeToISO8601(preppedDF, indexJson) + val geoConvertedDF = convertGeographyPointToStruct(dateConvertedDF, indexJson) logInfo("checking schema parity") - checkSchemaParity(dateConvertedDF.schema, indexJson, actionCol) + checkSchemaParity(geoConvertedDF.schema, indexJson, actionCol) val df1 = if (filterNulls) { val collectionColumns = parseIndexJson(indexJson).fields .filter(_.`type`.startsWith("Collection")) .map(_.name) - collectionColumns.foldLeft(dateConvertedDF) { (ndf, c) => filterOutNulls(ndf, c) } + collectionColumns.foldLeft(geoConvertedDF) { (ndf, c) => filterOutNulls(ndf, c) } } else { - dateConvertedDF + geoConvertedDF } // Convert date/timestamp columns to ISO8601 strings for Azure Search diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala index 54ace956e1..a8ec4d6120 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala @@ -171,4 +171,37 @@ class SearchWriterSuite extends SearchWriterSuiteUtilities { } + test("Handle GeoJSON GeographyPoint fields supplied as strings") { + + val in = generateIndexName() + val df = spark.createDataFrame(Seq( + ("upload", "0", """{"type":"Point","coordinates":[-122.3493, 47.6205]}"""), + ("upload", "1", """{"type":"Point","coordinates":[-122.3351, 47.6080]}""") + )).toDF("searchAction", "id", "location") + + val indexJson = + s""" + |{ + | "name": "$in", + | "fields": [ + | { "name": "id", "type": "Edm.String", "key": true, "searchable": true, "retrievable": true }, + | { "name": "location", "type": "Edm.GeographyPoint", "searchable": false, + | "filterable": true, "retrievable": true, "sortable": true } + | ] + |} + |""".stripMargin + + AzureSearchWriter.write(df, + Map( + "subscriptionKey" -> azureSearchKey, + "actionCol" -> "searchAction", + "serviceName" -> testServiceName, + "indexJson" -> indexJson + ) + ) + + retryWithBackoff(assertSize(in, 2)) + + } + } From a8399f70f317424181bb85c12587c5beed5d5643 Mon Sep 17 00:00:00 2001 From: chon3806 <93464148+chon3806@users.noreply.github.com> Date: Sat, 25 Apr 2026 15:23:29 -0400 Subject: [PATCH 2/3] polish: ASCII-only comments, package-private helper, and unit tests for GeographyPoint conversion - Replace unicode em-dash with ASCII to avoid encoding/scalastyle surprises. - Replace '#2420' in Scaladoc (member-reference syntax) with a full URL. - Make convertGeographyPointToStruct private[ml] so it can be exercised by in-tree unit tests without requiring live Azure Search credentials. - Add two non-network unit tests covering the string->struct rewrite, null preservation, and the no-op path for already-structured columns. --- .../ml/services/search/AzureSearch.scala | 11 ++-- .../split2/SearchWriterSuitePart2.scala | 59 +++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala index 7a47d45ff8..26284e8f85 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala @@ -258,18 +258,19 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging * (e.g. `{"type":"Point","coordinates":[lon, lat]}`), not as a JSON-encoded string. * Users frequently have their GeoJSON readily available as a string column, and * passing it as a `StringType` previously caused a `400 Bad Request` - * (see issue #2420) because the writer JSON-escaped the entire string. + * (see [[https://github.com/microsoft/SynapseML/issues/2420]]) because the writer + * JSON-escaped the entire string. * * For each field declared as `Edm.GeographyPoint` in the index, if the corresponding * DataFrame column is a `StringType`, parse it into the canonical - * `struct>` so that downstream `to_json` - * emits a proper GeoJSON object. Columns that are already structured are left as-is. + * `StructType(type: StringType, coordinates: ArrayType(DoubleType))` so that downstream + * `to_json` emits a proper GeoJSON object. Columns that are already structured are left as-is. * * @param df DataFrame with potential GeographyPoint columns * @param indexJson JSON string containing the index schema * @return DataFrame with string GeographyPoint columns converted to GeoJSON structs */ - private def convertGeographyPointToStruct(df: DataFrame, indexJson: String): DataFrame = { + private[ml] def convertGeographyPointToStruct(df: DataFrame, indexJson: String): DataFrame = { val geoStructType = StructType(Seq( StructField("type", StringType), StructField("coordinates", ArrayType(DoubleType)) @@ -285,7 +286,7 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging when(col(fieldName).isNotNull, from_json(col(fieldName), geoStructType)) ) case _ => - // Already a struct (or otherwise compatible) — let checkSchemaParity validate it. + // Already a struct (or otherwise compatible); checkSchemaParity will validate. currentDF } } else { diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala index a8ec4d6120..ae71481dfe 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala @@ -204,4 +204,63 @@ class SearchWriterSuite extends SearchWriterSuiteUtilities { } + test("convertGeographyPointToStruct parses GeoJSON strings into structs") { + val df = spark.createDataFrame(Seq( + ("0", """{"type":"Point","coordinates":[-122.3493, 47.6205]}"""), + ("1", null) + )).toDF("id", "location") + + val indexJson = + """ + |{ + | "name": "unit-test-geo", + | "fields": [ + | { "name": "id", "type": "Edm.String", "key": true }, + | { "name": "location", "type": "Edm.GeographyPoint" } + | ] + |} + |""".stripMargin + + val converted = AzureSearchWriter.convertGeographyPointToStruct(df, indexJson) + val expected = StructType(Seq( + StructField("type", StringType), + StructField("coordinates", ArrayType(DoubleType)) + )) + assert(converted.schema("location").dataType == expected) + + val rows = converted.orderBy("id").collect() + val parsed = rows.head.getStruct(rows.head.fieldIndex("location")) + assert(parsed.getString(0) == "Point") + assert(parsed.getSeq[Double](1) == Seq(-122.3493, 47.6205)) + assert(rows(1).isNullAt(rows(1).fieldIndex("location"))) + } + + test("convertGeographyPointToStruct leaves struct columns untouched") { + val schema = StructType(Seq( + StructField("id", StringType), + StructField("location", StructType(Seq( + StructField("type", StringType, nullable = false), + StructField("coordinates", ArrayType(DoubleType, containsNull = false), nullable = false) + ))) + )) + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row("0", Row("Point", Seq(-122.3493, 47.6205))))), + schema + ) + + val indexJson = + """ + |{ + | "name": "unit-test-geo", + | "fields": [ + | { "name": "id", "type": "Edm.String", "key": true }, + | { "name": "location", "type": "Edm.GeographyPoint" } + | ] + |} + |""".stripMargin + + val converted = AzureSearchWriter.convertGeographyPointToStruct(df, indexJson) + assert(converted.schema("location").dataType == schema("location").dataType) + } + } From 95a7ae13e856deaa9baf40f5110dcf9ba7c50c36 Mon Sep 17 00:00:00 2001 From: chon3806 <93464148+chon3806@users.noreply.github.com> Date: Sat, 25 Apr 2026 15:27:44 -0400 Subject: [PATCH 3/3] fix: address Copilot review feedback on GeographyPoint conversion - Use Spark's FAILFAST parsing mode in from_json so malformed GeoJSON surfaces an explicit exception instead of being silently coerced to null and shipped to Azure Search. - Clarify Scaladoc to state the conversion is top-level-only (mirroring convertDateTimeToISO8601), and document the FAILFAST behavior. - Add a unit test asserting malformed GeoJSON fails fast at materialization. - Document why the end-to-end test's count assertion is sufficient: with fatalErrors=true (default) any service-side rejection throws, so a passing count proves the documents were accepted as valid spatial objects. --- .../ml/services/search/AzureSearch.scala | 14 +++++++--- .../split2/SearchWriterSuitePart2.scala | 27 +++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala index 26284e8f85..03cb9d0bdf 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/search/AzureSearch.scala @@ -261,10 +261,15 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging * (see [[https://github.com/microsoft/SynapseML/issues/2420]]) because the writer * JSON-escaped the entire string. * - * For each field declared as `Edm.GeographyPoint` in the index, if the corresponding - * DataFrame column is a `StringType`, parse it into the canonical + * For each '''top-level''' field declared as `Edm.GeographyPoint` in the index, if the + * corresponding DataFrame column is a `StringType`, parse it into the canonical * `StructType(type: StringType, coordinates: ArrayType(DoubleType))` so that downstream - * `to_json` emits a proper GeoJSON object. Columns that are already structured are left as-is. + * `to_json` emits a proper GeoJSON object. Columns that are already structured are + * left as-is. GeographyPoint fields nested inside complex types are not auto-converted + * (mirrors the existing top-level-only handling in `convertDateTimeToISO8601`). + * + * Parsing uses Spark's `FAILFAST` mode so malformed GeoJSON surfaces an explicit + * exception instead of being silently coerced to `null` and shipped to Azure Search. * * @param df DataFrame with potential GeographyPoint columns * @param indexJson JSON string containing the index schema @@ -275,6 +280,7 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging StructField("type", StringType), StructField("coordinates", ArrayType(DoubleType)) )) + val parseOptions = Map("mode" -> "FAILFAST") val geoFields = parseIndexJson(indexJson).fields .filter(_.`type` == "Edm.GeographyPoint") .map(_.name) @@ -283,7 +289,7 @@ object AzureSearchWriter extends IndexParser with IndexJsonGetter with SLogging currentDF.schema(fieldName).dataType match { case StringType => currentDF.withColumn(fieldName, - when(col(fieldName).isNotNull, from_json(col(fieldName), geoStructType)) + when(col(fieldName).isNotNull, from_json(col(fieldName), geoStructType, parseOptions)) ) case _ => // Already a struct (or otherwise compatible); checkSchemaParity will validate. diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala index ae71481dfe..55d161aefd 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/search/split2/SearchWriterSuitePart2.scala @@ -200,6 +200,10 @@ class SearchWriterSuite extends SearchWriterSuiteUtilities { ) ) + // With fatalErrors=true (default) any 400 from Azure Search becomes a thrown + // RuntimeException, so reaching this `assertSize` proves the documents were + // accepted as valid spatial objects -- a count of 2 is only achievable if the + // GeoJSON strings were correctly parsed and serialized as GeoJSON objects. retryWithBackoff(assertSize(in, 2)) } @@ -263,4 +267,27 @@ class SearchWriterSuite extends SearchWriterSuiteUtilities { assert(converted.schema("location").dataType == schema("location").dataType) } + test("convertGeographyPointToStruct fails fast on malformed GeoJSON instead of silently nulling") { + val df = spark.createDataFrame(Seq( + ("0", "{not valid json") + )).toDF("id", "location") + + val indexJson = + """ + |{ + | "name": "unit-test-geo", + | "fields": [ + | { "name": "id", "type": "Edm.String", "key": true }, + | { "name": "location", "type": "Edm.GeographyPoint" } + | ] + |} + |""".stripMargin + + val converted = AzureSearchWriter.convertGeographyPointToStruct(df, indexJson) + // FAILFAST surfaces parse errors when the row is materialized, not at plan time. + intercept[org.apache.spark.SparkException] { + converted.collect() + } + } + }