import os
import re
import sys
import csv
import glob
import shutil
import pickle
import inspect
import textwrap
import itertools
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
from scipy import stats
from jinja2 import Template
from tqdm.auto import tqdm
from matplotlib import transforms
from collections import defaultdict
from matplotlib import pyplot as plt
from matplotlib import lines as mlines
from matplotlib import gridspec as gridspec
from matplotlib.colors import LinearSegmentedColormap
from matplotlib import patches as mpatches
from scipy.interpolate import make_interp_spline
from IPython.display import display as ipy_display
from concurrent.futures import ThreadPoolExecutor, as_completed
[docs]
class Multiverse:
"""
Multiverse analysis class.
Parameters
----------
name : str
Name of the multiverse analysis. Default is "multiverse".
path : str
Path to a multiverse directory (only used by the GUI).
"""
def __init__(self, name="multiverse", path=None):
self.name = name.split('/')[-1].split('.')[0]
self.num_universes = None
self.forking_paths = None
if path is not None:
# Set the directories from the provided path (used by the GUI)
self.multiverse_dir = path
self.results_dir = os.path.join(self.multiverse_dir, "results")
self.script_dir = os.path.join(self.multiverse_dir, "scripts")
else:
# Set the directories based on the calling script
calling_script_dir = os.getcwd() if self._in_notebook() else os.path.dirname(os.path.abspath(sys.modules['__main__'].__file__))
self.multiverse_dir = os.path.join(calling_script_dir, self.name)
self.results_dir = os.path.join(self.multiverse_dir, "results")
self.script_dir = os.path.join(self.multiverse_dir, "scripts")
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.script_dir, exist_ok=True)
# Public methods
[docs]
def create(self, analysis_template, forking_paths, config={}, verbose=False):
"""
Create the individual universe scripts
Parameters
----------
analysis_template : function
Function containing the analysis template
forking_paths : dict
Dictionary containing the forking paths
config : dict
Configuration dictionary with optional combination rules
- order : list of lists specifying the order of decisions
- exclude : list of list[dict or str] (set listed keys to NaN if conditions match)
- remove : list of list[dict or str] (drop universes if conditions match)
- deduplicate : bool (collapse duplicates after exclude/remove; default True)
"""
# If multiverse directory exists, remove all Python files but keep folders and template
if os.path.exists(self.multiverse_dir) and os.path.exists(self.script_dir):
for item in os.listdir(self.script_dir):
item_path = os.path.join(self.script_dir, item)
if os.path.isfile(item_path) and item.endswith(".py") and item != "template.py":
os.remove(item_path)
else:
os.makedirs(self.multiverse_dir)
# Ensure the results directory exists
if not os.path.exists(self.results_dir):
os.makedirs(self.results_dir)
# Template creation
template_code = inspect.getsource(analysis_template) # Extract the source code
template_body = "\n".join(template_code.splitlines()[1:]) # Remove the function definition
# Determine the indentation level of the first line of the function body and remove it from all lines
first_line = template_body.splitlines()[0]
indentation_level = len(first_line) - len(first_line.lstrip())
adjusted_lines = [line[indentation_level:] if len(line) > indentation_level else line
for line in template_body.splitlines()]
adjusted_template_body = "\n".join(adjusted_lines)
# Create jinja template
jinja_template = Template(adjusted_template_body)
# Generate all unique combinations of forking paths
if not forking_paths:
print("No forking paths provided; nothing to create.")
return
keys, values = zip(*forking_paths.items())
# Rules
all_universes = []
exclude_rules = config.get("exclude", [])
removed_rules = config.get("remove", [])
deduplicate = config.get("deduplicate", True)
def _normalise_value(v):
return v["name"] if isinstance(v, dict) and "name" in v else v
def _apply_exclude_to_universe(universe):
if not exclude_rules:
return universe
path_dict = {k: _normalise_value(v) for k, v in universe}
to_nan = set()
for rule in exclude_rules:
matches = True
keys_to_nan = []
for condition in rule:
if isinstance(condition, dict):
for key, val in condition.items():
actual = path_dict.get(key)
no_match = actual not in val if isinstance(val, list) else actual != val
if no_match:
matches = False
break
elif isinstance(condition, str):
keys_to_nan.append(condition)
else:
matches = False
break
if matches:
for k in keys_to_nan:
if k in path_dict:
to_nan.add(k)
if not to_nan:
return universe
updated = []
for k, v in universe:
if k in to_nan:
updated.append((k, float("nan")))
else:
updated.append((k, v))
return updated
def _rule_matches(universe, ruleset):
path_dict = {k: _normalise_value(v) for k, v in universe}
for rule in ruleset:
if not isinstance(rule, list):
continue
matches = True
for condition in rule:
if isinstance(condition, dict):
for key, val in condition.items():
actual = path_dict.get(key)
no_match = actual not in val if isinstance(val, list) else actual != val
if no_match:
matches = False
break
elif isinstance(condition, str):
if condition not in path_dict:
matches = False
break
else:
matches = False
break
if matches:
return True, rule
return False, None
# Build universes (apply exclude during creation)
pre_dedup_count = 0
dedup_removed = 0
if config.get("order"):
for order in config["order"]:
reordered_values = [forking_paths[key] for key in order]
unused_keys = [key for key in keys if key not in order]
order_universes = []
for combination in itertools.product(*reordered_values):
universe = list(zip(order, combination)) + [(k, "unused") for k in unused_keys]
universe = _apply_exclude_to_universe(universe)
order_universes.append(tuple(universe))
pre_dedup_count += len(order_universes)
# Deduplicate **within this order only**
if deduplicate and order_universes:
unique, seen = [], set()
def _canon_sig(d):
def canon(x):
x = _normalise_value(x)
if isinstance(x, float) and (x != x): # NaN
return ("NaN",)
return x
return tuple(sorted((k, canon(v)) for k, v in d.items()))
for u in order_universes:
sig = _canon_sig(dict(u))
if sig not in seen:
seen.add(sig)
unique.append(u)
order_universes = unique
dedup_removed += (len(order_universes) - len(unique))
# Now append; note we do **not** run a global dedup later
all_universes.extend(order_universes)
else:
for combination in itertools.product(*values):
universe = list(zip(keys, combination))
universe = _apply_exclude_to_universe(universe)
all_universes.append(tuple(universe))
pre_dedup_count = len(all_universes)
dedup_removed = len(all_universes)
# Exclusion summary with rule context (post-dedup for accurate counts)
rule_context_counts = defaultdict(int)
def _freeze_conditions(rule):
# Extract dict conditions only; sort for stable printing
conds = []
for c in rule:
if isinstance(c, dict):
for k, v in c.items():
conds.append((k, v))
return tuple(sorted(conds))
for u in all_universes:
u_dict = dict(u)
path_dict = {k: _normalise_value(v) for k, v in u}
for rule in exclude_rules:
conditions, keys_to_nan = [], []
for c in rule:
if isinstance(c, dict):
conditions.append(c)
elif isinstance(c, str):
keys_to_nan.append(c)
# match conditions?
matches = True
for cond in conditions:
for ck, cv in cond.items():
if path_dict.get(ck) != cv:
matches = False
break
if not matches:
break
if not matches:
continue
# count keys that are actually NaN in this universe
cond_sig = _freeze_conditions(rule)
for k in keys_to_nan:
if k in u_dict:
val = u_dict[k]
if isinstance(val, float) and (val != val): # NaN
rule_context_counts[(cond_sig, k)] += 1
if rule_context_counts and verbose:
if config:
if config.get("order"):
print(f"Total number of universes: {pre_dedup_count} (includes ordering permutations)")
else:
print(f"Total number of universes: {pre_dedup_count}")
if dedup_removed:
print(f"Removed {dedup_removed} duplicate universes (set 'deduplicate' to False if you want to keep them)")
for (cond_sig, key), count in rule_context_counts.items():
human_cond = "{" + ", ".join([f"'{k}': {repr(v)}" for k, v in cond_sig]) + "}"
print(f" - Set '{key}' to NaN for universes matching {human_cond} ({count} total).")
# Remove universes that match 'remove' rules (printed last)
if removed_rules:
valid_universes = []
removed_universes = []
for combination in all_universes:
is_invalid, matched_rule = _rule_matches(combination, removed_rules)
if is_invalid:
removed_universes.append((combination, matched_rule))
else:
valid_universes.append(combination)
# Group removed universes by rule
rule_to_universes = defaultdict(list)
for universe, rule in removed_universes:
rule_to_universes[str(rule)].append(dict(universe))
print(f"Removed {len(removed_universes)} out of {len(all_universes)} universes:")
if verbose:
for i, (rule, universes) in enumerate(rule_to_universes.items(), 1):
print(f" Rule {rule} excluded {len(universes)} universes:")
for u in universes:
print(f" {u}")
else:
valid_universes = all_universes
# Final number of universes
if rule_context_counts:
print(f"\n{len(valid_universes)} universes remain for analysis.")
# Create Python scripts for each combination
for i, combination in enumerate(valid_universes, start=1):
combination_dict = dict(combination)
# Smart formatting: only format dicts, pass raw types for strings/numbers
context = {}
for key in forking_paths.keys():
val = combination_dict.get(key, None)
# Map NaN to None for Jinja
if isinstance(val, float) and (val != val):
context[key] = None
else:
context[key] = self._format_type(val)
rendered_content = jinja_template.render(**context)
if config.get("order"):
forking_dict_str = (
"# Ordering information was provided. The ordered decisions for this universe are:\n"
f"forking_paths = {combination_dict}\n\n"
)
rendered_content = forking_dict_str + rendered_content
save_path = os.path.join(self.script_dir, f"universe_{i}.py")
with open(save_path, "w") as file:
file.write(rendered_content)
# Generate CSV file with the decisions of all universes
self._create_summary(valid_universes, keys)
# Save forking paths
with open(f"{self.results_dir}/forking_paths.pkl", "wb") as file:
pickle.dump(forking_paths, file)
# Set some attributes
self.num_universes = len(valid_universes)
self.forking_paths = forking_paths
return
[docs]
def run(self, universe=None, parallel=1, combine_results=True):
"""
Run either an individual universe or the entire multiverse
Parameters
----------
universe : None, int, list, range
Number of the universe to run. Default is None, which runs all universes
parallel : int
Number of universes to run in parallel
"""
# Get all universe scripts
scripts = [f for f in sorted(os.listdir(self.script_dir))
if f.endswith(".py") and not f.startswith("template")]
# Delete previous results (.pkl files)
for item in os.listdir(self.results_dir):
item_path = os.path.join(self.results_dir, item)
if os.path.isfile(item_path) and item_path.endswith('.pkl') and not item_path.endswith('forking_paths.pkl'):
os.remove(item_path)
# Function for parallel processing
def execute_script(file):
env = os.environ.copy()
if parallel > 1:
# Restrict each subprocess to 1 thread in case of parallel execution
# to avoid conflicts in SLURM environments
for var in ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS",
"BLIS_NUM_THREADS", "NUMEXPR_NUM_THREADS"):
env[var] = "1"
subprocess.run([sys.executable, os.path.join(self.script_dir, file)],
check=True, env=env)
def run_parallel(file_list, desc):
if parallel == 1:
for file in tqdm(file_list, desc=desc):
execute_script(file)
else:
# Use threads to avoid conflicts in SLURM environments
with ThreadPoolExecutor(max_workers=parallel) as pool:
futures = {pool.submit(execute_script, f): f for f in file_list}
with tqdm(total=len(futures), desc=desc) as pbar:
for fut in as_completed(futures):
fut.result() # re-raise any subprocess errors
pbar.update(1)
if universe is None:
print("Starting multiverse analysis for all universes...")
run_parallel(scripts, "Performing multiverse analysis:")
else:
# Subset of universes was chosen
if isinstance(universe, int):
universe_numbers = [universe]
elif isinstance(universe, (list, tuple, range)):
universe_numbers = list(universe)
else:
raise ValueError("universe_number should be None, an int, a list, tuple, or a range.")
selected_universes = [f for f in scripts if any(f.endswith(f"universe_{u}.py") for u in universe_numbers)]
print(f"Starting analysis for universe(s): {universe_numbers}...")
run_parallel(selected_universes, "Performing multiverse analysis:")
# Save all results in a single dictionary
if combine_results:
self._combine_results()
self.combined_results = True
else:
self.combined_results = False
print("The multiverse analysis completed without any errors.")
return
[docs]
def summary(self, universe=None, print_df=True, return_df=False):
"""
Print the multiverse summary to the terminal/notebook
Parameters
----------
universe : int, range, or None
The universe number(s) to display. Default is None (prints the head)
"""
multiverse_summary = self._read_summary()
if isinstance(universe, int):
multiverse_selection = multiverse_summary.iloc[universe-1]
elif isinstance(universe, range):
multiverse_selection = multiverse_summary.iloc[universe.start-1:universe.stop]
else:
multiverse_selection = multiverse_summary
if print_df:
if self._in_notebook():
from IPython.display import display
display(multiverse_selection)
else:
print(multiverse_selection)
return multiverse_selection if return_df else None
[docs]
def get_results(self, universe=None, as_df=False, expand_dec=False):
"""
Get the results of the multiverse (or a specific universe).
Parameters
----------
universe : int | None
If given, return results for that specific universe.
as_df : bool
False returns the raw dict (default).
True returns a pandas DataFrame (only valid when universe is None).
"""
if not isinstance(as_df, bool):
raise ValueError("as_df must be a boolean")
if os.path.exists(f"{self.results_dir}/multiverse_results.pkl"):
path = f"{self.results_dir}/multiverse_results.pkl"
with open(path, "rb") as file:
results = pickle.load(file)
if universe is not None:
results = results[f"universe_{universe}"]
return results
if as_df:
df = pd.DataFrame.from_dict(results, orient="index")
# keep string universe label as a column
df.insert(0, "universe", df.index)
# integer index
df.index = (df["universe"].str.replace("universe_", "", regex=False).astype(int))
df.index.name = None
if expand_dec:
# Flatten decisions into columns
flat = df["__decisions"].map(self._flatten_decisions)
dec_only_df = pd.DataFrame(list(flat), index=df.index)
dec_only_df = dec_only_df.add_prefix("__")
if dec_only_df.shape[1] == 0:
raise ValueError("Could not extract any decisions from the 'decisions' dicts.")
# Enforce decision-group order from "Decision 1..N"
decision_order = self._extract_decision_order(df.iloc[0]["__decisions"])
decision_order = [f"__{d}" for d in decision_order if f"__{d}" in dec_only_df.columns]
leftovers = [c for c in dec_only_df.columns if c not in decision_order]
decisions_list = decision_order + leftovers
# Reorder decision columns and merge
dec_only_df = dec_only_df.reindex(columns=decisions_list)
df = pd.concat([df.drop(columns=["__decisions"]), dec_only_df.reindex(columns=decisions_list)], axis=1)
return df.sort_index()
return results
else:
if universe is None:
raise ValueError(
"Multiverse results are not combined. Please specify a universe number."
)
path = f"{self.results_dir}/tmp/universe_{universe}.pkl"
with open(path, "rb") as file:
results = pickle.load(file)
if as_df:
df = pd.DataFrame([results])
df.insert(0, "universe", f"universe_{universe}")
df.index = pd.Index([int(universe)], name="universe_id")
return df
return results
[docs]
def add_results(self, name, values):
"""
Add a derived measure to the saved multiverse results.
The measure is written into ``multiverse_results.pkl`` so it becomes a regular column in
:meth:`get_results` and can be used by :meth:`multiverse_plot`, :meth:`specification_curve`,
and :meth:`integrate`. Useful for quantities computed from existing measures (e.g. the
difference between two measures). An existing measure of the same name is overwritten.
Parameters
----------
name : str
Name of the measure.
values : array-like or pandas.Series
One value per universe. A ``pandas.Series`` is aligned by its index to the universe
numbers (1..N); any other array-like is matched positionally in universe order
(``universe_1`` .. ``universe_N``). Each entry may be a scalar or an array.
Returns
-------
pandas.DataFrame
The updated results, as returned by ``get_results(as_df=True)``.
"""
results_path = os.path.join(self.results_dir, "multiverse_results.pkl")
if not os.path.exists(results_path):
raise FileNotFoundError(
"Combined results not found. Run the multiverse first so that "
"'multiverse_results.pkl' exists."
)
with open(results_path, "rb") as file:
results = pickle.load(file)
# Universe order: universe_1 .. universe_N
universe_keys = sorted(results.keys(), key=lambda k: int(k.split("_")[1]))
universe_ids = [int(k.split("_")[1]) for k in universe_keys]
# Align a Series by its index, otherwise match positionally in universe order.
if isinstance(values, pd.Series):
ordered = [values.loc[uid] for uid in universe_ids]
else:
ordered = list(values)
if len(ordered) != len(universe_keys):
raise ValueError(f"Expected {len(universe_keys)} values (one per universe), got {len(ordered)}.")
for key, val in zip(universe_keys, ordered):
results[key][name] = val
with open(results_path, "wb") as file:
pickle.dump(results, file)
return self.get_results(as_df=True)
[docs]
def remove_results(self, name):
"""
Remove a measure column from the saved multiverse results.
The measure is deleted from ``multiverse_results.pkl`` for every universe. Decision and
internal columns (prefixed ``"__"``, e.g. ``__decisions``) cannot be removed.
Parameters
----------
name : str
Name of the measure to remove.
Returns
-------
pandas.DataFrame
The updated results, as returned by ``get_results(as_df=True)``.
"""
if isinstance(name, str) and name.startswith("__"):
raise ValueError(f"Cannot remove decision/internal columns (prefixed '__'): '{name}'.")
results_path = os.path.join(self.results_dir, "multiverse_results.pkl")
if not os.path.exists(results_path):
raise FileNotFoundError(
"Combined results not found. Run the multiverse first so that "
"'multiverse_results.pkl' exists."
)
with open(results_path, "rb") as file:
results = pickle.load(file)
universe_keys = sorted(results.keys(), key=lambda k: int(k.split("_")[1]))
if not any(name in results[k] for k in universe_keys):
raise ValueError(f"Measure '{name}' was not found in the results.")
for key in universe_keys:
results[key].pop(name, None)
with open(results_path, "wb") as file:
pickle.dump(results, file)
return self.get_results(as_df=True)
[docs]
def integrate(self, measure=None, method="uniform", type="mean", agg="mean"):
"""
Integrate the multiverse results for a specific measure into a single weighted estimate.
Weighting schemes follow Cantone & Tomaselli (2024); the decision-based schemes
(mli, mli_restricted) require the decision columns, which are loaded via ``expand_dec=True``.
Parameters
----------
measure : string
Name of the measure to integrate.
method : string
Weighting scheme. Options are:
- "uniform" (default): equal weights across all universes
- "mli": maximum local influence (specifications whose neighbours -- by
Gower distance over the decisions -- give similar estimates get more weight)
- "mli_restricted": variant of MLI that ignores decisions not freely crossed
across the multiverse, so the local neighbourhood is comparable across
universes. Use this when the design includes ``remove`` rules that confine
some decisions to a subset of levels.
type : string
Type of (weighted) integration. Options are "mean" (default) or "median".
agg : string
Aggregation applied per universe when the measure holds sequences (arrays) rather
than scalars. One of "mean" (default), "median", "first", "last".
Returns
-------
tuple
(integrated_estimate : float, weights : np.ndarray)
"""
results = self.get_results(as_df=True, expand_dec=True)
results.columns = results.columns.str.lower()
measure = measure.lower() if measure else None
if measure is None:
raise ValueError("Please provide a measure to integrate.")
if measure not in results.columns:
raise ValueError(f"The measure '{measure}' was not found in the results.")
x = self._get_measure_values(results, measure, agg=agg)
weights = self._compute_weights(results, measure, method, agg=agg)
if type == "mean":
integrated_estimate = self._weighted_mean(x, weights)
elif type == "median":
integrated_estimate = self._weighted_median(x, weights)
else:
raise ValueError("type must be 'mean' or 'median'")
return integrated_estimate, weights
[docs]
def compare_integration(self, measure, true_value=None, agg="mean", sigfigs=3):
"""
Compare all integration schemes for a measure and return a summary table.
Parameters
----------
measure : string
Name of the measure to integrate.
true_value : float or None
If given, absolute errors of each estimate against this value are added.
agg : string
Per-universe aggregation for sequence-valued measures (see ``integrate``).
sigfigs : int or None
Round the numeric table columns to this many significant figures (default 3).
Use None to keep full precision. Only affects the table; ``weights`` is unrounded.
Returns
-------
tuple
(table : pandas.DataFrame, weights : dict[str, np.ndarray])
The table has one row per scheme with the weighted median/mean and the Gini
coefficient of the weights (0 = uniform, 1 = concentrated). If ``true_value`` is
given, absolute errors of the median/mean are added (``err_median``, ``err_mean``).
"""
results = self.get_results(as_df=True, expand_dec=True)
results.columns = results.columns.str.lower()
measure = measure.lower()
if measure not in results.columns:
raise ValueError(f"The measure '{measure}' was not found in the results.")
y = self._get_measure_values(results, measure, agg=agg)
schemes = [("Uniform", "uniform"), ("MLI", "mli")]
if self._untestable_decisions(results):
schemes.append(("MLI (restricted)", "mli_restricted"))
rows, weights = [], {}
for name, method in schemes:
w = self._compute_weights(results, measure, method, agg=agg)
y_median = self._weighted_median(y, w)
y_mean = self._weighted_mean(y, w)
row = {"Scheme": name, "median": y_median, "mean": y_mean,
"gini_w": self._gini(w)}
if true_value is not None:
row["err_median"] = abs(y_median - true_value)
row["err_mean"] = abs(y_mean - true_value)
rows.append(row)
weights[name] = w
table = pd.DataFrame(rows)
if sigfigs is not None:
num = table.select_dtypes("number").columns
table[num] = table[num].apply(
lambda c: c.map(lambda v: v if pd.isna(v) else float(f"{v:.{sigfigs}g}")))
return table, weights
# Plotting methods
[docs]
def visualize(self,
universe=None,
figsize=(8,5),
node_size=1500,
text_size=12,
max_label_len=15,
label_offset=0.04,
cmap="Set2",
exclude_single=False
):
"""
Visualize the multiverse as a network.
Parameters
----------
universe : int or None
The universe to highlight in the network. If None or if the provided universe number
is higher than available universes, the entire multiverse is shown without highlighting.
Default is None.
figsize : tuple
Size of the figure. Default is (8,5).
node_size : int
Size of the nodes. Default is 1500.
text_size : int
Size of the text labels. Default is 12.
max_label_len : int
Maximum length of decision labels before wrapping.
label_offset : float
Offset multiplier for decision labels.
cmap : str
Colormap to use for the nodes. Default is "Set2".
exclude_single : bool
Whether to exclude parameters with only one unique option.
"""
# Read the CSV summary into a DataFrame.
multiverse_summary = self._read_summary()
# Identify value and decision columns.
value_columns = [col for col in multiverse_summary.columns if col.startswith("Value") or col == "Universe"]
decision_columns = [col for col in multiverse_summary.columns if col.startswith("Decision")]
# Create DataFrames for values and decisions.
multiverse_values = multiverse_summary[value_columns]
multiverse_decision = multiverse_summary[decision_columns]
# Recursive function to add decision nodes and connect them hierarchically.
def add_hierarchical_decision_nodes(G, root_node, parameters, level=0, exclude_single=exclude_single):
if level >= len(parameters):
return G # No more parameters to process
decision, options = parameters[level]
# Only add the node if there are multiple options or if we are not excluding single-option parameters.
if not exclude_single or len(options) > 1:
for option in options:
node_name = f"{decision}: {option}"
G.add_node(node_name, level=level + 1, option=option, decision=decision)
G.add_edge(root_node, node_name)
# Recurse to add the next level.
G = add_hierarchical_decision_nodes(G, node_name, parameters, level + 1, exclude_single)
else:
# If excluding single-option parameters, skip to the next parameter.
G = add_hierarchical_decision_nodes(G, root_node, parameters, level + 1, exclude_single)
return G
# Map each Value column (ignoring Universe) to its corresponding Decision column.
# Assumes the order of Value columns (except Universe) matches the order of Decision columns.
value_to_decision_map = {val: dec for val, dec in zip(value_columns[1:], decision_columns)}
# Build parameters: For each Value column (except Universe) with more than one unique option,
# use the decision name (from the map) and the unique values.
parameters = [
(value_to_decision_map.get(col, col), multiverse_values[col].unique())
for col in multiverse_values.columns[1:]
]
# Remove any NaN values from the unique options.
parameters = [(dec, options[pd.notna(options)]) for dec, options in parameters]
# Initialize the directed graph and add the root node.
G = nx.DiGraph()
root_node = "Start"
G.add_node(root_node, level=0, label="Start")
G = add_hierarchical_decision_nodes(G, root_node, parameters, exclude_single=exclude_single)
# Build the list of node names that correspond to the specified universe.
values = ["Start"]
if universe is not None:
filtered_df = multiverse_values[multiverse_values["Universe"] == f"Universe_{universe}"]
# If the provided universe number is out-of-range, reset universe to None.
if filtered_df.empty:
universe = None
else:
row_dict = filtered_df.iloc[0].to_dict()
for col, value in row_dict.items():
# Skip the Universe column.
if col == "Universe":
continue
# Convert the Value column name to its corresponding decision name.
decision_name = value_to_decision_map.get(col, col)
values.append(f"{decision_name}: {value}")
# Determine which edges are part of the selected universe's path.
universe_edges = [(src, tgt) for src, tgt in G.edges if src in values and tgt in values]
edge_colors = []
edge_widths = []
for edge in G.edges:
if edge in universe_edges:
edge_colors.append("black")
edge_widths.append(2.5)
else:
edge_colors.append("gray")
edge_widths.append(1.0)
# Create the figure and axis.
fig, ax = plt.subplots(figsize=figsize)
title_str = f"Universe {universe}" if universe is not None else "Multiverse"
ax.set_title(title_str, size=14, fontweight="bold")
# Use a multipartite layout based on the 'level' attribute.
pos = nx.multipartite_layout(G, subset_key="level")
# Create a colormap based on levels.
levels = set(nx.get_node_attributes(G, "level").values())
num_levels = len(levels)
first_color = "lightgray"
colormap = plt.cm.get_cmap(cmap, num_levels)
# First level gets a fixed color; subsequent levels are derived from the colormap.
colors = [first_color] + [colormap(i / (num_levels - 1)) for i in range(num_levels)]
level_colors = {level: colors[i] for i, level in enumerate(sorted(levels))}
# Draw edges.
nx.draw(G, pos, with_labels=False, node_size=node_size - 10, node_color="white",
arrows=True, edge_color=edge_colors, width=edge_widths, ax=ax)
# Draw nodes with colors based on their level.
for level in levels:
nodes_at_level = [node for node in G.nodes if G.nodes[node].get("level") == level]
nx.draw_networkx_nodes(G, pos, nodelist=nodes_at_level, node_size=node_size,
node_color=[level_colors[level] for _ in nodes_at_level], ax=ax)
# Prepare labels: For non-root nodes, use the 'option' text; for the root, use its label.
def wrap_label(text, max_len=max_label_len):
return "\n".join(textwrap.wrap(text, max_len))
node_labels = {}
for node in G.nodes:
if node != root_node and "option" in G.nodes[node]:
raw_label = str(G.nodes[node]["option"])
node_labels[node] = wrap_label(raw_label, max_len=max_label_len)
else:
node_labels[node] = G.nodes[node].get("label", node)
nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=text_size, ax=ax)
# Calculate an offset based on the maximum number of nodes at any level.
node_nums = [len([n for n in G.nodes if G.nodes[n].get("level") == level]) for level in levels]
max_nodes = max(node_nums) if node_nums else 1
# Annotate the bottom-most node at each level with the decision value.
for level in levels:
nodes_at_level = [node for node in G.nodes if G.nodes[node].get("level") == level]
if nodes_at_level:
# Choose the node with the lowest y-position (bottom-most)
bottom_node = min(nodes_at_level, key=lambda node: pos[node][1])
if bottom_node != root_node and "decision" in G.nodes[bottom_node]:
decision = G.nodes[bottom_node]["decision"]
if decision in multiverse_decision.columns:
decision_value = multiverse_decision[decision].iloc[0]
else:
decision_value = decision
x, y = pos[bottom_node]
ax.text(x, y - label_offset * max_nodes, decision_value,
horizontalalignment="center", fontsize=text_size, fontweight="bold")
# Save the figure to the results directory.
plt.savefig(f"{self.results_dir}/multiverse.png", bbox_inches="tight")
return self._handle_figure_returns(fig)
[docs]
def specification_curve(self,
measure: str,
baseline: float | None = None,
p_value: float | str | bool | None = None,
ci: int | str | bool | None = None,
smooth_ci: bool = True,
title: str | None = None,
name_map: dict | None = None,
cmap: str = "Set3",
linewidth: float = 2,
figsize: tuple | None = None,
height_ratio: tuple = (2, 1),
fontsize: int = 10,
dotsize: int = 50,
line_pad: float = 0.3,
fname: str = "specification_curve",
ftype: str = "pdf",
dpi: int = 300,
p_threshold: float = 0.05,
ci_level_default: int = 95,
):
"""
Create a specification curve plot from multiverse results.
The figure is saved to the results directory as "specification_curve.{ftype}".
Notes
-----
- If `p_value` is float or True, `measure` must contain list/array samples per universe.
- If `ci` is int or True, `measure` must contain list/array samples per universe.
- If `p_value` is a string, it is interpreted as a p-value column (numeric) or a significance flag (bool).
- If `ci` is a string, it must contain per-universe (lower, upper) bounds.
Returns
-------
Any
The figure if in a .py script.
None if in a .ipynb notebook (the figure is saved and displayed inline)
"""
def _map_name(key: str) -> str:
return name_map.get(key, key) if isinstance(name_map, dict) else key
def _map_level(decision: str, lvl) -> str:
# Option/level labels. Prefer a decision-qualified key ("decision/option") so the same
# option string can be relabelled differently per decision, then fall back to a bare
# key, then the raw value.
if not isinstance(name_map, dict):
return str(lvl)
qualified = f"{decision}/{lvl}"
if qualified in name_map:
return str(name_map[qualified])
return str(name_map.get(lvl, lvl))
def _extract_decision_order(decisions_obj) -> list[str]:
if not isinstance(decisions_obj, dict):
return []
dec_block = decisions_obj.get("__decisions")
d = dec_block if isinstance(dec_block, dict) else decisions_obj
order = []
i = 1
while f"Decision {i}" in d:
order.append(str(d[f"Decision {i}"]))
i += 1
return order
# ------------------------------------------------------------
# Load df and basic checks
df = self.get_results(as_df=True).copy()
if measure not in df.columns:
raise ValueError(f"'{measure}' not found in multiverse results.")
if "__decisions" not in df.columns:
raise ValueError("Expected a 'decisions' column containing decision dictionaries.")
# Keep raw before scalarising (for CI / p-value computation)
raw_measure = df[measure].copy()
# Scalarise outcome for plotting (mean over list/array; scalar -> float)
df[measure] = df[measure].apply(
lambda x: float(np.mean(x)) if isinstance(x, (list, tuple, np.ndarray)) else float(x)
)
if df[measure].isna().any():
raise ValueError(f"NaNs detected in '{measure}' after reduction to mean.")
# ------------------------------------------------------------
# Flatten decisions -> columns
flat = df["__decisions"].map(self._flatten_decisions)
dec_df = pd.DataFrame(list(flat), index=df.index)
if dec_df.shape[1] == 0:
raise ValueError("Could not extract any decisions from the 'decisions' dicts.")
# decision-group order from stored Decision 1..N
decision_order = _extract_decision_order(df.iloc[0]["__decisions"])
decision_order = [d for d in decision_order if d in dec_df.columns]
leftovers = [c for c in dec_df.columns if c not in decision_order]
decision_cols_all = decision_order + leftovers
df = pd.concat([df.drop(columns=["__decisions"]), dec_df], axis=1)
# Keep only decisions with >1 unique option
decision_cols = [c for c in decision_cols_all if df[c].nunique(dropna=True) > 1]
if len(decision_cols) == 0:
raise ValueError("No decisions with more than one option found; nothing to plot in the bottom panel.")
# ------------------------------------------------------------
# Sort by outcome
sort_idx = df[measure].sort_values().index
df = df.loc[sort_idx].reset_index(drop=True)
raw_measure_sorted = raw_measure.loc[sort_idx].reset_index(drop=True) # <-- aligned raw samples
n = len(df)
x_values = np.arange(n)
y_values = df[measure].to_numpy(dtype=float)
# ------------------------------------------------------------
# Significance handling
baseline_for_tests = 0.0 if baseline is None else float(baseline)
sig_color = "#018532"
significant = np.zeros(n, dtype=bool)
pvals = None
if p_value is None:
pass
elif isinstance(p_value, str):
if p_value not in df.columns:
raise ValueError(f"p_value column '{p_value}' not found in results df.")
col = df[p_value]
if pd.api.types.is_bool_dtype(col):
significant = col.fillna(False).to_numpy(dtype=bool)
else:
pvals = pd.to_numeric(col, errors="coerce").to_numpy(dtype=float)
if np.isnan(pvals).any():
raise ValueError(f"NaNs detected in p-value column '{p_value}'.")
significant = pvals < float(p_threshold)
else:
thr = float(p_threshold) if p_value is True else float(p_value)
pvals = self._one_sample_ttest(raw_measure_sorted, baseline_for_tests)
if np.isnan(pvals).any():
raise ValueError(
"Cannot compute p-values for some universes: "
f"'{measure}' must contain list/array samples per universe when p_value is float/True."
)
significant = pvals < thr
top_colors = np.where(significant, sig_color, "black")
# ------------------------------------------------------------
# CI handling
ci_lower = None
ci_upper = None
if ci is None:
pass
elif isinstance(ci, str):
if ci not in df.columns:
raise ValueError(f"CI column '{ci}' not found in results df.")
bounds = df[ci].to_list()
lows, highs = [], []
for b in bounds:
if isinstance(b, (list, tuple, np.ndarray)) and len(b) == 2:
lo, hi = float(b[0]), float(b[1])
if not (np.isfinite(lo) and np.isfinite(hi)):
raise ValueError(f"Non-finite CI bounds in column '{ci}'.")
lows.append(lo)
highs.append(hi)
else:
raise ValueError(f"CI column '{ci}' must contain (lower, upper) tuples/lists per universe.")
ci_lower = np.asarray(lows, dtype=float)
ci_upper = np.asarray(highs, dtype=float)
else:
level = int(ci_level_default) if ci is True else int(ci)
lows, highs = [], []
for rv, mean_val in zip(raw_measure_sorted, y_values):
if not isinstance(rv, (list, tuple, np.ndarray)):
raise ValueError(
f"Cannot compute CI because '{measure}' does not contain per-universe samples. "
"Provide a CI column instead (ci='colname')."
)
arr = np.asarray(rv, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) < 4:
raise ValueError(
f"Cannot compute CI for a universe with <4 finite samples in '{measure}'. "
"Provide a CI column instead (ci='colname') or store more samples."
)
sem = np.std(arr, ddof=1) / np.sqrt(len(arr))
half = sem * stats.t.ppf((1.0 + level / 100.0) / 2.0, len(arr) - 1)
lows.append(float(mean_val - half))
highs.append(float(mean_val + half))
ci_lower = np.asarray(lows, dtype=float)
ci_upper = np.asarray(highs, dtype=float)
# ------------------------------------------------------------
# Figure size (auto if None)
if title is None:
title = "Specification Curve"
if figsize is None:
num_options = sum(df[c].nunique(dropna=True) for c in decision_cols)
figsize = (max(8, n * 0.07), max(6, num_options * 0.35))
# ------------------------------------------------------------
# Bottom panel layout (decision order enforced; options order-of-appearance)
decision_positions = {}
display_labels = []
yticks = []
line_ends = []
key_positions = {}
y_max = 0
space_between_groups = 1
num_groups = len(decision_cols)
cmap_obj = plt.cm.get_cmap(cmap, num_groups if num_groups > 0 else 1)
group_colors = [cmap_obj(i) for i in range(num_groups)]
for group_idx, decision in enumerate(decision_cols):
options = pd.unique(df[decision].astype("object"))
options = [o for o in options if pd.notna(o)]
group_label = _map_name(decision)
key_positions[group_label] = y_max + len(options) / 2.0 - 0.5
for opt in options:
yticks.append(y_max)
display_labels.append(_map_level(decision, opt))
decision_positions[(decision, opt)] = (y_max, group_idx)
y_max += 1
line_ends.append(y_max)
y_max += space_between_groups
# ------------------------------------------------------------
# Plot
sns.set_theme(style="whitegrid")
fig, ax = plt.subplots(
2, 1, figsize=figsize, gridspec_kw={"height_ratios": height_ratio}, sharex=True
)
# Bottom axes setup
ax[1].set_yticks(yticks)
ax[1].set_yticklabels(display_labels, fontsize=fontsize)
ax[1].tick_params(axis="y", labelsize=fontsize)
ax[1].set_ylim(-1, y_max)
ax[1].xaxis.grid(False)
ax[1].invert_yaxis()
# Left padding for group labels/lines
fig.canvas.draw()
renderer = fig.canvas.get_renderer()
trans1 = transforms.blended_transform_factory(ax[1].transAxes, ax[1].transData)
tick_extents = [lbl.get_window_extent(renderer=renderer) for lbl in ax[1].get_yticklabels()]
max_extent = max(tick_extents, key=lambda bb: bb.width)
x_start_pixel = max_extent.x0
x_start_axes1 = ax[1].transAxes.inverted().transform((x_start_pixel, 0))[0]
trans0 = transforms.blended_transform_factory(ax[0].transAxes, ax[0].transData)
tick_extents0 = [lbl.get_window_extent(renderer=renderer) for lbl in ax[0].get_yticklabels()]
if tick_extents0:
max_extent0 = max(tick_extents0, key=lambda bb: bb.width)
x_start_pixel0 = max_extent0.x0
x_start_axes0 = ax[0].transAxes.inverted().transform((x_start_pixel0, 0))[0]
else:
x_start_axes0 = x_start_axes1
min_x_start_axes = min(x_start_axes1, x_start_axes0)
padding = -line_pad * min_x_start_axes
line_offset = min_x_start_axes - padding
# Group labels + separators
for key, pos in key_positions.items():
ax[1].text(
line_offset - padding, pos, key, transform=trans1,
ha="right", va="center", fontweight="bold", fontsize=fontsize
)
s = -0.5
for line_end in line_ends:
e = line_end - 0.5
ax[1].add_line(
mlines.Line2D([line_offset, line_offset], [s, e], color="black", lw=1, transform=trans1, clip_on=False)
)
s = line_end + 0.5
# Top scatter
ax[0].scatter(x_values, y_values, c=top_colors, s=dotsize, edgecolors=top_colors, zorder=3)
# CI drawing
if ci_lower is not None and ci_upper is not None:
if smooth_ci and len(ci_lower) >= 4:
spline_lo = make_interp_spline(x_values, ci_lower.astype(float), k=3)
spline_hi = make_interp_spline(x_values, ci_upper.astype(float), k=3)
x_smooth = np.linspace(x_values.min(), x_values.max(), 500)
ax[0].fill_between(x_smooth, spline_lo(x_smooth), spline_hi(x_smooth), color="gray", alpha=0.3)
else:
for i in range(n):
ax[0].plot([i, i], [ci_lower[i], ci_upper[i]], color="gray", linewidth=linewidth, zorder=2)
# Baseline line
legend_items = []
if baseline is not None:
ax[0].hlines(float(baseline), xmin=-2, xmax=n + 1, linestyles="--", lw=2, colors="black", zorder=1)
legend_items.append(mlines.Line2D([], [], linestyle="--", color="black", linewidth=2, label="Baseline"))
# Measure label + left line
ymin, ymax = ax[0].get_ylim()
ycenter = (ymin + ymax) / 2.0
ax[0].text(
line_offset - padding, ycenter, _map_name(measure), transform=trans0,
ha="right", va="center", fontweight="bold", fontsize=fontsize
)
ax[0].add_line(
mlines.Line2D([line_offset, line_offset], [ymin, ymax], color="black", lw=1, transform=trans0, clip_on=False)
)
# Bottom markers (vectorised melt)
long = df[decision_cols].reset_index().melt(
id_vars="index", value_vars=decision_cols, var_name="decision", value_name="option"
).rename(columns={"index": "x"})
bottom_x, bottom_y, bottom_c = [], [], []
for row in long.itertuples(index=False):
opt = row.option
if pd.isna(opt):
continue
key = (row.decision, opt)
if key in decision_positions:
y_pos, grp_idx = decision_positions[key]
bottom_x.append(row.x)
bottom_y.append(y_pos)
bottom_c.append(group_colors[grp_idx] if num_groups > 0 else "black")
if bottom_x:
ax[1].scatter(np.asarray(bottom_x), np.asarray(bottom_y), c=np.asarray(bottom_c), s=dotsize, marker="o")
# Top panel styling
ax[0].set_title(title, fontweight="bold", fontsize=fontsize + 2)
ax[0].xaxis.grid(False)
ax[0].set_xticks([])
ax[0].set_xlim(-1, n)
ax[0].tick_params(axis="y", labelsize=fontsize)
# Legend for CI
if ci_lower is not None and ci_upper is not None:
if isinstance(ci, int):
ci_lab = f"{ci}% CI"
elif ci is True:
ci_lab = f"{ci_level_default}% CI"
else:
ci_lab = "CI"
legend_items.append(mpatches.Patch(facecolor="gray", edgecolor="white", label=ci_lab))
# Legend for significance (only if p_value requested)
if p_value is not None:
if significant.any():
legend_items.append(
mlines.Line2D([], [], linestyle="None", marker="o", markersize=9,
markerfacecolor=sig_color, markeredgecolor=sig_color,
label="significant")
)
if (~significant).any():
legend_items.append(
mlines.Line2D([], [], linestyle="None", marker="o", markersize=9,
markerfacecolor="black", markeredgecolor="black",
label="not significant")
)
if legend_items:
ax[0].legend(handles=legend_items, loc="upper left", fontsize=fontsize, frameon=False)
plt.savefig(f"{self.results_dir}/{fname}.{ftype}", bbox_inches="tight", dpi=dpi)
sns.reset_orig()
return self._handle_figure_returns(fig)
[docs]
def multiverse_plot(self,
measure: str,
n_bins: int = 20,
sig_col: str | None = None,
sig_threshold: float = 0.05,
baseline: float | None = None,
reference: float | None = None,
reference_label: str | None = None,
name_map: dict | None = None,
figsize: tuple = (7, 9),
fname: str = "multiverse_plot",
ftype: str = "pdf",
dpi: int = 300,
):
"""
Multiverse plot as introduced by Krähmer & Young (2026).
This plot visualises the distribution of multiverse outcomes together with
heatmap strips showing how different analytic choices relate to the outcome.
For each decision level, the average change in the outcome relative to the
reference level is shown on the right.
The figure is saved to the results directory as "{fname}.{ftype}".
References
----------
Krähmer, D., & Young, C. (2026). Visualizing vastness: Graphical methods for
multiverse analysis. PLOS One, 21(2). https://doi.org/10.1371/journal.pone.0339452
Parameters
----------
measure : str
Name of the outcome/measure column in the multiverse results.
Entries may be scalars or lists/arrays (in which case the mean is used).
n_bins : int, optional
Number of bins used to discretise the outcome axis for the heatmap strips.
sig_col : str | None, optional
Column indicating statistical significance. If provided:
- boolean values are interpreted directly (True = significant),
- numeric values are compared against ``sig_threshold``.
If None, significance is computed from per-universe sample arrays with a one-sample
t-test against ``baseline`` (identical to ``specification_curve``), thresholded by
``sig_threshold``. This requires ``measure`` to hold array samples per universe;
scalar measures cannot be tested and no overlay is drawn.
sig_threshold : float, optional
Threshold applied to a numeric ``sig_col`` or to the computed p-values when
``sig_col`` is None (default is 0.05).
baseline : float | None, optional
Baseline value for the outcome. If provided, a vertical dashed reference line is
drawn at this value (extending through the strips to the x-axis), and it is the null
used for the one-sample t-test when significance is computed (defaults to 0).
reference : float | None, optional
A single reference outcome value to highlight on the density curve, e.g. the estimate
of one specific universe (such as a previously published single-pipeline study). It is
drawn as a marker sitting on the density at that x-position, showing where that single
result falls within the full multiverse distribution.
reference_label : str | None, optional
Legend label for ``reference`` (e.g. ``"Popp et al."``). Defaults to ``"Reference"``.
name_map : dict | None, optional
Optional mapping for display names. Keys may include the measure name, decision names,
and option/level values. Values are the desired display labels. When the same option
string is used by more than one decision, use a decision-qualified key
``"decision/option"`` (e.g. ``"family_splitting/all"``) to relabel it per decision;
otherwise a bare option key (e.g. ``"spearman"``) is applied wherever it appears.
figsize : tuple, optional
Figure size passed to Matplotlib (width, height) in inches.
ftype : str, optional
File type used when saving the figure (e.g., ``"pdf"``, ``"png"``).
dpi : int, optional
Resolution (dots per inch) used when saving the figure.
Returns
-------
Any
The figure if in a .py script.
None if in a .ipynb notebook (the figure is saved and displayed inline)
"""
# Helpers
def _map_name(key: str) -> str:
return name_map.get(key, key) if isinstance(name_map, dict) else key
def _map_level(decision: str, lvl) -> str:
# Option/level labels. Prefer a decision-qualified key ("decision/option") so the same
# option string can be relabelled differently per decision (e.g. "all" under
# family_splitting vs movie_frames), then fall back to a bare key, then the raw value.
if not isinstance(name_map, dict):
return str(lvl)
qualified = f"{decision}/{lvl}"
if qualified in name_map:
return str(name_map[qualified])
return str(name_map.get(lvl, lvl))
def _kde_density(values, grid, x_min, x_max):
values = np.asarray(values, dtype=float)
values = values[np.isfinite(values)]
if len(values) < 2:
hist, edges = np.histogram(values, bins=20, range=(x_min, x_max), density=True)
mids = (edges[:-1] + edges[1:]) / 2
return np.interp(grid, mids, hist, left=0, right=0)
return stats.gaussian_kde(values)(grid)
def _build_heatmap_data(df_in, varname, outcome_var, breaks):
tmp = df_in[[varname, outcome_var]].copy()
tmp["outcome_bin"] = pd.cut(tmp[outcome_var], bins=breaks, include_lowest=True)
all_bins = tmp["outcome_bin"].cat.categories
levels = tmp[varname].cat.categories
idx = pd.MultiIndex.from_product([all_bins, levels], names=["outcome_bin", varname])
counts = (
tmp.groupby(["outcome_bin", varname], observed=False)
.size()
.reindex(idx, fill_value=0)
.reset_index(name="n")
)
total_by_bin = counts.groupby("outcome_bin")["n"].transform(lambda x: x.sum() if x.sum() > 0 else 1)
counts["prop"] = counts["n"] / total_by_bin
bin_to_idx = {iv: i for i, iv in enumerate(all_bins)}
counts["bin_idx"] = counts["outcome_bin"].map(bin_to_idx)
return counts
###############
# General setup
df = self.get_results(as_df=True) # Load results
if measure not in df.columns:
raise ValueError(
f"'{measure}' not found in multiverse results. Make sure to save it in the multiverse template."
)
if "__decisions" not in df.columns:
raise ValueError("Expected a 'decisions' column containing decision dictionaries.")
if n_bins > len(df):
raise ValueError(f"n_bins ({n_bins}) cannot be higher than the number of universes ({len(df)}).")
# Significance handling
if sig_col is not None:
if sig_col not in df.columns:
raise ValueError(f"sig_col='{sig_col}' not found in results df.")
s = df[sig_col]
if pd.api.types.is_bool_dtype(s):
significant = s.fillna(False).to_numpy(dtype=bool)
else:
s_num = pd.to_numeric(s, errors="coerce")
significant = (s_num < sig_threshold).fillna(False).to_numpy(dtype=bool)
else:
# No column: compute significance from per-universe sample arrays (one-sample t-test
# against `baseline`), identical to specification_curve. Scalar/too-short samples
# yield NaN p-values and are treated as not significant (so scalar measures get no overlay).
pvals = self._one_sample_ttest(df[measure], baseline)
significant = pvals < sig_threshold
df = df.copy()
df["__significant"] = significant
# Scalarise outcome: mean over list/array; scalar -> float
df[measure] = df[measure].apply(
lambda x: float(np.mean(x)) if isinstance(x, (list, tuple, np.ndarray)) else float(x)
)
if df[measure].isna().any():
raise ValueError(f"NaNs detected in '{measure}' after reduction to mean.")
# Flatten decisions into columns
flat = df["__decisions"].map(self._flatten_decisions)
dec_only_df = pd.DataFrame(list(flat), index=df.index)
dec_only_df = dec_only_df.add_prefix("__")
if dec_only_df.shape[1] == 0:
raise ValueError("Could not extract any decisions from the 'decisions' dicts.")
# Enforce decision-group order from "Decision 1..N"
decision_order = self._extract_decision_order(df.iloc[0]["__decisions"])
decision_order = [f"__{d}" for d in decision_order if f"__{d}" in dec_only_df.columns]
leftovers = [c for c in dec_only_df.columns if c not in decision_order]
decisions_list = decision_order + leftovers
# Reorder decision columns and merge
dec_only_df = dec_only_df.reindex(columns=decisions_list)
df = pd.concat([df.drop(columns=["__decisions"]), dec_only_df.reindex(columns=decisions_list)], axis=1)
# Options within each decision: keep order-of-appearance (do NOT reverse)
for d in decisions_list:
s = df[d].astype("object")
cats = list(pd.unique(s))
df[d] = pd.Categorical(s, categories=cats, ordered=True)
# Average increase labels (mean difference vs reference = first option)
avg_diff_lookup = {}
for varname in decisions_list:
levels_all = list(df[varname].cat.categories)
if len(levels_all) == 0:
continue
ref = levels_all[0]
ref_mean = float(df.loc[df[varname] == ref, measure].mean())
lvl_map = {}
for lvl in levels_all:
if lvl == ref:
lvl_map[lvl] = "Ref."
else:
lvl_mean = float(df.loc[df[varname] == lvl, measure].mean())
lvl_map[lvl] = f"{(lvl_mean - ref_mean):+.2f}"
avg_diff_lookup[varname] = lvl_map
##################
# Top density plot
multiverse_outcome = df[measure].to_numpy(dtype=float)
x_min = float(np.min(multiverse_outcome))
x_max = float(np.max(multiverse_outcome))
if not np.isfinite(x_min) or not np.isfinite(x_max) or x_min == x_max:
raise ValueError(f"'{measure}' must have finite, non-constant values.")
x_pad = 0.02 * np.ptp(multiverse_outcome)
common_xlim = (x_min - x_pad, x_max + x_pad)
breaks = np.linspace(x_min, x_max, n_bins + 1)
grid_x = np.linspace(x_min, x_max, 1000)
y_all = _kde_density(multiverse_outcome, grid_x, x_min, x_max)
if df["__significant"].any():
sig_vals = df.loc[df["__significant"], measure].to_numpy(dtype=float)
y_sig = _kde_density(sig_vals, grid_x, x_min, x_max)
sig_share = float(df["__significant"].mean())
y_sig_scaled = np.minimum(y_sig * sig_share, y_all)
else:
y_sig_scaled = None
# Plot layout
DENSITY_RATIO = 0.5
strip_levels_counts = [len(df[v].cat.categories) for v in decisions_list]
total_strip_levels = sum(strip_levels_counts)
density_height = max(1, int(DENSITY_RATIO * total_strip_levels))
n_rows = density_height + sum([2 + c for c in strip_levels_counts]) + 1
fig = plt.figure(figsize=figsize)
gs = gridspec.GridSpec(n_rows, 1, figure=fig, hspace=0) # 0.75 if baseline label
current_row = 0
tab10 = plt.get_cmap("tab10").colors
base_colors = {v: tab10[i % len(tab10)] for i, v in enumerate(decisions_list)}
# Density panel
ax_density = fig.add_subplot(gs[current_row : current_row + density_height, 0])
current_row += density_height
ax_density.plot(grid_x, y_all, label="All", linewidth=1.5)
if y_sig_scaled is not None:
ax_density.fill_between(grid_x, y_sig_scaled, alpha=0.4, label="Significant", color="tomato")
# Reference value (e.g. one published single-pipeline estimate) as a dot on the density
if reference is not None:
y_ref = float(np.interp(reference, grid_x, y_all))
ax_density.plot([reference], [y_ref], marker="o", markersize=8, linestyle="none",
color="black", zorder=5,
label=reference_label if reference_label else "Reference")
ax_density.set_xlim(*common_xlim)
ax_density.set_xticks([])
ax_density.set_xticklabels([])
ax_density.set_ylabel("Density")
ax_density.set_yticks([])
ax_density.legend(frameon=False)
ax_density.grid(False)
ax_density.set_frame_on(False)
ax_density.plot([common_xlim[0], common_xlim[1]], [0, 0], linewidth=1, color="black")
# Baseline
if baseline is not None:
ax_density.plot([baseline, baseline], [0, float(np.max(y_all))], linestyle="--", linewidth=1, color="black")
#ax_density.text(baseline, -0.1, str(baseline),ha="center", va="top") if baseline is not None else None
##############################
# Decision/options strip plots
x_label_pos = common_xlim[1] + 0.01 * (x_max - x_min)
for varname in decisions_list:
levels = list(df[varname].cat.categories) # keep option order
n_levels = len(levels)
n_rows_strip = 2 + n_levels # blank + header + levels
ax = fig.add_subplot(gs[current_row : current_row + n_rows_strip, 0])
current_row += n_rows_strip
counts = _build_heatmap_data(df, varname, measure, breaks)
H = np.zeros((n_levels + 2, len(breaks) - 1))
for i_level, lvl in enumerate(levels):
sub = counts[counts[varname] == lvl]
prop_by_bin = dict(zip(sub["bin_idx"], sub["prop"]))
for b in range(len(breaks) - 1):
H[i_level + 2, b] = prop_by_bin.get(b, 0.0)
cmap = LinearSegmentedColormap.from_list("white_to_color", [(1, 1, 1), base_colors[varname]])
X = breaks
Y = np.arange(n_levels + 3) # edges
ax.pcolormesh(X, Y, H, shading="flat", cmap=cmap, vmin=0.0, vmax=1.0)
ax.set_ylim(n_levels + 2, 0)
# Extend the baseline down through the strip plot)
#if baseline is not None:
# ax.axvline(baseline, linestyle="--", linewidth=1, color="black", zorder=10)
# Right-side mean diffs
for i_level, lvl in enumerate(levels):
lab = avg_diff_lookup.get(varname, {}).get(lvl, "")
ax.text(x_label_pos, i_level + 2.5, lab, va="center", ha="left", clip_on=False)
# y ticks
ytick_pos = [0.5, 1.5] + [i + 2.5 for i in range(n_levels)]
display_name = {c: c.replace("__", "", 1) for c in dec_only_df.columns}
dname = display_name[varname]
ytick_labels = ["", _map_name(dname)] + [_map_level(dname, lvl) for lvl in levels]
ax.set_yticks(ytick_pos)
ax.set_yticklabels(ytick_labels)
ticklbls = ax.get_yticklabels()
if len(ticklbls) > 1:
ticklbls[1].set_fontweight("bold")
ax.tick_params(axis="y", length=0)
ax.set_xlim(*common_xlim)
ax.tick_params(axis="x", which="both", bottom=False, labelbottom=False)
for spine in ["top", "right", "left", "bottom"]:
ax.spines[spine].set_visible(False)
#############################################
# Bottom x-axis (the multiverse plot measure)
bottom_xaxis = fig.add_subplot(gs[current_row : current_row + 1, 0])
current_row += 1
bottom_xaxis.set_xlim(*common_xlim)
bottom_xaxis.set_yticks([])
bottom_xaxis.tick_params(axis="y", left=False, labelleft=False)
bottom_xaxis.set_xlabel(_map_name(measure))
bottom_xaxis.set_xticks([x_min, (x_min + x_max) / 2.0, x_max])
for spine in ["top", "left", "right"]:
bottom_xaxis.spines[spine].set_visible(False)
bottom_xaxis.spines["bottom"].set_visible(True)
bottom_xaxis.tick_params(axis="x", bottom=True, labelbottom=True)
#if baseline is not None:
# bottom_xaxis.axvline(baseline, linestyle="--", linewidth=1, color="black", zorder=10)
# Save and return
plt.savefig(f"{self.results_dir}/{fname}.{ftype}", bbox_inches="tight", dpi=dpi)
return self._handle_figure_returns(fig)
[docs]
def plot_integration(self,
measure,
weights=None,
true_value=None,
agg="mean",
xlim=None,
xlabel=None,
figsize=(7, 3),
title=None,
fname="density",
ftype="pdf",
dpi=300):
"""
Plot weighted density distributions of a measure for each integration scheme.
Parameters
----------
measure : string
Name of the measure to integrate.
weights : dict[str, np.ndarray] or None
Per-scheme weights (e.g. from ``compare_integration``). Computed automatically if None.
true_value : float or None
If given, a vertical reference line is drawn at this value.
agg : string
Per-universe aggregation for sequence-valued measures (see ``integrate``).
xlim, figsize, title, fname, ftype, dpi
Plot/save options. The figure is saved to the results directory as ``fname.ftype``.
Returns
-------
Any
The figure if in a .py script, None if in a notebook (shown inline).
"""
results = self.get_results(as_df=True, expand_dec=True)
results.columns = results.columns.str.lower()
measure_l = measure.lower()
if measure_l not in results.columns:
raise ValueError(f"The measure '{measure}' was not found in the results.")
y = self._get_measure_values(results, measure_l, agg=agg)
if weights is None:
_, weights = self.compare_integration(measure, true_value=true_value, agg=agg)
bw_adjust = {"Uniform": 1, "MLI": 1, "MLI (restricted)": 1}
colors = {"Uniform": "black", "MLI": "green", "MLI (restricted)": "blue"}
fig, ax = plt.subplots(figsize=figsize)
if xlim is None:
margin = (y.max() - y.min()) * 0.1
xlim = (y.min() - margin, y.max() + margin)
ax.set(xlabel=xlabel or measure, ylabel="Density", xlim=xlim)
if title is not None:
ax.set_title(title)
if true_value is not None:
ax.axvline(true_value, color="gray", linestyle="--", linewidth=1.5)
for name, w in weights.items():
sns.kdeplot(x=y, weights=w, label=name, color=colors.get(name, "black"),
bw_adjust=bw_adjust.get(name, 1), ax=ax)
ax.legend()
plt.tight_layout()
plt.savefig(f"{self.results_dir}/{fname}.{ftype}", bbox_inches="tight", dpi=dpi)
return self._handle_figure_returns(fig)
# Internal methods
def _create_summary(self, all_universes, keys):
"""
Internal function: Create a CSV file with the parameters of all universes
Parameters
----------
csv_path : string
Path to save the CSV file
all_universes : list
List of all universes (combinations of parameters)
keys : list
List of keys for the CSV file. Used when the order is consistent.
config : dict
Configuration dictionary. Used to check for the 'order' key.
"""
fieldnames = ['Universe']
for i in range(len(keys)):
fieldnames.append(f"Decision {i+1}")
fieldnames.append(f"Value {i+1}")
# Generate CSV file with the parameters of all universes
file_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv")
with open(file_path, "w", newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, combination in enumerate(all_universes, start=1):
context = {'Universe': f"Universe_{i}"}
# Populate the decision and value columns
for j, (key, value) in enumerate(combination):
if isinstance(value, dict):
value = value.get('name', '')
context[f"Decision {j+1}"] = key
context[f"Value {j+1}"] = value
writer.writerow(context)
return
def _read_summary(self):
"""
Internal function: Reads the multiverse_summary.csv file
Returns
-------
summary : pandas.DataFrame
Pandas datframe containing the multiverse summary
"""
summary_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv")
return pd.read_csv(summary_path)
def _render_val(self, v):
"""Render decision value. `$...` → literal inject, else double-quoted string."""
if isinstance(v, bool):
return "True" if v else "False"
if isinstance(v, (int, float)) or v is None:
return repr(v)
if isinstance(v, str):
if v.startswith("$"):
return v[1:] # literal inject (variable, code, collection, call...)
return f"\"{self._escape_double_quotes(v)}\"" # always double-quoted string
if isinstance(v, list):
return "[" + ", ".join(self._render_val(x) for x in v) + "]"
if isinstance(v, tuple):
return "(" + ", ".join(self._render_val(x) for x in v) + ")"
if isinstance(v, dict):
# If it's a function spec, _handle_dict will be used by _format_type.
# If it's a plain dict value, render a Python dict literal.
items = ", ".join(f"{self._render_val(k)}: {self._render_val(val)}" for k, val in v.items())
return "{" + items + "}"
return repr(v)
def _handle_dict(self, value):
"""
Turn a dict spec into a Python call string.
Supports:
{"func": "comet.connectivity.Static_Pearson(ts).estimate()"}
{"func": "comet.connectivity.Static_Partial", "args": {...}}
{"func": "bct.clustering_coef_bu", "args": {"G": "$G"}}
Optional: {"positional": ["$ts", 123]} for explicit positional args.
"""
func = value.get("func", "")
args = value.get("args", {}) or {}
positional = value.get("positional", []) or []
# If func already looks like a full call string, pass it through untouched.
# (Convention: if you need variables here, prefer using "args" with "$var")
if "(" in func:
return func
# Build argument list
parts = []
parts += [self._render_val(p) for p in positional]
parts += [f"{k}={self._render_val(v)}" for k, v in args.items()]
arg_str = ", ".join(parts)
call_expr = f"{func}({arg_str})" if parts else f"{func}()"
# comet.connectivity classes require .estimate()
if func.startswith("comet.connectivity."):
call_expr += ".estimate()"
return call_expr
def _format_type(self, value):
"""
Convert decision values into Python code strings for template injection.
"""
# Function spec dicts go through _handle_dict; everything else through _render_val.
if isinstance(value, dict) and "func" in value:
return self._handle_dict(value)
return self._render_val(value)
def _escape_double_quotes(self, s: str) -> str:
# Keep things robust if the literal happens to contain quotes/backslashes
return s.replace("\\", "\\\\").replace('"', '\\"')
def _combine_results(self):
"""
Internal function: Combines the results in a single dictionary and saves it as a pickle file
"""
file_name = "multiverse_results.pkl"
file_paths = glob.glob(os.path.join(self.script_dir, "temp", "universe_*.pkl"))
if not file_paths:
return
combined_results = {}
for file_path in sorted(file_paths):
universe_key = os.path.splitext(os.path.basename(file_path))[0]
with open(file_path, 'rb') as f:
result_dict = pickle.load(f)
combined_results[universe_key] = result_dict
# Save combined results file
with open(os.path.join(self.results_dir, file_name), 'wb') as f:
pickle.dump(combined_results, f)
# Delete individual results files and folder
for file_path in file_paths:
os.remove(file_path)
os.rmdir(os.path.join(self.script_dir, "temp"))
return
def _one_sample_ttest(self, samples_per_universe, baseline=0.0):
"""One-sample t-test per universe against a baseline."""
baseline = 0.0 if baseline is None else float(baseline)
pvals = []
for rv in samples_per_universe:
if isinstance(rv, (list, tuple, np.ndarray)):
arr = np.asarray(rv, dtype=float)
arr = arr[np.isfinite(arr)]
if len(arr) >= 2:
_, p = stats.ttest_1samp(arr, baseline)
pvals.append(float(p))
continue
pvals.append(np.nan)
return np.asarray(pvals, dtype=float)
def _flatten_decisions(self, dec_block):
"""
Convert {'Decision 1': 'X', 'Value 1': Y, ...} into {'X': 'Y', ...}.
Values are stringified for categorical plotting.
"""
if not isinstance(dec_block, dict):
return {}
out = {}
for k, v in dec_block.items():
m = re.fullmatch(r"Decision\s+(\d+)", str(k))
if not m:
continue
idx = m.group(1)
value_key = f"Value {idx}"
out[str(v)] = str(dec_block.get(value_key, "NA"))
return out
def _extract_decision_order(self, decisions_obj) -> list[str]:
"""
Return ["<Decision 1 name>", "<Decision 2 name>", ...] from COMET decisions storage.
Supports two schemas:
1) row["__decisions"] is the decisions block itself
2) row["__decisions"] is a result dict containing {"__decisions": {...}}
"""
if not isinstance(decisions_obj, dict):
return []
# schema 2: {"__decisions": {...}}
dec_block = decisions_obj.get("__decisions")
if isinstance(dec_block, dict):
d = dec_block
else:
# schema 1: already the block
d = decisions_obj
order = []
i = 1
while f"Decision {i}" in d:
order.append(str(d[f"Decision {i}"]))
i += 1
return order
def _load_and_prepare_data(self, measure=None):
"""
Internal function: Load and prepare the data for the specification curve plotting.
Parameters
----------
measure : str
Name of the measure to plot.
Returns
-------
sorted_universes : list
List of tuples (data, summary_row) sorted by the mean value of the measure.
forking_paths : dict
Dictionary containing the forking paths.
"""
# Load the summary CSV.
summary_path = os.path.join(self.multiverse_dir, "multiverse_summary.csv")
multiverse_summary = pd.read_csv(summary_path)
# Load the forking paths.
with open(os.path.join(self.results_dir, "forking_paths.pkl"), "rb") as file:
forking_paths = pickle.load(file)
# Load the combined results dictionary.
combined_results_path = os.path.join(self.results_dir, "multiverse_results.pkl")
if not os.path.exists(combined_results_path):
raise ValueError("Results file not found. Please run the multiverse analysis first.")
with open(combined_results_path, "rb") as file:
combined_results = pickle.load(file)
# Extract the specified measure for each universe.
universe_data = {}
for universe, result_dict in combined_results.items():
if measure not in result_dict:
raise ValueError(f"Measure '{measure}' not found in results for {universe}.")
universe_data[universe] = result_dict[measure]
# Build a list of tuples pairing each universe's measure data with its summary row.
universes_with_summary = []
for universe, data in universe_data.items():
# Ensure matching is case-insensitive.
summary_row = multiverse_summary[multiverse_summary['Universe'].str.lower() == universe.lower()]
if not summary_row.empty:
universes_with_summary.append((data, summary_row.iloc[0]))
# Sort the universes by the mean of the measure values.
sorted_universes = sorted(universes_with_summary, key=lambda x: np.mean(x[0]))
return sorted_universes, forking_paths
def _in_notebook(self):
"""
Helper function to check if the code is running in a Jupyter notebook
"""
try:
from IPython import get_ipython
if 'IPKernelApp' not in get_ipython().config:
return False
except Exception:
return False
return True
def _handle_figure_returns(self, fig):
"""
Helper function to handle plotting
"""
if self._in_notebook():
ipy_display(fig)
plt.close(fig)
return
else:
return fig
# Multiverse integration
def _compute_weights(self, results: pd.DataFrame, measure: str, method: str,
agg: str = "mean") -> np.ndarray:
"""Compute weights with the requested weighting scheme. Shared by integrate() and compare_integration()."""
if method == "uniform":
return self._uniform_weights(results)
elif method == "mli":
bad = self._untestable_decisions(results)
if bad:
bad_names = [d.replace("__", "", 1) for d in bad]
print(
f"[warn] Decision design is not fully crossed: {bad_names} are untestable for "
f"some universes, so MLI compares neighbourhoods of unequal coverage and is "
f"biased toward universes blind to those dimensions. Consider method='mli_restricted'."
)
return self._mli_weights(results, measure, agg=agg, restricted=False)
elif method == "mli_restricted":
return self._mli_weights(results, measure, agg=agg, restricted=True)
else:
raise ValueError("method must be 'uniform', 'mli', or 'mli_restricted'")
def _weighted_mean(self,x: np.ndarray, w: np.ndarray) -> float:
"""
Compute the weighted mean of x with weights w.
Assumes w >= 0 and w.sum() == 1.
"""
return float(np.sum(w * x))
def _weighted_median(self, x: np.ndarray, w: np.ndarray) -> float:
"""
Compute the weighted median of x with weights w.
Assumes w >= 0 and w.sum() == 1.
"""
order = np.argsort(x)
x_sorted = x[order]
w_sorted = w[order]
cw = np.cumsum(w_sorted)
return float(x_sorted[np.searchsorted(cw, 0.5, side="left")])
def _uniform_weights(self, data: pd.DataFrame) -> np.ndarray:
"""Compute uniform weights for all universes."""
n = len(data)
return np.full(n, 1.0 / n, dtype=float)
def _mli_weights(self, data: pd.DataFrame, measure: str, agg: str = "mean",
restricted: bool = False) -> np.ndarray:
"""
Maximum local influence weights (Cantone & Tomaselli 2024).
For each universe, the local instability is the largest change in the measure among
its nearest neighbours (minimum non-zero Gower distance over the decisions). Weights
are proportional to the complement of that instability (stable specifications weigh more).
When ``restricted=True``, decisions that are not freely crossed across the multiverse
(i.e. at least one universe has no Hamming-1 neighbour along that decision) are dropped
from the distance computation before the nearest-neighbour lookup. This makes the local
neighbourhood comparable across universes; without it MLI is biased toward universes
blind to the constrained dimensions.
"""
y = self._get_measure_values(data, measure, agg=agg)
Q, col_types, col_names = self._get_decision_matrix(data) # (n, k)
if restricted:
bad = self._untestable_decisions(data)
if bad:
keep = np.ones(Q.shape[1], dtype=bool)
for j, nm in enumerate(col_names):
if any(nm == d or nm.startswith(d + "_") for d in bad):
keep[j] = False
if not keep.any():
raise ValueError(
"Restricted MLI cannot proceed: every decision is untestable somewhere "
"in the design."
)
Q = Q[:, keep]
col_types = [t for j, t in enumerate(col_types) if keep[j]]
dist = self._gower_distance(Q, col_types)
n = len(y)
err = np.zeros(n, dtype=float)
for i in range(n):
nonzero = dist[i][dist[i] > 0]
if nonzero.size:
min_dist = nonzero.min()
neigh = np.where(np.abs(dist[i] - min_dist) < 1e-9)[0]
if neigh.size:
err[i] = np.max(np.abs(y[i] - y[neigh]))
num = err.max() - err
s = num.sum()
return np.full(n, 1.0 / n) if s == 0 else num / s
def _untestable_decisions(self, data: pd.DataFrame) -> list:
"""
Return decision columns ('__<name>') for which the design is not freely crossed: i.e.
at least one universe has no Hamming-1 neighbour along that decision (fixing all other
decisions, only one level of this decision occurs). An empty list means the design is
fully crossed and plain MLI is unbiased.
"""
dec_cols = sorted([c for c in data.columns if c.startswith("__")])
if len(dec_cols) < 2:
return []
bad = []
for d in dec_cols:
others = [c for c in dec_cols if c != d]
nunique = data.groupby(others, observed=True)[d].transform("nunique")
if (nunique == 1).any():
bad.append(d)
return bad
def _get_measure_values(self, data: pd.DataFrame, measure: str, agg: str = "mean") -> np.ndarray:
"""
Return the measure as a float array, reducing per-universe sequences via ``agg``.
"""
series = data[measure]
if isinstance(series.iloc[0], (list, np.ndarray)):
reducers = {"mean": np.mean, "median": np.median,
"first": lambda x: x[0], "last": lambda x: x[-1]}
if agg not in reducers:
raise ValueError(f"agg must be one of {list(reducers.keys())}")
return series.apply(reducers[agg]).to_numpy(dtype=float)
return series.to_numpy(dtype=float)
def _get_decision_matrix(self, data: pd.DataFrame) -> tuple:
"""
Build a numeric design matrix from the decision columns (prefixed '__' by expand_dec).
Booleans -> 0/1, numeric -> kept (ratio scale), strings -> one-hot (drop-first).
Returns (matrix, col_types, col_names) where col_types is one of binary/numeric/categorical
per column and col_names labels each column (decision name, or "<decision>_<level>" dummy).
"""
decision_cols = sorted([c for c in data.columns if c.startswith("__")])
if not decision_cols:
raise ValueError("No decision columns found. Decision-based weighting needs the "
"decisions, which are loaded with expand_dec=True.")
parts, col_types, col_names = [], [], []
for c in decision_cols:
series = data[c]
if series.dtype == bool:
parts.append(series.astype(int).to_frame())
col_types.append("binary")
col_names.append(c)
elif pd.api.types.is_numeric_dtype(series):
parts.append(series.to_frame())
col_types.append("numeric")
col_names.append(c)
else:
col_str = series.astype(str)
if set(col_str.unique()) <= {"True", "False"}:
parts.append(col_str.map({"True": 1, "False": 0}).to_frame())
col_types.append("binary")
col_names.append(c)
else:
dummies = pd.get_dummies(col_str, prefix=c, drop_first=True)
parts.append(dummies)
col_types.extend(["categorical"] * dummies.shape[1])
col_names.extend(list(dummies.columns))
matrix = pd.concat(parts, axis=1).to_numpy(dtype=float)
return matrix, col_types, col_names
def _gower_distance(self, Q: np.ndarray, col_types: list) -> np.ndarray:
"""
Gower distance matrix: Hamming for binary/categorical, normalised L1 for numeric.
"""
n = len(Q)
dist = np.zeros((n, n))
for k, ctype in enumerate(col_types):
if ctype in ("binary", "categorical"):
dist += (Q[:, None, k] != Q[None, :, k]).astype(float)
else:
rng = Q[:, k].max() - Q[:, k].min()
if rng > 0:
dist += np.abs(Q[:, None, k] - Q[None, :, k]) / rng
return dist
def _gini(self, w: np.ndarray) -> float:
"""
Gini coefficient of the weights (0 = uniform, 1 = fully concentrated).
"""
w = np.sort(w)
n = len(w)
i = np.arange(1, n + 1)
g = float((2 * np.sum(i * w) / (n * w.sum())) - (n + 1) / n)
return 0.0 if abs(g) < 1e-12 else g # snap uniform's floating-point noise to exactly 0
# Load an existing multiverse
[docs]
def load_multiverse(path=None):
"""
Load a previously created multiverse from disk.
Parameters
----------
path : str
A full/relative path to an existing multiverse folder
"""
if path is not None:
name = path.rstrip("/") # Remove trailing slash if present
mverse = Multiverse(name=name, path=path) # Create the Multiverse object for the given path
else:
raise ValueError("Please provide a name/path to a multiverse directory.")
if not os.path.exists(mverse.multiverse_dir + "/multiverse_summary.csv"):
shutil.rmtree(mverse.multiverse_dir) # clean up created directory
raise ValueError("The specified path does not seem to contain a valid multiverse.")
return mverse