Brandon Maier 4b19a190a2 support/testing/infra/emulator.py: fix qemu prompt detection
The qemu.run() method can break when a command happens to output the
string "# " to stdout. This is because qemu.run() detects when a command
has completed by searching for the shell prompt, which by default is
"# ". It then captures everything before the "# " as the commands
output, causing the rest of output to be lost.

Instead use the pexpect libraries REPLWrapper to handle running
commands. It has hooks to set a custom prompt and avoid some other
pitfalls of wrapping a shell.

We unfortunately can't reuse replwrap._repl_sh directly, because it
tries to spawn a command while we already have a qemu started. So we
need to copy that code into _repl_sh_child. While we're at it, also
define our own prompt strings.

Signed-off-by: Brandon Maier <brandon.maier@collins.com>
 - Make all arguments to _repl_sh_spawn non-optional.
 - Move non_printable_insert to a local variable instead of an argument,
   we don't need to override it.
 - Copy the comment from _repl_sh that explains why non_printable_insert
   is needed.
 - Add a comment about timeouts.
 - Rename spawn to child (we don't actually spawn anything so this felt
   more natural, even though the class
 - Use single quotes instead of triple quotes, and explicitly escape the
   nested quotes.
Signed-off-by: Arnout Vandecappelle <arnout@mind.be>

(cherry picked from commit 0cad947b964be5612a182413da136fcf0dc5a1f2)
Signed-off-by: Peter Korsgaard <peter@korsgaard.com>
2024-10-11 17:03:20 +02:00

172 lines
6.9 KiB

import os
import pexpect
import pexpect.replwrap
import infra
def _repl_sh_child(child, orig_prompt, extra_init_cmd):
"""Wrap the shell prompt to handle command output
Based on pexpect.replwrap._repl_sh() (ISC licensed)
# If the user runs 'env', the value of PS1 will be in the output. To avoid
# replwrap seeing that as the next prompt, we'll embed the marker characters
# for invisible characters in the prompt; these show up when inspecting the
# environment variable, but not when bash displays the prompt.
non_printable_insert = '\\[\\]'
ps1 = BR_PROMPT[:5] + non_printable_insert + BR_PROMPT[5:]
ps2 = (BR_CONTINUATION_PROMPT[:5] + non_printable_insert +
prompt_change = "PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
# Note: this will run various commands, each with the default timeout defined
# when qemu was spawned.
return pexpect.replwrap.REPLWrapper(
class Emulator(object):
def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
self.qemu = None
self.repl = None
self.downloaddir = downloaddir
self.logfile = infra.open_log_file(builddir, "run", logtofile)
# We use elastic runners on the cloud to runs our tests. Those runners
# can take a long time to run the emulator. Use a timeout multiplier
# when running the tests to avoid sporadic failures.
self.timeout_multiplier = timeout_multiplier
# Start Qemu to boot the system
# arch: Qemu architecture to use
# kernel: path to the kernel image, or the special string
# 'builtin'. 'builtin' means a pre-built kernel image will be
# downloaded from ARTIFACTS_URL and suitable options are
# automatically passed to qemu and added to the kernel cmdline. So
# far only armv5, armv7 and i386 builtin kernels are available.
# If None, then no kernel is used, and we assume a bootable device
# will be specified.
# kernel_cmdline: array of kernel arguments to pass to Qemu -append option
# options: array of command line options to pass to Qemu
def boot(self, arch, kernel=None, kernel_cmdline=None, options=None):
if arch in ["armv7", "armv5"]:
qemu_arch = "arm"
qemu_arch = arch
qemu_cmd = ["qemu-system-{}".format(qemu_arch),
"-serial", "stdio",
"-display", "none",
"-m", "256"]
if options:
qemu_cmd += options
if kernel_cmdline is None:
kernel_cmdline = []
if kernel:
if kernel == "builtin":
if arch in ["armv7", "armv5"]:
if arch == "armv7":
kernel = infra.download(self.downloaddir,
dtb = infra.download(self.downloaddir,
qemu_cmd += ["-dtb", dtb]
qemu_cmd += ["-M", "vexpress-a9"]
elif arch == "armv5":
kernel = infra.download(self.downloaddir,
dtb = infra.download(self.downloaddir,
qemu_cmd += ["-dtb", dtb]
qemu_cmd += ["-M", "versatilepb"]
qemu_cmd += ["-device", "virtio-rng-pci"]
qemu_cmd += ["-kernel", kernel]
if kernel_cmdline:
qemu_cmd += ["-append", " ".join(kernel_cmdline)]
self.logfile.write(f"> host cpu count: {os.cpu_count()}\n")
ldavg = os.getloadavg()
ldavg_str = f"{ldavg[0]:.2f}, {ldavg[1]:.2f}, {ldavg[2]:.2f}"
self.logfile.write(f"> host loadavg: {ldavg_str}\n")
self.logfile.write(f"> timeout multiplier: {self.timeout_multiplier}\n")
self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:],
timeout=5 * self.timeout_multiplier,
env={"QEMU_AUDIO_DRV": "none"})
# We want only stdout into the log to avoid double echo
self.qemu.logfile_read = self.logfile
# Wait for the login prompt to appear, and then login as root with
# the provided password, or no password if not specified.
def login(self, password=None, timeout=60):
# The login prompt can take some time to appear when running multiple
# instances in parallel, so set the timeout to a large value
index = self.qemu.expect(["buildroot login:", pexpect.TIMEOUT],
timeout=timeout * self.timeout_multiplier)
if index != 0:
self.logfile.write("==> System does not boot")
raise SystemError("System does not boot")
if password:
extra_init_cmd = " && ".join([
'export PAGER=cat',
'dmesg -n 1',
# Prevent the shell from wrapping the commands at 80 columns.
'stty columns 29999',
# Fix the prompt of any subshells that get run
'printf "%s\n" "PS1=\'$PS1\'" "PS2=\'$PS2\'" "PROMPT_COMMAND=\'\'" >>/etc/profile'
self.repl = _repl_sh_child(self.qemu, '# ', extra_init_cmd)
if not self.repl:
raise SystemError("Cannot initialize REPL prompt")
# Run the given 'cmd' with a 'timeout' on the target
# return a tuple (output, exit_code)
def run(self, cmd, timeout=-1):
if timeout != -1:
timeout *= self.timeout_multiplier
output = self.repl.run_command(cmd, timeout=timeout)
# Remove double carriage return from qemu stdout so str.splitlines()
# works as expected.
output = output.replace("\r\r", "\r").splitlines()[1:]
exit_code = self.repl.run_command("echo $?")
exit_code = self.qemu.before.splitlines()[2]
exit_code = int(exit_code)
return output, exit_code
def stop(self):
if self.qemu is None: