Skip to content

Evolution API

This section documents the evolutionary components of OKAPI.

Population

Functions for initializing and managing populations of trees.

from okapi.population import initialize_individuals, choose_n_best, choose_pareto, choose_pareto_then_sorted

choose_pareto(trees, fitnesses, n, objectives, minimize_node_count=True)

Select up to n trees based on Pareto optimality. Optimizes for: - Maximizing fitness - Minimizing number of nodes in the tree

Parameters:

Name Type Description Default
trees List[Tree]

List of Tree objects

required
fitnesses ndarray

Array of fitness values for each tree

required
n int

Maximum number of trees to select

required

Returns:

Type Description

List of selected trees and their corresponding fitness values

Source code in okapi/population.py
def choose_pareto(trees: List[Tree], fitnesses: np.ndarray, n: int, objectives: Sequence[Callable[[float, float], bool]], minimize_node_count=True):
    """
    Select up to n trees based on Pareto optimality.
    Optimizes for:
    - Maximizing fitness
    - Minimizing number of nodes in the tree

    Args:
        trees: List of Tree objects
        fitnesses: Array of fitness values for each tree
        n: Maximum number of trees to select

    Returns:
        List of selected trees and their corresponding fitness values
    """
    logger.debug(f"Selecting up to {n} Pareto-optimal trees from population of {len(trees)}")

    objectives = list(objectives)
    if minimize_node_count:
        objectives.append(minimize)
        sizes = np.array([tree.nodes_count for tree in trees]).reshape(-1, 1)
        objective_array = np.concatenate([fitnesses, sizes], axis=1)
    else:
        objective_array = fitnesses

    # Get Pareto-optimal mask using maximize for fitness and minimize for nodes count
    pareto_mask = paretoset(objective_array, objectives)
    pareto_count = np.sum(pareto_mask)
    logger.debug(f"Found {pareto_count} Pareto-optimal trees")

    # Get indices of Pareto-optimal trees
    pareto_indices = np.where(pareto_mask)[0]

    # If we have more Pareto-optimal trees than n, select the n with highest fitness
    if len(pareto_indices) > n:
        logger.debug(f"Too many Pareto-optimal trees ({len(pareto_indices)}), selecting top {n} by proximity")
        if minimize_node_count:
            objectives = objectives[:-1]
        _, sorted_indices = sort_by_optimal_point_proximity(fitnesses, objectives)
        selected_indices = sorted_indices[:n]
    else:
        selected_indices = pareto_indices
        logger.debug(f"Using all {len(selected_indices)} Pareto-optimal trees")

    # Return selected trees and their fitnesses, for now allow for duplicates with regards to fitrnesses

    selected_fitnesses = fitnesses[selected_indices]
    # uniques_mask = first_uniques_mask(selected_fitnesses)

    # selected_indices = selected_indices[uniques_mask]

    selected_trees = [trees[i] for i in selected_indices]
    # selected_fitnesses = fitnesses[selected_indices]

    if len(selected_trees) > 0:
        logger.debug(f"Selected {len(selected_trees)} trees with fitness range: {selected_fitnesses.min():.4f} - {selected_fitnesses.max():.4f}")
    else:
        logger.warning("No trees selected in Pareto optimization")

    return selected_trees, selected_fitnesses

initialize_individuals(tensors_dict, n, exclude_ids=tuple())

Initialize a population of individuals (trees) from a dictionary of tensors.

This function creates simple trees, each with a root node containing a different tensor from the provided dictionary. The tensors are selected randomly from the dictionary.

Parameters:

Name Type Description Default
tensors_dict Dict[str, Tensor]

Dictionary mapping model IDs to their tensor representations

required
n int

Number of individuals (trees) to create

required
exclude_ids

Optional tuple of model IDs to exclude from selection

tuple()

Returns:

Type Description
List[Tree]

List of initialized Tree objects

Raises:

Type Description
Exception

If n is greater than the number of available tensors after exclusions

Source code in okapi/population.py
def initialize_individuals(tensors_dict: Dict[str, Tensor], n: int, exclude_ids=tuple()) -> List[Tree]:
    """
    Initialize a population of individuals (trees) from a dictionary of tensors.

    This function creates simple trees, each with a root node containing a different tensor
    from the provided dictionary. The tensors are selected randomly from the dictionary.

    Args:
        tensors_dict: Dictionary mapping model IDs to their tensor representations
        n: Number of individuals (trees) to create
        exclude_ids: Optional tuple of model IDs to exclude from selection

    Returns:
        List of initialized Tree objects

    Raises:
        Exception: If n is greater than the number of available tensors after exclusions
    """
    logger.info(f"Initializing {n} individuals")
    logger.debug(f"Available tensors: {len(tensors_dict)}, excluded IDs: {len(exclude_ids)}")

    order = np.arange(len(tensors_dict))
    np.random.shuffle(order)
    logger.trace("Shuffled tensor order")

    ids_list = list(tensors_dict.keys())
    tensors_list = list(tensors_dict.values())

    new_trees = []
    count = 0
    for idx in order:
        _id = ids_list[idx]
        tensor = tensors_list[idx]
        if count >= n:
            break
        if _id in exclude_ids:
            logger.trace(f"Skipping excluded ID: {_id}")
            continue

        logger.debug(f"Creating tree with tensor ID: {_id}")
        root: ValueNode = ValueNode(children=None, value=tensor, id=_id)
        tree = Tree.create_tree_from_root(root)
        new_trees.append(tree)
        count += 1

    if count < n:
        logger.error(f"Could not generate enough individuals. Requested: {n}, generated: {count}")
        raise Exception("Could not generate as many examples")

    logger.info(f"Successfully initialized {len(new_trees)} individuals")
    return new_trees

Crossover

Functions for performing crossover between trees.

from okapi.crossover import crossover, tournament_selection_indexes

crossover(tree1, tree2, node_type=None)

Performs crossover between two parent trees to produce two offspring trees.

Crossover works by selecting a random node from each parent tree and swapping the subtrees rooted at those nodes. This creates two new offspring trees that contain genetic material from both parents.

Parameters:

Name Type Description Default
tree1 Tree

First parent tree

required
tree2 Tree

Second parent tree

required
node_type

Type of nodes to consider for crossover points ('value_nodes' or 'op_nodes'). If None, a random suitable type will be chosen.

None

Returns:

Type Description

Tuple of two new Tree objects created by crossover

Raises:

Type Description
ValueError

If node_type is 'op_nodes' but one or both trees don't have operator nodes

Source code in okapi/crossover.py
def crossover(tree1: Tree, tree2: Tree, node_type=None):
    """
    Performs crossover between two parent trees to produce two offspring trees.

    Crossover works by selecting a random node from each parent tree and swapping
    the subtrees rooted at those nodes. This creates two new offspring trees that
    contain genetic material from both parents.

    Args:
        tree1: First parent tree
        tree2: Second parent tree
        node_type: Type of nodes to consider for crossover points ('value_nodes' or 'op_nodes').
                   If None, a random suitable type will be chosen.

    Returns:
        Tuple of two new Tree objects created by crossover

    Raises:
        ValueError: If node_type is 'op_nodes' but one or both trees don't have operator nodes
    """
    logger.info("Performing crossover between two trees")

    if node_type is None:
        allowable_node_types = ["value_nodes"]  # TODO: this may be worth refactoring along with "get_random_node" to not use string but types instead

        if (len(tree1.nodes["op_nodes"]) > 0) & (len(tree2.nodes["op_nodes"]) > 0):
            allowable_node_types.append("op_nodes")
            logger.debug("Both trees have operator nodes, including them in potential crossover points")
        else:
            logger.debug("At least one tree has no operator nodes, using only value nodes for crossover")

        nodes_type = np.random.choice(allowable_node_types)
        logger.debug(f"Randomly selected node type for crossover: {nodes_type}")
    else:
        if node_type == "op_nodes" and not ((len(tree1.nodes["op_nodes"]) > 0) & (len(tree2.nodes["op_nodes"]) > 0)):
            logger.error("Node type was chosen to be operator nodes but there are no operator nodes in at least one of the parents")
            raise ValueError("Node type was chosen to be operator nodes but there are not operator nodes in at least one of the parents")
        nodes_type = node_type
        logger.debug(f"Using specified node type for crossover: {nodes_type}")

    logger.debug("Creating copies of parent trees")
    tree1, tree2 = tree1.copy(), tree2.copy()

    logger.debug("Selecting random nodes for crossover")
    node1, node2 = tree1.get_random_node(nodes_type), tree2.get_random_node(nodes_type)
    logger.debug(f"Selected nodes: {node1} from tree1, {node2} from tree2")

    logger.debug("Creating copies of subtrees")
    branch1, branch2 = node1.copy_subtree(), node2.copy_subtree()

    logger.debug("Swapping subtrees between trees")
    tree1.replace_at(node1, branch2).recalculate()
    tree2.replace_at(node2, branch1).recalculate()

    logger.info(f"Crossover complete, created two new trees with {tree1.nodes_count} and {tree2.nodes_count} nodes")
    return tree1, tree2

tournament_selection_indexes(fitnesses, tournament_size=5, optimal_point=None)

Selects parent indices for crossover using tournament selection.

In tournament selection, a subset of individuals (of size tournament_size) is randomly selected from the population, and the one with the highest fitness is chosen as a parent. This process is repeated to select the second parent.

Parameters:

Name Type Description Default
fitnesses ndarray

Array of fitness values for the entire population

required
tournament_size int

Number of individuals to include in each tournament

5

Returns:

Type Description
ndarray

Array with indices of the two selected parents

Raises:

Type Description
ValueError

If tournament_size is too large relative to population size

Source code in okapi/crossover.py
def tournament_selection_indexes(fitnesses: np.ndarray, tournament_size: int = 5, optimal_point: np.ndarray | None = None) -> np.ndarray:
    """
    Selects parent indices for crossover using tournament selection.

    In tournament selection, a subset of individuals (of size tournament_size) is randomly
    selected from the population, and the one with the highest fitness is chosen as a parent.
    This process is repeated to select the second parent.

    Args:
        fitnesses: Array of fitness values for the entire population
        tournament_size: Number of individuals to include in each tournament

    Returns:
        Array with indices of the two selected parents

    Raises:
        ValueError: If tournament_size is too large relative to population size
    """
    logger.debug(f"Running tournament selection with tournament size {tournament_size}")

    if tournament_size > (len(fitnesses)):
        logger.error(f"Tournament size {tournament_size} is too large for population size {len(fitnesses)}")
        raise ValueError(f"Size of the tournament should be at most equal to number of participans but {len(fitnesses)=} and {tournament_size=}")

    if len(fitnesses) < (2 * tournament_size):
        logger.warning(
            f"Tournament size ({tournament_size}), is small related to the population size ({len(fitnesses)})."
            "The population should be at least twice as large as tournament for more stable parent selection"
        )

    if optimal_point is None:
        optimal_point = np.ones(shape=(fitnesses.shape[-1],))
    assert len(optimal_point.shape) == 1 and fitnesses.shape[-1] == optimal_point.shape[-1], "Shapes for fitnesses and optimal point do not match"

    selected: list | np.ndarray = []
    for _ in range(2):
        candidates_idx = np.random.choice(np.arange(len(fitnesses)), size=(tournament_size,), replace=False)
        candidates_fitnesses = fitnesses[candidates_idx]
        distances = _euclidean_distances(candidates_fitnesses, optimal_point)
        best_idx = np.argmin(distances)
        assert isinstance(selected, list)
        selected.append(candidates_idx[best_idx])
    selected = np.array(selected)

    assert selected.shape == (2,)

    logger.debug(f"Selected parent indices: {selected}")
    return selected

Mutation

Functions for mutating trees.

from okapi.mutation import append_new_node_mutation, lose_branch_mutation, new_tree_from_branch_mutation, get_allowed_mutations

append_new_node_mutation(tree, models, ids=None, allowed_ops=(MeanNode,), **kwargs)

Mutation that adds a new node to the tree.

This mutation randomly selects an existing node in the tree and appends a new node as its child. If the selected node is a ValueNode, a new OperatorNode is created as an intermediary, and the new ValueNode is added as its child. If the selected node is an OperatorNode, a new ValueNode is directly appended to it.

Parameters:

Name Type Description Default
tree Tree

The tree to mutate

required
models Sequence[Tensor]

Sequence of tensor models that can be used as values for the new ValueNode

required
ids None | Sequence[str | int]

Optional sequence of identifiers for the models. If None, indices will be used

None
allowed_ops tuple[Type[OperatorNode], ...]

Tuple of OperatorNode types that can be used when creating a new operator node

(MeanNode,)
**kwargs

Additional keyword arguments (ignored)

{}

Returns:

Type Description

A new Tree with the mutation applied

Source code in okapi/mutation.py
def append_new_node_mutation(
    tree: Tree, models: Sequence[Tensor], ids: None | Sequence[str | int] = None, allowed_ops: tuple[Type[OperatorNode], ...] = (MeanNode,), **kwargs
):
    """
    Mutation that adds a new node to the tree.

    This mutation randomly selects an existing node in the tree and appends a new node as its
    child. If the selected node is a ValueNode, a new OperatorNode is created as an intermediary,
    and the new ValueNode is added as its child. If the selected node is an OperatorNode,
    a new ValueNode is directly appended to it.

    Args:
        tree: The tree to mutate
        models: Sequence of tensor models that can be used as values for the new ValueNode
        ids: Optional sequence of identifiers for the models. If None, indices will be used
        allowed_ops: Tuple of OperatorNode types that can be used when creating a new operator node
        **kwargs: Additional keyword arguments (ignored)

    Returns:
        A new Tree with the mutation applied
    """
    logger.debug("Applying append_new_node_mutation")
    tree = tree.copy()

    if ids is None:
        ids = list(range(len(models)))
        logger.trace("Using indices as IDs for models")
    else:
        assert len(models) == len(ids)
        logger.trace(f"Using provided IDs, confirmed length match: {len(ids)}")

    idx_model = np.random.randint(len(ids))
    logger.debug(f"Selected model ID: {ids[idx_model]}")
    node = tree.get_random_node()
    logger.debug(f"Selected random node for mutation: {node}")

    val_node: ValueNode = ValueNode([], models[idx_model], ids[idx_model])
    logger.trace(f"Created new value node with ID: {ids[idx_model]}")

    if isinstance(node, ValueNode):
        random_op: Type[OperatorNode] = np.random.choice(np.asarray(allowed_ops))
        logger.debug(f"Selected random operator type: {random_op.__name__}")
        op_node: OperatorNode = random_op.create_node([val_node])
        logger.debug("Appending operator node with value node child after selected node")
        tree.append_after(node, op_node)
    else:
        logger.debug("Appending value node directly to operator node")
        tree.append_after(node, val_node)

    logger.info(f"Append node mutation complete, new tree has {tree.nodes_count} nodes")
    return tree

get_allowed_mutations(tree)

Determines which mutation operations are valid for a given tree.

This function checks the tree's structure and size to determine which mutations can be safely applied without violating constraints.

Parameters:

Name Type Description Default
tree

The tree to analyze

required

Returns:

Type Description

A list of mutation functions that are valid for the given tree

Source code in okapi/mutation.py
def get_allowed_mutations(tree):
    """
    Determines which mutation operations are valid for a given tree.

    This function checks the tree's structure and size to determine which mutations
    can be safely applied without violating constraints.

    Args:
        tree: The tree to analyze

    Returns:
        A list of mutation functions that are valid for the given tree
    """
    logger.debug(f"Determining allowed mutations for tree with {tree.nodes_count} nodes")
    allowed_mutations: list[Callable] = [
        append_new_node_mutation,
    ]

    if tree.nodes_count >= 3:
        logger.trace("Tree is large enough for lose_branch_mutation")
        allowed_mutations.append(lose_branch_mutation)
    if len(tree.nodes["value_nodes"]) > 1:
        logger.trace("Tree has enough value nodes for new_tree_from_branch_mutation")
        allowed_mutations.append(new_tree_from_branch_mutation)

    logger.debug(f"Found {len(allowed_mutations)} allowed mutation types")
    return allowed_mutations

lose_branch_mutation(tree, **kwargs)

Mutation that removes a branch from the tree.

This mutation randomly selects a non-root, non-leaf node in the tree and removes it along with all its descendants, effectively pruning that branch from the tree.

Parameters:

Name Type Description Default
tree Tree

The tree to mutate

required
**kwargs

Additional keyword arguments (ignored)

{}

Returns:

Type Description

A new Tree with the mutation applied

Raises:

Type Description
AssertionError

If the tree has fewer than 3 nodes

Source code in okapi/mutation.py
def lose_branch_mutation(tree: Tree, **kwargs):
    """
    Mutation that removes a branch from the tree.

    This mutation randomly selects a non-root, non-leaf node in the tree and removes it along
    with all its descendants, effectively pruning that branch from the tree.

    Args:
        tree: The tree to mutate
        **kwargs: Additional keyword arguments (ignored)

    Returns:
        A new Tree with the mutation applied

    Raises:
        AssertionError: If the tree has fewer than 3 nodes
    """
    logger.debug("Applying lose_branch_mutation")
    tree = tree.copy()

    if tree.nodes_count < 3:
        logger.error(f"Cannot apply lose_branch_mutation - tree is too small: {tree.nodes_count} nodes")
        assert tree.nodes_count >= 3, "Tree is too small"

    node = tree.get_random_node(allow_leaves=False, allow_root=False)
    logger.debug(f"Selected node for pruning: {node}")

    pruned = tree.prune_at(node)
    logger.info(f"Pruned branch with {len(pruned.get_nodes())} nodes, tree now has {tree.nodes_count} nodes")

    return tree

new_tree_from_branch_mutation(tree, **kwargs)

Mutation that creates a new tree from a branch of the existing tree.

This mutation randomly selects a non-root ValueNode, removes it from the tree along with its descendants, and creates a new tree with the removed node as its root.

Parameters:

Name Type Description Default
tree Tree

The tree to mutate

required
**kwargs

Additional keyword arguments (ignored)

{}

Returns:

Type Description

A new Tree created from the selected branch

Raises:

Type Description
AssertionError

If the tree has only one ValueNode

Source code in okapi/mutation.py
def new_tree_from_branch_mutation(tree: Tree, **kwargs):
    """
    Mutation that creates a new tree from a branch of the existing tree.

    This mutation randomly selects a non-root ValueNode, removes it from the tree along with
    its descendants, and creates a new tree with the removed node as its root.

    Args:
        tree: The tree to mutate
        **kwargs: Additional keyword arguments (ignored)

    Returns:
        A new Tree created from the selected branch

    Raises:
        AssertionError: If the tree has only one ValueNode
    """
    assert len(tree.nodes["value_nodes"]) > 1, "Tree must have more than one value node"

    logger.debug("Applying new_tree_from_branch_mutation")
    tree = tree.copy()

    node = tree.get_random_node(nodes_type="value_nodes", allow_leaves=True, allow_root=False)
    logger.debug(f"Selected value node for creating new tree: {node}")

    _ = tree.prune_at(node)  # this may return parent op node, so we still want to use the original node.
    logger.debug("Pruned node and its subtree to create new tree")

    assert isinstance(node, ValueNode)
    new_tree = Tree.create_tree_from_root(node)

    logger.info(f"Created new tree from branch with {new_tree.nodes_count} nodes")
    return new_tree

Pareto Optimization

Functions for Pareto optimization and visualization.

from okapi.pareto import paretoset, minimize, maximize, plot_pareto_frontier

maximize(a, b)

Compare two values for maximization in Pareto optimization.

This function determines if the first value (a) is at least as good as the second value (b) in the context of maximization (higher is better).

Parameters:

Name Type Description Default
a

First value to compare

required
b

Second value to compare

required

Returns:

Type Description

True if a is greater than or equal to b, False otherwise

Source code in okapi/pareto.py
def maximize(a, b):
    """
    Compare two values for maximization in Pareto optimization.

    This function determines if the first value (a) is at least as good as
    the second value (b) in the context of maximization (higher is better).

    Args:
        a: First value to compare
        b: Second value to compare

    Returns:
        True if a is greater than or equal to b, False otherwise
    """
    return a >= b

minimize(a, b)

Compare two values for minimization in Pareto optimization.

This function determines if the first value (a) is at least as good as the second value (b) in the context of minimization (lower is better).

Parameters:

Name Type Description Default
a

First value to compare

required
b

Second value to compare

required

Returns:

Type Description

True if a is less than or equal to b, False otherwise

Source code in okapi/pareto.py
def minimize(a, b):
    """
    Compare two values for minimization in Pareto optimization.

    This function determines if the first value (a) is at least as good as
    the second value (b) in the context of minimization (lower is better).

    Args:
        a: First value to compare
        b: Second value to compare

    Returns:
        True if a is less than or equal to b, False otherwise
    """
    return a <= b

paretoset(array, objectives)

Identify the Pareto-optimal set from a collection of points with multiple objectives.

This function finds points that are not dominated by any other point, where dominance is determined based on the specified objective functions. A point dominates another if it is at least as good in all objectives and strictly better in at least one.

Parameters:

Name Type Description Default
array ndarray

2D array where each row is a point and each column represents a different objective

required
objectives Sequence[Callable[[float, float], bool]]

Sequence of objective functions (maximize or minimize) for each column

required

Returns:

Type Description

Boolean mask where True indicates a point belongs to the Pareto-optimal set

Raises:

Type Description
AssertionError

If dimensions don't match or the array is not 2D

Source code in okapi/pareto.py
def paretoset(array: np.ndarray, objectives: Sequence[Callable[[float, float], bool]]):
    """
    Identify the Pareto-optimal set from a collection of points with multiple objectives.

    This function finds points that are not dominated by any other point, where dominance
    is determined based on the specified objective functions. A point dominates another
    if it is at least as good in all objectives and strictly better in at least one.

    Args:
        array: 2D array where each row is a point and each column represents a different objective
        objectives: Sequence of objective functions (maximize or minimize) for each column

    Returns:
        Boolean mask where True indicates a point belongs to the Pareto-optimal set

    Raises:
        AssertionError: If dimensions don't match or the array is not 2D
    """
    assert len(array.shape) == 2, "Array should be one dimensional, where first dimension is number of points, second dimension number of objectives"

    n_points, n_objectives = array.shape

    assert len(objectives) == n_objectives

    domination_mask = [True for _ in range(n_points)]

    for i in range(n_points):  # checking if ith point should be on the pareto front
        for j in range(n_points):
            if i == j:
                continue
            if np.array_equal(array[i], array[j]):
                continue

            point_domination_mask = [f(array[j, k], array[i, k]) for k, f in enumerate(objectives)]
            if all(point_domination_mask):  # j dominates i because at least as good at all objectives
                domination_mask[i] = False
                break
    return domination_mask

plot_pareto_frontier(array, objectives, figsize=(10, 6), title='Pareto Frontier')

Visualize the Pareto frontier for a two-dimensional optimization problem.

Parameters:

array : np.ndarray Array of points where the first dimension is the number of points and the second dimension must be exactly 2 (two criteria to optimize). objectives : Sequence[Callable] Sequence of two objective functions, each should be either maximize or minimize. figsize : tuple, optional Size of the figure (width, height) in inches. Default is (10, 6). title : str, optional Title of the plot. Default is "Pareto Frontier".

Returns:

fig, ax : tuple Matplotlib figure and axes objects.

Source code in okapi/pareto.py
def plot_pareto_frontier(array: np.ndarray, objectives: Sequence[Callable[[float, float], bool]], figsize=(10, 6), title="Pareto Frontier"):
    """
    Visualize the Pareto frontier for a two-dimensional optimization problem.

    Parameters:
    -----------
    array : np.ndarray
        Array of points where the first dimension is the number of points and
        the second dimension must be exactly 2 (two criteria to optimize).
    objectives : Sequence[Callable]
        Sequence of two objective functions, each should be either maximize or minimize.
    figsize : tuple, optional
        Size of the figure (width, height) in inches. Default is (10, 6).
    title : str, optional
        Title of the plot. Default is "Pareto Frontier".

    Returns:
    --------
    fig, ax : tuple
        Matplotlib figure and axes objects.
    """
    assert len(array.shape) == 2, "Array should be two-dimensional"
    assert array.shape[1] == 2, "This function only works for two criteria (array.shape[1] must be 2)"
    assert len(objectives) == 2, "This function only works for two objectives"

    # Get the Pareto set (True for points on the Pareto frontier)
    pareto_mask = paretoset(array, objectives)

    # Create figure and axis
    fig, ax = plt.subplots(figsize=figsize)

    # Extract Pareto and non-Pareto points
    pareto_points = array[pareto_mask]
    non_pareto_points = array[np.logical_not(pareto_mask)]

    # Plot non-Pareto points in blue
    if len(non_pareto_points) > 0:
        ax.scatter(non_pareto_points[:, 0], non_pareto_points[:, 1], color="blue", label="Non-Pareto points")

    # Plot Pareto points in red
    if len(pareto_points) > 0:
        ax.scatter(pareto_points[:, 0], pareto_points[:, 1], color="red", label="Pareto frontier points")

        # Sort points for line drawing based on the objectives
        # For two objectives, we typically want to sort by one coordinate
        # The sort direction depends on whether we're maximizing or minimizing
        sort_col = 0
        sort_ascending = isinstance(objectives[0], type(minimize))

        # Sort the Pareto points
        sorted_indices = np.argsort(pareto_points[:, sort_col])
        if not sort_ascending:
            sorted_indices = sorted_indices[::-1]

        sorted_pareto = pareto_points[sorted_indices]

        # Draw the line connecting Pareto points
        ax.plot(sorted_pareto[:, 0], sorted_pareto[:, 1], color="red", linestyle="-", linewidth=2)

    # Add labels and title
    ax.set_xlabel("Criterion 1")
    ax.set_ylabel("Criterion 2")
    ax.set_title(title)
    ax.legend()
    ax.grid(True, linestyle="--", alpha=0.7)

    return fig, ax