From 7953a64f4da9b54a65884d677ee1c20d1985bfe4 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:32 -0700 Subject: [PATCH 01/10] Add PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 56 +++++++++++++++++++ .../frontend/PosixPluginFrontendSpec.scala | 9 +++ .../frontend/WindowsPluginFrontendSpec.scala | 33 +---------- 3 files changed, 67 insertions(+), 31 deletions(-) create mode 100644 bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala create mode 100644 bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala new file mode 100644 index 0000000..5b158ac --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -0,0 +1,56 @@ +package protocbridge.frontend + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.io.ByteArrayOutputStream +import scala.sys.process.ProcessIO +import scala.util.Random + +class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { + + protected def testPluginFrontend(frontend: PluginFrontend): Array[Byte] = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + val (path, state) = frontend.prepare( + fakeGenerator, + env + ) + val actualOutput = new ByteArrayOutputStream() + val process = sys.process + .Process(path.toAbsolutePath.toString) + .run( + new ProcessIO( + writeInput => { + writeInput.write(toSend) + writeInput.close() + }, + processOutput => { + val buffer = new Array[Byte](4096) + var bytesRead = 0 + while (bytesRead != -1) { + bytesRead = processOutput.read(buffer) + if (bytesRead != -1) { + actualOutput.write(buffer, 0, bytesRead) + } + } + processOutput.close() + }, + _.close() + ) + ) + process.exitValue() + frontend.cleanup(state) + actualOutput.toByteArray + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala new file mode 100644 index 0000000..0d6d76a --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -0,0 +1,9 @@ +package protocbridge.frontend + +class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { + if (!PluginFrontend.isWindows) { + it must "execute a program that forwards input and output to given stream" in { + testPluginFrontend(PosixPluginFrontend) + } + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 6385ad7..7936141 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -1,38 +1,9 @@ package protocbridge.frontend -import java.io.ByteArrayInputStream - -import protocbridge.{ProtocCodeGenerator, ExtraEnv} - -import scala.sys.process.ProcessLogger -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.must.Matchers - -class WindowsPluginFrontendSpec extends AnyFlatSpec with Matchers { +class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - val toSend = "ping" - val toReceive = "pong" - val env = new ExtraEnv(secondaryOutputDir = "tmp") - - val fakeGenerator = new ProtocCodeGenerator { - override def run(request: Array[Byte]): Array[Byte] = { - request mustBe (toSend.getBytes ++ env.toByteArrayAsField) - toReceive.getBytes - } - } - val (path, state) = WindowsPluginFrontend.prepare( - fakeGenerator, - env - ) - val actualOutput = scala.collection.mutable.Buffer.empty[String] - val process = sys.process - .Process(path.toAbsolutePath.toString) - .#<(new ByteArrayInputStream(toSend.getBytes)) - .run(ProcessLogger(o => actualOutput.append(o))) - process.exitValue() - actualOutput.mkString mustBe toReceive - WindowsPluginFrontend.cleanup(state) + testPluginFrontend(WindowsPluginFrontend) } } } From c576aedae92e4231588d883dba3d49170696c6fa Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 16 Aug 2024 16:54:29 -0700 Subject: [PATCH 02/10] Handle all failures in Future --- .../frontend/PluginFrontend.scala | 20 +++---- .../frontend/PosixPluginFrontend.scala | 5 ++ .../frontend/OsSpecificFrontendSpec.scala | 52 ++++++++++++++----- .../frontend/PosixPluginFrontendSpec.scala | 6 ++- .../frontend/WindowsPluginFrontendSpec.scala | 6 ++- 5 files changed, 63 insertions(+), 26 deletions(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index 7415f06..ec7d6ca 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path} import protocbridge.{ProtocCodeGenerator, ExtraEnv} -import scala.util.Try - /** A PluginFrontend instance provides a platform-dependent way for protoc to * communicate with a JVM based ProtocCodeGenerator. * @@ -47,13 +45,7 @@ object PluginFrontend { gen: ProtocCodeGenerator, request: Array[Byte] ): Array[Byte] = { - Try { - gen.run(request) - }.recover { case throwable => - createCodeGeneratorResponseWithError( - throwable.toString + "\n" + getStackTrace(throwable) - ) - }.get + gen.run(request) } def createCodeGeneratorResponseWithError(error: String): Array[Byte] = { @@ -116,9 +108,17 @@ object PluginFrontend { gen: ProtocCodeGenerator, fsin: InputStream, env: ExtraEnv - ): Array[Byte] = { + ): Array[Byte] = try { val bytes = readInputStreamToByteArrayWithEnv(fsin, env) runWithBytes(gen, bytes) + } catch { + // This covers all Throwable including OutOfMemoryError, StackOverflowError, etc. + // We need to make a best effort to return a response to protoc, + // otherwise protoc can hang indefinitely. + case throwable: Throwable => + createCodeGeneratorResponseWithError( + throwable.toString + "\n" + getStackTrace(throwable) + ) } def createTempFile(extension: String, content: String): Path = { diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index 5f70120..ef57687 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -40,6 +40,11 @@ object PosixPluginFrontend extends PluginFrontend { val response = PluginFrontend.runWithInputStream(plugin, fsin, env) fsin.close() + // Note that the output pipe must be opened after the input pipe is consumed. + // Otherwise, there might be a deadlock that + // - The shell script is stuck writing to the input pipe (which has a full buffer), + // and doesn't open the write end of the output pipe. + // - This thread is stuck waiting for the write end of the output pipe to be opened. val fsout = Files.newOutputStream(outputPipe) fsout.write(response) fsout.close() diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 5b158ac..1519857 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -10,20 +10,14 @@ import scala.util.Random class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { - protected def testPluginFrontend(frontend: PluginFrontend): Array[Byte] = { - val random = new Random() - val toSend = Array.fill(123)(random.nextInt(256).toByte) - val toReceive = Array.fill(456)(random.nextInt(256).toByte) - val env = new ExtraEnv(secondaryOutputDir = "tmp") - - val fakeGenerator = new ProtocCodeGenerator { - override def run(request: Array[Byte]): Array[Byte] = { - request mustBe (toSend ++ env.toByteArrayAsField) - toReceive - } - } + protected def testPluginFrontend( + frontend: PluginFrontend, + generator: ProtocCodeGenerator, + env: ExtraEnv, + request: Array[Byte] + ): Array[Byte] = { val (path, state) = frontend.prepare( - fakeGenerator, + generator, env ) val actualOutput = new ByteArrayOutputStream() @@ -32,7 +26,7 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { .run( new ProcessIO( writeInput => { - writeInput.write(toSend) + writeInput.write(request) writeInput.close() }, processOutput => { @@ -53,4 +47,34 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { frontend.cleanup(state) actualOutput.toByteArray } + + protected def testSuccess(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + response mustBe toReceive + } + + protected def testFailure(frontend: PluginFrontend): Unit = { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + throw new OutOfMemoryError("test error") + } + } + val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + response.length must be > 0 + } } diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index 0d6d76a..2a3481c 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -3,7 +3,11 @@ package protocbridge.frontend class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { if (!PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - testPluginFrontend(PosixPluginFrontend) + testSuccess(PosixPluginFrontend) + } + + it must "not hang if there is an OOM in generator" in { + testFailure(PosixPluginFrontend) } } } diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 7936141..4bf39de 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -3,7 +3,11 @@ package protocbridge.frontend class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - testPluginFrontend(WindowsPluginFrontend) + testSuccess(WindowsPluginFrontend) + } + + it must "not hang if there is an OOM in generator" in { + testFailure(WindowsPluginFrontend) } } } From 0ba6fcfe9eb75875e4c55497468916745667e268 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 16 Aug 2024 16:55:52 -0700 Subject: [PATCH 03/10] Switch PluginFrontend to sockets on macOS --- .../frontend/MacPluginFrontend.scala | 36 +++++++++++++ .../frontend/PluginFrontend.scala | 5 ++ .../frontend/PosixPluginFrontend.scala | 7 ++- .../frontend/SocketBasedPluginFrontend.scala | 51 +++++++++++++++++++ .../frontend/WindowsPluginFrontend.scala | 46 ++--------------- .../frontend/MacPluginFrontendSpec.scala | 15 ++++++ .../frontend/OsSpecificFrontendSpec.scala | 20 +++++--- .../frontend/PosixPluginFrontendSpec.scala | 2 +- .../frontend/WindowsPluginFrontendSpec.scala | 6 ++- 9 files changed, 135 insertions(+), 53 deletions(-) create mode 100644 bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala create mode 100644 bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala create mode 100644 bridge/src/test/scala/protocbridge/frontend/MacPluginFrontendSpec.scala diff --git a/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala new file mode 100644 index 0000000..6f33cf8 --- /dev/null +++ b/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala @@ -0,0 +1,36 @@ +package protocbridge.frontend + +import java.nio.file.attribute.PosixFilePermission +import java.nio.file.{Files, Path} +import java.{util => ju} + +/** PluginFrontend for macOS. + * + * Creates a server socket and uses `nc` to communicate with the socket. We use + * a server socket instead of named pipes because named pipes are unreliable on + * macOS: https://github.com/scalapb/protoc-bridge/issues/366. Since `nc` is + * widely available on macOS, this is the simplest and most reliable solution + * for macOS. + */ +object MacPluginFrontend extends SocketBasedPluginFrontend { + + protected def createShellScript(port: Int): Path = { + val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh") + // We use 127.0.0.1 instead of localhost for the (very unlikely) case that localhost is missing from /etc/hosts. + val scriptName = PluginFrontend.createTempFile( + "", + s"""|#!$shell + |set -e + |nc 127.0.0.1 $port + """.stripMargin + ) + val perms = new ju.HashSet[PosixFilePermission] + perms.add(PosixFilePermission.OWNER_EXECUTE) + perms.add(PosixFilePermission.OWNER_READ) + Files.setPosixFilePermissions( + scriptName, + perms + ) + scriptName + } +} diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index ec7d6ca..3b83cfa 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -131,8 +131,13 @@ object PluginFrontend { def isWindows: Boolean = sys.props("os.name").startsWith("Windows") + def isMac: Boolean = sys.props("os.name").startsWith("Mac") || sys + .props("os.name") + .startsWith("Darwin") + def newInstance: PluginFrontend = { if (isWindows) WindowsPluginFrontend + else if (isMac) MacPluginFrontend else PosixPluginFrontend } } diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index ef57687..65935e0 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -12,10 +12,13 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.sys.process._ import java.{util => ju} -/** PluginFrontend for Unix-like systems (Linux, Mac, etc) +/** PluginFrontend for Unix-like systems except macOS (Linux, FreeBSD, + * etc) * * Creates a pair of named pipes for input/output and a shell script that - * communicates with them. + * communicates with them. Compared with `SocketBasedPluginFrontend`, this + * frontend doesn't rely on `nc` that might not be available in some + * distributions. */ object PosixPluginFrontend extends PluginFrontend { case class InternalState( diff --git a/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala new file mode 100644 index 0000000..6d1dd59 --- /dev/null +++ b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala @@ -0,0 +1,51 @@ +package protocbridge.frontend + +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.net.ServerSocket +import java.nio.file.{Files, Path} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Future, blocking} + +/** PluginFrontend for Windows and macOS where a server socket is used. + */ +abstract class SocketBasedPluginFrontend extends PluginFrontend { + case class InternalState(serverSocket: ServerSocket, shellScript: Path) + + override def prepare( + plugin: ProtocCodeGenerator, + env: ExtraEnv + ): (Path, InternalState) = { + val ss = new ServerSocket(0) // Bind to any available port. + val sh = createShellScript(ss.getLocalPort) + + Future { + blocking { + // Accept a single client connection from the shell script. + val client = ss.accept() + try { + val response = + PluginFrontend.runWithInputStream( + plugin, + client.getInputStream, + env + ) + client.getOutputStream.write(response) + } finally { + client.close() + } + } + } + + (sh, InternalState(ss, sh)) + } + + override def cleanup(state: InternalState): Unit = { + state.serverSocket.close() + if (sys.props.get("protocbridge.debug") != Some("1")) { + Files.delete(state.shellScript) + } + } + + protected def createShellScript(port: Int): Path +} diff --git a/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala index 490211d..adf9486 100644 --- a/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala @@ -1,53 +1,15 @@ package protocbridge.frontend -import java.net.ServerSocket -import java.nio.file.{Files, Path, Paths} - -import protocbridge.ExtraEnv -import protocbridge.ProtocCodeGenerator - -import scala.concurrent.blocking - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import java.nio.file.{Path, Paths} /** A PluginFrontend that binds a server socket to a local interface. The plugin * is a batch script that invokes BridgeApp.main() method, in a new JVM with * the same parameters as the currently running JVM. The plugin will * communicate its stdin and stdout to this socket. */ -object WindowsPluginFrontend extends PluginFrontend { - - case class InternalState(batFile: Path) - - override def prepare( - plugin: ProtocCodeGenerator, - env: ExtraEnv - ): (Path, InternalState) = { - val ss = new ServerSocket(0) - val state = createWindowsScript(ss.getLocalPort) - - Future { - blocking { - val client = ss.accept() - val response = - PluginFrontend.runWithInputStream(plugin, client.getInputStream, env) - client.getOutputStream.write(response) - client.close() - ss.close() - } - } - - (state.batFile, state) - } - - override def cleanup(state: InternalState): Unit = { - if (sys.props.get("protocbridge.debug") != Some("1")) { - Files.delete(state.batFile) - } - } +object WindowsPluginFrontend extends SocketBasedPluginFrontend { - private def createWindowsScript(port: Int): InternalState = { + protected def createShellScript(port: Int): Path = { val classPath = Paths.get(getClass.getProtectionDomain.getCodeSource.getLocation.toURI) val classPathBatchString = classPath.toString.replace("%", "%%") @@ -62,6 +24,6 @@ object WindowsPluginFrontend extends PluginFrontend { ].getName} $port """.stripMargin ) - InternalState(batchFile) + batchFile } } diff --git a/bridge/src/test/scala/protocbridge/frontend/MacPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/MacPluginFrontendSpec.scala new file mode 100644 index 0000000..6e8b972 --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/MacPluginFrontendSpec.scala @@ -0,0 +1,15 @@ +package protocbridge.frontend + +class MacPluginFrontendSpec extends OsSpecificFrontendSpec { + if (PluginFrontend.isMac) { + it must "execute a program that forwards input and output to given stream" in { + val state = testSuccess(MacPluginFrontend) + state.serverSocket.isClosed mustBe true + } + + it must "not hang if there is an error in generator" in { + val state = testFailure(MacPluginFrontend) + state.serverSocket.isClosed mustBe true + } + } +} diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 1519857..619d1a8 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -15,7 +15,7 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { generator: ProtocCodeGenerator, env: ExtraEnv, request: Array[Byte] - ): Array[Byte] = { + ): (frontend.InternalState, Array[Byte]) = { val (path, state) = frontend.prepare( generator, env @@ -45,10 +45,12 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { ) process.exitValue() frontend.cleanup(state) - actualOutput.toByteArray + (state, actualOutput.toByteArray) } - protected def testSuccess(frontend: PluginFrontend): Unit = { + protected def testSuccess( + frontend: PluginFrontend + ): frontend.InternalState = { val random = new Random() val toSend = Array.fill(123)(random.nextInt(256).toByte) val toReceive = Array.fill(456)(random.nextInt(256).toByte) @@ -60,11 +62,15 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { toReceive } } - val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) response mustBe toReceive + state } - protected def testFailure(frontend: PluginFrontend): Unit = { + protected def testFailure( + frontend: PluginFrontend + ): frontend.InternalState = { val random = new Random() val toSend = Array.fill(123)(random.nextInt(256).toByte) val env = new ExtraEnv(secondaryOutputDir = "tmp") @@ -74,7 +80,9 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { throw new OutOfMemoryError("test error") } } - val response = testPluginFrontend(frontend, fakeGenerator, env, toSend) + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) response.length must be > 0 + state } } diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index 2a3481c..4a6dd99 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -1,7 +1,7 @@ package protocbridge.frontend class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { - if (!PluginFrontend.isWindows) { + if (!PluginFrontend.isWindows && !PluginFrontend.isMac) { it must "execute a program that forwards input and output to given stream" in { testSuccess(PosixPluginFrontend) } diff --git a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala index 4bf39de..db0bc65 100644 --- a/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/WindowsPluginFrontendSpec.scala @@ -3,11 +3,13 @@ package protocbridge.frontend class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec { if (PluginFrontend.isWindows) { it must "execute a program that forwards input and output to given stream" in { - testSuccess(WindowsPluginFrontend) + val state = testSuccess(WindowsPluginFrontend) + state.serverSocket.isClosed mustBe true } it must "not hang if there is an OOM in generator" in { - testFailure(WindowsPluginFrontend) + val state = testFailure(WindowsPluginFrontend) + state.serverSocket.isClosed mustBe true } } } From f202314d9e881e17c09d987be4c0e23e405240e3 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Mon, 16 Sep 2024 13:10:39 -0700 Subject: [PATCH 04/10] Switch PluginFrontend to domain sockets on macOS --- .../frontend/MacPluginFrontend.scala | 48 ++++++++++++++++--- .../frontend/SocketBasedPluginFrontend.scala | 33 +++++-------- .../frontend/WindowsPluginFrontend.scala | 27 ++++++++++- build.sbt | 3 +- 4 files changed, 82 insertions(+), 29 deletions(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala index 6f33cf8..fd2a686 100644 --- a/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala @@ -1,5 +1,9 @@ package protocbridge.frontend +import org.newsclub.net.unix.AFUNIXServerSocket +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.net.ServerSocket import java.nio.file.attribute.PosixFilePermission import java.nio.file.{Files, Path} import java.{util => ju} @@ -8,20 +12,52 @@ import java.{util => ju} * * Creates a server socket and uses `nc` to communicate with the socket. We use * a server socket instead of named pipes because named pipes are unreliable on - * macOS: https://github.com/scalapb/protoc-bridge/issues/366. Since `nc` is - * widely available on macOS, this is the simplest and most reliable solution - * for macOS. + * macOS: https://github.com/scalapb/protoc-bridge/issues/366 + * + * Since `nc` is widely available on macOS, this is the simplest alternative + * for macOS. However, raw `nc` is also not very reliable on macOS: + * https://github.com/scalapb/protoc-bridge/issues/379 + * + * The most reliable way to communicate is found to be with a domain socket and + * a server-side read timeout, which are implemented here. */ object MacPluginFrontend extends SocketBasedPluginFrontend { + case class InternalState( + shellScript: Path, + tempDirPath: Path, + socketPath: Path, + serverSocket: ServerSocket + ) + + override def prepare( + plugin: ProtocCodeGenerator, + env: ExtraEnv + ): (Path, InternalState) = { + val tempDirPath = Files.createTempDirectory("protocbridge") + val socketPath = tempDirPath.resolve("socket") + val serverSocket = AFUNIXServerSocket.bindOn(socketPath, true) + val sh = createShellScript(socketPath) + + runWithSocket(plugin, env, serverSocket) + + (sh, InternalState(sh, tempDirPath, socketPath, serverSocket)) + } + + override def cleanup(state: InternalState): Unit = { + state.serverSocket.close() + if (sys.props.get("protocbridge.debug") != Some("1")) { + Files.delete(state.tempDirPath) + Files.delete(state.shellScript) + } + } - protected def createShellScript(port: Int): Path = { + private def createShellScript(socketPath: Path): Path = { val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh") - // We use 127.0.0.1 instead of localhost for the (very unlikely) case that localhost is missing from /etc/hosts. val scriptName = PluginFrontend.createTempFile( "", s"""|#!$shell |set -e - |nc 127.0.0.1 $port + |nc -U "$socketPath" """.stripMargin ) val perms = new ju.HashSet[PosixFilePermission] diff --git a/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala index 6d1dd59..4ff5a42 100644 --- a/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala @@ -3,26 +3,30 @@ package protocbridge.frontend import protocbridge.{ExtraEnv, ProtocCodeGenerator} import java.net.ServerSocket -import java.nio.file.{Files, Path} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Future, blocking} /** PluginFrontend for Windows and macOS where a server socket is used. */ abstract class SocketBasedPluginFrontend extends PluginFrontend { - case class InternalState(serverSocket: ServerSocket, shellScript: Path) - override def prepare( + protected def runWithSocket( plugin: ProtocCodeGenerator, - env: ExtraEnv - ): (Path, InternalState) = { - val ss = new ServerSocket(0) // Bind to any available port. - val sh = createShellScript(ss.getLocalPort) - + env: ExtraEnv, + serverSocket: ServerSocket + ): Unit = { Future { blocking { // Accept a single client connection from the shell script. - val client = ss.accept() + val client = serverSocket.accept() + // It's found on macOS that a `junixsocket` domain socket server + // might not receive the EOF sent by the other end, leading to a hang: + // https://github.com/scalapb/protoc-bridge/issues/379 + // However, confusingly, adding an arbitrary read timeout resolves the issue. + // We thus add a read timeout of 1 minute here, which should be more than enough. + // It also helps to prevent an infinite hang on both Windows and macOS due to + // unexpected issues. + client.setSoTimeout(60000) try { val response = PluginFrontend.runWithInputStream( @@ -36,16 +40,5 @@ abstract class SocketBasedPluginFrontend extends PluginFrontend { } } } - - (sh, InternalState(ss, sh)) - } - - override def cleanup(state: InternalState): Unit = { - state.serverSocket.close() - if (sys.props.get("protocbridge.debug") != Some("1")) { - Files.delete(state.shellScript) - } } - - protected def createShellScript(port: Int): Path } diff --git a/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala index adf9486..0f82cca 100644 --- a/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/WindowsPluginFrontend.scala @@ -1,6 +1,9 @@ package protocbridge.frontend -import java.nio.file.{Path, Paths} +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.net.ServerSocket +import java.nio.file.{Files, Path, Paths} /** A PluginFrontend that binds a server socket to a local interface. The plugin * is a batch script that invokes BridgeApp.main() method, in a new JVM with @@ -8,8 +11,28 @@ import java.nio.file.{Path, Paths} * communicate its stdin and stdout to this socket. */ object WindowsPluginFrontend extends SocketBasedPluginFrontend { + case class InternalState(shellScript: Path, serverSocket: ServerSocket) + + override def prepare( + plugin: ProtocCodeGenerator, + env: ExtraEnv + ): (Path, InternalState) = { + val ss = new ServerSocket(0) // Bind to any available port. + val sh = createShellScript(ss.getLocalPort) + + runWithSocket(plugin, env, ss) + + (sh, InternalState(sh, ss)) + } + + override def cleanup(state: InternalState): Unit = { + state.serverSocket.close() + if (sys.props.get("protocbridge.debug") != Some("1")) { + Files.delete(state.shellScript) + } + } - protected def createShellScript(port: Int): Path = { + private def createShellScript(port: Int): Path = { val classPath = Paths.get(getClass.getProtectionDomain.getCodeSource.getLocation.toURI) val classPathBatchString = classPath.toString.replace("%", "%%") diff --git a/build.sbt b/build.sbt index 568684d..6498eb2 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,8 @@ lazy val bridge: Project = project "org.scalatest" %% "scalatest" % "3.2.17" % "test", "org.scalacheck" %% "scalacheck" % "1.17.0" % "test", "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test", - "io.get-coursier" %% "coursier" % coursierVersion % "test" + "io.get-coursier" %% "coursier" % coursierVersion % "test", + "com.kohlschutter.junixsocket" % "junixsocket-core" % "2.10.0" ), scalacOptions ++= (if (scalaVersion.value.startsWith("2.13.")) Seq("-Wconf:origin=.*JavaConverters.*:s") From 3161283ae5d0d314699a63929727ce44738a15a1 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Mon, 23 Sep 2024 16:51:13 -0700 Subject: [PATCH 05/10] Comment out setSoTimeout --- .../scala/protocbridge/frontend/SocketBasedPluginFrontend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala index 4ff5a42..9f65cb4 100644 --- a/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/SocketBasedPluginFrontend.scala @@ -26,7 +26,7 @@ abstract class SocketBasedPluginFrontend extends PluginFrontend { // We thus add a read timeout of 1 minute here, which should be more than enough. // It also helps to prevent an infinite hang on both Windows and macOS due to // unexpected issues. - client.setSoTimeout(60000) + // client.setSoTimeout(60000) try { val response = PluginFrontend.runWithInputStream( From 36fa0c660883789cd7992154d29d20bfb6f2a2d3 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:38 -0700 Subject: [PATCH 06/10] Use IOUtils in PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 619d1a8..906825f 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -1,5 +1,6 @@ package protocbridge.frontend +import org.apache.commons.io.IOUtils import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.must.Matchers import protocbridge.{ExtraEnv, ProtocCodeGenerator} @@ -30,17 +31,13 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { writeInput.close() }, processOutput => { - val buffer = new Array[Byte](4096) - var bytesRead = 0 - while (bytesRead != -1) { - bytesRead = processOutput.read(buffer) - if (bytesRead != -1) { - actualOutput.write(buffer, 0, bytesRead) - } - } + IOUtils.copy(processOutput, actualOutput) processOutput.close() }, - _.close() + processError => { + IOUtils.copy(processError, System.err) + processError.close() + } ) ) process.exitValue() From cec4df13612c7da0428f6afed12b5ecbfd578071 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:02:56 -0700 Subject: [PATCH 07/10] Stress test PosixPluginFrontendSpec --- .../frontend/OsSpecificFrontendSpec.scala | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 906825f..5f88fc9 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -1,11 +1,15 @@ package protocbridge.frontend import org.apache.commons.io.IOUtils +import org.scalatest.exceptions.TestFailedException import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.must.Matchers import protocbridge.{ExtraEnv, ProtocCodeGenerator} import java.io.ByteArrayOutputStream +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future, TimeoutException} import scala.sys.process.ProcessIO import scala.util.Random @@ -40,7 +44,13 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { } ) ) - process.exitValue() + try { + Await.result(Future { process.exitValue() }, 5.seconds) + } catch { + case _: TimeoutException => + System.err.println(s"Timeout") + process.destroy() + } frontend.cleanup(state) (state, actualOutput.toByteArray) } @@ -59,9 +69,31 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { toReceive } } + // Repeat 100,000 times since named pipes on macOS are flaky. + val repeatCount = 100000 + for (i <- 1 until repeatCount) { + if (i % 100 == 1) println(s"Running iteration $i of $repeatCount") + val (state, response) = + testPluginFrontend(frontend, fakeGenerator, env, toSend) + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""" + ) + } + } val (state, response) = testPluginFrontend(frontend, fakeGenerator, env, toSend) - response mustBe toReceive + try { + response mustBe toReceive + } catch { + case e: TestFailedException => + System.err.println( + s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""" + ) + } state } From 19c3dbbdce47d51cfea8262ef9626720b466d75b Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 13 Sep 2024 10:09:34 -0700 Subject: [PATCH 08/10] Switch to MacPluginFrontend in tests --- .../scala/protocbridge/frontend/PosixPluginFrontendSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index 4a6dd99..1c615d2 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -3,11 +3,11 @@ package protocbridge.frontend class PosixPluginFrontendSpec extends OsSpecificFrontendSpec { if (!PluginFrontend.isWindows && !PluginFrontend.isMac) { it must "execute a program that forwards input and output to given stream" in { - testSuccess(PosixPluginFrontend) + testSuccess(MacPluginFrontend) } it must "not hang if there is an OOM in generator" in { - testFailure(PosixPluginFrontend) + testFailure(MacPluginFrontend) } } } From cde1d0bd8bc74bfcc20d079b0f46d4222b142710 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Thu, 19 Sep 2024 15:15:28 -0700 Subject: [PATCH 09/10] Change to 100KB messages --- .../frontend/OsSpecificFrontendSpec.scala | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala index 5f88fc9..de79098 100644 --- a/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/OsSpecificFrontendSpec.scala @@ -59,8 +59,8 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { frontend: PluginFrontend ): frontend.InternalState = { val random = new Random() - val toSend = Array.fill(123)(random.nextInt(256).toByte) - val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val toSend = Array.fill(100000)(random.nextInt(256).toByte) + val toReceive = Array.fill(100000)(random.nextInt(256).toByte) val env = new ExtraEnv(secondaryOutputDir = "tmp") val fakeGenerator = new ProtocCodeGenerator { @@ -75,24 +75,18 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers { if (i % 100 == 1) println(s"Running iteration $i of $repeatCount") val (state, response) = testPluginFrontend(frontend, fakeGenerator, env, toSend) - try { - response mustBe toReceive - } catch { - case e: TestFailedException => - System.err.println( - s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""" - ) + if (!(response sameElements toReceive)) { + System.err.println( + s"Failed on iteration $i of $repeatCount ($state): ${response.length} != ${toReceive.length}" + ) } } val (state, response) = testPluginFrontend(frontend, fakeGenerator, env, toSend) - try { - response mustBe toReceive - } catch { - case e: TestFailedException => - System.err.println( - s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""" - ) + if (!(response sameElements toReceive)) { + System.err.println( + s"Failed on iteration $repeatCount of $repeatCount ($state): ${response.length} != ${toReceive.length}" + ) } state } From 8bbb0a576244a178797b5aae49c6d436084c4b88 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Tue, 17 Sep 2024 15:04:54 -0700 Subject: [PATCH 10/10] Add domain_socket_stress_test.sh --- domain_socket_stress_test.sh | 88 ++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100755 domain_socket_stress_test.sh diff --git a/domain_socket_stress_test.sh b/domain_socket_stress_test.sh new file mode 100755 index 0000000..0747233 --- /dev/null +++ b/domain_socket_stress_test.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash + +BYTE_LENGTH="$1" +SERVER_MODE="${2:-nc-save}" +CLIENT_MODE="${3:-nc}" + +TEST_FILE_PATH="/tmp/domain_socket_test_file" +SOCKET_PATH="/tmp/domain_socket_test.sck" +SERVER_RESULT_PATH="$TEST_FILE_PATH.server" +CLIENT_RESULT_PATH="$TEST_FILE_PATH.client" +dd if=/dev/urandom of="$TEST_FILE_PATH" bs=1 count="$BYTE_LENGTH" 2>/dev/null + +if [[ "$SERVER_MODE" == *"-save" ]]; then + TEST_RESULT_PATH="$SERVER_RESULT_PATH" +else + TEST_RESULT_PATH="$CLIENT_RESULT_PATH" +fi + +test_socket() { + # Start a process to consume the data from the socket + if [[ "$SERVER_MODE" == "nc-save" ]]; then + (nc -l -U "$SOCKET_PATH" > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") & + elif [[ "$SERVER_MODE" == "ncat-save" ]]; then + (ncat -l -U "$SOCKET_PATH" > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") & + elif [[ "$SERVER_MODE" == "socat-save" ]]; then + (socat UNIX-LISTEN:"$SOCKET_PATH" - > "$SERVER_RESULT_PATH" && echo "Completed saving random bytes from the socket") & + elif [[ "$SERVER_MODE" == "socat-echo" ]]; then + (socat UNIX-LISTEN:"$SOCKET_PATH" EXEC:"/bin/cat" && echo "Completed echoing random bytes from the socket") & + else + echo "Invalid server mode: $SERVER_MODE" + exit 1 + fi + SERVER_PID=$! + echo "Starting the server (PID: $SERVER_PID)" + + # Wait for the socket file to be created so that the server has started + while [ ! -e "$SOCKET_PATH" ]; do + sleep 0.001 + done + echo "The server has started and is listening to the socket (PID: $SERVER_PID)" + + # `nc` can fail even if we wait for another second to ensure the server has started + # sleep 1 + + # Start dumping random bytes to the socket in the background + if [[ "$CLIENT_MODE" == "nc" ]]; then + (nc -U "$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") & + elif [[ "$CLIENT_MODE" == "ncat" ]]; then + (ncat -U "$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") & + elif [[ "$CLIENT_MODE" == "socat" ]]; then + (socat - UNIX-CONNECT:"$SOCKET_PATH" < "$TEST_FILE_PATH" > "$CLIENT_RESULT_PATH" && echo "Completed dumping random bytes to the socket") & + else + echo "Invalid client mode: $CLIENT_MODE" + exit 1 + fi + CLIENT_PID=$! + echo "Started dumping random bytes to the socket (PID: $CLIENT_PID)" + + # Ensure the client process is killed + wait $CLIENT_PID 2>/dev/null + echo "The client process has stopped (PID: $CLIENT_PID)" + + # Ensure the server process is killed + wait $SERVER_PID 2>/dev/null + echo "The server process has stopped (PID: $SERVER_PID)" + + # Check the size of the data read from the socket + DATA_SIZE=$(wc -c < "$TEST_RESULT_PATH") + if [ "$DATA_SIZE" -ne "$BYTE_LENGTH" ]; then + echo "Error: Expected $BYTE_LENGTH bytes, but read $DATA_SIZE bytes" + exit 1 + else + echo "Successfully read $BYTE_LENGTH bytes from the socket" + fi + + rm -f "$SOCKET_PATH" + rm -f "$SERVER_RESULT_PATH" + rm -f "$CLIENT_RESULT_PATH" +} + +rm -f "$SOCKET_PATH" + +# Repeat the process +counter=0; +while test_socket; do + ((counter++)); echo "Iterations completed: $counter"; +done +echo "Command failed after $counter successful iterations."