Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/update-syntax-description.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ jobs:
- name: Update contents of syntax-description
run: |
if ! cmp -s new.json ./src/cfengine_cli/syntax-description.json; then
cat new.json > ./src/cfengine_cli/syntax-description.json
echo "CHANGES_DETECTED=true" >> $GITHUB_ENV
rm new.json
fi
cat new.json > ./src/cfengine_cli/syntax-description.json
echo "CHANGES_DETECTED=true" >> $GITHUB_ENV
rm new.json
fi
- name: Create Pull Request
if: env.CHANGES_DETECTED == 'true'
uses: cfengine/create-pull-request@v6
Expand Down
146 changes: 96 additions & 50 deletions src/cfengine_cli/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,54 @@
LINT_EXTENSIONS = (".cf", ".json")


def _load_syntax_description(path: str | None = None) -> dict:
"""Load and return the parsed syntax-description.json file."""
if path is None:
path = os.path.join(os.path.dirname(__file__), "syntax-description.json")
with open(path, "r") as f:
return json.load(f)


def _derive_syntax_sets(data: dict) -> tuple:
"""Derive the four sets used for linting from a loaded syntax-description dict.

Returns: (ALLOWED_BUNDLE_TYPES, BUILTIN_PROMISE_TYPES, BUILTIN_FUNCTIONS, DEPRECATED_PROMISE_TYPES)
"""
builtin_body_types = set(data.get("bodyTypes", {}).keys())

allowed_bundle_types = data.get("bundleTypes", {}).keys()

builtin_promise_types = set(data.get("promiseTypes", {}).keys())
@dataclass
class SyntaxData:
BUILTIN_BODY_TYPES = {}
BUILTIN_BUNDLE_TYPES = {}
BUILTIN_PROMISE_TYPES = {}
BUILTIN_FUNCTIONS = {}

def __init__(self):
self._data_dict = self._load_syntax_description()
self._derive_syntax_dicts(self._data_dict)

assert self.BUILTIN_BODY_TYPES
assert self.BUILTIN_BUNDLE_TYPES
assert self.BUILTIN_PROMISE_TYPES
assert self.BUILTIN_FUNCTIONS

def _load_syntax_description(self, path: str | None = None) -> dict:
"""Load and return the parsed syntax-description.json file."""
if path is None:
path = os.path.join(os.path.dirname(__file__), "syntax-description.json")
with open(path, "r") as f:
return json.load(f)

def _derive_syntax_dicts(self, data: dict):
"""Derive the five dictionaries used for linting from a loaded syntax-description json-file.
sets the (BUILTIN_BODY_TYPES, BUILTIN_BUNDLE_TYPES, BUILTIN_PROMISE_TYPES, BUILTIN_FUNCTIONS, DEPRECATED_PROMISE_TYPES) dicts
"""
builtin_body_types = data.get("bodyTypes", {})

builtin_functions = set(data.get("functions", {}).keys())
builtin_bundle_types = data.get("bundleTypes", {})

deprecated_promise_types = {
"defaults",
"guest_environments",
} # Has to be hardcoded, not tagged in syntax-description.json
builtin_promise_types = data.get("promiseTypes", {})

return (
builtin_body_types,
allowed_bundle_types,
builtin_promise_types,
builtin_functions,
deprecated_promise_types,
)
builtin_functions = data.get("functions", {})

deprecated_promise_types = {
promise: builtin_promise_types.get(promise, {})
for promise in {
"defaults",
"guest_environments",
} # Has to be hardcoded, not tagged in syntax-description.json
}

_SYNTAX_DATA = _load_syntax_description()
(
_,
ALLOWED_BUNDLE_TYPES,
BUILTIN_PROMISE_TYPES,
BUILTIN_FUNCTIONS,
DEPRECATED_PROMISE_TYPES,
) = _derive_syntax_sets(_SYNTAX_DATA)
self.BUILTIN_BODY_TYPES = builtin_body_types
self.BUILTIN_BUNDLE_TYPES = builtin_bundle_types
self.BUILTIN_PROMISE_TYPES = builtin_promise_types
self.BUILTIN_FUNCTIONS = builtin_functions
self.DEPRECATED_PROMISE_TYPES = deprecated_promise_types


def _qualify(name: str, namespace: str) -> str:
Expand Down Expand Up @@ -487,7 +492,9 @@ def _discover(policy_file: PolicyFile, state: State) -> int:
return 0


def _lint_node(node: Node, policy_file: PolicyFile, state: State) -> int:
def _lint_node(
node: Node, policy_file: PolicyFile, state: State, syntax_data: SyntaxData
) -> int:
"""Checks we run on each node in the syntax tree,
utilizes state for checks which require context."""

Expand All @@ -503,15 +510,15 @@ def _lint_node(node: Node, policy_file: PolicyFile, state: State) -> int:
if node.type == "promise_guard":
assert _text(node) and len(_text(node)) > 1 and _text(node)[-1] == ":"
promise_type = _text(node)[0:-1]
if promise_type in DEPRECATED_PROMISE_TYPES:
if promise_type in syntax_data.DEPRECATED_PROMISE_TYPES:
_highlight_range(node, lines)
print(
f"Deprecation: Promise type '{promise_type}' is deprecated {location}"
)
return 1
if (
state.strict
and promise_type not in BUILTIN_PROMISE_TYPES
and promise_type not in syntax_data.BUILTIN_PROMISE_TYPES
and promise_type not in state.custom_promise_types
):
_highlight_range(node, lines)
Expand All @@ -525,21 +532,53 @@ def _lint_node(node: Node, policy_file: PolicyFile, state: State) -> int:
_highlight_range(node, lines)
print(f"Convention: Promise type should be lowercase {location}")
return 1
if node.type == "bundle_block_type" and _text(node) not in ALLOWED_BUNDLE_TYPES:
if (
node.type == "bundle_block_type"
and _text(node) not in syntax_data.BUILTIN_BUNDLE_TYPES
):
_highlight_range(node, lines)
print(
f"Error: Bundle type must be one of ({', '.join(ALLOWED_BUNDLE_TYPES)}), not '{_text(node)}' {location}"
f"Error: Bundle type must be one of ({', '.join(syntax_data.BUILTIN_BUNDLE_TYPES)}), not '{_text(node)}' {location}"
)
return 1
if state.strict and (
node.type in ("bundle_block_name", "body_block_name")
and _text(node) in BUILTIN_FUNCTIONS
and _text(node) in syntax_data.BUILTIN_FUNCTIONS
):
_highlight_range(node, lines)
print(
f"Error: {"Bundle" if "bundle" in node.type else "Body"} '{_text(node)}' conflicts with built-in function with the same name {location}"
)
return 1
if state.promise_type == "vars" and node.type == "promise":
attribute_nodes = [x for x in node.children if x.type == "attribute"]
if not attribute_nodes:
_highlight_range(node, lines)
print(
f"Error: Missing attribute value for promiser "
f"{_text(node)[:-1]} inside vars-promise type {location}"
)
return 1

vars_attrs = syntax_data.BUILTIN_PROMISE_TYPES.get("vars", {}).get(
"attributes", {}
)
promise_type_attrs = [
_text(child)
for attr_node in attribute_nodes
for child in attr_node.children
if child.type == "attribute_name"
and vars_attrs.get(_text(child), {}).get("visibility", "") == "promiseType"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment to explain what visibility means here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SimonThalvorsen AFAICT you are using visibility to try to find out which attributes are mutually exclusive. I don't think that's correct. I don't see anything in the syntax description which covers this, so feel free to add a hardcoded list of the 7 options.

]

if len(promise_type_attrs) > 1:
for n in attribute_nodes:
_highlight_range(n, lines)
print(
f"Error: Mutually exclusive attribute values for promiser "
f"'{promise_type_attrs}' inside vars-promise {location}"
)
return 1
if node.type == "calling_identifier":
name = _text(node)
qualified_name = _qualify(name, state.namespace)
Expand All @@ -556,15 +595,15 @@ def _lint_node(node: Node, policy_file: PolicyFile, state: State) -> int:
if state.strict and (
qualified_name not in state.bundles
and qualified_name not in state.bodies
and name not in BUILTIN_FUNCTIONS
and name not in syntax_data.BUILTIN_FUNCTIONS
):
_highlight_range(node, lines)
print(
f"Error: Call to unknown function / bundle / body '{name}' {location}"
)
return 1
if (
name not in BUILTIN_FUNCTIONS
name not in syntax_data.BUILTIN_FUNCTIONS
and state.promise_type == "vars"
and state.attribute_name not in ("action", "classes")
):
Expand Down Expand Up @@ -605,14 +644,14 @@ def _pass_fail_state(state: State, errors: int) -> str:
return _pass_fail_filename(pretty_filename, errors)


def _lint(policy_file: PolicyFile, state: State) -> int:
def _lint(policy_file: PolicyFile, state: State, syntax_data: SyntaxData) -> int:
"""Run linting rules (checks) on nodes in a policy file syntax tree."""
assert state.mode == Mode.LINT
errors = 0
state.start_file(policy_file)
for node in policy_file.nodes:
state.navigate(node)
errors += _lint_node(node, policy_file, state)
errors += _lint_node(node, policy_file, state, syntax_data)
message = _pass_fail_state(state, errors)
state.end_file()
if state.prefix:
Expand Down Expand Up @@ -738,7 +777,11 @@ def _args_to_filenames(args: list[str]) -> list[str]:


def _lint_main(
args: list[str], strict: bool, state=None, snippet: Snippet | None = None
args: list[str],
strict: bool,
state=None,
snippet: Snippet | None = None,
syntax_data=None,
) -> int:
"""This is the main function used for linting, it does all the steps on all
the arguments (files / folders).
Expand All @@ -765,6 +808,9 @@ def _lint_main(
state.strict = strict
state.mode = Mode.SYNTAX

if syntax_data is None:
syntax_data = SyntaxData()

filenames = _args_to_filenames(args)

if snippet:
Expand Down Expand Up @@ -799,7 +845,7 @@ def _lint_main(
state.mode = Mode.LINT

for policy_file in policy_files:
errors += _lint(policy_file, state)
errors += _lint(policy_file, state, syntax_data)

return errors

Expand Down
Loading