Source code for comet.multiverse

import os
import re
import sys
import csv
import glob
import shutil
import pickle
import inspect
import textwrap
import itertools
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
from scipy import stats
from jinja2 import Template
from matplotlib import transforms
from collections import defaultdict
from matplotlib import pyplot as plt
from matplotlib import lines as mlines
from matplotlib import gridspec as gridspec
from matplotlib.colors import LinearSegmentedColormap
from matplotlib import patches as mpatches
from scipy.interpolate import make_interp_spline
from IPython.display import display as ipy_display
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from tqdm_joblib import tqdm_joblib


[docs] class Multiverse: """ Multiverse class for creating, running, and visualizing a multiverse analysis. Parameters ---------- name : str Name of the multiverse analysis. Default is "multiverse". path : str Path to a multiverse directory (only used by the GUI). """ def __init__(self, name="multiverse", path=None): self.name = name.split('/')[-1].split('.')[0] self.num_universes = None self.forking_paths = None if path is not None: # Set the directories from the provided path (used by the GUI) self.multiverse_dir = path self.results_dir = os.path.join(self.multiverse_dir, "results") self.script_dir = os.path.join(self.multiverse_dir, "scripts") else: # Set the directories based on the calling script calling_script_dir = os.getcwd() if self._in_notebook() else os.path.dirname(os.path.abspath(sys.modules['__main__'].__file__)) self.multiverse_dir = os.path.join(calling_script_dir, self.name) self.results_dir = os.path.join(self.multiverse_dir, "results") self.script_dir = os.path.join(self.multiverse_dir, "scripts") os.makedirs(self.results_dir, exist_ok=True) os.makedirs(self.script_dir, exist_ok=True) # Public methods
[docs] def create(self, analysis_template, forking_paths, config={}): """ Create the individual universe scripts Parameters ---------- analysis_template : function Function containing the analysis template forking_paths : dict Dictionary containing the forking paths config : dict Configuration dictionary with optional combination rules - order : list of lists specifying the order of decisions - exclude : list of list[dict or str] (set listed keys to NaN if conditions match) - remove : list of list[dict or str] (drop universes if conditions match) - deduplicate : bool (collapse duplicates after exclude/remove; default True) """ # If multiverse directory exists, remove all Python files but keep folders and template if os.path.exists(self.multiverse_dir) and os.path.exists(self.script_dir): for item in os.listdir(self.script_dir): item_path = os.path.join(self.script_dir, item) if os.path.isfile(item_path) and item.endswith(".py") and item != "template.py": os.remove(item_path) else: os.makedirs(self.multiverse_dir) # Ensure the results directory exists if not os.path.exists(self.results_dir): os.makedirs(self.results_dir) # Template creation template_code = inspect.getsource(analysis_template) # Extract the source code template_body = "\n".join(template_code.splitlines()[1:]) # Remove the function definition # Determine the indentation level of the first line of the function body and remove it from all lines first_line = template_body.splitlines()[0] indentation_level = len(first_line) - len(first_line.lstrip()) adjusted_lines = [line[indentation_level:] if len(line) > indentation_level else line for line in template_body.splitlines()] adjusted_template_body = "\n".join(adjusted_lines) # Create jinja template jinja_template = Template(adjusted_template_body) # Generate all unique combinations of forking paths if not forking_paths: print("No forking paths provided; nothing to create.") return keys, values = zip(*forking_paths.items()) # Rules all_universes = [] exclude_rules = config.get("exclude", []) removed_rules = config.get("remove", []) deduplicate = config.get("deduplicate", True) def _normalise_value(v): return v["name"] if isinstance(v, dict) and "name" in v else v def _apply_exclude_to_universe(universe): if not exclude_rules: return universe path_dict = {k: _normalise_value(v) for k, v in universe} to_nan = set() for rule in exclude_rules: matches = True keys_to_nan = [] for condition in rule: if isinstance(condition, dict): for key, val in condition.items(): if path_dict.get(key) != val: matches = False break elif isinstance(condition, str): keys_to_nan.append(condition) else: matches = False break if matches: for k in keys_to_nan: if k in path_dict: to_nan.add(k) if not to_nan: return universe updated = [] for k, v in universe: if k in to_nan: updated.append((k, float("nan"))) else: updated.append((k, v)) return updated def _rule_matches(universe, ruleset): path_dict = {k: _normalise_value(v) for k, v in universe} for rule in ruleset: if not isinstance(rule, list): continue matches = True for condition in rule: if isinstance(condition, dict): for key, val in condition.items(): if path_dict.get(key) != val: matches = False break elif isinstance(condition, str): if condition not in path_dict: matches = False break else: matches = False break if matches: return True, rule return False, None # Build universes (apply exclude during creation) pre_dedup_count = 0 dedup_removed = 0 if config.get("order"): for order in config["order"]: reordered_values = [forking_paths[key] for key in order] unused_keys = [key for key in keys if key not in order] order_universes = [] for combination in itertools.product(*reordered_values): universe = list(zip(order, combination)) + [(k, "unused") for k in unused_keys] universe = _apply_exclude_to_universe(universe) order_universes.append(tuple(universe)) pre_dedup_count += len(order_universes) # Deduplicate **within this order only** if deduplicate and order_universes: unique, seen = [], set() def _canon_sig(d): def canon(x): x = _normalise_value(x) if isinstance(x, float) and (x != x): # NaN return ("NaN",) return x return tuple(sorted((k, canon(v)) for k, v in d.items())) for u in order_universes: sig = _canon_sig(dict(u)) if sig not in seen: seen.add(sig) unique.append(u) order_universes = unique dedup_removed += (len(order_universes) - len(unique)) # Now append; note we do **not** run a global dedup later all_universes.extend(order_universes) else: for combination in itertools.product(*values): universe = list(zip(keys, combination)) universe = _apply_exclude_to_universe(universe) all_universes.append(tuple(universe)) pre_dedup_count = len(all_universes) dedup_removed = len(all_universes) # Exclusion summary with rule context (post-dedup for accurate counts) rule_context_counts = defaultdict(int) def _freeze_conditions(rule): # Extract dict conditions only; sort for stable printing conds = [] for c in rule: if isinstance(c, dict): for k, v in c.items(): conds.append((k, v)) return tuple(sorted(conds)) for u in all_universes: u_dict = dict(u) path_dict = {k: _normalise_value(v) for k, v in u} for rule in exclude_rules: conditions, keys_to_nan = [], [] for c in rule: if isinstance(c, dict): conditions.append(c) elif isinstance(c, str): keys_to_nan.append(c) # match conditions? matches = True for cond in conditions: for ck, cv in cond.items(): if path_dict.get(ck) != cv: matches = False break if not matches: break if not matches: continue # count keys that are actually NaN in this universe cond_sig = _freeze_conditions(rule) for k in keys_to_nan: if k in u_dict: val = u_dict[k] if isinstance(val, float) and (val != val): # NaN rule_context_counts[(cond_sig, k)] += 1 if rule_context_counts: print("Exclusion summary") print("-----------------") if config: if config.get("order"): print(f"Total number of universes: {pre_dedup_count} (includes ordering permutations)") else: print(f"Total number of universes: {pre_dedup_count}") if dedup_removed: print(f" - Removed {dedup_removed} duplicate universes (set 'deduplicate' to False if you want to keep them)") for (cond_sig, key), count in rule_context_counts.items(): human_cond = "{" + ", ".join([f"'{k}': {repr(v)}" for k, v in cond_sig]) + "}" print(f" - Set '{key}' to NaN for universes matching {human_cond} ({count} total).") # Remove universes that match 'remove' rules (printed last) if removed_rules: valid_universes = [] removed_universes = [] for combination in all_universes: is_invalid, matched_rule = _rule_matches(combination, removed_rules) if is_invalid: removed_universes.append((combination, matched_rule)) else: valid_universes.append(combination) # Group removed universes by rule rule_to_universes = defaultdict(list) for universe, rule in removed_universes: rule_to_universes[str(rule)].append(dict(universe)) print(f" - Removed {len(removed_universes)} out of {len(all_universes)} remaining universes:") for i, (rule, universes) in enumerate(rule_to_universes.items(), 1): print(f" Rule {rule} excluded {len(universes)} universes:") for u in universes: print(f" {u}") else: valid_universes = all_universes # Final number of universes if rule_context_counts: print(f"\n{len(valid_universes)} universes remain for analysis.") # Create Python scripts for each combination for i, combination in enumerate(valid_universes, start=1): combination_dict = dict(combination) # Smart formatting: only format dicts, pass raw types for strings/numbers context = {} for key in forking_paths.keys(): val = combination_dict.get(key, None) # Map NaN to None for Jinja if isinstance(val, float) and (val != val): context[key] = None else: context[key] = self._format_type(val) rendered_content = jinja_template.render(**context) if config.get("order"): forking_dict_str = ( "# Ordering information was provided. The ordered decisions for this universe are:\n" f"forking_paths = {combination_dict}\n\n" ) rendered_content = forking_dict_str + rendered_content save_path = os.path.join(self.script_dir, f"universe_{i}.py") with open(save_path, "w") as file: file.write(rendered_content) # Generate CSV file with the decisions of all universes self._create_summary(valid_universes, keys) # Save forking paths with open(f"{self.results_dir}/forking_paths.pkl", "wb") as file: pickle.dump(forking_paths, file) # Set some attributes self.num_universes = len(valid_universes) self.forking_paths = forking_paths return
[docs] def run(self, universe=None, parallel=1, combine_results=True): """ Run either an individual universe or the entire multiverse Parameters ---------- universe : None, int, list, range Number of the universe to run. Default is None, which runs all universes parallel : int Number of universes to run in parallel """ # Get all universe scripts scripts = [f for f in sorted(os.listdir(self.script_dir)) if f.endswith(".py") and not f.startswith("template")] # Delete previous results (.pkl files) for item in os.listdir(self.results_dir): item_path = os.path.join(self.results_dir, item) if os.path.isfile(item_path) and item_path.endswith('.pkl') and not item_path.endswith('forking_paths.pkl'): os.remove(item_path) # Function for parallel processing, called by joblib.delayed def execute_script(file): subprocess.run([sys.executable, os.path.join(self.script_dir, file)], check=True, env=os.environ.copy()) if universe is None: print("Starting multiverse analysis for all universes...") if parallel == 1: for file in tqdm(scripts): execute_script(file) else: with tqdm_joblib(total=len(scripts), desc="Performing multiverse analysis:") as progress: Parallel(n_jobs=parallel)(delayed(execute_script)(file) for file in scripts) else: # Subset of universes was chosen if isinstance(universe, int): universe_numbers = [universe] elif isinstance(universe, (list, tuple, range)): universe_numbers = list(universe) else: raise ValueError("universe_number should be None, an int, a list, tuple, or a range.") selected_universes = [f for f in scripts if any(f.endswith(f"universe_{u}.py") for u in universe_numbers)] print(f"Starting analysis for universe(s): {universe_numbers}...") if parallel == 1: for file in tqdm(selected_universes): execute_script(file) else: with tqdm_joblib(total=len(selected_universes), desc="Performing multiverse analysis:") as progress: Parallel(n_jobs=parallel)(delayed(execute_script)(file) for file in selected_universes) # Save all results in a single dictionary if combine_results: self._combine_results() self.combined_results = True else: self.combined_results = False print("The multiverse analysis completed without any errors.") return
[docs] def summary(self, universe=None, print_df=True, return_df=False): """ Print the multiverse summary to the terminal/notebook Parameters ---------- universe : int, range, or None The universe number(s) to display. Default is None (prints the head) """ multiverse_summary = self._read_summary() if isinstance(universe, int): multiverse_selection = multiverse_summary.iloc[universe-1] elif isinstance(universe, range): multiverse_selection = multiverse_summary.iloc[universe.start-1:universe.stop] else: multiverse_selection = multiverse_summary if print_df: if self._in_notebook(): from IPython.display import display display(multiverse_selection) else: print(multiverse_selection) return multiverse_selection if return_df else None
[docs] def get_results(self, universe=None, as_df=False, expand_dec=False): """ Get the results of the multiverse (or a specific universe). Parameters ---------- universe : int | None If given, return results for that specific universe. as_df : bool False returns the raw dict (default). True returns a pandas DataFrame (only valid when universe is None). """ if not isinstance(as_df, bool): raise ValueError("as_df must be a boolean") if os.path.exists(f"{self.results_dir}/multiverse_results.pkl"): path = f"{self.results_dir}/multiverse_results.pkl" with open(path, "rb") as file: results = pickle.load(file) if universe is not None: results = results[f"universe_{universe}"] return results if as_df: df = pd.DataFrame.from_dict(results, orient="index") # keep string universe label as a column df.insert(0, "universe", df.index) # integer index df.index = (df["universe"].str.replace("universe_", "", regex=False).astype(int)) df.index.name = None if expand_dec: # Flatten decisions into columns flat = df["__decisions"].map(self._flatten_decisions) dec_only_df = pd.DataFrame(list(flat), index=df.index) dec_only_df = dec_only_df.add_prefix("__") if dec_only_df.shape[1] == 0: raise ValueError("Could not extract any decisions from the 'decisions' dicts.") # Enforce decision-group order from "Decision 1..N" decision_order = self._extract_decision_order(df.iloc[0]["__decisions"]) decision_order = [f"__{d}" for d in decision_order if f"__{d}" in dec_only_df.columns] leftovers = [c for c in dec_only_df.columns if c not in decision_order] decisions_list = decision_order + leftovers # Reorder decision columns and merge dec_only_df = dec_only_df.reindex(columns=decisions_list) df = pd.concat([df.drop(columns=["__decisions"]), dec_only_df.reindex(columns=decisions_list)], axis=1) return df.sort_index() return results else: if universe is None: raise ValueError( "Multiverse results are not combined. Please specify a universe number." ) path = f"{self.results_dir}/tmp/universe_{universe}.pkl" with open(path, "rb") as file: results = pickle.load(file) if as_df: df = pd.DataFrame([results]) df.insert(0, "universe", f"universe_{universe}") df.index = pd.Index([int(universe)], name="universe_id") return df return results
[docs] def visualize(self, universe=None, figsize=(8,5), node_size=1500, text_size=12, max_label_len=15, label_offset=0.04, cmap="Set2", exclude_single=False ): """ Visualize the multiverse as a network. Parameters ---------- universe : int or None The universe to highlight in the network. If None or if the provided universe number is higher than available universes, the entire multiverse is shown without highlighting. Default is None. figsize : tuple Size of the figure. Default is (8,5). node_size : int Size of the nodes. Default is 1500. text_size : int Size of the text labels. Default is 12. max_label_len : int Maximum length of decision labels before wrapping. label_offset : float Offset multiplier for decision labels. cmap : str Colormap to use for the nodes. Default is "Set2". exclude_single : bool Whether to exclude parameters with only one unique option. """ # Read the CSV summary into a DataFrame. multiverse_summary = self._read_summary() # Identify value and decision columns. value_columns = [col for col in multiverse_summary.columns if col.startswith("Value") or col == "Universe"] decision_columns = [col for col in multiverse_summary.columns if col.startswith("Decision")] # Create DataFrames for values and decisions. multiverse_values = multiverse_summary[value_columns] multiverse_decision = multiverse_summary[decision_columns] # Recursive function to add decision nodes and connect them hierarchically. def add_hierarchical_decision_nodes(G, root_node, parameters, level=0, exclude_single=exclude_single): if level >= len(parameters): return G # No more parameters to process decision, options = parameters[level] # Only add the node if there are multiple options or if we are not excluding single-option parameters. if not exclude_single or len(options) > 1: for option in options: node_name = f"{decision}: {option}" G.add_node(node_name, level=level + 1, option=option, decision=decision) G.add_edge(root_node, node_name) # Recurse to add the next level. G = add_hierarchical_decision_nodes(G, node_name, parameters, level + 1, exclude_single) else: # If excluding single-option parameters, skip to the next parameter. G = add_hierarchical_decision_nodes(G, root_node, parameters, level + 1, exclude_single) return G # Map each Value column (ignoring Universe) to its corresponding Decision column. # Assumes the order of Value columns (except Universe) matches the order of Decision columns. value_to_decision_map = {val: dec for val, dec in zip(value_columns[1:], decision_columns)} # Build parameters: For each Value column (except Universe) with more than one unique option, # use the decision name (from the map) and the unique values. parameters = [ (value_to_decision_map.get(col, col), multiverse_values[col].unique()) for col in multiverse_values.columns[1:] ] # Remove any NaN values from the unique options. parameters = [(dec, options[pd.notna(options)]) for dec, options in parameters] # Initialize the directed graph and add the root node. G = nx.DiGraph() root_node = "Start" G.add_node(root_node, level=0, label="Start") G = add_hierarchical_decision_nodes(G, root_node, parameters, exclude_single=exclude_single) # Build the list of node names that correspond to the specified universe. values = ["Start"] if universe is not None: filtered_df = multiverse_values[multiverse_values["Universe"] == f"Universe_{universe}"] # If the provided universe number is out-of-range, reset universe to None. if filtered_df.empty: universe = None else: row_dict = filtered_df.iloc[0].to_dict() for col, value in row_dict.items(): # Skip the Universe column. if col == "Universe": continue # Convert the Value column name to its corresponding decision name. decision_name = value_to_decision_map.get(col, col) values.append(f"{decision_name}: {value}") # Determine which edges are part of the selected universe's path. universe_edges = [(src, tgt) for src, tgt in G.edges if src in values and tgt in values] edge_colors = [] edge_widths = [] for edge in G.edges: if edge in universe_edges: edge_colors.append("black") edge_widths.append(2.5) else: edge_colors.append("gray") edge_widths.append(1.0) # Create the figure and axis. fig, ax = plt.subplots(figsize=figsize) title_str = f"Universe {universe}" if universe is not None else "Multiverse" ax.set_title(title_str, size=14, fontweight="bold") # Use a multipartite layout based on the 'level' attribute. pos = nx.multipartite_layout(G, subset_key="level") # Create a colormap based on levels. levels = set(nx.get_node_attributes(G, "level").values()) num_levels = len(levels) first_color = "lightgray" colormap = plt.cm.get_cmap(cmap, num_levels) # First level gets a fixed color; subsequent levels are derived from the colormap. colors = [first_color] + [colormap(i / (num_levels - 1)) for i in range(num_levels)] level_colors = {level: colors[i] for i, level in enumerate(sorted(levels))} # Draw edges. nx.draw(G, pos, with_labels=False, node_size=node_size - 10, node_color="white", arrows=True, edge_color=edge_colors, width=edge_widths, ax=ax) # Draw nodes with colors based on their level. for level in levels: nodes_at_level = [node for node in G.nodes if G.nodes[node].get("level") == level] nx.draw_networkx_nodes(G, pos, nodelist=nodes_at_level, node_size=node_size, node_color=[level_colors[level] for _ in nodes_at_level], ax=ax) # Prepare labels: For non-root nodes, use the 'option' text; for the root, use its label. def wrap_label(text, max_len=max_label_len): return "\n".join(textwrap.wrap(text, max_len)) node_labels = {} for node in G.nodes: if node != root_node and "option" in G.nodes[node]: raw_label = str(G.nodes[node]["option"]) node_labels[node] = wrap_label(raw_label, max_len=max_label_len) else: node_labels[node] = G.nodes[node].get("label", node) nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=text_size, ax=ax) # Calculate an offset based on the maximum number of nodes at any level. node_nums = [len([n for n in G.nodes if G.nodes[n].get("level") == level]) for level in levels] max_nodes = max(node_nums) if node_nums else 1 # Annotate the bottom-most node at each level with the decision value. for level in levels: nodes_at_level = [node for node in G.nodes if G.nodes[node].get("level") == level] if nodes_at_level: # Choose the node with the lowest y-position (bottom-most) bottom_node = min(nodes_at_level, key=lambda node: pos[node][1]) if bottom_node != root_node and "decision" in G.nodes[bottom_node]: decision = G.nodes[bottom_node]["decision"] if decision in multiverse_decision.columns: decision_value = multiverse_decision[decision].iloc[0] else: decision_value = decision x, y = pos[bottom_node] ax.text(x, y - label_offset * max_nodes, decision_value, horizontalalignment="center", fontsize=text_size, fontweight="bold") # Save the figure to the results directory. plt.savefig(f"{self.results_dir}/multiverse.png", bbox_inches="tight") return self._handle_figure_returns(fig)
[docs] def specification_curve(self, measure: str, baseline: float | None = None, p_value: float | str | bool | None = None, ci: int | str | bool | None = None, smooth_ci: bool = True, title: str | None = None, name_map: dict | None = None, cmap: str = "Set3", linewidth: float = 2, figsize: tuple | None = None, height_ratio: tuple = (2, 1), fontsize: int = 10, dotsize: int = 50, line_pad: float = 0.3, ftype: str = "pdf", dpi: int = 300, p_threshold: float = 0.05, ci_level_default: int = 95, ): """ Create a specification curve plot from multiverse results. The figure is saved to the results directory as "specification curve.{ftype}". Notes ----- - If `p_value` is float or True, `measure` must contain list/array samples per universe. - If `ci` is int or True, `measure` must contain list/array samples per universe. - If `p_value` is a string, it is interpreted as a p-value column (numeric) or a significance flag (bool). - If `ci` is a string, it must contain per-universe (lower, upper) bounds. Returns ------- Any The figure if in a .py script. None if in a .ipynb notebook (the figure is saved and displayed inline) """ def _map_name(key: str) -> str: return name_map.get(key, key) if isinstance(name_map, dict) else key def _extract_decision_order(decisions_obj) -> list[str]: if not isinstance(decisions_obj, dict): return [] dec_block = decisions_obj.get("__decisions") d = dec_block if isinstance(dec_block, dict) else decisions_obj order = [] i = 1 while f"Decision {i}" in d: order.append(str(d[f"Decision {i}"])) i += 1 return order # ------------------------------------------------------------ # Load df and basic checks df = self.get_results(as_df=True).copy() if measure not in df.columns: raise ValueError(f"'{measure}' not found in multiverse results.") if "__decisions" not in df.columns: raise ValueError("Expected a 'decisions' column containing decision dictionaries.") # Keep raw before scalarising (for CI / p-value computation) raw_measure = df[measure].copy() # Scalarise outcome for plotting (mean over list/array; scalar -> float) df[measure] = df[measure].apply( lambda x: float(np.mean(x)) if isinstance(x, (list, tuple, np.ndarray)) else float(x) ) if df[measure].isna().any(): raise ValueError(f"NaNs detected in '{measure}' after reduction to mean.") # ------------------------------------------------------------ # Flatten decisions -> columns flat = df["__decisions"].map(self._flatten_decisions) dec_df = pd.DataFrame(list(flat), index=df.index) if dec_df.shape[1] == 0: raise ValueError("Could not extract any decisions from the 'decisions' dicts.") # decision-group order from stored Decision 1..N decision_order = _extract_decision_order(df.iloc[0]["__decisions"]) decision_order = [d for d in decision_order if d in dec_df.columns] leftovers = [c for c in dec_df.columns if c not in decision_order] decision_cols_all = decision_order + leftovers df = pd.concat([df.drop(columns=["__decisions"]), dec_df], axis=1) # Keep only decisions with >1 unique option decision_cols = [c for c in decision_cols_all if df[c].nunique(dropna=True) > 1] if len(decision_cols) == 0: raise ValueError("No decisions with more than one option found; nothing to plot in the bottom panel.") # ------------------------------------------------------------ # Sort by outcome sort_idx = df[measure].sort_values().index df = df.loc[sort_idx].reset_index(drop=True) raw_measure_sorted = raw_measure.loc[sort_idx].reset_index(drop=True) # <-- aligned raw samples n = len(df) x_values = np.arange(n) y_values = df[measure].to_numpy(dtype=float) # ------------------------------------------------------------ # Significance handling baseline_for_tests = 0.0 if baseline is None else float(baseline) sig_color = "#018532" significant = np.zeros(n, dtype=bool) pvals = None if p_value is None: pass elif isinstance(p_value, str): if p_value not in df.columns: raise ValueError(f"p_value column '{p_value}' not found in results df.") col = df[p_value] if pd.api.types.is_bool_dtype(col): significant = col.fillna(False).to_numpy(dtype=bool) else: pvals = pd.to_numeric(col, errors="coerce").to_numpy(dtype=float) if np.isnan(pvals).any(): raise ValueError(f"NaNs detected in p-value column '{p_value}'.") significant = pvals < float(p_threshold) else: thr = float(p_threshold) if p_value is True else float(p_value) pvals_list = [] for rv in raw_measure_sorted: if isinstance(rv, (list, tuple, np.ndarray)): arr = np.asarray(rv, dtype=float) arr = arr[np.isfinite(arr)] if len(arr) >= 2: _, p = stats.ttest_1samp(arr, baseline_for_tests) pvals_list.append(float(p)) else: pvals_list.append(np.nan) else: pvals_list.append(np.nan) pvals = np.asarray(pvals_list, dtype=float) if np.isnan(pvals).any(): raise ValueError( "Cannot compute p-values for some universes: " f"'{measure}' must contain list/array samples per universe when p_value is float/True." ) significant = pvals < thr top_colors = np.where(significant, sig_color, "black") # ------------------------------------------------------------ # CI handling ci_lower = None ci_upper = None if ci is None: pass elif isinstance(ci, str): if ci not in df.columns: raise ValueError(f"CI column '{ci}' not found in results df.") bounds = df[ci].to_list() lows, highs = [], [] for b in bounds: if isinstance(b, (list, tuple, np.ndarray)) and len(b) == 2: lo, hi = float(b[0]), float(b[1]) if not (np.isfinite(lo) and np.isfinite(hi)): raise ValueError(f"Non-finite CI bounds in column '{ci}'.") lows.append(lo) highs.append(hi) else: raise ValueError(f"CI column '{ci}' must contain (lower, upper) tuples/lists per universe.") ci_lower = np.asarray(lows, dtype=float) ci_upper = np.asarray(highs, dtype=float) else: level = int(ci_level_default) if ci is True else int(ci) lows, highs = [], [] for rv, mean_val in zip(raw_measure_sorted, y_values): if not isinstance(rv, (list, tuple, np.ndarray)): raise ValueError( f"Cannot compute CI because '{measure}' does not contain per-universe samples. " "Provide a CI column instead (ci='colname')." ) arr = np.asarray(rv, dtype=float) arr = arr[np.isfinite(arr)] if len(arr) < 4: raise ValueError( f"Cannot compute CI for a universe with <4 finite samples in '{measure}'. " "Provide a CI column instead (ci='colname') or store more samples." ) sem = np.std(arr, ddof=1) / np.sqrt(len(arr)) half = sem * stats.t.ppf((1.0 + level / 100.0) / 2.0, len(arr) - 1) lows.append(float(mean_val - half)) highs.append(float(mean_val + half)) ci_lower = np.asarray(lows, dtype=float) ci_upper = np.asarray(highs, dtype=float) # ------------------------------------------------------------ # Figure size (auto if None) if title is None: title = "Specification Curve" if figsize is None: num_options = sum(df[c].nunique(dropna=True) for c in decision_cols) figsize = (max(8, n * 0.07), max(6, num_options * 0.35)) # ------------------------------------------------------------ # Bottom panel layout (decision order enforced; options order-of-appearance) decision_positions = {} display_labels = [] yticks = [] line_ends = [] key_positions = {} y_max = 0 space_between_groups = 1 num_groups = len(decision_cols) cmap_obj = plt.cm.get_cmap(cmap, num_groups if num_groups > 0 else 1) group_colors = [cmap_obj(i) for i in range(num_groups)] for group_idx, decision in enumerate(decision_cols): options = pd.unique(df[decision].astype("object")) options = [o for o in options if pd.notna(o)] group_label = _map_name(decision) key_positions[group_label] = y_max + len(options) / 2.0 - 0.5 for opt in options: yticks.append(y_max) display_labels.append(str(opt)) decision_positions[(decision, opt)] = (y_max, group_idx) y_max += 1 line_ends.append(y_max) y_max += space_between_groups # ------------------------------------------------------------ # Plot sns.set_theme(style="whitegrid") fig, ax = plt.subplots( 2, 1, figsize=figsize, gridspec_kw={"height_ratios": height_ratio}, sharex=True ) # Bottom axes setup ax[1].set_yticks(yticks) ax[1].set_yticklabels(display_labels, fontsize=fontsize) ax[1].tick_params(axis="y", labelsize=fontsize) ax[1].set_ylim(-1, y_max) ax[1].xaxis.grid(False) ax[1].invert_yaxis() # Left padding for group labels/lines fig.canvas.draw() renderer = fig.canvas.get_renderer() trans1 = transforms.blended_transform_factory(ax[1].transAxes, ax[1].transData) tick_extents = [lbl.get_window_extent(renderer=renderer) for lbl in ax[1].get_yticklabels()] max_extent = max(tick_extents, key=lambda bb: bb.width) x_start_pixel = max_extent.x0 x_start_axes1 = ax[1].transAxes.inverted().transform((x_start_pixel, 0))[0] trans0 = transforms.blended_transform_factory(ax[0].transAxes, ax[0].transData) tick_extents0 = [lbl.get_window_extent(renderer=renderer) for lbl in ax[0].get_yticklabels()] if tick_extents0: max_extent0 = max(tick_extents0, key=lambda bb: bb.width) x_start_pixel0 = max_extent0.x0 x_start_axes0 = ax[0].transAxes.inverted().transform((x_start_pixel0, 0))[0] else: x_start_axes0 = x_start_axes1 min_x_start_axes = min(x_start_axes1, x_start_axes0) padding = -line_pad * min_x_start_axes line_offset = min_x_start_axes - padding # Group labels + separators for key, pos in key_positions.items(): ax[1].text( line_offset - padding, pos, key, transform=trans1, ha="right", va="center", fontweight="bold", fontsize=fontsize ) s = -0.5 for line_end in line_ends: e = line_end - 0.5 ax[1].add_line( mlines.Line2D([line_offset, line_offset], [s, e], color="black", lw=1, transform=trans1, clip_on=False) ) s = line_end + 0.5 # Top scatter ax[0].scatter(x_values, y_values, c=top_colors, s=dotsize, edgecolors=top_colors, zorder=3) # CI drawing if ci_lower is not None and ci_upper is not None: if smooth_ci and len(ci_lower) >= 4: spline_lo = make_interp_spline(x_values, ci_lower.astype(float), k=3) spline_hi = make_interp_spline(x_values, ci_upper.astype(float), k=3) x_smooth = np.linspace(x_values.min(), x_values.max(), 500) ax[0].fill_between(x_smooth, spline_lo(x_smooth), spline_hi(x_smooth), color="gray", alpha=0.3) else: for i in range(n): ax[0].plot([i, i], [ci_lower[i], ci_upper[i]], color="gray", linewidth=linewidth, zorder=2) # Baseline line legend_items = [] if baseline is not None: ax[0].hlines(float(baseline), xmin=-2, xmax=n + 1, linestyles="--", lw=2, colors="black", zorder=1) legend_items.append(mlines.Line2D([], [], linestyle="--", color="black", linewidth=2, label="Baseline")) # Measure label + left line ymin, ymax = ax[0].get_ylim() ycenter = (ymin + ymax) / 2.0 ax[0].text( line_offset - padding, ycenter, _map_name(measure), transform=trans0, ha="right", va="center", fontweight="bold", fontsize=fontsize ) ax[0].add_line( mlines.Line2D([line_offset, line_offset], [ymin, ymax], color="black", lw=1, transform=trans0, clip_on=False) ) # Bottom markers (vectorised melt) long = df[decision_cols].reset_index().melt( id_vars="index", value_vars=decision_cols, var_name="decision", value_name="option" ).rename(columns={"index": "x"}) bottom_x, bottom_y, bottom_c = [], [], [] for row in long.itertuples(index=False): opt = row.option if pd.isna(opt): continue key = (row.decision, opt) if key in decision_positions: y_pos, grp_idx = decision_positions[key] bottom_x.append(row.x) bottom_y.append(y_pos) bottom_c.append(group_colors[grp_idx] if num_groups > 0 else "black") if bottom_x: ax[1].scatter(np.asarray(bottom_x), np.asarray(bottom_y), c=np.asarray(bottom_c), s=dotsize, marker="o") # Top panel styling ax[0].set_title(title, fontweight="bold", fontsize=fontsize + 2) ax[0].xaxis.grid(False) ax[0].set_xticks([]) ax[0].set_xlim(-1, n) ax[0].tick_params(axis="y", labelsize=fontsize) # Legend for CI if ci_lower is not None and ci_upper is not None: if isinstance(ci, int): ci_lab = f"{ci}% CI" elif ci is True: ci_lab = f"{ci_level_default}% CI" else: ci_lab = "CI" legend_items.append(mpatches.Patch(facecolor="gray", edgecolor="white", label=ci_lab)) # Legend for significance (only if p_value requested) if p_value is not None: if significant.any(): legend_items.append( mlines.Line2D([], [], linestyle="None", marker="o", markersize=9, markerfacecolor=sig_color, markeredgecolor=sig_color, label="significant") ) if (~significant).any(): legend_items.append( mlines.Line2D([], [], linestyle="None", marker="o", markersize=9, markerfacecolor="black", markeredgecolor="black", label="not significant") ) if legend_items: ax[0].legend(handles=legend_items, loc="upper left", fontsize=fontsize, frameon=False) plt.savefig(f"{self.results_dir}/specification_curve2.{ftype}", bbox_inches="tight", dpi=dpi) sns.reset_orig() return self._handle_figure_returns(fig)
[docs] def multiverse_plot(self, measure: str, n_bins: int = 20, sig_col: str | None = None, sig_threshold: float = 0.05, baseline: float | None = None, name_map: dict | None = None, figsize: tuple = (7, 9), ftype: str = "pdf", dpi: int = 300, ): """ Multiverse plot as introduced by Krähmer & Young (2026). This plot visualises the distribution of multiverse outcomes together with heatmap strips showing how different analytic choices relate to the outcome. For each decision level, the average change in the outcome relative to the reference level is shown on the right. The figure is saved to the results directory as "multiverse_plot.{ftype}". References ---------- Krähmer, D., & Young, C. (2026). Visualizing vastness: Graphical methods for multiverse analysis. PLOS One, 21(2). https://doi.org/10.1371/journal.pone.0339452 Parameters ---------- measure : str Name of the outcome/measure column in the multiverse results. Entries may be scalars or lists/arrays (in which case the mean is used). n_bins : int, optional Number of bins used to discretise the outcome axis for the heatmap strips. sig_col : str | None, optional Column indicating statistical significance. If provided: - boolean values are interpreted directly (True = significant), - numeric values are compared against ``sig_threshold``. If None, no significance overlay is drawn. sig_threshold : float, optional Threshold used when ``sig_col`` is numeric (default is 0.05). baseline : float | None, optional Optional baseline value for the outcome. If provided, a vertical dashed reference line is drawn at this value in the density plot. name_map : dict | None, optional Optional mapping for display names. Keys may include the measure name and decision names. Values are the desired display labels. figsize : tuple, optional Figure size passed to Matplotlib (width, height) in inches. ftype : str, optional File type used when saving the figure (e.g., ``"pdf"``, ``"png"``). dpi : int, optional Resolution (dots per inch) used when saving the figure. Returns ------- Any The figure if in a .py script. None if in a .ipynb notebook (the figure is saved and displayed inline) """ # Helpers def _map_name(key: str) -> str: return name_map.get(key, key) if isinstance(name_map, dict) else key def _kde_density(values, grid, x_min, x_max): values = np.asarray(values, dtype=float) values = values[np.isfinite(values)] if len(values) < 2: hist, edges = np.histogram(values, bins=20, range=(x_min, x_max), density=True) mids = (edges[:-1] + edges[1:]) / 2 return np.interp(grid, mids, hist, left=0, right=0) return stats.gaussian_kde(values)(grid) def _build_heatmap_data(df_in, varname, outcome_var, breaks): tmp = df_in[[varname, outcome_var]].copy() tmp["outcome_bin"] = pd.cut(tmp[outcome_var], bins=breaks, include_lowest=True) all_bins = tmp["outcome_bin"].cat.categories levels = tmp[varname].cat.categories idx = pd.MultiIndex.from_product([all_bins, levels], names=["outcome_bin", varname]) counts = ( tmp.groupby(["outcome_bin", varname], observed=False) .size() .reindex(idx, fill_value=0) .reset_index(name="n") ) total_by_bin = counts.groupby("outcome_bin")["n"].transform(lambda x: x.sum() if x.sum() > 0 else 1) counts["prop"] = counts["n"] / total_by_bin bin_to_idx = {iv: i for i, iv in enumerate(all_bins)} counts["bin_idx"] = counts["outcome_bin"].map(bin_to_idx) return counts ############### # General setup df = self.get_results(as_df=True) # Load results if measure not in df.columns: raise ValueError( f"'{measure}' not found in multiverse results. Make sure to save it in the multiverse template." ) if "__decisions" not in df.columns: raise ValueError("Expected a 'decisions' column containing decision dictionaries.") if n_bins > len(df): raise ValueError(f"n_bins ({n_bins}) cannot be higher than the number of universes ({len(df)}).") # Significance handling if sig_col is not None: if sig_col not in df.columns: raise ValueError(f"sig_col='{sig_col}' not found in results df.") s = df[sig_col] if pd.api.types.is_bool_dtype(s): significant = s.fillna(False).astype(bool) else: s_num = pd.to_numeric(s, errors="coerce") significant = (s_num < sig_threshold).fillna(False) df = df.copy() df["__significant"] = significant else: df = df.copy() df["__significant"] = False # Scalarise outcome: mean over list/array; scalar -> float df[measure] = df[measure].apply( lambda x: float(np.mean(x)) if isinstance(x, (list, tuple, np.ndarray)) else float(x) ) if df[measure].isna().any(): raise ValueError(f"NaNs detected in '{measure}' after reduction to mean.") # Flatten decisions into columns flat = df["__decisions"].map(self._flatten_decisions) dec_only_df = pd.DataFrame(list(flat), index=df.index) dec_only_df = dec_only_df.add_prefix("__") if dec_only_df.shape[1] == 0: raise ValueError("Could not extract any decisions from the 'decisions' dicts.") # Enforce decision-group order from "Decision 1..N" decision_order = self._extract_decision_order(df.iloc[0]["__decisions"]) decision_order = [f"__{d}" for d in decision_order if f"__{d}" in dec_only_df.columns] leftovers = [c for c in dec_only_df.columns if c not in decision_order] decisions_list = decision_order + leftovers # Reorder decision columns and merge dec_only_df = dec_only_df.reindex(columns=decisions_list) df = pd.concat([df.drop(columns=["__decisions"]), dec_only_df.reindex(columns=decisions_list)], axis=1) # Options within each decision: keep order-of-appearance (do NOT reverse) for d in decisions_list: s = df[d].astype("object") cats = list(pd.unique(s)) df[d] = pd.Categorical(s, categories=cats, ordered=True) # Average increase labels (mean difference vs reference = first option) avg_diff_lookup = {} for varname in decisions_list: levels_all = list(df[varname].cat.categories) if len(levels_all) == 0: continue ref = levels_all[0] ref_mean = float(df.loc[df[varname] == ref, measure].mean()) lvl_map = {} for lvl in levels_all: if lvl == ref: lvl_map[lvl] = "Ref." else: lvl_mean = float(df.loc[df[varname] == lvl, measure].mean()) lvl_map[lvl] = f"{(lvl_mean - ref_mean):+.2f}" avg_diff_lookup[varname] = lvl_map ################## # Top density plot multiverse_outcome = df[measure].to_numpy(dtype=float) x_min = float(np.min(multiverse_outcome)) x_max = float(np.max(multiverse_outcome)) if not np.isfinite(x_min) or not np.isfinite(x_max) or x_min == x_max: raise ValueError(f"'{measure}' must have finite, non-constant values.") x_pad = 0.02 * np.ptp(multiverse_outcome) common_xlim = (x_min - x_pad, x_max + x_pad) breaks = np.linspace(x_min, x_max, n_bins + 1) grid_x = np.linspace(x_min, x_max, 1000) y_all = _kde_density(multiverse_outcome, grid_x, x_min, x_max) if df["__significant"].any(): sig_vals = df.loc[df["__significant"], measure].to_numpy(dtype=float) y_sig = _kde_density(sig_vals, grid_x, x_min, x_max) sig_share = float(df["__significant"].mean()) y_sig_scaled = np.minimum(y_sig * sig_share, y_all) else: y_sig_scaled = None # Plot layout DENSITY_RATIO = 0.5 strip_levels_counts = [len(df[v].cat.categories) for v in decisions_list] total_strip_levels = sum(strip_levels_counts) density_height = max(1, int(DENSITY_RATIO * total_strip_levels)) n_rows = density_height + sum([2 + c for c in strip_levels_counts]) + 1 fig = plt.figure(figsize=figsize) gs = gridspec.GridSpec(n_rows, 1, figure=fig, hspace=0) # 0.75 if baseline label current_row = 0 tab10 = plt.get_cmap("tab10").colors base_colors = {v: tab10[i % len(tab10)] for i, v in enumerate(decisions_list)} # Density panel ax_density = fig.add_subplot(gs[current_row : current_row + density_height, 0]) current_row += density_height if baseline is not None: ax_density.plot([baseline, baseline], [0, float(np.max(y_all))], linestyle="--", linewidth=1, color="black") ax_density.plot(grid_x, y_all, label="All", linewidth=1.5) if y_sig_scaled is not None: ax_density.fill_between(grid_x, y_sig_scaled, alpha=0.4, label="Significant", color="tomato") ax_density.set_xlim(*common_xlim) ax_density.set_xticks([]) ax_density.set_xticklabels([]) ax_density.set_ylabel("Density") ax_density.set_yticks([]) #ax_density.text(baseline, -0.1, str(baseline),ha="center", va="top") if baseline is not None else None ax_density.legend(frameon=False) ax_density.grid(False) ax_density.set_frame_on(False) ax_density.plot([common_xlim[0], common_xlim[1]], [0, 0], linewidth=1, color="black") ############################## # Decision/options strip plots x_label_pos = common_xlim[1] + 0.01 * (x_max - x_min) for varname in decisions_list: levels = list(df[varname].cat.categories) # keep option order n_levels = len(levels) n_rows_strip = 2 + n_levels # blank + header + levels ax = fig.add_subplot(gs[current_row : current_row + n_rows_strip, 0]) current_row += n_rows_strip counts = _build_heatmap_data(df, varname, measure, breaks) H = np.zeros((n_levels + 2, len(breaks) - 1)) for i_level, lvl in enumerate(levels): sub = counts[counts[varname] == lvl] prop_by_bin = dict(zip(sub["bin_idx"], sub["prop"])) for b in range(len(breaks) - 1): H[i_level + 2, b] = prop_by_bin.get(b, 0.0) cmap = LinearSegmentedColormap.from_list("white_to_color", [(1, 1, 1), base_colors[varname]]) X = breaks Y = np.arange(n_levels + 3) # edges ax.pcolormesh(X, Y, H, shading="flat", cmap=cmap, vmin=0.0, vmax=1.0) ax.set_ylim(n_levels + 2, 0) # Right-side mean diffs for i_level, lvl in enumerate(levels): lab = avg_diff_lookup.get(varname, {}).get(lvl, "") ax.text(x_label_pos, i_level + 2.5, lab, va="center", ha="left", clip_on=False) # y ticks ytick_pos = [0.5, 1.5] + [i + 2.5 for i in range(n_levels)] display_name = {c: c.replace("__", "", 1) for c in dec_only_df.columns} ytick_labels = ["", _map_name(display_name[varname])] + [str(lvl) for lvl in levels] ax.set_yticks(ytick_pos) ax.set_yticklabels(ytick_labels) ticklbls = ax.get_yticklabels() if len(ticklbls) > 1: ticklbls[1].set_fontweight("bold") ax.tick_params(axis="y", length=0) ax.set_xlim(*common_xlim) ax.tick_params(axis="x", which="both", bottom=False, labelbottom=False) for spine in ["top", "right", "left", "bottom"]: ax.spines[spine].set_visible(False) ############################################# # Bottom x-axis (the multiverse plot measure) bottom_xaxis = fig.add_subplot(gs[current_row : current_row + 1, 0]) current_row += 1 bottom_xaxis.set_xlim(*common_xlim) bottom_xaxis.set_yticks([]) bottom_xaxis.tick_params(axis="y", left=False, labelleft=False) bottom_xaxis.set_xlabel(_map_name(measure)) bottom_xaxis.set_xticks([x_min, (x_min + x_max) / 2.0, x_max]) for spine in ["top", "left", "right"]: bottom_xaxis.spines[spine].set_visible(False) bottom_xaxis.spines["bottom"].set_visible(True) bottom_xaxis.tick_params(axis="x", bottom=True, labelbottom=True) # Save and return plt.savefig(f"{self.results_dir}/multiverse_plot.{ftype}", bbox_inches="tight", dpi=dpi) return self._handle_figure_returns(fig)
[docs] def integrate(self, measure=None, method="uniform", type="mean" ): """ Integrate the multiverse results. Parameters ---------- measure : string Name of the measure to integrate. method : string Method to use for integration. Options are: "uniform" (default): Simple mean/median across all universes "bma": Bayesian model averaging (requires BIC values in the results) type : string Type of (weighted) integration. Options are "mean" (default) or "median". """ # Get results dataframe and convert columns to lowercase results = self.get_results(as_df=True) results.columns = results.columns.str.lower() # Initial checks if measure is None: raise ValueError("Please provide a measure to integrate.") if measure not in results.columns: raise ValueError(f"The measure '{measure}' was not found in the results.") if method == "bma" and "bic" not in results.columns: raise ValueError("BMA weights require a 'bic' column in the results.") # Get measure and compute weights x = results[measure].to_numpy() if method == "uniform": weights = self._uniform_weights(x) elif method == "bma": weights = self._bma_weights(results) else: raise ValueError("method must be 'uniform' or 'bma'") # Compute integrated estimate if type == "mean": integrated_estimate = self._weighted_mean(x, weights) elif type == "median": integrated_estimate = self._weighted_median(x, weights) else: raise ValueError("type must be 'mean' or 'median'") return integrated_estimate, weights
# Internal methods def _create_summary(self, all_universes, keys): """ Internal function: Create a CSV file with the parameters of all universes Parameters ---------- csv_path : string Path to save the CSV file all_universes : list List of all universes (combinations of parameters) keys : list List of keys for the CSV file. Used when the order is consistent. config : dict Configuration dictionary. Used to check for the 'order' key. """ fieldnames = ['Universe'] for i in range(len(keys)): fieldnames.append(f"Decision {i+1}") fieldnames.append(f"Value {i+1}") # Generate CSV file with the parameters of all universes file_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv") with open(file_path, "w", newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for i, combination in enumerate(all_universes, start=1): context = {'Universe': f"Universe_{i}"} # Populate the decision and value columns for j, (key, value) in enumerate(combination): if isinstance(value, dict): value = value.get('name', '') context[f"Decision {j+1}"] = key context[f"Value {j+1}"] = value writer.writerow(context) return def _read_summary(self): """ Internal function: Reads the multiverse_summary.csv file Returns ------- summary : pandas.DataFrame Pandas datframe containing the multiverse summary """ summary_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv") return pd.read_csv(summary_path) def _render_val(self, v): """Render decision value. `$...` → literal inject, else double-quoted string.""" if isinstance(v, bool): return "True" if v else "False" if isinstance(v, (int, float)) or v is None: return repr(v) if isinstance(v, str): if v.startswith("$"): return v[1:] # literal inject (variable, code, collection, call...) return f"\"{self._escape_double_quotes(v)}\"" # always double-quoted string if isinstance(v, list): return "[" + ", ".join(self._render_val(x) for x in v) + "]" if isinstance(v, tuple): return "(" + ", ".join(self._render_val(x) for x in v) + ")" if isinstance(v, dict): # If it's a function spec, _handle_dict will be used by _format_type. # If it's a plain dict value, render a Python dict literal. items = ", ".join(f"{self._render_val(k)}: {self._render_val(val)}" for k, val in v.items()) return "{" + items + "}" return repr(v) def _handle_dict(self, value): """ Turn a dict spec into a Python call string. Supports: {"func": "comet.connectivity.Static_Pearson(ts).estimate()"} {"func": "comet.connectivity.Static_Partial", "args": {...}} {"func": "bct.clustering_coef_bu", "args": {"G": "$G"}} Optional: {"positional": ["$ts", 123]} for explicit positional args. """ func = value.get("func", "") args = value.get("args", {}) or {} positional = value.get("positional", []) or [] # If func already looks like a full call string, pass it through untouched. # (Convention: if you need variables here, prefer using "args" with "$var") if "(" in func: return func # Build argument list parts = [] parts += [self._render_val(p) for p in positional] parts += [f"{k}={self._render_val(v)}" for k, v in args.items()] arg_str = ", ".join(parts) call_expr = f"{func}({arg_str})" if parts else f"{func}()" # comet.connectivity classes require .estimate() if func.startswith("comet.connectivity."): call_expr += ".estimate()" return call_expr def _format_type(self, value): """ Convert decision values into Python code strings for template injection. """ # Function spec dicts go through _handle_dict; everything else through _render_val. if isinstance(value, dict) and "func" in value: return self._handle_dict(value) return self._render_val(value) def _escape_double_quotes(self, s: str) -> str: # Keep things robust if the literal happens to contain quotes/backslashes return s.replace("\\", "\\\\").replace('"', '\\"') def _combine_results(self): """ Internal function: Combines the results in a single dictionary and saves it as a pickle file """ file_name = "multiverse_results.pkl" file_paths = glob.glob(os.path.join(self.script_dir, "temp", "universe_*.pkl")) if not file_paths: return combined_results = {} for file_path in sorted(file_paths): universe_key = os.path.splitext(os.path.basename(file_path))[0] with open(file_path, 'rb') as f: result_dict = pickle.load(f) combined_results[universe_key] = result_dict # Save combined results file with open(os.path.join(self.results_dir, file_name), 'wb') as f: pickle.dump(combined_results, f) # Delete individual results files and folder for file_path in file_paths: os.remove(file_path) os.rmdir(os.path.join(self.script_dir, "temp")) return def _flatten_decisions(self, dec_block): """ Convert {'Decision 1': 'X', 'Value 1': Y, ...} into {'X': 'Y', ...}. Values are stringified for categorical plotting. """ if not isinstance(dec_block, dict): return {} out = {} for k, v in dec_block.items(): m = re.fullmatch(r"Decision\s+(\d+)", str(k)) if not m: continue idx = m.group(1) value_key = f"Value {idx}" out[str(v)] = str(dec_block.get(value_key, "NA")) return out def _extract_decision_order(self, decisions_obj) -> list[str]: """ Return ["<Decision 1 name>", "<Decision 2 name>", ...] from COMET decisions storage. Supports two schemas: 1) row["__decisions"] is the decisions block itself 2) row["__decisions"] is a result dict containing {"__decisions": {...}} """ if not isinstance(decisions_obj, dict): return [] # schema 2: {"__decisions": {...}} dec_block = decisions_obj.get("__decisions") if isinstance(dec_block, dict): d = dec_block else: # schema 1: already the block d = decisions_obj order = [] i = 1 while f"Decision {i}" in d: order.append(str(d[f"Decision {i}"])) i += 1 return order def _load_and_prepare_data(self, measure=None): """ Internal function: Load and prepare the data for the specification curve plotting. Parameters ---------- measure : str Name of the measure to plot. Returns ------- sorted_universes : list List of tuples (data, summary_row) sorted by the mean value of the measure. forking_paths : dict Dictionary containing the forking paths. """ # Load the summary CSV. summary_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv") multiverse_summary = pd.read_csv(summary_path) # Load the forking paths. with open(os.path.join(self.results_dir, "forking_paths.pkl"), "rb") as file: forking_paths = pickle.load(file) # Load the combined results dictionary. combined_results_path = os.path.join(self.results_dir, "multiverse_results.pkl") if not os.path.exists(combined_results_path): raise ValueError("Results file not found. Please run the multiverse analysis first.") with open(combined_results_path, "rb") as file: combined_results = pickle.load(file) # Extract the specified measure for each universe. universe_data = {} for universe, result_dict in combined_results.items(): if measure not in result_dict: raise ValueError(f"Measure '{measure}' not found in results for {universe}.") universe_data[universe] = result_dict[measure] # Build a list of tuples pairing each universe's measure data with its summary row. universes_with_summary = [] for universe, data in universe_data.items(): # Ensure matching is case-insensitive. summary_row = multiverse_summary[multiverse_summary['Universe'].str.lower() == universe.lower()] if not summary_row.empty: universes_with_summary.append((data, summary_row.iloc[0])) # Sort the universes by the mean of the measure values. sorted_universes = sorted(universes_with_summary, key=lambda x: np.mean(x[0])) return sorted_universes, forking_paths def _in_notebook(self): """ Helper function to check if the code is running in a Jupyter notebook """ try: from IPython import get_ipython if 'IPKernelApp' not in get_ipython().config: return False except Exception: return False return True def _handle_figure_returns(self, fig): """ Helper function to handle plotting """ if self._in_notebook(): ipy_display(fig) plt.close(fig) return else: return fig # Multiverse integration def _weighted_mean(self,x: np.ndarray, w: np.ndarray) -> float: """ Compute the weighted mean of x with weights w. Assumes w >= 0 and w.sum() == 1. """ return float(np.sum(w * x)) def _weighted_median(self, x: np.ndarray, w: np.ndarray) -> float: """ Compute the weighted median of x with weights w. Assumes w >= 0 and w.sum() == 1. """ order = np.argsort(x) x_sorted = x[order] w_sorted = w[order] cw = np.cumsum(w_sorted) return float(x_sorted[np.searchsorted(cw, 0.5, side="left")]) def _uniform_weights(self, data: pd.DataFrame) -> np.ndarray: """ Compute uniform weights for all universes. """ n = len(data) return np.full(n, 1.0 / n, dtype=float) def _bma_weights(self, data: pd.DataFrame) -> np.ndarray: """ Compute normalised BMA weights from BIC values. """ bic = data["bic"].to_numpy(float) delta = bic - np.min(bic) w = np.exp(-0.5 * delta) return w / w.sum()
# Load an existing multiverse
[docs] def load_multiverse(path=None): """ Load a previously created multiverse from disk. Parameters ---------- path : str A full/relative path to an existing multiverse folder """ if path is not None: name = path.rstrip("/") # Remove trailing slash if present mverse = Multiverse(name=name, path=path) # Create the Multiverse object for the given path else: raise ValueError("Please provide a name/path to a multiverse directory.") if not os.path.exists(mverse.multiverse_dir + "/multiverse_summary.csv"): shutil.rmtree(mverse.multiverse_dir) # clean up created directory raise ValueError("The specified path does not seem to contain a valid multiverse.") return mverse