new file mode 100755
@@ -0,0 +1,486 @@
+#!/usr/bin/env python3
+# ---------------------------------------------------------------------------- #
+
+from configparser import ConfigParser
+from contextlib import contextmanager
+from dataclasses import dataclass
+import json
+import os
+import os.path
+import shlex
+import subprocess
+import sys
+import re
+from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter
+from multiprocessing import Pool
+from pathlib import Path
+import textwrap
+import time
+from typing import (
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ NoReturn,
+ Sequence,
+ Union,
+)
+
+import clang.cindex # type: ignore
+
+from static_analyzer import CHECKS, CheckContext
+
+# ---------------------------------------------------------------------------- #
+# Usage
+
+
+def parse_args() -> Namespace:
+
+ available_checks = "\n".join(
+ f" {name} {' '.join((CHECKS[name].__doc__ or '').split())}"
+ for name in sorted(CHECKS)
+ )
+
+ parser = ArgumentParser(
+ allow_abbrev=False,
+ formatter_class=RawDescriptionHelpFormatter,
+ description=textwrap.dedent(
+ """
+ Checks are best-effort, but should never report false positives.
+
+ This only considers translation units enabled under the given QEMU
+ build configuration. Note that a single .c file may give rise to
+ several translation units.
+
+ You should build QEMU before running this, since some translation
+ units depend on files that are generated during the build. If you
+ don't, you'll get errors, but should never get false negatives.
+ """
+ ),
+ epilog=textwrap.dedent(
+ f"""
+ available checks:
+ {available_checks}
+
+ exit codes:
+ 0 No problems found.
+ 1 Internal failure.
+ 2 Bad usage.
+ 3 Problems found in one or more translation units.
+ """
+ ),
+ )
+
+ parser.add_argument(
+ "build_dir",
+ type=Path,
+ help="Path to the build directory.",
+ )
+
+ parser.add_argument(
+ "translation_units",
+ type=Path,
+ nargs="*",
+ help=(
+ "Analyze only translation units whose root source file matches or"
+ " is under one of the given paths."
+ ),
+ )
+
+ # add regular options
+
+ parser.add_argument(
+ "-c",
+ "--check",
+ metavar="CHECK",
+ dest="check_names",
+ choices=sorted(CHECKS),
+ action="append",
+ help=(
+ "Enable the given check. Can be given multiple times. If not given,"
+ " all checks are enabled."
+ ),
+ )
+
+ parser.add_argument(
+ "-j",
+ "--jobs",
+ dest="threads",
+ type=int,
+ default=os.cpu_count() or 1,
+ help=(
+ f"Number of threads to employ. Defaults to {os.cpu_count() or 1} on"
+ f" this machine."
+ ),
+ )
+
+ # add development options
+
+ dev_options = parser.add_argument_group("development options")
+
+ dev_options.add_argument(
+ "--profile",
+ metavar="SORT_KEY",
+ help=(
+ "Profile execution. Forces single-threaded execution. The argument"
+ " specifies how to sort the results; see"
+ " https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats"
+ ),
+ )
+
+ dev_options.add_argument(
+ "--skip-checks",
+ action="store_true",
+ help="Do everything except actually running the checks.",
+ )
+
+ # parse arguments
+
+ args = parser.parse_args()
+ args.check_names = sorted(set(args.check_names or CHECKS))
+
+ return args
+
+
+# ---------------------------------------------------------------------------- #
+# Main
+
+
+def main() -> int:
+
+ args = parse_args()
+
+ if args.profile:
+
+ import cProfile
+ import pstats
+
+ profile = cProfile.Profile()
+
+ try:
+ return profile.runcall(lambda: analyze(args))
+ finally:
+ stats = pstats.Stats(profile, stream=sys.stderr)
+ stats.strip_dirs()
+ stats.sort_stats(args.profile)
+ stats.print_stats()
+
+ else:
+
+ return analyze(args)
+
+
+def analyze(args: Namespace) -> int:
+
+ tr_units = get_translation_units(args)
+
+ # analyze translation units
+
+ start_time = time.monotonic()
+ results: List[bool] = []
+
+ if len(tr_units) == 1:
+ progress_suffix = " of 1 translation unit...\033[0m\r"
+ else:
+ progress_suffix = f" of {len(tr_units)} translation units...\033[0m\r"
+
+ def print_progress() -> None:
+ print(f"\033[0;34mAnalyzed {len(results)}" + progress_suffix, end="")
+
+ print_progress()
+
+ def collect_results(results_iter: Iterable[bool]) -> None:
+ if sys.stdout.isatty():
+ for r in results_iter:
+ results.append(r)
+ print_progress()
+ else:
+ for r in results_iter:
+ results.append(r)
+
+ if tr_units:
+
+ if args.threads == 1:
+
+ collect_results(map(analyze_translation_unit, tr_units))
+
+ else:
+
+ # Mimic Python's default pool.map() chunk size, but limit it to
+ # 5 to avoid very big chunks when analyzing thousands of
+ # translation units.
+ chunk_size = min(5, -(-len(tr_units) // (args.threads * 4)))
+
+ with Pool(processes=args.threads) as pool:
+ collect_results(
+ pool.imap_unordered(
+ analyze_translation_unit, tr_units, chunk_size
+ )
+ )
+
+ end_time = time.monotonic()
+
+ # print summary
+
+ if len(tr_units) == 1:
+ message = "Analyzed 1 translation unit"
+ else:
+ message = f"Analyzed {len(tr_units)} translation units"
+
+ message += f" in {end_time - start_time:.1f} seconds."
+
+ print(f"\033[0;34m{message}\033[0m")
+
+ # exit
+
+ return 0 if all(results) else 3
+
+
+# ---------------------------------------------------------------------------- #
+# Translation units
+
+
+@dataclass
+class TranslationUnit:
+ absolute_path: str
+ build_working_dir: str
+ build_command: str
+ system_include_paths: Sequence[str]
+ check_names: Sequence[str]
+
+
+def get_translation_units(args: Namespace) -> Sequence["TranslationUnit"]:
+ """Return a list of translation units to be analyzed."""
+
+ system_include_paths = get_clang_system_include_paths()
+ compile_commands = load_compilation_database(args.build_dir)
+
+ # get all translation units
+
+ tr_units: Iterable[TranslationUnit] = (
+ TranslationUnit(
+ absolute_path=str(Path(cmd["directory"], cmd["file"]).resolve()),
+ build_working_dir=cmd["directory"],
+ build_command=cmd["command"],
+ system_include_paths=system_include_paths,
+ check_names=args.check_names,
+ )
+ for cmd in compile_commands
+ )
+
+ # ignore translation units from git submodules
+
+ repo_root = (args.build_dir / "Makefile").resolve(strict=True).parent
+ module_file = repo_root / ".gitmodules"
+ assert module_file.exists()
+
+ modules = ConfigParser()
+ modules.read(module_file)
+
+ disallowed_prefixes = [
+ # ensure path is slash-terminated
+ os.path.join(repo_root, section["path"], "")
+ for section in modules.values()
+ if "path" in section
+ ]
+
+ tr_units = (
+ ctx
+ for ctx in tr_units
+ if all(
+ not ctx.absolute_path.startswith(prefix)
+ for prefix in disallowed_prefixes
+ )
+ )
+
+ # filter translation units by command line arguments
+
+ if args.translation_units:
+
+ allowed_prefixes = [
+ # ensure path exists and is slash-terminated (even if it is a file)
+ os.path.join(path.resolve(strict=True), "")
+ for path in args.translation_units
+ ]
+
+ tr_units = (
+ ctx
+ for ctx in tr_units
+ if any(
+ (ctx.absolute_path + "/").startswith(prefix)
+ for prefix in allowed_prefixes
+ )
+ )
+
+ # ensure that at least one translation unit is selected
+
+ tr_unit_list = list(tr_units)
+
+ if not tr_unit_list:
+ fatal("No translation units to analyze")
+
+ # disable all checks if --skip-checks was given
+
+ if args.skip_checks:
+ for context in tr_unit_list:
+ context.check_names = []
+
+ return tr_unit_list
+
+
+def get_clang_system_include_paths() -> Sequence[str]:
+
+ # libclang does not automatically include clang's standard system include
+ # paths, so we ask clang what they are and include them ourselves.
+
+ result = subprocess.run(
+ ["clang", "-E", "-", "-v"],
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.PIPE,
+ universal_newlines=True, # decode output using default encoding
+ check=True,
+ )
+
+ # Module `re` does not support repeated group captures.
+ pattern = (
+ r"#include <...> search starts here:\n"
+ r"((?: \S*\n)+)"
+ r"End of search list."
+ )
+
+ match = re.search(pattern, result.stderr, re.MULTILINE)
+ assert match is not None
+
+ return [line[1:] for line in match.group(1).splitlines()]
+
+
+def load_compilation_database(build_dir: Path) -> Sequence[Mapping[str, str]]:
+
+ # clang.cindex.CompilationDatabase.getCompileCommands() apparently produces
+ # entries for files not listed in compile_commands.json in a best-effort
+ # manner, which we don't want, so we parse the JSON ourselves instead.
+
+ path = build_dir / "compile_commands.json"
+
+ try:
+ with path.open("r") as f:
+ return json.load(f)
+ except FileNotFoundError:
+ fatal(f"{path} does not exist")
+
+
+# ---------------------------------------------------------------------------- #
+# Analysis
+
+
+def analyze_translation_unit(tr_unit: TranslationUnit) -> bool:
+
+ check_context = get_check_context(tr_unit)
+
+ try:
+ for name in tr_unit.check_names:
+ CHECKS[name](check_context)
+ except Exception as e:
+ raise RuntimeError(f"Error analyzing {check_context._rel_path}") from e
+
+ return not check_context._problems_found
+
+
+def get_check_context(tr_unit: TranslationUnit) -> CheckContext:
+
+ # relative to script's original working directory
+ rel_path = os.path.relpath(tr_unit.absolute_path)
+
+ # load translation unit
+
+ command = shlex.split(tr_unit.build_command)
+
+ adjusted_command = [
+ # keep the original compilation command name
+ command[0],
+ # ignore unknown GCC warning options
+ "-Wno-unknown-warning-option",
+ # keep all other arguments but the last, which is the file name
+ *command[1:-1],
+ # add clang system include paths
+ *(
+ arg
+ for path in tr_unit.system_include_paths
+ for arg in ("-isystem", path)
+ ),
+ # replace relative path to get absolute location information
+ tr_unit.absolute_path,
+ ]
+
+ # clang can warn about things that GCC doesn't
+ if "-Werror" in adjusted_command:
+ adjusted_command.remove("-Werror")
+
+ # We change directory for options like -I to work, but have to change back
+ # to have correct relative paths in messages.
+ with cwd(tr_unit.build_working_dir):
+
+ try:
+ tu = clang.cindex.TranslationUnit.from_source(
+ filename=None, args=adjusted_command
+ )
+ except clang.cindex.TranslationUnitLoadError as e:
+ raise RuntimeError(f"Failed to load {rel_path}") from e
+
+ if sys.stdout.isatty():
+ # add padding to fully overwrite progress message
+ printer = lambda s: print(s.ljust(50))
+ else:
+ printer = print
+
+ check_context = CheckContext(
+ translation_unit=tu,
+ translation_unit_path=tr_unit.absolute_path,
+ _rel_path=rel_path,
+ _build_working_dir=Path(tr_unit.build_working_dir),
+ _problems_found=False,
+ _printer=printer,
+ )
+
+ # check for error/fatal diagnostics
+
+ for diag in tu.diagnostics:
+ if diag.severity >= clang.cindex.Diagnostic.Error:
+ check_context._problems_found = True
+ location = check_context.format_location(diag)
+ check_context._printer(
+ f"\033[0;33m{location}: {diag.spelling}; this may lead to false"
+ f" positives and negatives\033[0m"
+ )
+
+ return check_context
+
+
+# ---------------------------------------------------------------------------- #
+# Utilities
+
+
+@contextmanager
+def cwd(path: Union[str, Path]) -> Iterator[None]:
+
+ original_cwd = os.getcwd()
+ os.chdir(path)
+
+ try:
+ yield
+ finally:
+ os.chdir(original_cwd)
+
+
+def fatal(message: str) -> NoReturn:
+ print(f"\033[0;31mERROR: {message}\033[0m")
+ sys.exit(1)
+
+
+# ---------------------------------------------------------------------------- #
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+# ---------------------------------------------------------------------------- #
new file mode 100644
@@ -0,0 +1,242 @@
+# ---------------------------------------------------------------------------- #
+
+from ctypes import CFUNCTYPE, c_int, py_object
+from dataclasses import dataclass
+from enum import Enum
+import os
+import os.path
+from pathlib import Path
+from importlib import import_module
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Union,
+)
+
+from clang.cindex import ( # type: ignore
+ Cursor,
+ CursorKind,
+ TranslationUnit,
+ SourceLocation,
+ conf,
+)
+
+# ---------------------------------------------------------------------------- #
+# Monkeypatch clang.cindex
+
+Cursor.__hash__ = lambda self: self.hash # so `Cursor`s can be dict keys
+
+# ---------------------------------------------------------------------------- #
+# Traversal
+
+
+class VisitorResult(Enum):
+
+ BREAK = 0
+ """Terminates the cursor traversal."""
+
+ CONTINUE = 1
+ """Continues the cursor traversal with the next sibling of the cursor just
+ visited, without visiting its children."""
+
+ RECURSE = 2
+ """Recursively traverse the children of this cursor."""
+
+
+def visit(root: Cursor, visitor: Callable[[Cursor], VisitorResult]) -> bool:
+ """
+ A simple wrapper around `clang_visitChildren()`.
+
+ The `visitor` callback is called for each visited node, with that node as
+ its argument. `root` is NOT visited.
+
+ Unlike a standard `Cursor`, the callback argument will have a `parent` field
+ that points to its parent in the AST. The `parent` will also have its own
+ `parent` field, and so on, unless it is `root`, in which case its `parent`
+ field is `None`. We add this because libclang's `lexical_parent` field is
+ almost always `None` for some reason.
+
+ Returns `false` if the visitation was aborted by the callback returning
+ `VisitorResult.BREAK`. Returns `true` otherwise.
+ """
+
+ tu = root._tu
+ root.parent = None
+
+ # Stores the path from `root` to the node being visited. We need this to set
+ # `node.parent`.
+ path: List[Cursor] = [root]
+
+ exception: List[BaseException] = []
+
+ @CFUNCTYPE(c_int, Cursor, Cursor, py_object)
+ def actual_visitor(node: Cursor, parent: Cursor, client_data: None) -> int:
+
+ try:
+
+ # The `node` and `parent` `Cursor` objects are NOT reused in between
+ # invocations of this visitor callback, so we can't assume that
+ # `parent.parent` is set.
+
+ while path[-1] != parent:
+ path.pop()
+
+ node.parent = path[-1]
+ path.append(node)
+
+ # several clang.cindex methods need Cursor._tu to be set
+ node._tu = tu
+
+ return visitor(node).value
+
+ except BaseException as e:
+
+ # Exceptions can't cross into C. Stash it, abort the visitation, and
+ # reraise it.
+
+ exception.append(e)
+ return VisitorResult.BREAK.value
+
+ result = conf.lib.clang_visitChildren(root, actual_visitor, None)
+
+ if exception:
+ raise exception[0]
+
+ return result == 0
+
+
+# ---------------------------------------------------------------------------- #
+# Node predicates
+
+
+def might_have_attribute(node: Cursor, attr: Union[CursorKind, str]) -> bool:
+ """
+ Check whether any of `node`'s children are an attribute of the given kind,
+ or an attribute of kind `UNEXPOSED_ATTR` with the given `str` spelling.
+
+ This check is best-effort and may erroneously return `True`.
+ """
+
+ if isinstance(attr, CursorKind):
+
+ assert attr.is_attribute()
+
+ def matcher(n: Cursor) -> bool:
+ return n.kind == attr
+
+ else:
+
+ def matcher(n: Cursor) -> bool:
+ if n.kind != CursorKind.UNEXPOSED_ATTR:
+ return False
+ tokens = list(n.get_tokens())
+ # `tokens` can have 0 or more than 1 element when the attribute
+ # comes from a macro expansion. AFAICT, in that case we can't do
+ # better here than tell callers that this might be the attribute
+ # that they're looking for.
+ return len(tokens) != 1 or tokens[0].spelling == attr
+
+ return any(map(matcher, node.get_children()))
+
+
+# ---------------------------------------------------------------------------- #
+# Checks
+
+
+@dataclass
+class CheckContext:
+
+ translation_unit: TranslationUnit
+ translation_unit_path: str # exactly as reported by libclang
+
+ _rel_path: str
+ _build_working_dir: Path
+ _problems_found: bool
+
+ _printer: Callable[[str], None]
+
+ def format_location(self, obj: Any) -> str:
+ """obj must have a location field/property that is a
+ `SourceLocation`."""
+ return self._format_location(obj.location)
+
+ def _format_location(self, loc: SourceLocation) -> str:
+
+ if loc.file is None:
+ return self._rel_path
+ else:
+ abs_path = (self._build_working_dir / loc.file.name).resolve()
+ rel_path = os.path.relpath(abs_path)
+ return f"{rel_path}:{loc.line}:{loc.column}"
+
+ def report(self, node: Cursor, message: str) -> None:
+ self._problems_found = True
+ self._printer(f"{self.format_location(node)}: {message}")
+
+ def print_node(self, node: Cursor) -> None:
+ """This can be handy when developing checks."""
+
+ print(f"{self.format_location(node)}: kind = {node.kind.name}", end="")
+
+ if node.spelling:
+ print(f", spelling = {node.spelling!r}", end="")
+
+ if node.type is not None:
+ print(f", type = {node.type.get_canonical().spelling!r}", end="")
+
+ if node.referenced is not None:
+ print(f", referenced = {node.referenced.spelling!r}", end="")
+
+ start = self._format_location(node.extent.start)
+ end = self._format_location(node.extent.end)
+ print(f", extent = {start}--{end}")
+
+ def print_tree(
+ self,
+ node: Cursor,
+ *,
+ max_depth: Optional[int] = None,
+ indentation_level: int = 0,
+ ) -> None:
+ """This can be handy when developing checks."""
+
+ if max_depth is None or max_depth >= 0:
+
+ print(" " * indentation_level, end="")
+ self.print_node(node)
+
+ for child in node.get_children():
+ self.print_tree(
+ child,
+ max_depth=None if max_depth is None else max_depth - 1,
+ indentation_level=indentation_level + 1,
+ )
+
+
+Checker = Callable[[CheckContext], None]
+
+CHECKS: Dict[str, Checker] = {}
+
+
+def check(name: str) -> Callable[[Checker], Checker]:
+ def decorator(checker: Checker) -> Checker:
+ assert name not in CHECKS
+ CHECKS[name] = checker
+ return checker
+
+ return decorator
+
+
+# ---------------------------------------------------------------------------- #
+# Import all checks
+
+for path in Path(__file__).parent.glob("**/*.py"):
+ if path.name != "__init__.py":
+ rel_path = path.relative_to(Path(__file__).parent)
+ module = "." + ".".join([*rel_path.parts[:-1], rel_path.stem])
+ import_module(module, __package__)
+
+# ---------------------------------------------------------------------------- #
new file mode 100644
@@ -0,0 +1,117 @@
+# ---------------------------------------------------------------------------- #
+
+from typing import Dict
+
+from clang.cindex import ( # type: ignore
+ Cursor,
+ CursorKind,
+ StorageClass,
+ TypeKind,
+)
+
+from static_analyzer import (
+ CheckContext,
+ VisitorResult,
+ check,
+ might_have_attribute,
+ visit,
+)
+
+# ---------------------------------------------------------------------------- #
+
+
+@check("return-value-never-used")
+def check_return_value_never_used(context: CheckContext) -> None:
+ """Report static functions with a non-void return value that no caller ever
+ uses."""
+
+ # Maps canonical function `Cursor`s to whether we found a place that maybe
+ # uses their return value. Only includes static functions that don't return
+ # void, don't have __attribute__((unused)), and belong to the translation
+ # unit's root file (i.e., were not brought in by an #include).
+ funcs: Dict[Cursor, bool] = {}
+
+ def visitor(node: Cursor) -> VisitorResult:
+
+ if (
+ node.kind == CursorKind.FUNCTION_DECL
+ and node.storage_class == StorageClass.STATIC
+ and node.location.file.name == context.translation_unit_path
+ and node.type.get_result().get_canonical().kind != TypeKind.VOID
+ and not might_have_attribute(node, "unused")
+ ):
+ funcs.setdefault(node.canonical, False)
+
+ if (
+ node.kind == CursorKind.DECL_REF_EXPR
+ and node.referenced.kind == CursorKind.FUNCTION_DECL
+ and node.referenced.canonical in funcs
+ and function_occurrence_might_use_return_value(node)
+ ):
+ funcs[node.referenced.canonical] = True
+
+ return VisitorResult.RECURSE
+
+ visit(context.translation_unit.cursor, visitor)
+
+ for (cursor, return_value_maybe_used) in funcs.items():
+ if not return_value_maybe_used:
+ context.report(
+ cursor, f"{cursor.spelling}() return value is never used"
+ )
+
+
+def function_occurrence_might_use_return_value(node: Cursor) -> bool:
+
+ parent = get_parent_if_unexposed_expr(node.parent)
+
+ if parent.kind.is_statement():
+
+ return False
+
+ elif (
+ parent.kind == CursorKind.CALL_EXPR
+ and parent.referenced == node.referenced
+ ):
+
+ grandparent = get_parent_if_unexposed_expr(parent.parent)
+
+ if not grandparent.kind.is_statement():
+ return True
+
+ if grandparent.kind in [
+ CursorKind.IF_STMT,
+ CursorKind.SWITCH_STMT,
+ CursorKind.WHILE_STMT,
+ CursorKind.DO_STMT,
+ CursorKind.RETURN_STMT,
+ ]:
+ return True
+
+ if grandparent.kind == CursorKind.FOR_STMT:
+
+ [*for_parts, for_body] = grandparent.get_children()
+ if len(for_parts) == 0:
+ return False
+ elif len(for_parts) in [1, 2]:
+ return True # call may be in condition part of for loop
+ elif len(for_parts) == 3:
+ # Comparison doesn't work properly with `Cursor`s originating
+ # from nested visitations, so we compare the extent instead.
+ return parent.extent == for_parts[1].extent
+ else:
+ assert False
+
+ return False
+
+ else:
+
+ # might be doing something with a pointer to the function
+ return True
+
+
+def get_parent_if_unexposed_expr(node: Cursor) -> Cursor:
+ return node.parent if node.kind == CursorKind.UNEXPOSED_EXPR else node
+
+
+# ---------------------------------------------------------------------------- #
Add a static-analyzer.py script that uses libclang's Python bindings to provide a common framework on which arbitrary static analysis checks can be developed and run against QEMU's code base. As an example, a simple "return-value-never-used" check is included that verifies that the return value of static, non-void functions is used by at least one caller. Signed-off-by: Alberto Faria <afaria@redhat.com> --- static-analyzer.py | 486 +++++++++++++++++++++ static_analyzer/__init__.py | 242 ++++++++++ static_analyzer/return_value_never_used.py | 117 +++++ 3 files changed, 845 insertions(+) create mode 100755 static-analyzer.py create mode 100644 static_analyzer/__init__.py create mode 100644 static_analyzer/return_value_never_used.py