""" mppnp data management tool v.0.0.0, 20JULY2025: NuGrid collaboration (Joshua Issa) multizone.py provides tools to read and manage your mppnp data output in a single Python object. mppnp can create a number of files, and it can become unwieldly to keep track of what is related to who. multizone.py is designed to keep track of the multizone output h5 files, the mass-averaged surface abundance output h5 files, the cross-section output DAT files, initial composition, ingested composition, and solar composition. """ import numpy as np import pandas as pd import h5py import os from nugridpy import nugridse as nuse from nugridpy import utils from concurrent.futures import ProcessPoolExecutor import functools from collections import defaultdict from astropy import units as u from astropy import constants as cc def read_single_file(file_path): '''Helper function to read a single file's 'v' column.''' return pd.read_csv(file_path, skiprows=1, delimiter='\t', usecols=['v'])['v'].values def read_flux_files(directory): '''Parallel processing to read all DAT files for cross-section information.''' # Get sorted list of files files = sorted([f for f in os.listdir(directory) if not os.path.isdir(os.path.join(directory, f))]) # Get dimensions for pre-allocation first_file = os.path.join(directory, files[0]) num_rows = len(pd.read_csv(first_file, skiprows=1, delimiter='\t', usecols=['v'])) num_zones = len(files) # Preallocate array all_vs = np.empty((num_zones, num_rows)) # Generate full file paths file_paths = [os.path.join(directory, f) for f in files] # Use process pool for parallel reading with ProcessPoolExecutor() as executor: # Map the reading function to all files for idx, result in enumerate(executor.map(read_single_file, file_paths)): if idx % 50 == 0: # Progress indicator print(f"Processed {idx}/{num_zones} files") all_vs[idx, :] = result return all_vs class mppnp_reader: '''management tool for mppnp Parameters ---------- multizonepath : string Directory with the *.cycle.out.h5 files. surfpath: string Directory with the *.cycle.surf.h5 files. same_surf: boolean Whether the out and surf files are in different directories. If they are, no need to provide surfpath. Default: False solarpath: string Path to the solar composition as a .ppn file. xsectionpath: string Directory with the .DAT files modified to have a row 'v'. networksetuppath: string Path to the networksetup.txt file. initialpath: string Path to the initial composition as a .dat file. ingestionpath: string Path to the ingested material as a .pnn file. ingestspeed: float The rate of ingestion in Msun/second. isomers: list The isomers used in the mppnp run. Default: ['Al-26m1', 'Kr-86m1', 'Cd-115m1', 'Lu-176m1', 'Ta-180m1'] ''' mppnp_code_units = {'dcoeff': u.cm**2 * u.s**-1, 'radius': cc.R_sun * u.cm/u.Mm, 'mass': (u.g).to(u.M_sun) * u.M_sun, 'rho': u.g * u.cm**-3, 'temperature': u.K} all_elements = np.array(['H', 'He', 'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Ne', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'Cl', 'Ar', 'K', 'Ca', 'Sc', 'Ti', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Kr', 'Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd', 'In', 'Sn', 'Sb', 'Te', 'I', 'Xe', 'Cs', 'Ba', 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'W', 'Re', 'Os', 'Ir', 'Pt', 'Au', 'Hg', 'Tl', 'Pb', 'Bi', 'Po', 'At']) all_stable_species = ['Neutron-999', 'H-1', 'H-2', 'He-3', 'He-4', 'Li-6', 'Li-7', 'Be-9', 'B-10', 'B-11', 'C-12', 'C-13', 'N-14', 'N-15', 'O-16', 'O-17', 'O-18', 'F-19', 'Ne-20', 'Ne-21', 'Ne-22', 'Na-23', 'Mg-24', 'Mg-25', 'Mg-26', 'Al-27', 'Si-28', 'Si-29', 'Si-30', 'P-31', 'S-32', 'S-33', 'S-34', 'S-36', 'Cl-35', 'Cl-37', 'Ar-36', 'Ar-38', 'Ar-40', 'K-39', 'K-40', 'K-41', 'Ca-40', 'Ca-42', 'Ca-43', 'Ca-44', 'Ca-46', 'Ca-48', 'Sc-45', 'Ti-46', 'Ti-47', 'Ti-48', 'Ti-49', 'Ti-50', 'V-50', 'V-51', 'Cr-50', 'Cr-52', 'Cr-53', 'Cr-54', 'Mn-55', 'Fe-54', 'Fe-56', 'Fe-57', 'Fe-58', 'Co-59', 'Ni-58', 'Ni-60', 'Ni-61', 'Ni-62', 'Ni-64', 'Cu-63', 'Cu-65', 'Zn-64', 'Zn-66', 'Zn-67', 'Zn-68', 'Zn-70', 'Ga-69', 'Ga-71', 'Ge-70', 'Ge-72', 'Ge-73', 'Ge-74', 'Ge-76', 'As-75', 'Se-74', 'Se-76', 'Se-77', 'Se-78', 'Se-80', 'Se-82', 'Br-79', 'Br-81', 'Kr-78', 'Kr-80', 'Kr-82', 'Kr-83', 'Kr-84', 'Kr-86', 'Rb-85', 'Rb-87', 'Sr-84', 'Sr-86', 'Sr-87', 'Sr-88', 'Y-89', 'Zr-90', 'Zr-91', 'Zr-92', 'Zr-94', 'Zr-96', 'Nb-93', 'Mo-92', 'Mo-94', 'Mo-95', 'Mo-96', 'Mo-97', 'Mo-98', 'Mo-100', 'Tc-999', 'Ru-96', 'Ru-98', 'Ru-99', 'Ru-100', 'Ru-101', 'Ru-102', 'Ru-104', 'Rh-103', 'Pd-102', 'Pd-104', 'Pd-105', 'Pd-106', 'Pd-108', 'Pd-110', 'Ag-107', 'Ag-109', 'Cd-106', 'Cd-108', 'Cd-110', 'Cd-111', 'Cd-112', 'Cd-113', 'Cd-114', 'Cd-116', 'In-113', 'In-115', 'Sn-112', 'Sn-114', 'Sn-115', 'Sn-116', 'Sn-117', 'Sn-118', 'Sn-119', 'Sn-120', 'Sn-122', 'Sn-124', 'Sb-121', 'Sb-123', 'Te-120', 'Te-122', 'Te-123', 'Te-124', 'Te-125', 'Te-126', 'Te-128', 'Te-130', 'I-127', 'Xe-124', 'Xe-126', 'Xe-128', 'Xe-129', 'Xe-130', 'Xe-131', 'Xe-132', 'Xe-134', 'Xe-136', 'Cs-133', 'Ba-130', 'Ba-132', 'Ba-134', 'Ba-135', 'Ba-136', 'Ba-137', 'Ba-138', 'La-138', 'La-139', 'Ce-136', 'Ce-138', 'Ce-140', 'Ce-142', 'Pr-141', 'Nd-142', 'Nd-143', 'Nd-144', 'Nd-145', 'Nd-146', 'Nd-148', 'Nd-150', 'Pm-999', 'Sm-144', 'Sm-147', 'Sm-148', 'Sm-149', 'Sm-150', 'Sm-152', 'Sm-154', 'Eu-151', 'Eu-153', 'Gd-152', 'Gd-154', 'Gd-155', 'Gd-156', 'Gd-157', 'Gd-158', 'Gd-160', 'Tb-159', 'Dy-156', 'Dy-158', 'Dy-160', 'Dy-161', 'Dy-162', 'Dy-163', 'Dy-164', 'Ho-165', 'Er-162', 'Er-164', 'Er-166', 'Er-167', 'Er-168', 'Er-170', 'Tm-169', 'Yb-168', 'Yb-170', 'Yb-171', 'Yb-172', 'Yb-173', 'Yb-174', 'Yb-176', 'Lu-175', 'Lu-176', 'Hf-174', 'Hf-176', 'Hf-177', 'Hf-178', 'Hf-179', 'Hf-180', 'Ta-180', 'Ta-181', 'W-180', 'W-182', 'W-183', 'W-184', 'W-186', 'Re-185', 'Re-187', 'Os-184', 'Os-186', 'Os-187', 'Os-188', 'Os-189', 'Os-190', 'Os-192', 'Ir-191', 'Ir-193', 'Pt-190', 'Pt-192', 'Pt-194', 'Pt-195', 'Pt-196', 'Pt-198', 'Au-197', 'Hg-196', 'Hg-198', 'Hg-199', 'Hg-200', 'Hg-201', 'Hg-202', 'Hg-204', 'Tl-203', 'Tl-205', 'Pb-204', 'Pb-206', 'Pb-207', 'Pb-208', 'Bi-209', 'Th-232', 'U-235', 'U-238', 'Po-999', 'At-999', 'Rn-999', 'Fr-999', 'Ra-999', 'Ac-999', 'Pa-999', 'Np-999', 'Pu-999', 'Am-999', 'Cm-999', 'Bk-999', 'Cf-999'] def __init__(self, multizonepath=None, surfpath=None, same_surf=False, \ solarpath=None, xsectionpath=None, networksetuppath=None, \ initialpath=None, ingestionpath=None, ingestspeed=np.nan, \ isomers=['Al-26m1', 'Kr-86m1', 'Cd-115m1', 'Lu-176m1', 'Ta-180m1']): self.multizonepath = multizonepath self.same_surf = same_surf self.surfpath = multizonepath if self.same_surf else surfpath self.solarpath = solarpath self.xsectionpath = xsectionpath self.networksetuppath = networksetuppath self.initialpath = initialpath self.ingestionpath = ingestionpath self.ingestspeed = ingestspeed self.isomers = isomers self.read_multizone() self.read_surf() self.read_solar() self.read_crosssection() self.read_ingestion() #to update: I am pretty sure that utils has the ability to read .dat files. self.read_initial() self.current_cycle = -1 def read_multizone(self): '''Reads the multizone *.out.h5 files''' self.h5_data = {} if self.multizonepath is not None: possible_files = sorted(list(os.listdir(self.multizonepath))) for f in possible_files: if f.endswith('.out.h5'): cycle = f.split('.')[1] print(f'Reading in data for cycle block {cycle}. This may take a while.') self.h5_data[cycle] = nuse.se(self.multizonepath, pattern=str(cycle)+'.out.h5', rewrite=True) self.all_isos = np.array(list(self.h5_data[cycle].se.isotopes)) def read_surf(self): '''Reads the surface *.surf.h5 files''' self.surf_data = {} if self.same_surf: self.surfpath = self.multizonepath if self.surfpath is not None: possible_files = sorted(list(os.listdir(self.surfpath))) for f in possible_files: if f.endswith('.surf.h5'): cycle = f.split('.')[1] self.surf_data[cycle] = os.path.join(self.surfpath, f) if not hasattr(self, "all_isos"): with h5py.File(os.path.join(self.surf_data[cycle]), 'r') as openfile: dataset_path = f"/cycle{str(cycle).zfill(10)}/SE_DATASET" surfabus = openfile[dataset_path]['iso_massf_decay'][0] n_iso = len(surfabus) iso_z = np.linspace(0,0,n_iso) iso_a = np.linspace(0,0,n_iso) iso_z[:] = openfile["Z"][:] iso_a[:] = openfile["A"][:] surfisos = [" "] * n_iso surfisos[:2] = ['n', 'H-1'] surfisos[-len(self.isomers):] = self.isomers idx1, idx2 = 2, n_iso - len(self.isomers) surfisos[idx1:idx2] = [f"{utils.get_el_from_z(int(z))}-{int(a)}" for z, a in zip(iso_z[idx1:idx2], iso_a[idx1:idx2])] self.all_isos = np.array(surfisos) def read_solar(self): '''Reads the solar abundance data.''' if self.solarpath is not None: self.solar = utils.iniabu(self.solarpath) def read_crosssection(self): '''Reads the cross-section data.''' if self.xsectionpath is not None and self.networksetuppath is not None: print("Reading reaction cross-section information. This may take a while.") self.all_xsections = read_flux_files(self.xsectionpath) elif self.xsectionpath is not None and self.networksetuppath is None: print("Cross-sections will not be read in because no networksetup.txt path was provided.") elif self.xsectionpath is None and self.networksetuppath is not None: print("Cross-sections will not be read in because no folder with cross-sections was provided.") def read_ingestion(self): '''Reads in the ingested data.''' if self.ingestionpath is not None: self.ingest = utils.iniabu(self.ingestionpath) print("Ingestion rate is understood to be in Msun/second") self.dmdtIngest = self.ingestspeed * u.Msun / u.s def read_initial(self): '''Reads in the initial values.''' if self.initialpath is not None: isotopes, massfracs = [], [] with open(self.initialpath, 'r') as f: for line in f: if not line.startswith("D"): continue _, element, A, massfrac = line.split() isotope = element[0] + element[1:].lower() + '-' + str(A) isotopes.append(isotope) massfracs.append(float(massfrac)) self.initial_massfrac = pd.DataFrame({"Isotope": isotopes, "X": massfracs}) def exists_in_sim(self, isotopes): '''Checks if isotopes exist in the simulation. Parameters ---------- isotopes: string or list Isotopes to check if they are in the simulation. Returns ------- insim: boolean or np.array Whether the isotope are in the simulation in order of request. ''' if not isinstance(isotopes, list): isotopes = [isotopes] insim = [iso in self.all_isos for iso in isotopes] return insim[0] if len(isotopes) == 1 else insim def get_key(self, filetype, cycle): '''Gets the key for the out and surf data. Parameters ---------- filetype: str Possible inputs: out and surf cycle: float Cycle you want the data for. Returns ------- identified_key: string String to identify which cycle block to get the data from. ''' identified_key = '0000001' if cycle < 1: return identified_key if filetype == 'surf': identified_key = [key for key in self.surf_data.keys() if int(key) < cycle][-1] if filetype == 'out': identified_key = [key for key in self.h5_data.keys() if int(key) < cycle][-1] return identified_key def get(self, filetype, cycle, thing, decay=True): '''Returns requested "thing" from "filetype" Parameters ---------- filetype: string Possible inputs: out: returns values from the out.h5 files surf: returns values from the surf.h5 files solar: returns values from the solar composition initial: returns values from the initial composition ingest: returns values from the ingested composition cycle: float Cycle you want the data for. thing: string or np.array Thing you want (eg. isotope, mass, etc) ''' # Grab from the solar abundance using utils if filetype == 'solar': if isinstance(thing,str): return self.solar.iso_abundance(thing) else: res = np.array(self.solar.iso_abundance(thing)) return res[0] if len(res) == 1 else res if filetype == 'ingest': if isinstance(thing,str): return self.ingest.iso_abundance(thing) else: res = np.array(self.ingest.iso_abundance(thing)) return res[0] if len(res) == 1 else res if isinstance(thing, str): thing = np.array([thing]) # Grab from the initial mass fraction dataframe if filetype == 'initial': res = self.initial_massfrac.loc[self.initial_massfrac.Isotope.isin(thing), 'X'].to_numpy() return res[0] if len(thing) == 1 else res # Grab from the H5_surf file if filetype == 'surf': key = self.get_key(filetype, cycle) surffile = self.surf_data[key] with h5py.File(os.path.join(surffile), 'r') as openfile: dataset_path = f"/cycle{str(cycle).zfill(10)}/SE_DATASET" if decay: surfabus = openfile[dataset_path]['iso_massf_decay'][0] else: surfabus = openfile[dataset_path]['iso_massf'][0] n_iso = len(surfabus) iso_z = np.linspace(0,0,n_iso) iso_a = np.linspace(0,0,n_iso) iso_z[:] = openfile["Z"][:] iso_a[:] = openfile["A"][:] surfisos = [" "] * n_iso surfisos[:2] = ['n', 'H-1'] surfisos[-len(self.isomers):] = self.isomers idx1, idx2 = 2, n_iso - len(self.isomers) surfisos[idx1:idx2] = [f"{utils.get_el_from_z(int(z))}-{int(a)}" for z, a in zip(iso_z[idx1:idx2], iso_a[idx1:idx2])] surfisos = np.array(surfisos) idxs = np.where(np.isin(surfisos, thing))[0] return surfabus[idxs][0] if len(idxs) == 1 else surfabus[idxs] # Grab from the H5_out file if filetype == 'out': key = self.get_key(filetype, cycle) if thing[0] in ['mass', 'rho', 'radius', 'temperature', 'dcoeff']: h5thing = self.h5_data[key].se.get(cycle, thing) unitinfo = thing[0] + '_unit' units = self.h5_data[key].se.get(unitinfo) h5thing = h5thing * units * self.mppnp_code_units[thing[0]] elif thing[0] == 'iso_massf': h5thing = self.h5_data[key].se.get(cycle, 'iso_massf') else: h5thing = self.h5_data[key].se.get(cycle, 'iso_massf') h5thing = h5thing[:, np.isin(self.all_isos, thing)] h5thing = np.squeeze(h5thing) # only so we don't get (mass zones, 1) and instead get (mass zones, ) return h5thing def setupnetwork(self, cycle): '''Calculates f_ij for all isotopes. Parameters ---------- cycle: float The cycle to calculate the fluxes for. ''' def grab_info(isotope, diag=''): def potentials(iso): if iso == 'PROT': return 'H-1' elif iso == 'NEUT': return 'Neutron-1' elif iso == 'OOOOO': return 'OOOOO' elif iso == 'AL*26': return 'Al-26m1' elif iso == 'KR*85': return 'Kr-85m1' elif iso == 'CD*15': return 'Cd-115m1' elif iso == 'LU*76': return 'Lu-176m1' elif iso == 'TAg80': return 'Ta-180m1' else: iso = iso[0] + iso[1].lower() + '-' + iso[2:] return iso try: if len(isotope) == 2: ele, num = isotope iso = ele[0] + ele[1:].lower() + '-' + str(num) return iso elif len(isotope) > 3: if len(isotope) == 3: part1, part2, part3 = isotope elif len(isotope) == 4: part1, part2, part3, part4 = isotope if part1 in ['0', '1', '2']: return potentials(part2) elif part2.find('E+') != -1 or part2.find('E-') != -1: return potentials(part1) elif part3.find('E+') != -1 or part3.find('E-') != -1: return potentials(part1+part2) else: iso = isotope[0] return potentials(iso) except: print('Error: ', isotope, len(isotope), diag) return None def calculate_flux(row): X_R = iso_massf[:, row.idx1] if row.idx1 != -99 else np.ones(iso_massf.shape[0]) A_R = mass_numbers.get(row.INPUT1, 1) X_T = iso_massf[:, row.idx2] if row.idx2 != -99 else np.ones(iso_massf.shape[0]) A_T = mass_numbers.get(row.INPUT2, 1) v_RT = row.V var_RT = row.VAR return (X_R / A_R) * (X_T / A_T) * v_RT * var_RT # figure out where the actual reaction list start in networksetup.txt skip = 2 # starts at 2 because the line we are looking for is two above the reactions with open(self.networksetuppath, 'r') as f: lines = f.readlines() for line in lines: skip+=1 if line.find('REACTION NETWORK') != -1 and line.find('V(i) Nasv(*rho)') != -1: break networksetup = pd.read_csv(self.networksetuppath, skiprows=skip, header=None, delimiter='->', names=['INPUT', 'OUTPUT']) networksetup['V'] = list(self.all_xsections.T) networksetup = networksetup[ networksetup['INPUT'].str.split(' ').apply(lambda row: row[1] == 'T') ] networksetup['INPUT'] = networksetup['INPUT'].str.split(' ').apply(lambda x: [item for item in x if item]) networksetup['OUTPUT'] = networksetup['OUTPUT'].str.split(' ').apply(lambda x: [item for item in x if item]) networksetup_processed = pd.DataFrame() networksetup_processed['REACNUM'] = networksetup['INPUT'].apply(lambda x: x[0]) networksetup_processed['NUM1'] = networksetup['INPUT'].apply(lambda x: x[2]) networksetup_processed['INPUT1'] = networksetup['INPUT'].apply(lambda x: grab_info(x[3:5]) if x[4] != '+' else grab_info([x[3]])) networksetup_processed['NUM2'] = networksetup['INPUT'].apply(lambda x: x[6] if x[4] != '+' else x[5]) networksetup_processed['INPUT2'] = networksetup['INPUT'].apply(lambda x: grab_info(x[7:]) if x[4] != '+' else grab_info(x[6:])) networksetup_processed['NUM3'] = networksetup['OUTPUT'].apply(lambda x: x[0]) networksetup_processed['OUTPUT1'] = networksetup['OUTPUT'].apply(lambda x: grab_info(x[1:3]) if x[2] != '+' else grab_info([x[1]])) networksetup_processed['NUM4'] = networksetup['OUTPUT'].apply(lambda x: x[4] if x[2] != '+' else x[3]) networksetup_processed['OUTPUT2'] = networksetup['OUTPUT'].apply(lambda x: grab_info(x[5:7]) if x[2] != '+' and (x[6].find('E+') == -1 and x[6].find('E-') == -1) else grab_info(x[4:8], x)) networksetup_processed['TYPE'] = networksetup['OUTPUT'].apply(lambda x: x[-4]) networksetup_processed['V'] = networksetup['V'] networksetup_processed['VAR'] = networksetup['OUTPUT'].apply(lambda x: x[-2]) for col in ['NUM1', 'NUM2', 'NUM3', 'NUM4', 'VAR']: networksetup_processed[col] = networksetup_processed[col].astype(float) networksetup_processed.REACNUM = networksetup_processed.REACNUM.astype(int) iso_massf = self.get('out', cycle, 'iso_massf') all_isos = self.all_isos isotope_index = {iso: idx for idx, iso in enumerate(all_isos)} mass_numbers = {iso: int(iso.split('-')[1].replace('m1', '')) for iso in all_isos} networksetup_processed['idx1'] = networksetup_processed.INPUT1.map(isotope_index).fillna(-99).astype(int) networksetup_processed['idx2'] = networksetup_processed.INPUT2.map(isotope_index).fillna(-99).astype(int) networksetup_processed.FLUXES = networksetup_processed.apply(calculate_flux, axis=1) self.networksetup_processed = networksetup_processed inputs = self.networksetup_processed.INPUT1.values outputs = self.networksetup_processed.OUTPUT1.values fluxes = self.networksetup_processed.FLUXES.values balance_dict = defaultdict(float) for i in range(len(inputs)): key = (inputs[i], outputs[i]) reverse_key = (outputs[i], inputs[i]) if reverse_key in balance_dict: balance_dict[reverse_key] -= fluxes[i] else: balance_dict[key] += fluxes[i] self.balanced_fluxes = dict(balance_dict) self.balanced_fluxes = pd.DataFrame([(k[0], k[1], v) for k, v in self.balanced_fluxes.items()], columns=['ISO1', 'ISO2', 'f_ij']) def get_balanced_fluxes(self, cycle): '''Gets the balanced f_ij for all isotopes. If the current cycle has not changed, it is not recalculated. Parameters ---------- cycle: float The cycle to calculate the fluxes for. ''' if cycle != self.current_cycle: self.setupnetwork(cycle) self.current_cycle = cycle return self.balanced_fluxes def get_relevant_fluxes(self, isotope, cycle): '''Gets the fluxes for a specific isotope. Parameters ---------- isotope: string The isotope you want the reactions for. cycle: The cycle you want the reactions for. Returns ------- relevant_f_ij_m: Pandas DataFrame Dataframe with the isotopes reacting with "isotope" and the f_ij as a function of mass. The reactions are ISO1 --> ISO2. If f_ij is negative that would mean ISO1 <-- ISO2. ''' # Note: the structure of the balanced fluxes is # ISO1, ISO2, [f_ij_m=0, ... f_ij_m=max] # where it assumes the fluxes are ISO1 --> ISO2. Negative fluxes imply that ISO2 --> ISO1. f_ij_m = self.get_balanced_fluxes(cycle) filter1 = f_ij_m['ISO1'].str.contains(isotope, regex=True) filter2 = f_ij_m['ISO2'].str.contains(isotope, regex=True) relevant_f_ij_m = f_ij_m[filter1 | filter2].copy() # now we will make sure that ISO2 == isotope so we can talk about it's production consistently relevant_f_ij_m.loc[filter1, "f_ij"] = -relevant_f_ij_m.loc[filter1, "f_ij"] relevant_f_ij_m.loc[filter1, ["ISO1", "ISO2"]] = relevant_f_ij_m.loc[filter1, ["ISO2", "ISO1"]].values # Warning: run get_balanced_fluxes before asking for the network (dependent on the time step) network = self.networksetup_processed filtered_network = network[ network[['INPUT1', 'OUTPUT1']].apply(tuple, axis=1).isin(relevant_f_ij_m[['ISO1', 'ISO2']].apply(tuple, axis=1)) ] relevant_f_ij_m = relevant_f_ij_m.merge( filtered_network.rename(columns={'INPUT1': 'ISO1', 'OUTPUT1': 'ISO2'}), on=["ISO1", "ISO2"], how="left" ) return relevant_f_ij_m def get_ZNAele(self, isotopes): '''Gets the Z, N, A, and element name of isotopes. Parameters ---------- isotopes: string or np.array Isotope to get the parts of. Returns ------- Z: float or np.array Proton number in the order of request. N: float or np.array Neutron number in the order of request. A: float or np.array Mass number in order of request. element_part: string or np.array Element name in order of request. ''' if not isinstance(isotopes, list): isotopes = [isotopes] proton_num = np.arange(1, len(self.all_elements) + 1) element_part, mass_part = zip(*(iso.split('-') for iso in isotopes)) A = np.array([int(mass.replace('*', '')) for mass in mass_part]) Z = np.array([proton_num[np.where(ele == self.all_elements)][0] for ele in element_part]) N = A - Z return (Z[0], N[0], A[0], element_part[0]) if len(isotopes) == 1 else (Z, N, A, element_part) # TO-DO: probably already exists in NuGridPy def get_element(self, Z): '''Gets the element given a proton number. Parameters ---------- Z: float or list or np.array The proton number to get element for. Returns ------- ele: string or np.array The element names in order of request. ''' if not isinstance(Z, list): Z = [Z] if isinstance(Z, list): Z = np.array(Z) Z -= 1 # for indexing reasons ele = np.take(self.all_elements, Z) return ele[0] if len(Z) == 1 else ele # TO-DO: probably already exists in NuGridPy def is_stable(self, isotopes): '''Checks if an isotope is stable. Parameters ---------- isotopes: string or list or np.array The isotopes you want to check stability for. Returns ------- stability: boolean or np.array Whether the isotopes are stable in order of request. ''' if not isinstance(isotopes, list): isotopes = [isotopes] # Check stability using vectorized comparison stability = np.array([iso in self.all_stable_species for iso in isotopes]) return stability[0] if len(isotopes) == 1 else stability def round_to_nearest(self, arr, step): '''Rounds the maximum and minimum of an array to the closest "step". Parameters ---------- arr: list or np.array The data to get the rounded maximum and minimum. step: float The value to round to. Returns ------- floored_min: float The minimum rounded to the floor of the step (ie. if step == 10 and min is 3 then it rounds to 0). ceiled_max: float The maximum rounded to the ceiling of the step (ie. if step == 10 and max is 8 then it rounds to 10). ''' min_val = np.min(arr) max_val = np.max(arr) floored_min = np.floor(min_val / step) * step ceiled_max = np.ceil(max_val / step) * step return floored_min, ceiled_max #### TO-DO: Needs to be re-written to use get_relevant_fluxes probably. def calc_delta(self, cycle, current_isotope): '''Calculates the total flux across all reactions as a function of mass. Parameters ---------- cycle: float The cycle you want to calculate the total flux for. current_isotope: string The isotope you want to calculate the total flux for. Returns ------- delta: np.array The total flux an isotope experiences as a function of mass. ''' balanced_fluxes = self.get_balanced_fluxes(cycle) pairs = list(balanced_fluxes.keys()) fluxes = np.array(list(balanced_fluxes.values())) # (input, output) so if the isotope is being produced its pair[1] all_fluxes = np.array([ flux if current_isotope == pair[1] else -flux for pair, flux in zip(pairs, fluxes) if current_isotope in pair]) delta = np.sum(all_fluxes, axis=0) return delta #### IN DEVELOPMENT :: DO NOT USE def ingestion_flux(self, spe, Xi, Xi_prev, Dconv, mass, radius, rho, ttt='gM2s'): # There may be an issue in my implementation of this -- seems too high dzeit = 0.01 * u.s # ingestion is over the first 18 mass zones in this implementation dmIngest = mass[-1] - mass[-1-18] # there is a nuance here with indexing that i am missing from converting Fortran --> Python xingest=0.5*(self.dmdtIngest*dzeit/dmIngest) try: X_cshell = self.ingest_df[self.ingest_df.Isotope == spe].X.to_numpy()[0] except IndexError: expected_units = u.g * u.M_sun ** -2 * u.s if ttt == 'gM2s' else u.g * u.cm ** -2 * u.s return np.zeros(mass.size) * expected_units X_addition = xingest * (2* (X_cshell - Xi_prev[-1]) + (self.dmdtIngest*dzeit/dmIngest) * (Xi_prev[-1] - Xi_prev[-1-18]) ) X_ingest = np.ones(Xi.size) * X_addition X_step = Xi + X_addition X_ingest[X_step < 0.] -= X_step[X_step < 0.] # because the minimum is manually set to 0 in case of overflow X_ingest[X_step > 1.] -= (X_step[X_step > 1.] - 1.) # because the maximum is manually set to 1 in case of overflow X_ingest[:-18+1] = 0 # because we ingest from jhif to the top # I am going to say the ingestion flux is the diffusion flux but with this ingested mass fraction """ dm_dr = np.gradient(mass, radius) dX_dm = np.gradient(X_ingest, mass) dX_dr = dX_dm * dm_dr if ttt == 'gM2s': term2 = np.power(radius,2) / u.M_sun ** 2 else: term2 = 1 iv = -rho * Dconv * dX_dr * term2 if ttt == 'gM2s': iv = iv.to(u.g * u.s ** -1 * u.M_sun ** -2) else: iv = iv.to(u.g * u.s ** -1 * u.cm ** -2) """ dX_dm = np.gradient(X_ingest, mass) dm_dr = np.gradient(mass, radius) iv = ( - rho * Dconv * dX_dm * dm_dr ).to(u.g * u.cm**-2 * u.s**-1) return iv def convective_flux(self, Xi, Dconv, mass, radius, rho, ttt='gM2s'): # https://pages.mtu.edu/~fmorriso/cm3120/lectures/2021_CM3120_module3_12lectureIV.pdf # this is the mass fraction flux :: just need to double check that I can say w = X from the notes # If we want dX/dm instead of dX/dr then dX/dm * dm/dr # The reason we would want this is because we want to go from Eulerian to Lagrangian coordinates since r = r(t) """ dm_dr = np.gradient(mass, radius) dX_dm = np.gradient(Xi, mass) dX_dr = dX_dm * dm_dr if ttt == 'gM2s': term2 = np.power(radius,2) / u.M_sun ** 2 else: term2 = 1 jv = -rho * Dconv * dX_dr * term2 if ttt == 'gM2s': jv = jv.to(u.g * u.s ** -1 * u.M_sun ** -2) else: jv = jv.to(u.g * u.s ** -1 * u.cm ** -2) """ dX_dm = np.gradient(Xi, mass) dm_dr = np.gradient(mass, radius) jv = (- rho * Dconv * dX_dm * dm_dr ).to(u.g * u.cm**-2 * u.s**-1) return jv def nuclear_flux(self, Xi, dYdt, mass, radius, rho, ttt='gM2s'): # think about the units of reaction flux :: d(X/A)/dt = dY/dt -- is it mol/s? Think about it # nucleosynthetic flux = rho [g / cm^3] * dY/dt [1/s] = [g/s/cm^3] maybe but note that the units for convective flux is g/s/cm^2 :::: do I want it to be dY/dt * m * dr/dm? # https://chatgpt.com/share/6f7a4e62-26ee-48de-8820-c17dad75dee0 """ dr_dm = np.gradient(radius, mass) term = mass * dr_dm if ttt == 'gM2s': term2 = np.power(radius,2) / u.M_sun ** 2 else: term2 = 1 nv = rho * dYdt * term * term2 if ttt == 'gM2s': nv = nv.to(u.g * u.s ** -1 * u.M_sun ** -2) else: nv = nv.to(u.g * u.s ** -1 * u.cm ** -2) """ dX_dm = np.gradient(Xi, mass) dm_dr = np.gradient(mass, radius) # https://chatgpt.com/share/67379231-a5f8-8010-95a9-6f5ef821f266 # also will let me do dX/dt # might have to have the gradient of delta f nv = (- rho * np.pi * (radius ** 2) * dYdt * dX_dm * dm_dr).to(u.g * u.cm**-2 * u.s**-1) return nv def get_nvjviv(self, cycle, spe, ttt='gM2s'): flux_unit = 1/u.s dYdt = self.calc_delta(cycle, spe) * flux_unit Xi = self.get('out', cycle, spe) Dconv = self.get('out', cycle, 'dcoeff') mass = self.get('out', cycle, 'mass') radius = self.get('out', cycle, 'radius') rho = self.get('out', cycle, 'rho') # TO-DO :: needs to be thought about what makes sense. I have cycle by cycle resolution but should Xi_prev be -1, -5, -20, etc ? g Xi_prev = self.get('out', cycle-20, spe) nv = self.nuclear_flux(Xi, dYdt, mass, radius, rho, ttt) jv = self.convective_flux(Xi, Dconv, mass, radius, rho, ttt) iv = self.ingestion_flux(spe, Xi, Xi_prev, Dconv, mass, radius, rho, ttt) return nv, jv, iv