diff mbox series

[v5,7/7] scripts/ghes_inject: add a script to generate GHES error inject

Message ID 0654a89fe24f4343016b9cecc0752594ad1cd49f.1722634602.git.mchehab+huawei@kernel.org
State New
Headers show
Series Add ACPI CPER firmware first error injection on ARM emulation | expand

Commit Message

Mauro Carvalho Chehab Aug. 2, 2024, 9:44 p.m. UTC
Using the QMP GHESv2 API requires preparing a raw data array
containing a CPER record.

Add a helper script with subcommands to prepare such data.

Currently, only ARM Processor error CPER record is supported.

Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
---
 MAINTAINERS                    |   3 +
 scripts/arm_processor_error.py | 352 +++++++++++++++++++++++++++++++++
 scripts/ghes_inject.py         |  59 ++++++
 scripts/qmp_helper.py          | 249 +++++++++++++++++++++++
 4 files changed, 663 insertions(+)
 create mode 100644 scripts/arm_processor_error.py
 create mode 100755 scripts/ghes_inject.py
 create mode 100644 scripts/qmp_helper.py

Comments

Igor Mammedov Aug. 6, 2024, 2:56 p.m. UTC | #1
On Fri,  2 Aug 2024 23:44:02 +0200
Mauro Carvalho Chehab <mchehab+huawei@kernel.org> wrote:

> Using the QMP GHESv2 API requires preparing a raw data array
> containing a CPER record.
> 
> Add a helper script with subcommands to prepare such data.
> 
> Currently, only ARM Processor error CPER record is supported.
> 
> Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> ---
>  MAINTAINERS                    |   3 +
>  scripts/arm_processor_error.py | 352 +++++++++++++++++++++++++++++++++
>  scripts/ghes_inject.py         |  59 ++++++
>  scripts/qmp_helper.py          | 249 +++++++++++++++++++++++
>  4 files changed, 663 insertions(+)
>  create mode 100644 scripts/arm_processor_error.py
>  create mode 100755 scripts/ghes_inject.py
>  create mode 100644 scripts/qmp_helper.py
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 655edcb6688c..e490f69da1de 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2081,6 +2081,9 @@ S: Maintained
>  F: hw/arm/ghes_cper.c
>  F: hw/acpi/ghes_cper_stub.c
>  F: qapi/ghes-cper.json
> +F: scripts/ghes_inject.py
> +F: scripts/arm_processor_error.py
> +F: scripts/qmp_helper.py
>  
>  ppc4xx
>  L: qemu-ppc@nongnu.org
> diff --git a/scripts/arm_processor_error.py b/scripts/arm_processor_error.py
> new file mode 100644
> index 000000000000..df4efa508790
> --- /dev/null
> +++ b/scripts/arm_processor_error.py
> @@ -0,0 +1,352 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0914, R0915, W0511
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +# TODO: current implementation has dummy defaults.
> +#
> +# For a better implementation, a QMP addition/call is needed to
> +# retrieve some data for ARM Processor Error injection:
> +#
> +#   - machine emulation architecture, as ARM current default is
> +#     for AArch64;
> +#   - ARM registers: power_state, midr, mpidr.

I'm not really reviewing the script but here some pointers how to fetch properties

start qemu with QMP connection
  ./qemu-system-aarch64 -M virt -qmp unix:/tmp/s,server,nowait
use script
  ./scripts/qmp/qom-get --socket /tmp/s /machine/unattached/device[0].midr

you can use ./scripts/qmp/qom-tree to explore what's there

see commit e61cc6b5c69 how to add property (DEFINE_PROP_UINT32 part mainly),
as long as it's prefixed with "x-" (meaning internal/unstable) likelihood is
that no one would object adding extra ones

> +
> +import argparse
> +import json
> +
> +from qmp_helper import (qmp_command, get_choice, get_mult_array,
> +                        get_mult_choices, get_mult_int, bit,
> +                        data_add, to_guid)
> +
> +# Arm processor EINJ logic
> +#
> +ACPI_GHES_ARM_CPER_LENGTH = 40
> +ACPI_GHES_ARM_CPER_PEI_LENGTH = 32
> +
> +# TODO: query it from emulation. Current default valid only for Aarch64
> +CONTEXT_AARCH64_EL1 = 5
> +
> +class ArmProcessorEinj:
> +    """
> +    Implements ARM Processor Error injection via GHES
> +    """
> +
> +    def __init__(self):
> +        """Initialize the error injection class"""
> +
> +        # Valid choice values
> +        self.arm_valid_bits = {
> +            "mpidr":    bit(0),
> +            "affinity": bit(1),
> +            "running":  bit(2),
> +            "vendor":   bit(3),
> +        }
> +
> +        self.pei_flags = {
> +            "first":        bit(0),
> +            "last":         bit(1),
> +            "propagated":   bit(2),
> +            "overflow":     bit(3),
> +        }
> +
> +        self.pei_error_types = {
> +            "cache":        bit(1),
> +            "tlb":          bit(2),
> +            "bus":          bit(3),
> +            "micro-arch":   bit(4),
> +        }
> +
> +        self.pei_valid_bits = {
> +            "multiple-error":   bit(0),
> +            "flags":            bit(1),
> +            "error-info":       bit(2),
> +            "virt-addr":        bit(3),
> +            "phy-addr":         bit(4),
> +        }
> +
> +        self.data = bytearray()
> +
> +    def create_subparser(self, subparsers):
> +        """Add a subparser to handle for the error fields"""
> +
> +        parser = subparsers.add_parser("arm",
> +                                       help="Generate an ARM processor CPER")
> +
> +        arm_valid_bits = ",".join(self.arm_valid_bits.keys())
> +        flags = ",".join(self.pei_flags.keys())
> +        error_types = ",".join(self.pei_error_types.keys())
> +        pei_valid_bits = ",".join(self.arm_valid_bits.keys())
> +
> +        # UEFI N.16 ARM Validation bits
> +        g_arm = parser.add_argument_group("ARM processor")
> +        g_arm.add_argument("--arm", "--arm-valid",
> +                           help=f"ARM valid bits: {arm_valid_bits}")
> +        g_arm.add_argument("-a", "--affinity",  "--level", "--affinity-level",
> +                           type=lambda x: int(x, 0),
> +                           help="Affinity level (when multiple levels apply)")
> +        g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0),
> +                           help="Multiprocessor Affinity Register")
> +        g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0),
> +                           help="Main ID Register")
> +        g_arm.add_argument("-r", "--running",
> +                           action=argparse.BooleanOptionalAction,
> +                           default=None,
> +                           help="Indicates if the processor is running or not")
> +        g_arm.add_argument("--psci", "--psci-state",
> +                           type=lambda x: int(x, 0),
> +                           help="Power State Coordination Interface - PSCI state")
> +
> +        # TODO: Add vendor-specific support
> +
> +        # UEFI N.17 bitmaps (type and flags)
> +        g_pei = parser.add_argument_group("ARM Processor Error Info (PEI)")
> +        g_pei.add_argument("-t", "--type", nargs="+",
> +                        help=f"one or more error types: {error_types}")
> +        g_pei.add_argument("-f", "--flags", nargs="*",
> +                        help=f"zero or more error flags: {flags}")
> +        g_pei.add_argument("-V", "--pei-valid", "--error-valid", nargs="*",
> +                        help=f"zero or more PEI valid bits: {pei_valid_bits}")
> +
> +        # UEFI N.17 Integer values
> +        g_pei.add_argument("-m", "--multiple-error", nargs="+",
> +                        help="Number of errors: 0: Single error, 1: Multiple errors, 2-65535: Error count if known")
> +        g_pei.add_argument("-e", "--error-info", nargs="+",
> +                        help="Error information (UEFI 2.10 tables N.18 to N.20)")
> +        g_pei.add_argument("-p", "--physical-address",  nargs="+",
> +                        help="Physical address")
> +        g_pei.add_argument("-v", "--virtual-address",  nargs="+",
> +                        help="Virtual address")
> +
> +        # UEFI N.21 Context
> +        g_ctx = parser.add_argument_group("Processor Context")
> +        g_ctx.add_argument("--ctx-type", "--context-type", nargs="*",
> +                        help="Type of the context (0=ARM32 GPR, 5=ARM64 EL1, other values supported)")
> +        g_ctx.add_argument("--ctx-size", "--context-size", nargs="*",
> +                        help="Minimal size of the context")
> +        g_ctx.add_argument("--ctx-array", "--context-array", nargs="*",
> +                        help="Comma-separated arrays for each context")
> +
> +        # Vendor-specific data
> +        g_vendor = parser.add_argument_group("Vendor-specific data")
> +        g_vendor.add_argument("--vendor", "--vendor-specific", nargs="+",
> +                        help="Vendor-specific byte arrays of data")
> +
> +    def parse_args(self, args):
> +        """Parse subcommand arguments"""
> +
> +        cper = {}
> +        pei = {}
> +        ctx = {}
> +        vendor = {}
> +
> +        arg = vars(args)
> +
> +        # Handle global parameters
> +        if args.arm:
> +            arm_valid_init = False
> +            cper["valid"] = get_choice(name="valid",
> +                                       value=args.arm,
> +                                       choices=self.arm_valid_bits,
> +                                       suffixes=["-error", "-err"])
> +        else:
> +            cper["valid"] = 0
> +            arm_valid_init = True
> +
> +        if "running" in arg:
> +            if args.running:
> +                cper["running-state"] = bit(0)
> +            else:
> +                cper["running-state"] = 0
> +        else:
> +            cper["running-state"] = 0
> +
> +        if arm_valid_init:
> +            if args.affinity:
> +                cper["valid"] |= self.arm_valid_bits["affinity"]
> +
> +            if args.mpidr:
> +                cper["valid"] |= self.arm_valid_bits["mpidr"]
> +
> +            if "running-state" in cper:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +            if args.psci:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +        # Handle PEI
> +        if not args.type:
> +            args.type = ["cache-error"]
> +
> +        get_mult_choices(
> +            pei,
> +            name="valid",
> +            values=args.pei_valid,
> +            choices=self.pei_valid_bits,
> +            suffixes=["-valid", "-info", "--information", "--addr"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="type",
> +            values=args.type,
> +            choices=self.pei_error_types,
> +            suffixes=["-error", "-err"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="flags",
> +            values=args.flags,
> +            choices=self.pei_flags,
> +            suffixes=["-error", "-cap"],
> +        )
> +        get_mult_int(pei, "error-info", args.error_info)
> +        get_mult_int(pei, "multiple-error", args.multiple_error)
> +        get_mult_int(pei, "phy-addr", args.physical_address)
> +        get_mult_int(pei, "virt-addr", args.virtual_address)
> +
> +        # Handle context
> +        get_mult_int(ctx, "type", args.ctx_type, allow_zero=True)
> +        get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True)
> +        get_mult_array(ctx, "register", args.ctx_array, allow_zero=True)
> +
> +        get_mult_array(vendor, "bytes", args.vendor, max_val=255)
> +
> +        # Store PEI
> +        pei_data = bytearray()
> +        default_flags  = self.pei_flags["first"]
> +        default_flags |= self.pei_flags["last"]
> +
> +        error_info_num = 0
> +
> +        for i, p in pei.items():        # pylint: disable=W0612
> +            error_info_num += 1
> +
> +            # UEFI 2.10 doesn't define how to encode error information
> +            # when multiple types are raised. So, provide a default only
> +            # if a single type is there
> +            if "error-info" not in p:
> +                if p["type"] == bit(1):
> +                    p["error-info"] = 0x0091000F
> +                if p["type"] == bit(2):
> +                    p["error-info"] = 0x0054007F
> +                if p["type"] == bit(3):
> +                    p["error-info"] = 0x80D6460FFF
> +                if p["type"] == bit(4):
> +                    p["error-info"] = 0x78DA03FF
> +
> +            if "valid" not in p:
> +                p["valid"] = 0
> +                if "multiple-error" in p:
> +                    p["valid"] |= self.pei_valid_bits["multiple-error"]
> +
> +                if "flags" in p:
> +                    p["valid"] |= self.pei_valid_bits["flags"]
> +
> +                if "error-info" in p:
> +                    p["valid"] |= self.pei_valid_bits["error-info"]
> +
> +                if "phy-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["phy-addr"]
> +
> +                if "virt-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["virt-addr"]
> +
> +            # Version
> +            data_add(pei_data, 0, 1)
> +
> +            data_add(pei_data, ACPI_GHES_ARM_CPER_PEI_LENGTH, 1)
> +
> +            data_add(pei_data, p["valid"], 2)
> +            data_add(pei_data, p["type"], 1)
> +            data_add(pei_data, p.get("multiple-error", 1), 2)
> +            data_add(pei_data, p.get("flags", default_flags), 1)
> +            data_add(pei_data, p.get("error-info", 0), 8)
> +            data_add(pei_data, p.get("virt-addr", 0xDEADBEEF), 8)
> +            data_add(pei_data, p.get("phy-addr", 0xABBA0BAD), 8)
> +
> +        # Store Context
> +        ctx_data = bytearray()
> +        context_info_num = 0
> +
> +        if ctx:
> +            for k in sorted(ctx.keys()):
> +                context_info_num += 1
> +
> +                if "type" not in ctx:
> +                    ctx[k]["type"] = CONTEXT_AARCH64_EL1
> +
> +                if "register" not in ctx:
> +                    ctx[k]["register"] = []
> +
> +                reg_size = len(ctx[k]["register"])
> +                size = 0
> +
> +                if "minimal-size" in ctx:
> +                    size = ctx[k]["minimal-size"]
> +
> +                size = max(size, reg_size)
> +
> +                size = (size + 1) % 0xFFFE
> +
> +                # Version
> +                data_add(ctx_data, 0, 2)
> +
> +                data_add(ctx_data, ctx[k]["type"], 2)
> +
> +                data_add(ctx_data, 8 * size, 4)
> +
> +                for r in ctx[k]["register"]:
> +                    data_add(ctx_data, r, 8)
> +
> +                for i in range(reg_size, size):   # pylint: disable=W0612
> +                    data_add(ctx_data, 0, 8)
> +
> +        # Vendor-specific bytes are not grouped
> +        vendor_data = bytearray()
> +        if vendor:
> +            for k in sorted(vendor.keys()):
> +                for b in vendor[k]["bytes"]:
> +                    data_add(vendor_data, b, 1)
> +
> +        # Encode ARM Processor Error
> +        data = bytearray()
> +
> +        data_add(data, cper["valid"], 4)
> +
> +        data_add(data, error_info_num, 2)
> +        data_add(data, context_info_num, 2)
> +
> +        # Calculate the length of the CPER data
> +        cper_length = ACPI_GHES_ARM_CPER_LENGTH
> +        cper_length += len(pei_data)
> +        cper_length += len(vendor_data)
> +        cper_length += len(ctx_data)
> +        data_add(data, cper_length, 4)
> +
> +        data_add(data, arg.get("affinity-level", 0), 1)
> +
> +        # Reserved
> +        data_add(data, 0, 3)
> +
> +        data_add(data, arg.get("mpidr-el1", 0), 8)
> +        data_add(data, arg.get("midr-el1", 0), 8)
> +        data_add(data, cper["running-state"], 4)
> +        data_add(data, arg.get("psci-state", 0), 4)
> +
> +        # Add PEI
> +        data.extend(pei_data)
> +        data.extend(ctx_data)
> +        data.extend(vendor_data)
> +
> +        self.data = data
> +
> +    def run(self, host, port):
> +        """Execute QMP commands"""
> +
> +        guid = to_guid(0xE19E3D16, 0xBC11, 0x11E4,
> +                       [0x9C, 0xAA, 0xC2, 0x05,
> +                        0x1D, 0x5D, 0x46, 0xB0])
> +
> +        qmp_command(host, port, guid, self.data)
> diff --git a/scripts/ghes_inject.py b/scripts/ghes_inject.py
> new file mode 100755
> index 000000000000..8415ccbbc53d
> --- /dev/null
> +++ b/scripts/ghes_inject.py
> @@ -0,0 +1,59 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import argparse
> +
> +from arm_processor_error import ArmProcessorEinj
> +
> +EINJ_DESCRIPTION = """
> +Handle ACPI GHESv2 error injection logic QEMU QMP interface.\n
> +
> +It allows using UEFI BIOS EINJ features to generate GHES records.
> +
> +It helps testing Linux CPER and GHES drivers and to test rasdaemon
> +error handling logic.
> +
> +Currently, it support ARM processor error injection for ARM processor
> +events, being compatible with UEFI 2.9A Errata.
> +
> +This small utility works together with those QEMU additions:
> +- https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2
> +"""
> +
> +def main():
> +    """Main program"""
> +
> +    # Main parser - handle generic args like QEMU QMP TCP socket options
> +    parser = argparse.ArgumentParser(prog="einj.py",
> +                                     formatter_class=argparse.RawDescriptionHelpFormatter,
> +                                     usage="%(prog)s [options]",
> +                                     description=EINJ_DESCRIPTION,
> +                                     epilog="If a field is not defined, a default value will be applied by QEMU.")
> +
> +    g_options = parser.add_argument_group("QEMU QMP socket options")
> +    g_options.add_argument("-H", "--host", default="localhost", type=str,
> +                           help="host name")
> +    g_options.add_argument("-P", "--port", default=4445, type=int,
> +                           help="TCP port number")
> +
> +    arm_einj = ArmProcessorEinj()
> +
> +    # Call subparsers
> +    subparsers = parser.add_subparsers(dest='command')
> +
> +    arm_einj.create_subparser(subparsers)
> +
> +    args = parser.parse_args()
> +
> +    # Handle subparser commands
> +    if args.command == "arm":
> +        arm_einj.parse_args(args)
> +        arm_einj.run(args.host, args.port)
> +
> +
> +if __name__ == "__main__":
> +    main()
> diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> new file mode 100644
> index 000000000000..13fae7a7af0e
> --- /dev/null
> +++ b/scripts/qmp_helper.py
> @@ -0,0 +1,249 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0915, W0511
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import json
> +import socket
> +import sys
> +
> +from base64 import b64encode
> +
> +#
> +# Socket QMP send command
> +#
> +def qmp_command(host, port, guid, data):
> +    """Send commands to QEMU though QMP TCP socket"""
> +
> +    # Fill the commands to be sent
> +    commands = []
> +
> +    # Needed to negotiate QMP and for QEMU to accept the command
> +    commands.append('{ "execute": "qmp_capabilities" } ')
> +
> +    base64_data = b64encode(bytes(data)).decode('ascii')
> +
> +    cmd_arg = {
> +        'cper': {
> +            'notification-type': guid,
> +            "raw-data": base64_data
> +        }
> +    }
> +
> +    command = '{ "execute": "ghes-cper", '
> +    command += '"arguments": ' + json.dumps(cmd_arg) + " }"
> +
> +    commands.append(command)
> +
> +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +    try:
> +        s.connect((host, port))
> +    except ConnectionRefusedError:
> +        sys.exit(f"Can't connect to QMP host {host}:{port}")
> +
> +    data = s.recv(1024)
> +    try:
> +        obj = json.loads(data.decode("utf-8"))
> +    except json.JSONDecodeError as e:
> +        print(f"Invalid QMP answer: {e}")
> +        s.close()
> +        return
> +
> +    if "QMP" not in obj:
> +        print(f"Invalid QMP answer: {data.decode("utf-8")}")
> +        s.close()
> +        return
> +
> +    for i, command in enumerate(commands):
> +        s.sendall(command.encode("utf-8"))
> +        data = s.recv(1024)
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            if obj["return"]:
> +                print(json.dumps(obj["return"]))
> +            elif i > 0:
> +                print("Error injected.")
> +        elif isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.shutdown(socket.SHUT_WR)
> +    while 1:
> +        data = s.recv(1024)
> +        if data == b"":
> +            break
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            print(json.dumps(obj["return"]))
> +        if isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.close()
> +
> +
> +#
> +# Helper routines to handle multiple choice arguments
> +#
> +def get_choice(name, value, choices, suffixes=None):
> +    """Produce a list from multiple choice argument"""
> +
> +    new_values = 0
> +
> +    if not value:
> +        return new_values
> +
> +    for val in value.split(","):
> +        val = val.lower()
> +
> +        if suffixes:
> +            for suffix in suffixes:
> +                val = val.removesuffix(suffix)
> +
> +        if val not in choices.keys():
> +            sys.exit(f"Error on '{name}': choice {val} is invalid.")
> +
> +        val = choices[val]
> +
> +        new_values |= val
> +
> +    return new_values
> +
> +
> +def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
> +    """Add numbered hashes from integer lists"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +        if not values:
> +            i = 0
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            mult[i][name] = []
> +            return
> +
> +    i = 0
> +    for value in values:
> +        for val in value.split(","):
> +            try:
> +                val = int(val, 0)
> +            except ValueError:
> +                sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +            if val < 0:
> +                sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +            if max_val and val > max_val:
> +                sys.exit(f"Error on '{name}': {val} is too little")
> +
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            if name not in mult[i]:
> +                mult[i][name] = []
> +
> +            mult[i][name].append(val)
> +
> +        i += 1
> +
> +
> +def get_mult_choices(mult, name, values, choices,
> +                     suffixes=None, allow_zero=False):
> +    """Add numbered hashes from multiple choice arguments"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        new_values = get_choice(name, val, choices, suffixes)
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = new_values
> +        i += 1
> +
> +
> +def get_mult_int(mult, name, values, allow_zero=False):
> +    """Add numbered hashes from integer arguments"""
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        try:
> +            val = int(val, 0)
> +        except ValueError:
> +            sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +        if val < 0:
> +            sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = val
> +        i += 1
> +
> +
> +#
> +# Data encode helper functions
> +#
> +def bit(b):
> +    """Simple macro to define a bit on a bitmask"""
> +    return 1 << b
> +
> +
> +def data_add(data, value, num_bytes):
> +    """Adds bytes from value inside a bitarray"""
> +
> +    data.extend(value.to_bytes(num_bytes, byteorder="little"))
> +
> +def to_guid(time_low, time_mid, time_high, nodes):
> +    """Create an GUID string"""
> +
> +    assert(len(nodes) == 8)
> +
> +    clock = nodes[0] << 8 | nodes[1]
> +
> +    node = 0
> +    for i in range(2, len(nodes)):
> +        node = node << 8 | nodes[i]
> +
> +    s = f"{time_low:08x}-{time_mid:04x}-"
> +    s += f"{time_high:04x}-{clock:04x}-{node:012x}"
> +
> +    return s
John Snow Aug. 8, 2024, 8:58 p.m. UTC | #2
On Fri, Aug 2, 2024 at 5:44 PM Mauro Carvalho Chehab <
mchehab+huawei@kernel.org> wrote:

> Using the QMP GHESv2 API requires preparing a raw data array
> containing a CPER record.
>
> Add a helper script with subcommands to prepare such data.
>
> Currently, only ARM Processor error CPER record is supported.
>
> Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> ---
>  MAINTAINERS                    |   3 +
>  scripts/arm_processor_error.py | 352 +++++++++++++++++++++++++++++++++
>  scripts/ghes_inject.py         |  59 ++++++
>  scripts/qmp_helper.py          | 249 +++++++++++++++++++++++
>  4 files changed, 663 insertions(+)
>  create mode 100644 scripts/arm_processor_error.py
>  create mode 100755 scripts/ghes_inject.py
>  create mode 100644 scripts/qmp_helper.py
>
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 655edcb6688c..e490f69da1de 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2081,6 +2081,9 @@ S: Maintained
>  F: hw/arm/ghes_cper.c
>  F: hw/acpi/ghes_cper_stub.c
>  F: qapi/ghes-cper.json
> +F: scripts/ghes_inject.py
> +F: scripts/arm_processor_error.py
> +F: scripts/qmp_helper.py
>
>  ppc4xx
>  L: qemu-ppc@nongnu.org
> diff --git a/scripts/arm_processor_error.py
> b/scripts/arm_processor_error.py
> new file mode 100644
> index 000000000000..df4efa508790
> --- /dev/null
> +++ b/scripts/arm_processor_error.py
> @@ -0,0 +1,352 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0914, R0915, W0511
>

Out of curiosity, what tools are you using to delint your files and how are
you invoking them?

I don't really maintain any strict regime for python files under
qemu.git/scripts (yet), so I am mostly curious as to what regimes others
are using currently. I don't see most QEMU contributors checking in pylint
ignores etc directly into the files, so it caught my eye.

~js


> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +# TODO: current implementation has dummy defaults.
> +#
> +# For a better implementation, a QMP addition/call is needed to
> +# retrieve some data for ARM Processor Error injection:
> +#
> +#   - machine emulation architecture, as ARM current default is
> +#     for AArch64;
> +#   - ARM registers: power_state, midr, mpidr.
> +
> +import argparse
> +import json
> +
> +from qmp_helper import (qmp_command, get_choice, get_mult_array,
> +                        get_mult_choices, get_mult_int, bit,
> +                        data_add, to_guid)
> +
> +# Arm processor EINJ logic
> +#
> +ACPI_GHES_ARM_CPER_LENGTH = 40
> +ACPI_GHES_ARM_CPER_PEI_LENGTH = 32
> +
> +# TODO: query it from emulation. Current default valid only for Aarch64
> +CONTEXT_AARCH64_EL1 = 5
> +
> +class ArmProcessorEinj:
> +    """
> +    Implements ARM Processor Error injection via GHES
> +    """
> +
> +    def __init__(self):
> +        """Initialize the error injection class"""
> +
> +        # Valid choice values
> +        self.arm_valid_bits = {
> +            "mpidr":    bit(0),
> +            "affinity": bit(1),
> +            "running":  bit(2),
> +            "vendor":   bit(3),
> +        }
> +
> +        self.pei_flags = {
> +            "first":        bit(0),
> +            "last":         bit(1),
> +            "propagated":   bit(2),
> +            "overflow":     bit(3),
> +        }
> +
> +        self.pei_error_types = {
> +            "cache":        bit(1),
> +            "tlb":          bit(2),
> +            "bus":          bit(3),
> +            "micro-arch":   bit(4),
> +        }
> +
> +        self.pei_valid_bits = {
> +            "multiple-error":   bit(0),
> +            "flags":            bit(1),
> +            "error-info":       bit(2),
> +            "virt-addr":        bit(3),
> +            "phy-addr":         bit(4),
> +        }
> +
> +        self.data = bytearray()
> +
> +    def create_subparser(self, subparsers):
> +        """Add a subparser to handle for the error fields"""
> +
> +        parser = subparsers.add_parser("arm",
> +                                       help="Generate an ARM processor
> CPER")
> +
> +        arm_valid_bits = ",".join(self.arm_valid_bits.keys())
> +        flags = ",".join(self.pei_flags.keys())
> +        error_types = ",".join(self.pei_error_types.keys())
> +        pei_valid_bits = ",".join(self.arm_valid_bits.keys())
> +
> +        # UEFI N.16 ARM Validation bits
> +        g_arm = parser.add_argument_group("ARM processor")
> +        g_arm.add_argument("--arm", "--arm-valid",
> +                           help=f"ARM valid bits: {arm_valid_bits}")
> +        g_arm.add_argument("-a", "--affinity",  "--level",
> "--affinity-level",
> +                           type=lambda x: int(x, 0),
> +                           help="Affinity level (when multiple levels
> apply)")
> +        g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0),
> +                           help="Multiprocessor Affinity Register")
> +        g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0),
> +                           help="Main ID Register")
> +        g_arm.add_argument("-r", "--running",
> +                           action=argparse.BooleanOptionalAction,
> +                           default=None,
> +                           help="Indicates if the processor is running or
> not")
> +        g_arm.add_argument("--psci", "--psci-state",
> +                           type=lambda x: int(x, 0),
> +                           help="Power State Coordination Interface -
> PSCI state")
> +
> +        # TODO: Add vendor-specific support
> +
> +        # UEFI N.17 bitmaps (type and flags)
> +        g_pei = parser.add_argument_group("ARM Processor Error Info
> (PEI)")
> +        g_pei.add_argument("-t", "--type", nargs="+",
> +                        help=f"one or more error types: {error_types}")
> +        g_pei.add_argument("-f", "--flags", nargs="*",
> +                        help=f"zero or more error flags: {flags}")
> +        g_pei.add_argument("-V", "--pei-valid", "--error-valid",
> nargs="*",
> +                        help=f"zero or more PEI valid bits:
> {pei_valid_bits}")
> +
> +        # UEFI N.17 Integer values
> +        g_pei.add_argument("-m", "--multiple-error", nargs="+",
> +                        help="Number of errors: 0: Single error, 1:
> Multiple errors, 2-65535: Error count if known")
> +        g_pei.add_argument("-e", "--error-info", nargs="+",
> +                        help="Error information (UEFI 2.10 tables N.18 to
> N.20)")
> +        g_pei.add_argument("-p", "--physical-address",  nargs="+",
> +                        help="Physical address")
> +        g_pei.add_argument("-v", "--virtual-address",  nargs="+",
> +                        help="Virtual address")
> +
> +        # UEFI N.21 Context
> +        g_ctx = parser.add_argument_group("Processor Context")
> +        g_ctx.add_argument("--ctx-type", "--context-type", nargs="*",
> +                        help="Type of the context (0=ARM32 GPR, 5=ARM64
> EL1, other values supported)")
> +        g_ctx.add_argument("--ctx-size", "--context-size", nargs="*",
> +                        help="Minimal size of the context")
> +        g_ctx.add_argument("--ctx-array", "--context-array", nargs="*",
> +                        help="Comma-separated arrays for each context")
> +
> +        # Vendor-specific data
> +        g_vendor = parser.add_argument_group("Vendor-specific data")
> +        g_vendor.add_argument("--vendor", "--vendor-specific", nargs="+",
> +                        help="Vendor-specific byte arrays of data")
> +
> +    def parse_args(self, args):
> +        """Parse subcommand arguments"""
> +
> +        cper = {}
> +        pei = {}
> +        ctx = {}
> +        vendor = {}
> +
> +        arg = vars(args)
> +
> +        # Handle global parameters
> +        if args.arm:
> +            arm_valid_init = False
> +            cper["valid"] = get_choice(name="valid",
> +                                       value=args.arm,
> +                                       choices=self.arm_valid_bits,
> +                                       suffixes=["-error", "-err"])
> +        else:
> +            cper["valid"] = 0
> +            arm_valid_init = True
> +
> +        if "running" in arg:
> +            if args.running:
> +                cper["running-state"] = bit(0)
> +            else:
> +                cper["running-state"] = 0
> +        else:
> +            cper["running-state"] = 0
> +
> +        if arm_valid_init:
> +            if args.affinity:
> +                cper["valid"] |= self.arm_valid_bits["affinity"]
> +
> +            if args.mpidr:
> +                cper["valid"] |= self.arm_valid_bits["mpidr"]
> +
> +            if "running-state" in cper:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +            if args.psci:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +        # Handle PEI
> +        if not args.type:
> +            args.type = ["cache-error"]
> +
> +        get_mult_choices(
> +            pei,
> +            name="valid",
> +            values=args.pei_valid,
> +            choices=self.pei_valid_bits,
> +            suffixes=["-valid", "-info", "--information", "--addr"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="type",
> +            values=args.type,
> +            choices=self.pei_error_types,
> +            suffixes=["-error", "-err"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="flags",
> +            values=args.flags,
> +            choices=self.pei_flags,
> +            suffixes=["-error", "-cap"],
> +        )
> +        get_mult_int(pei, "error-info", args.error_info)
> +        get_mult_int(pei, "multiple-error", args.multiple_error)
> +        get_mult_int(pei, "phy-addr", args.physical_address)
> +        get_mult_int(pei, "virt-addr", args.virtual_address)
> +
> +        # Handle context
> +        get_mult_int(ctx, "type", args.ctx_type, allow_zero=True)
> +        get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True)
> +        get_mult_array(ctx, "register", args.ctx_array, allow_zero=True)
> +
> +        get_mult_array(vendor, "bytes", args.vendor, max_val=255)
> +
> +        # Store PEI
> +        pei_data = bytearray()
> +        default_flags  = self.pei_flags["first"]
> +        default_flags |= self.pei_flags["last"]
> +
> +        error_info_num = 0
> +
> +        for i, p in pei.items():        # pylint: disable=W0612
> +            error_info_num += 1
> +
> +            # UEFI 2.10 doesn't define how to encode error information
> +            # when multiple types are raised. So, provide a default only
> +            # if a single type is there
> +            if "error-info" not in p:
> +                if p["type"] == bit(1):
> +                    p["error-info"] = 0x0091000F
> +                if p["type"] == bit(2):
> +                    p["error-info"] = 0x0054007F
> +                if p["type"] == bit(3):
> +                    p["error-info"] = 0x80D6460FFF
> +                if p["type"] == bit(4):
> +                    p["error-info"] = 0x78DA03FF
> +
> +            if "valid" not in p:
> +                p["valid"] = 0
> +                if "multiple-error" in p:
> +                    p["valid"] |= self.pei_valid_bits["multiple-error"]
> +
> +                if "flags" in p:
> +                    p["valid"] |= self.pei_valid_bits["flags"]
> +
> +                if "error-info" in p:
> +                    p["valid"] |= self.pei_valid_bits["error-info"]
> +
> +                if "phy-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["phy-addr"]
> +
> +                if "virt-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["virt-addr"]
> +
> +            # Version
> +            data_add(pei_data, 0, 1)
> +
> +            data_add(pei_data, ACPI_GHES_ARM_CPER_PEI_LENGTH, 1)
> +
> +            data_add(pei_data, p["valid"], 2)
> +            data_add(pei_data, p["type"], 1)
> +            data_add(pei_data, p.get("multiple-error", 1), 2)
> +            data_add(pei_data, p.get("flags", default_flags), 1)
> +            data_add(pei_data, p.get("error-info", 0), 8)
> +            data_add(pei_data, p.get("virt-addr", 0xDEADBEEF), 8)
> +            data_add(pei_data, p.get("phy-addr", 0xABBA0BAD), 8)
> +
> +        # Store Context
> +        ctx_data = bytearray()
> +        context_info_num = 0
> +
> +        if ctx:
> +            for k in sorted(ctx.keys()):
> +                context_info_num += 1
> +
> +                if "type" not in ctx:
> +                    ctx[k]["type"] = CONTEXT_AARCH64_EL1
> +
> +                if "register" not in ctx:
> +                    ctx[k]["register"] = []
> +
> +                reg_size = len(ctx[k]["register"])
> +                size = 0
> +
> +                if "minimal-size" in ctx:
> +                    size = ctx[k]["minimal-size"]
> +
> +                size = max(size, reg_size)
> +
> +                size = (size + 1) % 0xFFFE
> +
> +                # Version
> +                data_add(ctx_data, 0, 2)
> +
> +                data_add(ctx_data, ctx[k]["type"], 2)
> +
> +                data_add(ctx_data, 8 * size, 4)
> +
> +                for r in ctx[k]["register"]:
> +                    data_add(ctx_data, r, 8)
> +
> +                for i in range(reg_size, size):   # pylint: disable=W0612
> +                    data_add(ctx_data, 0, 8)
> +
> +        # Vendor-specific bytes are not grouped
> +        vendor_data = bytearray()
> +        if vendor:
> +            for k in sorted(vendor.keys()):
> +                for b in vendor[k]["bytes"]:
> +                    data_add(vendor_data, b, 1)
> +
> +        # Encode ARM Processor Error
> +        data = bytearray()
> +
> +        data_add(data, cper["valid"], 4)
> +
> +        data_add(data, error_info_num, 2)
> +        data_add(data, context_info_num, 2)
> +
> +        # Calculate the length of the CPER data
> +        cper_length = ACPI_GHES_ARM_CPER_LENGTH
> +        cper_length += len(pei_data)
> +        cper_length += len(vendor_data)
> +        cper_length += len(ctx_data)
> +        data_add(data, cper_length, 4)
> +
> +        data_add(data, arg.get("affinity-level", 0), 1)
> +
> +        # Reserved
> +        data_add(data, 0, 3)
> +
> +        data_add(data, arg.get("mpidr-el1", 0), 8)
> +        data_add(data, arg.get("midr-el1", 0), 8)
> +        data_add(data, cper["running-state"], 4)
> +        data_add(data, arg.get("psci-state", 0), 4)
> +
> +        # Add PEI
> +        data.extend(pei_data)
> +        data.extend(ctx_data)
> +        data.extend(vendor_data)
> +
> +        self.data = data
> +
> +    def run(self, host, port):
> +        """Execute QMP commands"""
> +
> +        guid = to_guid(0xE19E3D16, 0xBC11, 0x11E4,
> +                       [0x9C, 0xAA, 0xC2, 0x05,
> +                        0x1D, 0x5D, 0x46, 0xB0])
> +
> +        qmp_command(host, port, guid, self.data)
> diff --git a/scripts/ghes_inject.py b/scripts/ghes_inject.py
> new file mode 100755
> index 000000000000..8415ccbbc53d
> --- /dev/null
> +++ b/scripts/ghes_inject.py
> @@ -0,0 +1,59 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import argparse
> +
> +from arm_processor_error import ArmProcessorEinj
> +
> +EINJ_DESCRIPTION = """
> +Handle ACPI GHESv2 error injection logic QEMU QMP interface.\n
> +
> +It allows using UEFI BIOS EINJ features to generate GHES records.
> +
> +It helps testing Linux CPER and GHES drivers and to test rasdaemon
> +error handling logic.
> +
> +Currently, it support ARM processor error injection for ARM processor
> +events, being compatible with UEFI 2.9A Errata.
> +
> +This small utility works together with those QEMU additions:
> +- https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2
> +"""
> +
> +def main():
> +    """Main program"""
> +
> +    # Main parser - handle generic args like QEMU QMP TCP socket options
> +    parser = argparse.ArgumentParser(prog="einj.py",
> +
>  formatter_class=argparse.RawDescriptionHelpFormatter,
> +                                     usage="%(prog)s [options]",
> +                                     description=EINJ_DESCRIPTION,
> +                                     epilog="If a field is not defined, a
> default value will be applied by QEMU.")
> +
> +    g_options = parser.add_argument_group("QEMU QMP socket options")
> +    g_options.add_argument("-H", "--host", default="localhost", type=str,
> +                           help="host name")
> +    g_options.add_argument("-P", "--port", default=4445, type=int,
> +                           help="TCP port number")
> +
> +    arm_einj = ArmProcessorEinj()
> +
> +    # Call subparsers
> +    subparsers = parser.add_subparsers(dest='command')
> +
> +    arm_einj.create_subparser(subparsers)
> +
> +    args = parser.parse_args()
> +
> +    # Handle subparser commands
> +    if args.command == "arm":
> +        arm_einj.parse_args(args)
> +        arm_einj.run(args.host, args.port)
> +
> +
> +if __name__ == "__main__":
> +    main()
> diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> new file mode 100644
> index 000000000000..13fae7a7af0e
> --- /dev/null
> +++ b/scripts/qmp_helper.py
> @@ -0,0 +1,249 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0915, W0511
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import json
> +import socket
> +import sys
> +
> +from base64 import b64encode
> +
> +#
> +# Socket QMP send command
> +#
> +def qmp_command(host, port, guid, data):
> +    """Send commands to QEMU though QMP TCP socket"""
> +
> +    # Fill the commands to be sent
> +    commands = []
> +
> +    # Needed to negotiate QMP and for QEMU to accept the command
> +    commands.append('{ "execute": "qmp_capabilities" } ')
> +
> +    base64_data = b64encode(bytes(data)).decode('ascii')
> +
> +    cmd_arg = {
> +        'cper': {
> +            'notification-type': guid,
> +            "raw-data": base64_data
> +        }
> +    }
> +
> +    command = '{ "execute": "ghes-cper", '
> +    command += '"arguments": ' + json.dumps(cmd_arg) + " }"
> +
> +    commands.append(command)
> +
> +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +    try:
> +        s.connect((host, port))
> +    except ConnectionRefusedError:
> +        sys.exit(f"Can't connect to QMP host {host}:{port}")
> +
> +    data = s.recv(1024)
> +    try:
> +        obj = json.loads(data.decode("utf-8"))
> +    except json.JSONDecodeError as e:
> +        print(f"Invalid QMP answer: {e}")
> +        s.close()
> +        return
> +
> +    if "QMP" not in obj:
> +        print(f"Invalid QMP answer: {data.decode("utf-8")}")
> +        s.close()
> +        return
> +
> +    for i, command in enumerate(commands):
> +        s.sendall(command.encode("utf-8"))
> +        data = s.recv(1024)
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            if obj["return"]:
> +                print(json.dumps(obj["return"]))
> +            elif i > 0:
> +                print("Error injected.")
> +        elif isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.shutdown(socket.SHUT_WR)
> +    while 1:
> +        data = s.recv(1024)
> +        if data == b"":
> +            break
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            print(json.dumps(obj["return"]))
> +        if isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.close()
> +
> +
> +#
> +# Helper routines to handle multiple choice arguments
> +#
> +def get_choice(name, value, choices, suffixes=None):
> +    """Produce a list from multiple choice argument"""
> +
> +    new_values = 0
> +
> +    if not value:
> +        return new_values
> +
> +    for val in value.split(","):
> +        val = val.lower()
> +
> +        if suffixes:
> +            for suffix in suffixes:
> +                val = val.removesuffix(suffix)
> +
> +        if val not in choices.keys():
> +            sys.exit(f"Error on '{name}': choice {val} is invalid.")
> +
> +        val = choices[val]
> +
> +        new_values |= val
> +
> +    return new_values
> +
> +
> +def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
> +    """Add numbered hashes from integer lists"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +        if not values:
> +            i = 0
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            mult[i][name] = []
> +            return
> +
> +    i = 0
> +    for value in values:
> +        for val in value.split(","):
> +            try:
> +                val = int(val, 0)
> +            except ValueError:
> +                sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +            if val < 0:
> +                sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +            if max_val and val > max_val:
> +                sys.exit(f"Error on '{name}': {val} is too little")
> +
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            if name not in mult[i]:
> +                mult[i][name] = []
> +
> +            mult[i][name].append(val)
> +
> +        i += 1
> +
> +
> +def get_mult_choices(mult, name, values, choices,
> +                     suffixes=None, allow_zero=False):
> +    """Add numbered hashes from multiple choice arguments"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        new_values = get_choice(name, val, choices, suffixes)
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = new_values
> +        i += 1
> +
> +
> +def get_mult_int(mult, name, values, allow_zero=False):
> +    """Add numbered hashes from integer arguments"""
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        try:
> +            val = int(val, 0)
> +        except ValueError:
> +            sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +        if val < 0:
> +            sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = val
> +        i += 1
> +
> +
> +#
> +# Data encode helper functions
> +#
> +def bit(b):
> +    """Simple macro to define a bit on a bitmask"""
> +    return 1 << b
> +
> +
> +def data_add(data, value, num_bytes):
> +    """Adds bytes from value inside a bitarray"""
> +
> +    data.extend(value.to_bytes(num_bytes, byteorder="little"))
> +
> +def to_guid(time_low, time_mid, time_high, nodes):
> +    """Create an GUID string"""
> +
> +    assert(len(nodes) == 8)
> +
> +    clock = nodes[0] << 8 | nodes[1]
> +
> +    node = 0
> +    for i in range(2, len(nodes)):
> +        node = node << 8 | nodes[i]
> +
> +    s = f"{time_low:08x}-{time_mid:04x}-"
> +    s += f"{time_high:04x}-{clock:04x}-{node:012x}"
> +
> +    return s
> --
> 2.45.2
>
>
John Snow Aug. 8, 2024, 9:21 p.m. UTC | #3
On Fri, Aug 2, 2024 at 5:44 PM Mauro Carvalho Chehab <
mchehab+huawei@kernel.org> wrote:

> Using the QMP GHESv2 API requires preparing a raw data array
> containing a CPER record.
>
> Add a helper script with subcommands to prepare such data.
>
> Currently, only ARM Processor error CPER record is supported.
>
> Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> ---
>  MAINTAINERS                    |   3 +
>  scripts/arm_processor_error.py | 352 +++++++++++++++++++++++++++++++++
>  scripts/ghes_inject.py         |  59 ++++++
>  scripts/qmp_helper.py          | 249 +++++++++++++++++++++++
>  4 files changed, 663 insertions(+)
>  create mode 100644 scripts/arm_processor_error.py
>  create mode 100755 scripts/ghes_inject.py
>  create mode 100644 scripts/qmp_helper.py
>
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 655edcb6688c..e490f69da1de 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2081,6 +2081,9 @@ S: Maintained
>  F: hw/arm/ghes_cper.c
>  F: hw/acpi/ghes_cper_stub.c
>  F: qapi/ghes-cper.json
> +F: scripts/ghes_inject.py
> +F: scripts/arm_processor_error.py
> +F: scripts/qmp_helper.py
>
>  ppc4xx
>  L: qemu-ppc@nongnu.org
> diff --git a/scripts/arm_processor_error.py
> b/scripts/arm_processor_error.py
> new file mode 100644
> index 000000000000..df4efa508790
> --- /dev/null
> +++ b/scripts/arm_processor_error.py
> @@ -0,0 +1,352 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0914, R0915, W0511
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +# TODO: current implementation has dummy defaults.
> +#
> +# For a better implementation, a QMP addition/call is needed to
> +# retrieve some data for ARM Processor Error injection:
> +#
> +#   - machine emulation architecture, as ARM current default is
> +#     for AArch64;
> +#   - ARM registers: power_state, midr, mpidr.
> +
> +import argparse
> +import json
> +
> +from qmp_helper import (qmp_command, get_choice, get_mult_array,
> +                        get_mult_choices, get_mult_int, bit,
> +                        data_add, to_guid)
> +
> +# Arm processor EINJ logic
> +#
> +ACPI_GHES_ARM_CPER_LENGTH = 40
> +ACPI_GHES_ARM_CPER_PEI_LENGTH = 32
> +
> +# TODO: query it from emulation. Current default valid only for Aarch64
> +CONTEXT_AARCH64_EL1 = 5
> +
> +class ArmProcessorEinj:
> +    """
> +    Implements ARM Processor Error injection via GHES
> +    """
> +
> +    def __init__(self):
> +        """Initialize the error injection class"""
> +
> +        # Valid choice values
> +        self.arm_valid_bits = {
> +            "mpidr":    bit(0),
> +            "affinity": bit(1),
> +            "running":  bit(2),
> +            "vendor":   bit(3),
> +        }
> +
> +        self.pei_flags = {
> +            "first":        bit(0),
> +            "last":         bit(1),
> +            "propagated":   bit(2),
> +            "overflow":     bit(3),
> +        }
> +
> +        self.pei_error_types = {
> +            "cache":        bit(1),
> +            "tlb":          bit(2),
> +            "bus":          bit(3),
> +            "micro-arch":   bit(4),
> +        }
> +
> +        self.pei_valid_bits = {
> +            "multiple-error":   bit(0),
> +            "flags":            bit(1),
> +            "error-info":       bit(2),
> +            "virt-addr":        bit(3),
> +            "phy-addr":         bit(4),
> +        }
> +
> +        self.data = bytearray()
> +
> +    def create_subparser(self, subparsers):
> +        """Add a subparser to handle for the error fields"""
> +
> +        parser = subparsers.add_parser("arm",
> +                                       help="Generate an ARM processor
> CPER")
> +
> +        arm_valid_bits = ",".join(self.arm_valid_bits.keys())
> +        flags = ",".join(self.pei_flags.keys())
> +        error_types = ",".join(self.pei_error_types.keys())
> +        pei_valid_bits = ",".join(self.arm_valid_bits.keys())
> +
> +        # UEFI N.16 ARM Validation bits
> +        g_arm = parser.add_argument_group("ARM processor")
> +        g_arm.add_argument("--arm", "--arm-valid",
> +                           help=f"ARM valid bits: {arm_valid_bits}")
> +        g_arm.add_argument("-a", "--affinity",  "--level",
> "--affinity-level",
> +                           type=lambda x: int(x, 0),
> +                           help="Affinity level (when multiple levels
> apply)")
> +        g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0),
> +                           help="Multiprocessor Affinity Register")
> +        g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0),
> +                           help="Main ID Register")
> +        g_arm.add_argument("-r", "--running",
> +                           action=argparse.BooleanOptionalAction,
> +                           default=None,
> +                           help="Indicates if the processor is running or
> not")
> +        g_arm.add_argument("--psci", "--psci-state",
> +                           type=lambda x: int(x, 0),
> +                           help="Power State Coordination Interface -
> PSCI state")
> +
> +        # TODO: Add vendor-specific support
> +
> +        # UEFI N.17 bitmaps (type and flags)
> +        g_pei = parser.add_argument_group("ARM Processor Error Info
> (PEI)")
> +        g_pei.add_argument("-t", "--type", nargs="+",
> +                        help=f"one or more error types: {error_types}")
> +        g_pei.add_argument("-f", "--flags", nargs="*",
> +                        help=f"zero or more error flags: {flags}")
> +        g_pei.add_argument("-V", "--pei-valid", "--error-valid",
> nargs="*",
> +                        help=f"zero or more PEI valid bits:
> {pei_valid_bits}")
> +
> +        # UEFI N.17 Integer values
> +        g_pei.add_argument("-m", "--multiple-error", nargs="+",
> +                        help="Number of errors: 0: Single error, 1:
> Multiple errors, 2-65535: Error count if known")
> +        g_pei.add_argument("-e", "--error-info", nargs="+",
> +                        help="Error information (UEFI 2.10 tables N.18 to
> N.20)")
> +        g_pei.add_argument("-p", "--physical-address",  nargs="+",
> +                        help="Physical address")
> +        g_pei.add_argument("-v", "--virtual-address",  nargs="+",
> +                        help="Virtual address")
> +
> +        # UEFI N.21 Context
> +        g_ctx = parser.add_argument_group("Processor Context")
> +        g_ctx.add_argument("--ctx-type", "--context-type", nargs="*",
> +                        help="Type of the context (0=ARM32 GPR, 5=ARM64
> EL1, other values supported)")
> +        g_ctx.add_argument("--ctx-size", "--context-size", nargs="*",
> +                        help="Minimal size of the context")
> +        g_ctx.add_argument("--ctx-array", "--context-array", nargs="*",
> +                        help="Comma-separated arrays for each context")
> +
> +        # Vendor-specific data
> +        g_vendor = parser.add_argument_group("Vendor-specific data")
> +        g_vendor.add_argument("--vendor", "--vendor-specific", nargs="+",
> +                        help="Vendor-specific byte arrays of data")
> +
> +    def parse_args(self, args):
> +        """Parse subcommand arguments"""
> +
> +        cper = {}
> +        pei = {}
> +        ctx = {}
> +        vendor = {}
> +
> +        arg = vars(args)
> +
> +        # Handle global parameters
> +        if args.arm:
> +            arm_valid_init = False
> +            cper["valid"] = get_choice(name="valid",
> +                                       value=args.arm,
> +                                       choices=self.arm_valid_bits,
> +                                       suffixes=["-error", "-err"])
> +        else:
> +            cper["valid"] = 0
> +            arm_valid_init = True
> +
> +        if "running" in arg:
> +            if args.running:
> +                cper["running-state"] = bit(0)
> +            else:
> +                cper["running-state"] = 0
> +        else:
> +            cper["running-state"] = 0
> +
> +        if arm_valid_init:
> +            if args.affinity:
> +                cper["valid"] |= self.arm_valid_bits["affinity"]
> +
> +            if args.mpidr:
> +                cper["valid"] |= self.arm_valid_bits["mpidr"]
> +
> +            if "running-state" in cper:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +            if args.psci:
> +                cper["valid"] |= self.arm_valid_bits["running"]
> +
> +        # Handle PEI
> +        if not args.type:
> +            args.type = ["cache-error"]
> +
> +        get_mult_choices(
> +            pei,
> +            name="valid",
> +            values=args.pei_valid,
> +            choices=self.pei_valid_bits,
> +            suffixes=["-valid", "-info", "--information", "--addr"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="type",
> +            values=args.type,
> +            choices=self.pei_error_types,
> +            suffixes=["-error", "-err"],
> +        )
> +        get_mult_choices(
> +            pei,
> +            name="flags",
> +            values=args.flags,
> +            choices=self.pei_flags,
> +            suffixes=["-error", "-cap"],
> +        )
> +        get_mult_int(pei, "error-info", args.error_info)
> +        get_mult_int(pei, "multiple-error", args.multiple_error)
> +        get_mult_int(pei, "phy-addr", args.physical_address)
> +        get_mult_int(pei, "virt-addr", args.virtual_address)
> +
> +        # Handle context
> +        get_mult_int(ctx, "type", args.ctx_type, allow_zero=True)
> +        get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True)
> +        get_mult_array(ctx, "register", args.ctx_array, allow_zero=True)
> +
> +        get_mult_array(vendor, "bytes", args.vendor, max_val=255)
> +
> +        # Store PEI
> +        pei_data = bytearray()
> +        default_flags  = self.pei_flags["first"]
> +        default_flags |= self.pei_flags["last"]
> +
> +        error_info_num = 0
> +
> +        for i, p in pei.items():        # pylint: disable=W0612
> +            error_info_num += 1
> +
> +            # UEFI 2.10 doesn't define how to encode error information
> +            # when multiple types are raised. So, provide a default only
> +            # if a single type is there
> +            if "error-info" not in p:
> +                if p["type"] == bit(1):
> +                    p["error-info"] = 0x0091000F
> +                if p["type"] == bit(2):
> +                    p["error-info"] = 0x0054007F
> +                if p["type"] == bit(3):
> +                    p["error-info"] = 0x80D6460FFF
> +                if p["type"] == bit(4):
> +                    p["error-info"] = 0x78DA03FF
> +
> +            if "valid" not in p:
> +                p["valid"] = 0
> +                if "multiple-error" in p:
> +                    p["valid"] |= self.pei_valid_bits["multiple-error"]
> +
> +                if "flags" in p:
> +                    p["valid"] |= self.pei_valid_bits["flags"]
> +
> +                if "error-info" in p:
> +                    p["valid"] |= self.pei_valid_bits["error-info"]
> +
> +                if "phy-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["phy-addr"]
> +
> +                if "virt-addr" in p:
> +                    p["valid"] |= self.pei_valid_bits["virt-addr"]
> +
> +            # Version
> +            data_add(pei_data, 0, 1)
> +
> +            data_add(pei_data, ACPI_GHES_ARM_CPER_PEI_LENGTH, 1)
> +
> +            data_add(pei_data, p["valid"], 2)
> +            data_add(pei_data, p["type"], 1)
> +            data_add(pei_data, p.get("multiple-error", 1), 2)
> +            data_add(pei_data, p.get("flags", default_flags), 1)
> +            data_add(pei_data, p.get("error-info", 0), 8)
> +            data_add(pei_data, p.get("virt-addr", 0xDEADBEEF), 8)
> +            data_add(pei_data, p.get("phy-addr", 0xABBA0BAD), 8)
> +
> +        # Store Context
> +        ctx_data = bytearray()
> +        context_info_num = 0
> +
> +        if ctx:
> +            for k in sorted(ctx.keys()):
> +                context_info_num += 1
> +
> +                if "type" not in ctx:
> +                    ctx[k]["type"] = CONTEXT_AARCH64_EL1
> +
> +                if "register" not in ctx:
> +                    ctx[k]["register"] = []
> +
> +                reg_size = len(ctx[k]["register"])
> +                size = 0
> +
> +                if "minimal-size" in ctx:
> +                    size = ctx[k]["minimal-size"]
> +
> +                size = max(size, reg_size)
> +
> +                size = (size + 1) % 0xFFFE
> +
> +                # Version
> +                data_add(ctx_data, 0, 2)
> +
> +                data_add(ctx_data, ctx[k]["type"], 2)
> +
> +                data_add(ctx_data, 8 * size, 4)
> +
> +                for r in ctx[k]["register"]:
> +                    data_add(ctx_data, r, 8)
> +
> +                for i in range(reg_size, size):   # pylint: disable=W0612
> +                    data_add(ctx_data, 0, 8)
> +
> +        # Vendor-specific bytes are not grouped
> +        vendor_data = bytearray()
> +        if vendor:
> +            for k in sorted(vendor.keys()):
> +                for b in vendor[k]["bytes"]:
> +                    data_add(vendor_data, b, 1)
> +
> +        # Encode ARM Processor Error
> +        data = bytearray()
> +
> +        data_add(data, cper["valid"], 4)
> +
> +        data_add(data, error_info_num, 2)
> +        data_add(data, context_info_num, 2)
> +
> +        # Calculate the length of the CPER data
> +        cper_length = ACPI_GHES_ARM_CPER_LENGTH
> +        cper_length += len(pei_data)
> +        cper_length += len(vendor_data)
> +        cper_length += len(ctx_data)
> +        data_add(data, cper_length, 4)
> +
> +        data_add(data, arg.get("affinity-level", 0), 1)
> +
> +        # Reserved
> +        data_add(data, 0, 3)
> +
> +        data_add(data, arg.get("mpidr-el1", 0), 8)
> +        data_add(data, arg.get("midr-el1", 0), 8)
> +        data_add(data, cper["running-state"], 4)
> +        data_add(data, arg.get("psci-state", 0), 4)
> +
> +        # Add PEI
> +        data.extend(pei_data)
> +        data.extend(ctx_data)
> +        data.extend(vendor_data)
> +
> +        self.data = data
> +
> +    def run(self, host, port):
> +        """Execute QMP commands"""
> +
> +        guid = to_guid(0xE19E3D16, 0xBC11, 0x11E4,
> +                       [0x9C, 0xAA, 0xC2, 0x05,
> +                        0x1D, 0x5D, 0x46, 0xB0])
> +
> +        qmp_command(host, port, guid, self.data)
> diff --git a/scripts/ghes_inject.py b/scripts/ghes_inject.py
> new file mode 100755
> index 000000000000..8415ccbbc53d
> --- /dev/null
> +++ b/scripts/ghes_inject.py
> @@ -0,0 +1,59 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import argparse
> +
> +from arm_processor_error import ArmProcessorEinj
> +
> +EINJ_DESCRIPTION = """
> +Handle ACPI GHESv2 error injection logic QEMU QMP interface.\n
> +
> +It allows using UEFI BIOS EINJ features to generate GHES records.
> +
> +It helps testing Linux CPER and GHES drivers and to test rasdaemon
> +error handling logic.
> +
> +Currently, it support ARM processor error injection for ARM processor
> +events, being compatible with UEFI 2.9A Errata.
> +
> +This small utility works together with those QEMU additions:
> +- https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2
> +"""
> +
> +def main():
> +    """Main program"""
> +
> +    # Main parser - handle generic args like QEMU QMP TCP socket options
> +    parser = argparse.ArgumentParser(prog="einj.py",
> +
>  formatter_class=argparse.RawDescriptionHelpFormatter,
> +                                     usage="%(prog)s [options]",
> +                                     description=EINJ_DESCRIPTION,
> +                                     epilog="If a field is not defined, a
> default value will be applied by QEMU.")
> +
> +    g_options = parser.add_argument_group("QEMU QMP socket options")
> +    g_options.add_argument("-H", "--host", default="localhost", type=str,
> +                           help="host name")
> +    g_options.add_argument("-P", "--port", default=4445, type=int,
> +                           help="TCP port number")
> +
> +    arm_einj = ArmProcessorEinj()
> +
> +    # Call subparsers
> +    subparsers = parser.add_subparsers(dest='command')
> +
> +    arm_einj.create_subparser(subparsers)
> +
> +    args = parser.parse_args()
> +
> +    # Handle subparser commands
> +    if args.command == "arm":
> +        arm_einj.parse_args(args)
> +        arm_einj.run(args.host, args.port)
> +
> +
> +if __name__ == "__main__":
> +    main()
> diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> new file mode 100644
> index 000000000000..13fae7a7af0e
> --- /dev/null
> +++ b/scripts/qmp_helper.py
>

I'm going to admit I only glanced at this very briefly, but -- is there a
chance you could use qemu.git/python/qemu/qmp instead of writing your own
helpers here?

If *NOT*, is there something that I need to add to our QMP library to
facilitate your script?


> @@ -0,0 +1,249 @@
> +#!/usr/bin/env python3
> +#
> +# pylint: disable=C0301, C0114, R0912, R0913, R0915, W0511
> +# SPDX-License-Identifier: GPL-2.0
> +#
> +# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
> +
> +import json
> +import socket
> +import sys
> +
> +from base64 import b64encode
> +
> +#
> +# Socket QMP send command
> +#
> +def qmp_command(host, port, guid, data):
> +    """Send commands to QEMU though QMP TCP socket"""
> +
> +    # Fill the commands to be sent
> +    commands = []
> +
> +    # Needed to negotiate QMP and for QEMU to accept the command
> +    commands.append('{ "execute": "qmp_capabilities" } ')
> +
> +    base64_data = b64encode(bytes(data)).decode('ascii')
> +
> +    cmd_arg = {
> +        'cper': {
> +            'notification-type': guid,
> +            "raw-data": base64_data
> +        }
> +    }
> +
> +    command = '{ "execute": "ghes-cper", '
> +    command += '"arguments": ' + json.dumps(cmd_arg) + " }"
> +
> +    commands.append(command)
> +
> +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +    try:
> +        s.connect((host, port))
> +    except ConnectionRefusedError:
> +        sys.exit(f"Can't connect to QMP host {host}:{port}")
>

You should be able to use e.g.

legacy.py's QEMUMonitorProtocol class for synchronous connections, e.g.

from qemu.qmp.legacy import QEMUMonitorProtocol

qmp = QEMUMonitorProtocol((host, port))
qmp.connect(negotiate=True)

If you want to run the script w/o setting up a virtual environment or
installing the package, take a look at the hacks in scripts/qmp/ for how I
support e.g. qom-get directly from the source tree.


> +
> +    data = s.recv(1024)
> +    try:
> +        obj = json.loads(data.decode("utf-8"))
> +    except json.JSONDecodeError as e:
> +        print(f"Invalid QMP answer: {e}")
> +        s.close()
> +        return
> +
> +    if "QMP" not in obj:
> +        print(f"Invalid QMP answer: {data.decode("utf-8")}")
> +        s.close()
> +        return
> +
> +    for i, command in enumerate(commands):
>

Then here you'd use qmp.cmd (raises exception on QMPError) or qmp.cmd_raw
or qmp.cmd_obj (returns the QMP response as the return value even if it was
an error.)

More details:
https://qemu.readthedocs.io/projects/python-qemu-qmp/en/latest/qemu.qmp.legacy.html

There's also an async version, but it doesn't look like you require that
complexity, so you can ignore it.

~~js

+        s.sendall(command.encode("utf-8"))
> +        data = s.recv(1024)
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            if obj["return"]:
> +                print(json.dumps(obj["return"]))
> +            elif i > 0:
> +                print("Error injected.")
> +        elif isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.shutdown(socket.SHUT_WR)
> +    while 1:
> +        data = s.recv(1024)
> +        if data == b"":
> +            break
> +        try:
> +            obj = json.loads(data.decode("utf-8"))
> +        except json.JSONDecodeError as e:
> +            print(f"Invalid QMP answer: {e}")
> +            s.close()
> +            return
> +
> +        if isinstance(obj.get("return"), dict):
> +            print(json.dumps(obj["return"]))
> +        if isinstance(obj.get("error"), dict):
> +            error = obj["error"]
> +            print(f'{error["class"]}: {error["desc"]}')
> +        else:
> +            print(json.dumps(obj))
> +
> +    s.close()
> +
> +
> +#
> +# Helper routines to handle multiple choice arguments
> +#
> +def get_choice(name, value, choices, suffixes=None):
> +    """Produce a list from multiple choice argument"""
> +
> +    new_values = 0
> +
> +    if not value:
> +        return new_values
> +
> +    for val in value.split(","):
> +        val = val.lower()
> +
> +        if suffixes:
> +            for suffix in suffixes:
> +                val = val.removesuffix(suffix)
> +
> +        if val not in choices.keys():
> +            sys.exit(f"Error on '{name}': choice {val} is invalid.")
> +
> +        val = choices[val]
> +
> +        new_values |= val
> +
> +    return new_values
> +
> +
> +def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
> +    """Add numbered hashes from integer lists"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +        if not values:
> +            i = 0
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            mult[i][name] = []
> +            return
> +
> +    i = 0
> +    for value in values:
> +        for val in value.split(","):
> +            try:
> +                val = int(val, 0)
> +            except ValueError:
> +                sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +            if val < 0:
> +                sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +            if max_val and val > max_val:
> +                sys.exit(f"Error on '{name}': {val} is too little")
> +
> +            if i not in mult:
> +                mult[i] = {}
> +
> +            if name not in mult[i]:
> +                mult[i][name] = []
> +
> +            mult[i][name].append(val)
> +
> +        i += 1
> +
> +
> +def get_mult_choices(mult, name, values, choices,
> +                     suffixes=None, allow_zero=False):
> +    """Add numbered hashes from multiple choice arguments"""
> +
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        new_values = get_choice(name, val, choices, suffixes)
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = new_values
> +        i += 1
> +
> +
> +def get_mult_int(mult, name, values, allow_zero=False):
> +    """Add numbered hashes from integer arguments"""
> +    if not allow_zero:
> +        if not values:
> +            return
> +    else:
> +        if values is None:
> +            return
> +
> +    i = 0
> +    for val in values:
> +        try:
> +            val = int(val, 0)
> +        except ValueError:
> +            sys.exit(f"Error on '{name}': {val} is not an integer")
> +
> +        if val < 0:
> +            sys.exit(f"Error on '{name}': {val} is not unsigned")
> +
> +        if i not in mult:
> +            mult[i] = {}
> +
> +        mult[i][name] = val
> +        i += 1
> +
> +
> +#
> +# Data encode helper functions
> +#
> +def bit(b):
> +    """Simple macro to define a bit on a bitmask"""
> +    return 1 << b
> +
> +
> +def data_add(data, value, num_bytes):
> +    """Adds bytes from value inside a bitarray"""
> +
> +    data.extend(value.to_bytes(num_bytes, byteorder="little"))
> +
> +def to_guid(time_low, time_mid, time_high, nodes):
> +    """Create an GUID string"""
> +
> +    assert(len(nodes) == 8)
> +
> +    clock = nodes[0] << 8 | nodes[1]
> +
> +    node = 0
> +    for i in range(2, len(nodes)):
> +        node = node << 8 | nodes[i]
> +
> +    s = f"{time_low:08x}-{time_mid:04x}-"
> +    s += f"{time_high:04x}-{clock:04x}-{node:012x}"
> +
> +    return s
> --
> 2.45.2
>
>
Mauro Carvalho Chehab Aug. 8, 2024, 9:51 p.m. UTC | #4
Em Thu, 8 Aug 2024 16:58:38 -0400
John Snow <jsnow@redhat.com> escreveu:

> On Fri, Aug 2, 2024 at 5:44 PM Mauro Carvalho Chehab <
> mchehab+huawei@kernel.org> wrote:  
> 

> > +#!/usr/bin/env python3
> > +#
> > +# pylint: disable=C0301, C0114, R0912, R0913, R0914, R0915, W0511
> >  
> 
> Out of curiosity, what tools are you using to delint your files 

Primarily I use pylint, almost always with disable line(s), as those lint
tools have some warnings that sound too silly (like too many/too low 
functions/branches/arguments...). From time to time, I review the disable
lines, to keep the code as clean as desired.

Sometimes I also use pep8 (now named as pycodestyle) and black, specially 
when I want some autoformat hints (I manually commit the hunks that make
sense), but I prefer pylint as the primary checking tool. I'm not too
found of the black's coding style, though[1].

[1] For instance, black would do this change:

	-        g_arm.add_argument("--arm", "--arm-valid",
	-                           help=f"ARM valid bits: {arm_valid_bits}")
	+        g_arm.add_argument(
	+            "--arm", "--arm-valid", help=f"ARM valid bits: {arm_valid_bits}"
	+        )

IMO, the original coding style I wrote is a lot better than black's
suggestion - and it is closer to the C style I use at the Linux Kernel ;-)

> and how are
> you invoking them?

I don't play much with such tools, though. I usually just invoke them with
the python file names(s) without passing any parameters nor creating any
configuration file.

> I don't really maintain any strict regime for python files under
> qemu.git/scripts (yet), so I am mostly curious as to what regimes others
> are using currently. I don't see most QEMU contributors checking in pylint
> ignores etc directly into the files, so it caught my eye.

Having some verification sounds interesting, as it may help preventing
some hidden bugs (like re-defining a variable that it was already used
globally), if such check is not too picky and if stupid warnings can be 
bypassed.

Regards,
Mauro
Mauro Carvalho Chehab Aug. 8, 2024, 10:41 p.m. UTC | #5
Em Thu, 8 Aug 2024 17:21:33 -0400
John Snow <jsnow@redhat.com> escreveu:

> On Fri, Aug 2, 2024 at 5:44 PM Mauro Carvalho Chehab <
> mchehab+huawei@kernel.org> wrote:  
> 

> > diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> > new file mode 100644
> > index 000000000000..13fae7a7af0e
> > --- /dev/null
> > +++ b/scripts/qmp_helper.py
> >  
> 
> I'm going to admit I only glanced at this very briefly, but -- is there a
> chance you could use qemu.git/python/qemu/qmp instead of writing your own
> helpers here?
> 
> If *NOT*, is there something that I need to add to our QMP library to
> facilitate your script?

I started writing this script to be hosted outside qemu tree, when
we had a very different API.

I noticed later about the QMP, and even tried to write a patch for it,
but I gave up due to asyncio complexity...

Please notice that, on this file, I actually placed three classes:

- qmp
- util
- cper_guid

I could probably make the first one to be an override of QEMUMonitorProtocol 
(besides normal open/close/cmd communication, it also contains some
methods that are specific to error inject use case:

- to generate a CPER record;
- to search for data via qom-get.

The other two classes are just common code used by ghes_inject commands.
My idea is to have multiple commands to do different kinds of GHES
error injection, each command on a different file/class.

> > +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +    try:
> > +        s.connect((host, port))
> > +    except ConnectionRefusedError:
> > +        sys.exit(f"Can't connect to QMP host {host}:{port}")
> >  
> 
> You should be able to use e.g.
> 
> legacy.py's QEMUMonitorProtocol class for synchronous connections, e.g.
> 
> from qemu.qmp.legacy import QEMUMonitorProtocol
> 
> qmp = QEMUMonitorProtocol((host, port))
> qmp.connect(negotiate=True)

That sounds interesting! I give it a try.

> If you want to run the script w/o setting up a virtual environment or
> installing the package, take a look at the hacks in scripts/qmp/ for how I
> support e.g. qom-get directly from the source tree.

Yeah, I saw that already. Doing: 

	sys.path.append(path.join(qemu_dir, 'python'))

the same way qom-get does should do the trick.

> > +
> > +    data = s.recv(1024)
> > +    try:
> > +        obj = json.loads(data.decode("utf-8"))
> > +    except json.JSONDecodeError as e:
> > +        print(f"Invalid QMP answer: {e}")
> > +        s.close()
> > +        return
> > +
> > +    if "QMP" not in obj:
> > +        print(f"Invalid QMP answer: {data.decode("utf-8")}")
> > +        s.close()
> > +        return
> > +
> > +    for i, command in enumerate(commands):
> >  
> 
> Then here you'd use qmp.cmd (raises exception on QMPError) or qmp.cmd_raw
> or qmp.cmd_obj (returns the QMP response as the return value even if it was
> an error.)

Good to know, I'll try and see what fits best.

> More details:
> https://qemu.readthedocs.io/projects/python-qemu-qmp/en/latest/qemu.qmp.legacy.html

I'll take a look. The name "legacy" is a little scary, as it might
imply that this has been deprecated. If there's no plans to deprecate,
then it would be great to use it and simplify the code a little bit.

> There's also an async version, but it doesn't look like you require that
> complexity, so you can ignore it.

Yes, that's the case: a serialized sync send/response logic works perfectly
for this script. No need to be burden with asyncio complexity.

Thanks,
Mauro
John Snow Aug. 8, 2024, 11:33 p.m. UTC | #6
On Thu, Aug 8, 2024 at 6:41 PM Mauro Carvalho Chehab <
mchehab+huawei@kernel.org> wrote:

> Em Thu, 8 Aug 2024 17:21:33 -0400
> John Snow <jsnow@redhat.com> escreveu:
>
> > On Fri, Aug 2, 2024 at 5:44 PM Mauro Carvalho Chehab <
> > mchehab+huawei@kernel.org> wrote:
> >
>
> > > diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
> > > new file mode 100644
> > > index 000000000000..13fae7a7af0e
> > > --- /dev/null
> > > +++ b/scripts/qmp_helper.py
> > >
> >
> > I'm going to admit I only glanced at this very briefly, but -- is there a
> > chance you could use qemu.git/python/qemu/qmp instead of writing your own
> > helpers here?
> >
> > If *NOT*, is there something that I need to add to our QMP library to
> > facilitate your script?
>
> I started writing this script to be hosted outside qemu tree, when
> we had a very different API.
>
> I noticed later about the QMP, and even tried to write a patch for it,
> but I gave up due to asyncio complexity...
>

Sorry :)


>
> Please notice that, on this file, I actually placed three classes:
>
> - qmp
> - util
> - cper_guid
>
> I could probably make the first one to be an override of
> QEMUMonitorProtocol
> (besides normal open/close/cmd communication, it also contains some
> methods that are specific to error inject use case:
>
> - to generate a CPER record;
> - to search for data via qom-get.
>
> The other two classes are just common code used by ghes_inject commands.
> My idea is to have multiple commands to do different kinds of GHES
> error injection, each command on a different file/class.
>

Gotcha! Thanks for the feedback. I would *prefer* that code checked in to
qemu.git use the QMP module where possible so that I don't have to maintain
multiple copies of QMP wrangling code. I think what you want to do should
be easily possible with the existing library; and anything that isn't, I'm
more than happy to meet your needs. Reach out absolutely any time.


>
> > > +    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > > +    try:
> > > +        s.connect((host, port))
> > > +    except ConnectionRefusedError:
> > > +        sys.exit(f"Can't connect to QMP host {host}:{port}")
> > >
> >
> > You should be able to use e.g.
> >
> > legacy.py's QEMUMonitorProtocol class for synchronous connections, e.g.
> >
> > from qemu.qmp.legacy import QEMUMonitorProtocol
> >
> > qmp = QEMUMonitorProtocol((host, port))
> > qmp.connect(negotiate=True)
>
> That sounds interesting! I give it a try.
>
> > If you want to run the script w/o setting up a virtual environment or
> > installing the package, take a look at the hacks in scripts/qmp/ for how
> I
> > support e.g. qom-get directly from the source tree.
>
> Yeah, I saw that already. Doing:
>
>         sys.path.append(path.join(qemu_dir, 'python'))
>
> the same way qom-get does should do the trick.
>
> > > +
> > > +    data = s.recv(1024)
> > > +    try:
> > > +        obj = json.loads(data.decode("utf-8"))
> > > +    except json.JSONDecodeError as e:
> > > +        print(f"Invalid QMP answer: {e}")
> > > +        s.close()
> > > +        return
> > > +
> > > +    if "QMP" not in obj:
> > > +        print(f"Invalid QMP answer: {data.decode("utf-8")}")
> > > +        s.close()
> > > +        return
> > > +
> > > +    for i, command in enumerate(commands):
> > >
> >
> > Then here you'd use qmp.cmd (raises exception on QMPError) or qmp.cmd_raw
> > or qmp.cmd_obj (returns the QMP response as the return value even if it
> was
> > an error.)
>
> Good to know, I'll try and see what fits best.
>

I might *suggest* you try to use the exception-raising interface and catch
exceptions to interrogate expected errors as it aligns better with the
"idiomatic python API" - I have no plans to support an external API that
*returns* error objects except via the exception class. This approach will
be easier to port when I drop the legacy interface in the future, see below.

But, that said, whichever is easiest. We use all three interfaces in many
places in the QEMU tree. I have no grounds to require you to use a specific
one ;)


>
> > More details:
> >
> https://qemu.readthedocs.io/projects/python-qemu-qmp/en/latest/qemu.qmp.legacy.html
>
> I'll take a look. The name "legacy" is a little scary, as it might
> imply that this has been deprecated. If there's no plans to deprecate,
> then it would be great to use it and simplify the code a little bit.
>

I named it legacy to be scary on purpose :)

The truth is that the "legacy" module was designed to be a 1:1 drop-in
replacement for an older version of the synchronous QMP library that
powered our internal iotests. We still use this "legacy" module in
thousands of places in the QEMU tree. I do have plans to replace it with a
"proper" synchronous frontend class, eventually, someday, etc. It's been a
while and I still haven't done it, though. Oops...

When I do eventually replace it, I will convert all users inside of
qemu.git personally, and the design of the "non-legacy" API will be chosen
pretty explicitly to make that task really easy for myself and reviewers.
This would include your script inside the qemu.git tree. It should be
pretty safe to use the legacy module *in qemu.git*, but for external,
out-of-tree scripts, it may indeed disappear someday - but converting to
the new API, when I merge it, should be very, very trivial. How much of a
headache that is for you depends on how you package/distribute the script
and how awful it will be to update the code and dependencies when it
happens.

FYI: I have promised in the readme for the standalone version of qemu.qmp
that legacy.py will not be removed prior to v0.1.0. All versions before
then will still have it, guaranteed.

(Neither here nor there: One of the holdups in this replacement is figuring
out how to structure the API for event listening, which was the main
motivator of the *async* version of the class. We have many users who don't
want full async handling, but still want to listen for and catch events. I
need a proper sit and think for what the API I want to commit to supporting
and maintaining for this should look like. Not your problem, anyway!)


>
> > There's also an async version, but it doesn't look like you require that
> > complexity, so you can ignore it.
>
> Yes, that's the case: a serialized sync send/response logic works perfectly
> for this script. No need to be burden with asyncio complexity.
>
> Thanks,
> Mauro
>
>
Mauro Carvalho Chehab Aug. 9, 2024, 6:26 a.m. UTC | #7
Em Fri, 9 Aug 2024 00:41:37 +0200
Mauro Carvalho Chehab <mchehab+huawei@kernel.org> escreveu:

> > You should be able to use e.g.
> > 
> > legacy.py's QEMUMonitorProtocol class for synchronous connections, e.g.
> > 
> > from qemu.qmp.legacy import QEMUMonitorProtocol
> > 
> > qmp = QEMUMonitorProtocol((host, port))
> > qmp.connect(negotiate=True)  
> 
> That sounds interesting! I give it a try.

I applied the enclosed patch at the end of my patch series, but
somehow it is not working. For whatever reason, connect() is
raising a StateError apparently due to Runstate.CONNECTING.

I tried both as declaring (see enclosed patch):

	class qmp(QEMUMonitorProtocol)

and using:

-        super().__init__(self.host, self.port)
+        self.qmp_monitor = QEMUMonitorProtocol(self.host, self.port)

On both cases, it keeps waiting forever for a connection.

Regards,
Mauro

---

diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
index e9e9388bcb8b..62ca267cdc87 100644
--- a/scripts/qmp_helper.py
+++ b/scripts/qmp_helper.py
@@ -9,9 +9,23 @@
 import socket
 import sys
 
+from os import path
+
+try:
+    qemu_dir = path.abspath(path.dirname(path.dirname(__file__)))
+    sys.path.append(path.join(qemu_dir, 'python'))
+
+    from qemu.qmp.legacy import QEMUMonitorProtocol
+    from qemu.qmp.protocol import StateError
+
+except ModuleNotFoundError as exc:
+    print(f"Module '{exc.name}' not found.")
+    print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir")
+    sys.exit(1)
+
 from base64 import b64encode
 
-class qmp:
+class qmp(QEMUMonitorProtocol):
     """
     Opens a connection and send/receive QMP commands.
     """
@@ -21,22 +35,20 @@ def send_cmd(self, command, may_open=False,return_error=True):
 
         if may_open:
             self._connect()
-        elif not self.socket:
-            return None
+        elif not self.connected:
+            return False
 
         if isinstance(command, dict):
             data = json.dumps(command).encode("utf-8")
         else:
             data = command.encode("utf-8")
 
-        self.socket.sendall(data)
-        data = self.socket.recv(1024)
         try:
-            obj = json.loads(data.decode("utf-8"))
-        except json.JSONDecodeError as e:
-            print(f"Invalid QMP answer: {e}")
-            self._close()
-            return None
+            obj = self.cmd_obj(command)
+        except Exception as e:
+            print("Failed to inject error: {e}.")
+
+        print(obj)
 
         if "return" in obj:
             if isinstance(obj.get("return"), dict):
@@ -46,86 +58,47 @@ def send_cmd(self, command, may_open=False,return_error=True):
             else:
                 return obj["return"]
 
-        elif isinstance(obj.get("error"), dict):
-            error = obj["error"]
-            if return_error:
-                print(f'{error["class"]}: {error["desc"]}')
-        else:
-            print(json.dumps(obj))
-
         return None
 
     def _close(self):
         """Shutdown and close the socket, if opened"""
-        if not self.socket:
+        if not self.connected:
             return
 
-        self.socket.shutdown(socket.SHUT_WR)
-        while 1:
-            data = self.socket.recv(1024)
-            if data == b"":
-                break
-            try:
-                obj = json.loads(data.decode("utf-8"))
-            except json.JSONDecodeError as e:
-                print(f"Invalid QMP answer: {e}")
-                self.socket.close()
-                self.socket = None
-                return
-
-            if isinstance(obj.get("return"), dict):
-                print(json.dumps(obj["return"]))
-            if isinstance(obj.get("error"), dict):
-                error = obj["error"]
-                print(f'{error["class"]}: {error["desc"]}')
-            else:
-                print(json.dumps(obj))
-
-        self.socket.close()
-        self.socket = None
+        self.close()
+        self.connected = False
 
     def _connect(self):
         """Connect to a QMP TCP/IP port, if not connected yet"""
 
-        if self.socket:
+        if self.connected:
             return True
 
-        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            self.socket.connect((self.host, self.port))
-        except ConnectionRefusedError:
-            sys.exit(f"Can't connect to QMP host {self.host}:{self.port}")
-
-        data = self.socket.recv(1024)
-        try:
-            obj = json.loads(data.decode("utf-8"))
-        except json.JSONDecodeError as e:
-            print(f"Invalid QMP answer: {e}")
-            self._close()
-            return False
-
-        if "QMP" not in obj:
-            print(f"Invalid QMP answer: {data.decode('utf-8')}")
-            self._close()
-            return False
+        is_connecting = True
+        while is_connecting:
+            try:
+                ret = self.connect(negotiate=True)
+                self.accept()
+                is_connecting = False
+            except ConnectionError:
+                sys.exit(f"Can't connect to QMP host {self.host}:{self.port}")
+                return False
+            except StateError as e:
+                print(f"StateError: {e}")
 
-        result = self.send_cmd('{ "execute": "qmp_capabilities" }')
-        if not result:
-            self._close()
-            return False
+        self.connected = True
 
         return True
 
     def __init__(self, host, port, debug=False):
         """Initialize variables used by the QMP send logic"""
 
-        self.socket = None
+        self.connected = False
         self.host = host
         self.port = port
         self.debug = debug
 
-    def __del__(self):
-        self._close()
+        super().__init__(self.host, self.port)
 
     #
     # Socket QMP send command
@@ -168,8 +141,12 @@ def send_cper(self, guid, data):
 
         self._connect()
 
-        if self.send_cmd(command):
-            print("Error injected.")
+        try:
+            self.cmd_obj(command)
+        except Exception as e:
+            print("Failed to inject error: {e}.")
+
+        print("Error injected.")
 
     def search_qom(self, path, prop, regex):
         """
@@ -180,8 +157,9 @@ def search_qom(self, path, prop, regex):
             ...
         """
 
-        found = []
+        self._connect()
 
+        found = []
         i = 0
         while 1:
             dev = f"{path}[{i}]"
@@ -192,7 +170,11 @@ def search_qom(self, path, prop, regex):
                     'property': prop
                 }
             }
-            ret = self.send_cmd(cmd, may_open=True, return_error=False)
+            try:
+                ret = self.cmd_obj(cmd)
+            except Exception as e:
+                print("Failed to inject error: {e}.")
+
             if not ret:
                 break
 



Thanks,
Mauro
Mauro Carvalho Chehab Aug. 9, 2024, 7:37 a.m. UTC | #8
Em Fri, 9 Aug 2024 08:26:09 +0200
Mauro Carvalho Chehab <mchehab+huawei@kernel.org> escreveu:

> Em Fri, 9 Aug 2024 00:41:37 +0200
> Mauro Carvalho Chehab <mchehab+huawei@kernel.org> escreveu:
> 
> > > You should be able to use e.g.
> > > 
> > > legacy.py's QEMUMonitorProtocol class for synchronous connections, e.g.
> > > 
> > > from qemu.qmp.legacy import QEMUMonitorProtocol
> > > 
> > > qmp = QEMUMonitorProtocol((host, port))
> > > qmp.connect(negotiate=True)  
> > 
> > That sounds interesting! I give it a try.
> 
> I applied the enclosed patch at the end of my patch series, but
> somehow it is not working. For whatever reason, connect() is
> raising a StateError apparently due to Runstate.CONNECTING.
> 
> I tried both as declaring (see enclosed patch):
> 
> 	class qmp(QEMUMonitorProtocol)
> 
> and using:
> 
> -        super().__init__(self.host, self.port)
> +        self.qmp_monitor = QEMUMonitorProtocol(self.host, self.port)
> 
> On both cases, it keeps waiting forever for a connection.

Nevermind, placing host/post on a tuple made it work.

The enclosed patch converts the script to use QEMUMonitorProtocol.

I'll fold it with the script for the next spin of this series.

Regards,
Mauro

---

[PATCH] scripts/qmp_helper.py: use QEMUMonitorProtocol class

Instead of reinventing the wheel, let's use QEMUMonitorProtocol.

Signed-off-by: Mauro Carvalho Chehab <mchehab+huawei@kernel.org>

diff --git a/scripts/arm_processor_error.py b/scripts/arm_processor_error.py
index 756935a2263c..f869f07860b8 100644
--- a/scripts/arm_processor_error.py
+++ b/scripts/arm_processor_error.py
@@ -169,14 +169,11 @@ def send_cper(self, args):
         if args.mpidr:
             cper["mpidr-el1"] = arg["mpidr"]
         elif cpus:
-            get_mpidr = {
-                "execute": "qom-get",
-                "arguments": {
-                    'path': cpus[0],
-                    'property': "x-mpidr"
-                }
+            cmd_arg = {
+                'path': cpus[0],
+                'property': "x-mpidr"
             }
-            ret = qmp_cmd.send_cmd(get_mpidr, may_open=True)
+            ret = qmp_cmd.send_cmd("qom-get", cmd_arg, may_open=True)
             if isinstance(ret, int):
                 cper["mpidr-el1"] = ret
             else:
@@ -291,8 +288,7 @@ def send_cper(self, args):
         context_info_num = 0
 
         if ctx:
-            ret = qmp_cmd.send_cmd('{ "execute": "query-target" }',
-                                   may_open=True)
+            ret = qmp_cmd.send_cmd("query-target", may_open=True)
 
             default_ctx = self.CONTEXT_MISC_REG
 
@@ -363,14 +359,11 @@ def send_cper(self, args):
 
         if "midr-el1" not in arg:
             if cpus:
-                get_mpidr = {
-                    "execute": "qom-get",
-                    "arguments": {
-                        'path': cpus[0],
-                        'property': "midr"
-                    }
+                cmd_arg = {
+                    'path': cpus[0],
+                    'property': "midr"
                 }
-                ret = qmp_cmd.send_cmd(get_mpidr, may_open=True)
+                ret = qmp_cmd.send_cmd("qom-get", cmd_arg, may_open=True)
                 if isinstance(ret, int):
                     arg["midr-el1"] = ret
 
diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
index 7214c15c6718..e2e0a881f6c1 100644
--- a/scripts/qmp_helper.py
+++ b/scripts/qmp_helper.py
@@ -9,6 +9,19 @@
 import socket
 import sys
 
+from os import path
+
+try:
+    qemu_dir = path.abspath(path.dirname(path.dirname(__file__)))
+    sys.path.append(path.join(qemu_dir, 'python'))
+
+    from qemu.qmp.legacy import QEMUMonitorProtocol
+
+except ModuleNotFoundError as exc:
+    print(f"Module '{exc.name}' not found.")
+    print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir")
+    sys.exit(1)
+
 from base64 import b64encode
 
 class qmp:
@@ -16,26 +29,23 @@ class qmp:
     Opens a connection and send/receive QMP commands.
     """
 
-    def send_cmd(self, command, may_open=False, return_error=True):
+    def send_cmd(self, command, args=None, may_open=False, return_error=True):
         """Send a command to QMP, optinally opening a connection"""
 
         if may_open:
             self._connect()
-        elif not self.socket:
-            return None
+        elif not self.connected:
+            return False
 
-        if isinstance(command, dict):
-            data = json.dumps(command).encode("utf-8")
-        else:
-            data = command.encode("utf-8")
+        msg = { 'execute': command }
+        if args:
+            msg['arguments'] = args
 
-        self.socket.sendall(data)
-        data = self.socket.recv(1024)
         try:
-            obj = json.loads(data.decode("utf-8"))
-        except json.JSONDecodeError as e:
-            print(f"Invalid QMP answer: {e}")
-            self._close()
+            obj = self.qmp_monitor.cmd_obj(msg)
+        except Exception as e:
+            print(f"Command: {command}")
+            print(f"Failed to inject error: {e}.")
             return None
 
         if "return" in obj:
@@ -49,6 +59,7 @@ def send_cmd(self, command, may_open=False, return_error=True):
         elif isinstance(obj.get("error"), dict):
             error = obj["error"]
             if return_error:
+                print(f"Command: {msg}")
                 print(f'{error["class"]}: {error["desc"]}')
         else:
             print(json.dumps(obj))
@@ -57,75 +68,37 @@ def send_cmd(self, command, may_open=False, return_error=True):
 
     def _close(self):
         """Shutdown and close the socket, if opened"""
-        if not self.socket:
+        if not self.connected:
             return
 
-        self.socket.shutdown(socket.SHUT_WR)
-        while 1:
-            data = self.socket.recv(1024)
-            if data == b"":
-                break
-            try:
-                obj = json.loads(data.decode("utf-8"))
-            except json.JSONDecodeError as e:
-                print(f"Invalid QMP answer: {e}")
-                self.socket.close()
-                self.socket = None
-                return
-
-            if isinstance(obj.get("return"), dict):
-                print(json.dumps(obj["return"]))
-            if isinstance(obj.get("error"), dict):
-                error = obj["error"]
-                print(f'{error["class"]}: {error["desc"]}')
-            else:
-                print(json.dumps(obj))
-
-        self.socket.close()
-        self.socket = None
+        self.qmp_monitor.close()
+        self.connected = False
 
     def _connect(self):
         """Connect to a QMP TCP/IP port, if not connected yet"""
 
-        if self.socket:
+        if self.connected:
             return True
 
-        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         try:
-            self.socket.connect((self.host, self.port))
-        except ConnectionRefusedError:
+            ret = self.qmp_monitor.connect(negotiate=True)
+        except ConnectionError:
             sys.exit(f"Can't connect to QMP host {self.host}:{self.port}")
-
-        data = self.socket.recv(1024)
-        try:
-            obj = json.loads(data.decode("utf-8"))
-        except json.JSONDecodeError as e:
-            print(f"Invalid QMP answer: {e}")
-            self._close()
             return False
 
-        if "QMP" not in obj:
-            print(f"Invalid QMP answer: {data.decode('utf-8')}")
-            self._close()
-            return False
-
-        result = self.send_cmd('{ "execute": "qmp_capabilities" }')
-        if not result:
-            self._close()
-            return False
+        self.connected = True
 
         return True
 
     def __init__(self, host, port, debug=False):
         """Initialize variables used by the QMP send logic"""
 
-        self.socket = None
+        self.connected = False
         self.host = host
         self.port = port
         self.debug = debug
 
-    def __del__(self):
-        self._close()
+        self.qmp_monitor = QEMUMonitorProtocol(address=(self.host, self.port))
 
     #
     # Socket QMP send command
@@ -142,9 +115,6 @@ def send_cper(self, guid, data):
             }
         }
 
-        command = '{ "execute": "ghes-cper", '
-        command += '"arguments": ' + json.dumps(cmd_arg) + " }"
-
         if self.debug:
             print(f"GUID: {guid}")
             print("CPER:")
@@ -168,7 +138,7 @@ def send_cper(self, guid, data):
 
         self._connect()
 
-        if self.send_cmd(command):
+        if self.send_cmd("ghes-cper", cmd_arg):
             print("Error injected.")
 
     def search_qom(self, path, prop, regex):
@@ -185,14 +155,11 @@ def search_qom(self, path, prop, regex):
         i = 0
         while 1:
             dev = f"{path}[{i}]"
-            cmd =  {
-                "execute": "qom-get",
-                "arguments": {
-                    'path': dev,
-                    'property': prop
-                }
+            args = {
+                'path': dev,
+                'property': prop
             }
-            ret = self.send_cmd(cmd, may_open=True, return_error=False)
+            ret = self.send_cmd("qom-get", args, may_open=True, return_error=False)
             if not ret:
                 break
Mauro Carvalho Chehab Aug. 9, 2024, 8:24 a.m. UTC | #9
Em Thu, 8 Aug 2024 19:33:32 -0400
John Snow <jsnow@redhat.com> escreveu:

> > > Then here you'd use qmp.cmd (raises exception on QMPError) or qmp.cmd_raw
> > > or qmp.cmd_obj (returns the QMP response as the return value even if it  
> > was  
> > > an error.)  
> >
> > Good to know, I'll try and see what fits best.
> >  
> 
> I might *suggest* you try to use the exception-raising interface and catch
> exceptions to interrogate expected errors as it aligns better with the
> "idiomatic python API" - I have no plans to support an external API that
> *returns* error objects except via the exception class. This approach will
> be easier to port when I drop the legacy interface in the future, see below.
> 
> But, that said, whichever is easiest. We use all three interfaces in many
> places in the QEMU tree. I have no grounds to require you to use a specific
> one ;)

While a python-style exception handling is cool, I ended opting to use 
cmd_obj(), as the script needs to catch the end of /machine/unattached/device[]
array, and using cmd_obj() made the conversion easier.

One of the things I missed at the documentation is a description of the
possible exceptions that cmd() could raise.

It is probably worth documenting it and placing them on a QMP-specific
error class, but a change like that would probably be incompatible with
the existing applications. Probably something to be considered on your
TODO list to move this from legacy ;-)

Anyway, I already folded the changes at the branch I'll be using as basis
for the next submission (be careful to use it, as I'm always rebasing it):

	https://gitlab.com/mchehab_kernel/qemu/-/commit/62feb8f6037ab762a9848eb601a041fbbbe2a77a#b665bcbc1e5ae3a488f1c0f20f8c29ae640bfa63_0_17


Thanks,
Mauro
John Snow Aug. 9, 2024, 7:26 p.m. UTC | #10
On Fri, Aug 9, 2024, 4:24 AM Mauro Carvalho Chehab <
mchehab+huawei@kernel.org> wrote:

> Em Thu, 8 Aug 2024 19:33:32 -0400
> John Snow <jsnow@redhat.com> escreveu:
>
> > > > Then here you'd use qmp.cmd (raises exception on QMPError) or
> qmp.cmd_raw
> > > > or qmp.cmd_obj (returns the QMP response as the return value even if
> it
> > > was
> > > > an error.)
> > >
> > > Good to know, I'll try and see what fits best.
> > >
> >
> > I might *suggest* you try to use the exception-raising interface and
> catch
> > exceptions to interrogate expected errors as it aligns better with the
> > "idiomatic python API" - I have no plans to support an external API that
> > *returns* error objects except via the exception class. This approach
> will
> > be easier to port when I drop the legacy interface in the future, see
> below.
> >
> > But, that said, whichever is easiest. We use all three interfaces in many
> > places in the QEMU tree. I have no grounds to require you to use a
> specific
> > one ;)
>
> While a python-style exception handling is cool, I ended opting to use
> cmd_obj(), as the script needs to catch the end of
> /machine/unattached/device[]
> array, and using cmd_obj() made the conversion easier.
>
> One of the things I missed at the documentation is a description of the
> possible exceptions that cmd() could raise.
>
> It is probably worth documenting it and placing them on a QMP-specific
> error class, but a change like that would probably be incompatible with
> the existing applications. Probably something to be considered on your
> TODO list to move this from legacy ;-)
>

Good feedback, thanks! I definitely didn't spend much time polishing the
"legacy" interface. I clearly thought it'd be more temporary than it became
;)

I owe the package some updates for 3.13, I'll improve the documentation and
also consider adding some "you forgot to make the address a tuple"
protection so that part is less of a trap. (Without the tuple, I think it
likely used the address as a socket path and the port as a bool to enter
server mode. mypy would catch this, but it's a design goal to not require
or expect script writers to need such things.)

Thank you! :)


> Anyway, I already folded the changes at the branch I'll be using as basis
> for the next submission (be careful to use it, as I'm always rebasing it):
>

Great, I'll review the entire script more thoroughly on v2, if that's OK
with you.

Just got back from a long PTO and an illness and I'm still ramping back up
and handling backlog.


>
> https://gitlab.com/mchehab_kernel/qemu/-/commit/62feb8f6037ab762a9848eb601a041fbbbe2a77a#b665bcbc1e5ae3a488f1c0f20f8c29ae640bfa63_0_17
>
>
> Thanks,
> Mauro
>

~~js

>
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 655edcb6688c..e490f69da1de 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2081,6 +2081,9 @@  S: Maintained
 F: hw/arm/ghes_cper.c
 F: hw/acpi/ghes_cper_stub.c
 F: qapi/ghes-cper.json
+F: scripts/ghes_inject.py
+F: scripts/arm_processor_error.py
+F: scripts/qmp_helper.py
 
 ppc4xx
 L: qemu-ppc@nongnu.org
diff --git a/scripts/arm_processor_error.py b/scripts/arm_processor_error.py
new file mode 100644
index 000000000000..df4efa508790
--- /dev/null
+++ b/scripts/arm_processor_error.py
@@ -0,0 +1,352 @@ 
+#!/usr/bin/env python3
+#
+# pylint: disable=C0301, C0114, R0912, R0913, R0914, R0915, W0511
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+# TODO: current implementation has dummy defaults.
+#
+# For a better implementation, a QMP addition/call is needed to
+# retrieve some data for ARM Processor Error injection:
+#
+#   - machine emulation architecture, as ARM current default is
+#     for AArch64;
+#   - ARM registers: power_state, midr, mpidr.
+
+import argparse
+import json
+
+from qmp_helper import (qmp_command, get_choice, get_mult_array,
+                        get_mult_choices, get_mult_int, bit,
+                        data_add, to_guid)
+
+# Arm processor EINJ logic
+#
+ACPI_GHES_ARM_CPER_LENGTH = 40
+ACPI_GHES_ARM_CPER_PEI_LENGTH = 32
+
+# TODO: query it from emulation. Current default valid only for Aarch64
+CONTEXT_AARCH64_EL1 = 5
+
+class ArmProcessorEinj:
+    """
+    Implements ARM Processor Error injection via GHES
+    """
+
+    def __init__(self):
+        """Initialize the error injection class"""
+
+        # Valid choice values
+        self.arm_valid_bits = {
+            "mpidr":    bit(0),
+            "affinity": bit(1),
+            "running":  bit(2),
+            "vendor":   bit(3),
+        }
+
+        self.pei_flags = {
+            "first":        bit(0),
+            "last":         bit(1),
+            "propagated":   bit(2),
+            "overflow":     bit(3),
+        }
+
+        self.pei_error_types = {
+            "cache":        bit(1),
+            "tlb":          bit(2),
+            "bus":          bit(3),
+            "micro-arch":   bit(4),
+        }
+
+        self.pei_valid_bits = {
+            "multiple-error":   bit(0),
+            "flags":            bit(1),
+            "error-info":       bit(2),
+            "virt-addr":        bit(3),
+            "phy-addr":         bit(4),
+        }
+
+        self.data = bytearray()
+
+    def create_subparser(self, subparsers):
+        """Add a subparser to handle for the error fields"""
+
+        parser = subparsers.add_parser("arm",
+                                       help="Generate an ARM processor CPER")
+
+        arm_valid_bits = ",".join(self.arm_valid_bits.keys())
+        flags = ",".join(self.pei_flags.keys())
+        error_types = ",".join(self.pei_error_types.keys())
+        pei_valid_bits = ",".join(self.arm_valid_bits.keys())
+
+        # UEFI N.16 ARM Validation bits
+        g_arm = parser.add_argument_group("ARM processor")
+        g_arm.add_argument("--arm", "--arm-valid",
+                           help=f"ARM valid bits: {arm_valid_bits}")
+        g_arm.add_argument("-a", "--affinity",  "--level", "--affinity-level",
+                           type=lambda x: int(x, 0),
+                           help="Affinity level (when multiple levels apply)")
+        g_arm.add_argument("-l", "--mpidr", type=lambda x: int(x, 0),
+                           help="Multiprocessor Affinity Register")
+        g_arm.add_argument("-i", "--midr", type=lambda x: int(x, 0),
+                           help="Main ID Register")
+        g_arm.add_argument("-r", "--running",
+                           action=argparse.BooleanOptionalAction,
+                           default=None,
+                           help="Indicates if the processor is running or not")
+        g_arm.add_argument("--psci", "--psci-state",
+                           type=lambda x: int(x, 0),
+                           help="Power State Coordination Interface - PSCI state")
+
+        # TODO: Add vendor-specific support
+
+        # UEFI N.17 bitmaps (type and flags)
+        g_pei = parser.add_argument_group("ARM Processor Error Info (PEI)")
+        g_pei.add_argument("-t", "--type", nargs="+",
+                        help=f"one or more error types: {error_types}")
+        g_pei.add_argument("-f", "--flags", nargs="*",
+                        help=f"zero or more error flags: {flags}")
+        g_pei.add_argument("-V", "--pei-valid", "--error-valid", nargs="*",
+                        help=f"zero or more PEI valid bits: {pei_valid_bits}")
+
+        # UEFI N.17 Integer values
+        g_pei.add_argument("-m", "--multiple-error", nargs="+",
+                        help="Number of errors: 0: Single error, 1: Multiple errors, 2-65535: Error count if known")
+        g_pei.add_argument("-e", "--error-info", nargs="+",
+                        help="Error information (UEFI 2.10 tables N.18 to N.20)")
+        g_pei.add_argument("-p", "--physical-address",  nargs="+",
+                        help="Physical address")
+        g_pei.add_argument("-v", "--virtual-address",  nargs="+",
+                        help="Virtual address")
+
+        # UEFI N.21 Context
+        g_ctx = parser.add_argument_group("Processor Context")
+        g_ctx.add_argument("--ctx-type", "--context-type", nargs="*",
+                        help="Type of the context (0=ARM32 GPR, 5=ARM64 EL1, other values supported)")
+        g_ctx.add_argument("--ctx-size", "--context-size", nargs="*",
+                        help="Minimal size of the context")
+        g_ctx.add_argument("--ctx-array", "--context-array", nargs="*",
+                        help="Comma-separated arrays for each context")
+
+        # Vendor-specific data
+        g_vendor = parser.add_argument_group("Vendor-specific data")
+        g_vendor.add_argument("--vendor", "--vendor-specific", nargs="+",
+                        help="Vendor-specific byte arrays of data")
+
+    def parse_args(self, args):
+        """Parse subcommand arguments"""
+
+        cper = {}
+        pei = {}
+        ctx = {}
+        vendor = {}
+
+        arg = vars(args)
+
+        # Handle global parameters
+        if args.arm:
+            arm_valid_init = False
+            cper["valid"] = get_choice(name="valid",
+                                       value=args.arm,
+                                       choices=self.arm_valid_bits,
+                                       suffixes=["-error", "-err"])
+        else:
+            cper["valid"] = 0
+            arm_valid_init = True
+
+        if "running" in arg:
+            if args.running:
+                cper["running-state"] = bit(0)
+            else:
+                cper["running-state"] = 0
+        else:
+            cper["running-state"] = 0
+
+        if arm_valid_init:
+            if args.affinity:
+                cper["valid"] |= self.arm_valid_bits["affinity"]
+
+            if args.mpidr:
+                cper["valid"] |= self.arm_valid_bits["mpidr"]
+
+            if "running-state" in cper:
+                cper["valid"] |= self.arm_valid_bits["running"]
+
+            if args.psci:
+                cper["valid"] |= self.arm_valid_bits["running"]
+
+        # Handle PEI
+        if not args.type:
+            args.type = ["cache-error"]
+
+        get_mult_choices(
+            pei,
+            name="valid",
+            values=args.pei_valid,
+            choices=self.pei_valid_bits,
+            suffixes=["-valid", "-info", "--information", "--addr"],
+        )
+        get_mult_choices(
+            pei,
+            name="type",
+            values=args.type,
+            choices=self.pei_error_types,
+            suffixes=["-error", "-err"],
+        )
+        get_mult_choices(
+            pei,
+            name="flags",
+            values=args.flags,
+            choices=self.pei_flags,
+            suffixes=["-error", "-cap"],
+        )
+        get_mult_int(pei, "error-info", args.error_info)
+        get_mult_int(pei, "multiple-error", args.multiple_error)
+        get_mult_int(pei, "phy-addr", args.physical_address)
+        get_mult_int(pei, "virt-addr", args.virtual_address)
+
+        # Handle context
+        get_mult_int(ctx, "type", args.ctx_type, allow_zero=True)
+        get_mult_int(ctx, "minimal-size", args.ctx_size, allow_zero=True)
+        get_mult_array(ctx, "register", args.ctx_array, allow_zero=True)
+
+        get_mult_array(vendor, "bytes", args.vendor, max_val=255)
+
+        # Store PEI
+        pei_data = bytearray()
+        default_flags  = self.pei_flags["first"]
+        default_flags |= self.pei_flags["last"]
+
+        error_info_num = 0
+
+        for i, p in pei.items():        # pylint: disable=W0612
+            error_info_num += 1
+
+            # UEFI 2.10 doesn't define how to encode error information
+            # when multiple types are raised. So, provide a default only
+            # if a single type is there
+            if "error-info" not in p:
+                if p["type"] == bit(1):
+                    p["error-info"] = 0x0091000F
+                if p["type"] == bit(2):
+                    p["error-info"] = 0x0054007F
+                if p["type"] == bit(3):
+                    p["error-info"] = 0x80D6460FFF
+                if p["type"] == bit(4):
+                    p["error-info"] = 0x78DA03FF
+
+            if "valid" not in p:
+                p["valid"] = 0
+                if "multiple-error" in p:
+                    p["valid"] |= self.pei_valid_bits["multiple-error"]
+
+                if "flags" in p:
+                    p["valid"] |= self.pei_valid_bits["flags"]
+
+                if "error-info" in p:
+                    p["valid"] |= self.pei_valid_bits["error-info"]
+
+                if "phy-addr" in p:
+                    p["valid"] |= self.pei_valid_bits["phy-addr"]
+
+                if "virt-addr" in p:
+                    p["valid"] |= self.pei_valid_bits["virt-addr"]
+
+            # Version
+            data_add(pei_data, 0, 1)
+
+            data_add(pei_data, ACPI_GHES_ARM_CPER_PEI_LENGTH, 1)
+
+            data_add(pei_data, p["valid"], 2)
+            data_add(pei_data, p["type"], 1)
+            data_add(pei_data, p.get("multiple-error", 1), 2)
+            data_add(pei_data, p.get("flags", default_flags), 1)
+            data_add(pei_data, p.get("error-info", 0), 8)
+            data_add(pei_data, p.get("virt-addr", 0xDEADBEEF), 8)
+            data_add(pei_data, p.get("phy-addr", 0xABBA0BAD), 8)
+
+        # Store Context
+        ctx_data = bytearray()
+        context_info_num = 0
+
+        if ctx:
+            for k in sorted(ctx.keys()):
+                context_info_num += 1
+
+                if "type" not in ctx:
+                    ctx[k]["type"] = CONTEXT_AARCH64_EL1
+
+                if "register" not in ctx:
+                    ctx[k]["register"] = []
+
+                reg_size = len(ctx[k]["register"])
+                size = 0
+
+                if "minimal-size" in ctx:
+                    size = ctx[k]["minimal-size"]
+
+                size = max(size, reg_size)
+
+                size = (size + 1) % 0xFFFE
+
+                # Version
+                data_add(ctx_data, 0, 2)
+
+                data_add(ctx_data, ctx[k]["type"], 2)
+
+                data_add(ctx_data, 8 * size, 4)
+
+                for r in ctx[k]["register"]:
+                    data_add(ctx_data, r, 8)
+
+                for i in range(reg_size, size):   # pylint: disable=W0612
+                    data_add(ctx_data, 0, 8)
+
+        # Vendor-specific bytes are not grouped
+        vendor_data = bytearray()
+        if vendor:
+            for k in sorted(vendor.keys()):
+                for b in vendor[k]["bytes"]:
+                    data_add(vendor_data, b, 1)
+
+        # Encode ARM Processor Error
+        data = bytearray()
+
+        data_add(data, cper["valid"], 4)
+
+        data_add(data, error_info_num, 2)
+        data_add(data, context_info_num, 2)
+
+        # Calculate the length of the CPER data
+        cper_length = ACPI_GHES_ARM_CPER_LENGTH
+        cper_length += len(pei_data)
+        cper_length += len(vendor_data)
+        cper_length += len(ctx_data)
+        data_add(data, cper_length, 4)
+
+        data_add(data, arg.get("affinity-level", 0), 1)
+
+        # Reserved
+        data_add(data, 0, 3)
+
+        data_add(data, arg.get("mpidr-el1", 0), 8)
+        data_add(data, arg.get("midr-el1", 0), 8)
+        data_add(data, cper["running-state"], 4)
+        data_add(data, arg.get("psci-state", 0), 4)
+
+        # Add PEI
+        data.extend(pei_data)
+        data.extend(ctx_data)
+        data.extend(vendor_data)
+
+        self.data = data
+
+    def run(self, host, port):
+        """Execute QMP commands"""
+
+        guid = to_guid(0xE19E3D16, 0xBC11, 0x11E4,
+                       [0x9C, 0xAA, 0xC2, 0x05,
+                        0x1D, 0x5D, 0x46, 0xB0])
+
+        qmp_command(host, port, guid, self.data)
diff --git a/scripts/ghes_inject.py b/scripts/ghes_inject.py
new file mode 100755
index 000000000000..8415ccbbc53d
--- /dev/null
+++ b/scripts/ghes_inject.py
@@ -0,0 +1,59 @@ 
+#!/usr/bin/env python3
+#
+# pylint: disable=C0301, C0114
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+import argparse
+
+from arm_processor_error import ArmProcessorEinj
+
+EINJ_DESCRIPTION = """
+Handle ACPI GHESv2 error injection logic QEMU QMP interface.\n
+
+It allows using UEFI BIOS EINJ features to generate GHES records.
+
+It helps testing Linux CPER and GHES drivers and to test rasdaemon
+error handling logic.
+
+Currently, it support ARM processor error injection for ARM processor
+events, being compatible with UEFI 2.9A Errata.
+
+This small utility works together with those QEMU additions:
+- https://gitlab.com/mchehab_kernel/qemu/-/tree/arm-error-inject-v2
+"""
+
+def main():
+    """Main program"""
+
+    # Main parser - handle generic args like QEMU QMP TCP socket options
+    parser = argparse.ArgumentParser(prog="einj.py",
+                                     formatter_class=argparse.RawDescriptionHelpFormatter,
+                                     usage="%(prog)s [options]",
+                                     description=EINJ_DESCRIPTION,
+                                     epilog="If a field is not defined, a default value will be applied by QEMU.")
+
+    g_options = parser.add_argument_group("QEMU QMP socket options")
+    g_options.add_argument("-H", "--host", default="localhost", type=str,
+                           help="host name")
+    g_options.add_argument("-P", "--port", default=4445, type=int,
+                           help="TCP port number")
+
+    arm_einj = ArmProcessorEinj()
+
+    # Call subparsers
+    subparsers = parser.add_subparsers(dest='command')
+
+    arm_einj.create_subparser(subparsers)
+
+    args = parser.parse_args()
+
+    # Handle subparser commands
+    if args.command == "arm":
+        arm_einj.parse_args(args)
+        arm_einj.run(args.host, args.port)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/scripts/qmp_helper.py b/scripts/qmp_helper.py
new file mode 100644
index 000000000000..13fae7a7af0e
--- /dev/null
+++ b/scripts/qmp_helper.py
@@ -0,0 +1,249 @@ 
+#!/usr/bin/env python3
+#
+# pylint: disable=C0301, C0114, R0912, R0913, R0915, W0511
+# SPDX-License-Identifier: GPL-2.0
+#
+# Copyright (C) 2024 Mauro Carvalho Chehab <mchehab+huawei@kernel.org>
+
+import json
+import socket
+import sys
+
+from base64 import b64encode
+
+#
+# Socket QMP send command
+#
+def qmp_command(host, port, guid, data):
+    """Send commands to QEMU though QMP TCP socket"""
+
+    # Fill the commands to be sent
+    commands = []
+
+    # Needed to negotiate QMP and for QEMU to accept the command
+    commands.append('{ "execute": "qmp_capabilities" } ')
+
+    base64_data = b64encode(bytes(data)).decode('ascii')
+
+    cmd_arg = {
+        'cper': {
+            'notification-type': guid,
+            "raw-data": base64_data
+        }
+    }
+
+    command = '{ "execute": "ghes-cper", '
+    command += '"arguments": ' + json.dumps(cmd_arg) + " }"
+
+    commands.append(command)
+
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    try:
+        s.connect((host, port))
+    except ConnectionRefusedError:
+        sys.exit(f"Can't connect to QMP host {host}:{port}")
+
+    data = s.recv(1024)
+    try:
+        obj = json.loads(data.decode("utf-8"))
+    except json.JSONDecodeError as e:
+        print(f"Invalid QMP answer: {e}")
+        s.close()
+        return
+
+    if "QMP" not in obj:
+        print(f"Invalid QMP answer: {data.decode("utf-8")}")
+        s.close()
+        return
+
+    for i, command in enumerate(commands):
+        s.sendall(command.encode("utf-8"))
+        data = s.recv(1024)
+        try:
+            obj = json.loads(data.decode("utf-8"))
+        except json.JSONDecodeError as e:
+            print(f"Invalid QMP answer: {e}")
+            s.close()
+            return
+
+        if isinstance(obj.get("return"), dict):
+            if obj["return"]:
+                print(json.dumps(obj["return"]))
+            elif i > 0:
+                print("Error injected.")
+        elif isinstance(obj.get("error"), dict):
+            error = obj["error"]
+            print(f'{error["class"]}: {error["desc"]}')
+        else:
+            print(json.dumps(obj))
+
+    s.shutdown(socket.SHUT_WR)
+    while 1:
+        data = s.recv(1024)
+        if data == b"":
+            break
+        try:
+            obj = json.loads(data.decode("utf-8"))
+        except json.JSONDecodeError as e:
+            print(f"Invalid QMP answer: {e}")
+            s.close()
+            return
+
+        if isinstance(obj.get("return"), dict):
+            print(json.dumps(obj["return"]))
+        if isinstance(obj.get("error"), dict):
+            error = obj["error"]
+            print(f'{error["class"]}: {error["desc"]}')
+        else:
+            print(json.dumps(obj))
+
+    s.close()
+
+
+#
+# Helper routines to handle multiple choice arguments
+#
+def get_choice(name, value, choices, suffixes=None):
+    """Produce a list from multiple choice argument"""
+
+    new_values = 0
+
+    if not value:
+        return new_values
+
+    for val in value.split(","):
+        val = val.lower()
+
+        if suffixes:
+            for suffix in suffixes:
+                val = val.removesuffix(suffix)
+
+        if val not in choices.keys():
+            sys.exit(f"Error on '{name}': choice {val} is invalid.")
+
+        val = choices[val]
+
+        new_values |= val
+
+    return new_values
+
+
+def get_mult_array(mult, name, values, allow_zero=False, max_val=None):
+    """Add numbered hashes from integer lists"""
+
+    if not allow_zero:
+        if not values:
+            return
+    else:
+        if values is None:
+            return
+
+        if not values:
+            i = 0
+            if i not in mult:
+                mult[i] = {}
+
+            mult[i][name] = []
+            return
+
+    i = 0
+    for value in values:
+        for val in value.split(","):
+            try:
+                val = int(val, 0)
+            except ValueError:
+                sys.exit(f"Error on '{name}': {val} is not an integer")
+
+            if val < 0:
+                sys.exit(f"Error on '{name}': {val} is not unsigned")
+
+            if max_val and val > max_val:
+                sys.exit(f"Error on '{name}': {val} is too little")
+
+            if i not in mult:
+                mult[i] = {}
+
+            if name not in mult[i]:
+                mult[i][name] = []
+
+            mult[i][name].append(val)
+
+        i += 1
+
+
+def get_mult_choices(mult, name, values, choices,
+                     suffixes=None, allow_zero=False):
+    """Add numbered hashes from multiple choice arguments"""
+
+    if not allow_zero:
+        if not values:
+            return
+    else:
+        if values is None:
+            return
+
+    i = 0
+    for val in values:
+        new_values = get_choice(name, val, choices, suffixes)
+
+        if i not in mult:
+            mult[i] = {}
+
+        mult[i][name] = new_values
+        i += 1
+
+
+def get_mult_int(mult, name, values, allow_zero=False):
+    """Add numbered hashes from integer arguments"""
+    if not allow_zero:
+        if not values:
+            return
+    else:
+        if values is None:
+            return
+
+    i = 0
+    for val in values:
+        try:
+            val = int(val, 0)
+        except ValueError:
+            sys.exit(f"Error on '{name}': {val} is not an integer")
+
+        if val < 0:
+            sys.exit(f"Error on '{name}': {val} is not unsigned")
+
+        if i not in mult:
+            mult[i] = {}
+
+        mult[i][name] = val
+        i += 1
+
+
+#
+# Data encode helper functions
+#
+def bit(b):
+    """Simple macro to define a bit on a bitmask"""
+    return 1 << b
+
+
+def data_add(data, value, num_bytes):
+    """Adds bytes from value inside a bitarray"""
+
+    data.extend(value.to_bytes(num_bytes, byteorder="little"))
+
+def to_guid(time_low, time_mid, time_high, nodes):
+    """Create an GUID string"""
+
+    assert(len(nodes) == 8)
+
+    clock = nodes[0] << 8 | nodes[1]
+
+    node = 0
+    for i in range(2, len(nodes)):
+        node = node << 8 | nodes[i]
+
+    s = f"{time_low:08x}-{time_mid:04x}-"
+    s += f"{time_high:04x}-{clock:04x}-{node:012x}"
+
+    return s