Source code for procfunc.nodes.execute.construct_nodes

import logging
from typing import Any

import bpy

from procfunc import compute_graph as cg
from procfunc import types as pt
from procfunc.nodes import types as nt
from procfunc.nodes.util import bpy_node_info as bni
from procfunc.util import pytree
from procfunc.util.bpy_info import bpy_nocollide_data_name
from procfunc.util.log import add_exception_context_msg

from . import construct_operator, construct_standard
from .util import (
    get_nth_socket,
    normalize_socket_type,
)

logger = logging.getLogger(__name__)


def instantiate_nodegroup(
    node_tree: bpy.types.NodeTree,
    nodegroup: bpy.types.NodeTree,
) -> bpy.types.Node:
    nodegroup_instance_type = bni.NODEGROUPTYPE_TO_INSTANCE_NODE[node_tree.bl_idname]
    bpy_nodegroup_call = node_tree.nodes.new(nodegroup_instance_type)
    bpy_nodegroup_call.node_tree = nodegroup
    return bpy_nodegroup_call


def _construct_subgraph_call(
    node: cg.SubgraphCallNode,
    bl_node_tree: bpy.types.NodeTree,
    input_results: dict[str | int, Any],
) -> bpy.types.Node | bpy.types.NodeSocket | bpy.types.Material:
    node_group_type = bni.NodeGroupType(bl_node_tree.bl_idname)
    nodegroup = as_nodegroup(node.subgraph, node_group_type)
    bpy_nodegroup_call = instantiate_nodegroup(bl_node_tree, nodegroup)

    for input_name, input_py in node.kwargs.items():
        input_val = input_results[input_name]
        if input_val is None:
            continue
        to_socket = bpy_nodegroup_call.inputs[input_name]
        construct_standard.connect_single_input(
            node_tree=bl_node_tree,
            to_socket=to_socket,
            input_val=input_val,
        )

    return bpy_nodegroup_call


def _dispatch_construct_procnode_by_type(
    node: cg.Node,
    bl_node_tree: bpy.types.NodeTree,
    cache: dict[cg.Node, bpy.types.Node],
    input_results: dict[str | int, Any],
):
    match node:
        case cg.GetAttributeNode():
            raise ValueError(
                f"{node=} must be resolved by construct_procnode_to_bpy before dispatch"
            )
        case cg.ProceduralNode():
            return construct_standard._construct_procnode_standard(
                node, bl_node_tree, input_results
            )
        case cg.FunctionCallNode():
            result = construct_operator._construct_operator_call(
                node, bl_node_tree, input_results
            )
            if isinstance(result, cg.Node):
                return construct_procnode_to_bpy(result, bl_node_tree, cache)
            return result
        case cg.SubgraphCallNode():
            return _construct_subgraph_call(node, bl_node_tree, input_results)
        case cg.InputPlaceholderNode():
            input_node = next(
                n for n in bl_node_tree.nodes if n.bl_idname == "NodeGroupInput"
            )
            assert input_node is not None, bl_node_tree.nodes.keys()
            return input_node.outputs[node.input_name]
        case _:
            raise ValueError(f"Got misconfigured {node=}")


def construct_procnode_to_bpy(
    node: cg.Node,
    bl_node_tree: bpy.types.NodeTree,
    cache: dict[cg.Node, bpy.types.Node] | None = None,
) -> bpy.types.NodeSocket | bpy.types.NodeInternal:
    assert node is not None
    assert isinstance(bl_node_tree, bpy.types.NodeTree), bl_node_tree

    if cache is None:
        cache = {}
    elif cached := cache.get(node, None):
        return cached

    if isinstance(node, cg.GetAttributeNode):
        assert len(node.args) == 1, node.args
        assert len(node.kwargs) == 0, node.kwargs
        source = node.args[0]
        if isinstance(source, cg.SubgraphCallNode):
            # an absent subgraph output leaf (None, e.g. a Material with no
            # displacement) has no output socket; resolve to it directly
            for k, v in source.subgraph.outputs.items(nocontainer_name="result"):
                if k == node.attribute_name and not isinstance(v, cg.Node):
                    return v
        base_node = construct_procnode_to_bpy(source, bl_node_tree, cache)
        assert isinstance(base_node, bpy.types.Node), base_node
        socket_name = construct_standard._resolve_output_socket_name(
            source, node.attribute_name, bl_node_tree
        )
        return get_nth_socket(base_node.outputs, socket_name, 0, base_node.type)

    def _construct_leaf(input_val: cg.Node | Any | None):
        if not isinstance(input_val, cg.Node):
            return input_val
        bpy_res = construct_procnode_to_bpy(input_val, bl_node_tree, cache)
        if isinstance(bpy_res, bpy.types.NodeInternal):
            # nodes that need non-socket input (GetAttribute) wont reach here, so we can return the primary socket
            return construct_standard._get_primary_output_socket(input_val, bpy_res)
        return bpy_res

    # resolve pt.Material before PyTree traversal, since Material is a registered
    # PyTree container and would be destructured into shader ProcNodes otherwise
    kwargs_for_tree = {
        k: v.item() if isinstance(v, pt.Material) else v for k, v in node.kwargs.items()
    }

    input_sockets: dict = pytree.PyTree(kwargs_for_tree).map(_construct_leaf).obj()
    input_sockets.update(
        {
            i: v
            for i, v in enumerate(pytree.PyTree(node.args).map(_construct_leaf).obj())
        }
    )

    if (
        lowered := construct_operator._lower_compare_outside_geometry(
            node, bl_node_tree, input_sockets
        )
    ) is not None:
        result = construct_procnode_to_bpy(lowered, bl_node_tree, cache)
        cache[node] = result
        return result

    with add_exception_context_msg(
        f"While instantiating {nt.node_definition_context_message(node)}{node=}:\n"
    ):
        result = _dispatch_construct_procnode_by_type(
            node, bl_node_tree, cache, input_sockets
        )

    assert isinstance(result, (bpy.types.Node, bpy.types.NodeSocket)), result

    logger.debug(
        f"{construct_procnode_to_bpy.__name__} for {node=} produced {type(result)=}"
    )
    cache[node] = result
    return result


def _graph_requires_scene_tree(graph: cg.ComputeGraph) -> bool:
    return any(
        isinstance(node, cg.ProceduralNode)
        and node.node_type in bni.SCENE_BOUND_NODE_TYPES
        for subgraph in cg.traverse_nested_graphs(graph)
        for node in cg.traverse_depth_first(subgraph)
    )


def _construct_nodegroup(
    graph: cg.ComputeGraph,
    node_tree_type: bni.NodeGroupType,
) -> bpy.types.NodeTree:
    # Scene-bound compositor nodes (e.g. Render Layers) poll False inside a
    # standalone node group, so build on the active scene's compositing tree
    # instead, replacing its contents. That tree still accepts group IO nodes
    # and an interface, so the rest of the scaffold below is unchanged.
    if _graph_requires_scene_tree(graph):
        scene = bpy.context.scene
        scene.use_nodes = True
        nodegroup = scene.node_tree
    else:
        name = (
            graph.name
            if graph.name is not None
            else bpy_nocollide_data_name(graph, bpy.data.node_groups)
        )
        nodegroup = bpy.data.node_groups.new(name, node_tree_type.value)

    nodegroup.interface.clear()
    nodegroup.nodes.clear()

    input_node = nodegroup.nodes.new("NodeGroupInput")
    output_node = nodegroup.nodes.new("NodeGroupOutput")

    cache = {}

    for k, v in graph.inputs.items():
        v_type = v.metadata.get("known_value_type", None)
        if v_type is None:
            raise ValueError(f"Got {v=} with no known value type")
        socket_type = bni.PYTHON_TYPE_TO_SOCKET_TYPE[v_type]
        nodegroup.interface.new_socket(
            name=k,
            in_out="INPUT",
            socket_type=socket_type.value,
        )
        cache[v] = input_node.outputs[k]

    for k, v in graph.outputs.items(nocontainer_name="result"):
        # an absent output (None, e.g. a Material with no displacement) gets no
        # output socket
        if not isinstance(v, cg.Node):
            continue

        res = construct_procnode_to_bpy(v, nodegroup, cache)

        # accessor nodes can resolve to an absent output (None, e.g. a
        # Material's missing displacement read through a subgraph boundary)
        if not isinstance(res, (bpy.types.Node, bpy.types.NodeSocket)):
            continue

        if isinstance(res, bpy.types.Node):
            res = construct_standard._get_primary_output_socket(v, res)
        assert isinstance(res, bpy.types.NodeSocket), res

        nodegroup.interface.new_socket(
            name=k,
            in_out="OUTPUT",
            socket_type=normalize_socket_type(res.bl_idname),
        )

        to_socket = output_node.inputs[k]
        construct_standard.connect_single_input(nodegroup, to_socket, res)

    return nodegroup


[docs] def as_nodegroup( graph: cg.ComputeGraph, node_tree_type: bni.NodeGroupType, ) -> bpy.types.NodeTree: # Scene-bound graphs (e.g. Render Layers) are built on the active scene's # compositing tree, replacing its contents. ops = graph.metadata.get("operations", []) use_cache = len(ops) > 0 and ops[0][0].__name__ == "node_function" if use_cache: cached = graph.metadata["bpy_cached_impls"].get(node_tree_type, None) if cached is not None: return cached with add_exception_context_msg( prefix=f"While constructing nodegroup for {graph=} {node_tree_type=}:\n", ): nodegroup = _construct_nodegroup(graph, node_tree_type) if use_cache: graph.metadata["bpy_cached_impls"][node_tree_type] = nodegroup return nodegroup