import os import subprocess import csv import itertools import numpy as np from datetime import datetime # from wolframclient.evaluation import WolframLanguageSession # from wolframclient.language import wl, wlexpr nmesh_exe = "/home/wyj/Code/NR/nmesh/exe/nmesh" def timestr(): now = datetime.now() return now.strftime('%y-%m-%d %H:%M:%S') class nmesh_tester: def __init__(self, a=1e-8, m=0, N=1, l=1, type="out", ntheta=16, nphi=32, interp="yes", n=20, luni=0, rlist="", rmodes=[], imodes=[], rpsi4s=[], ipsi4s=[], outlist="0.05 0.08 0.1 -0.4 0.42 0.33 1.4 0.2 0.35"): self.a = a self.m = m self.N = N self.l = l self.type = type self.ntheta = ntheta self.nphi = nphi self.interp = interp self.n = n self.luni = luni self.rlist = rlist self.rarray = list(map(float, rlist.split())) self.outlist = outlist self.outarray = np.array(list(map(float, outlist.split()))).reshape(-1,3) self.rmodes = rmodes self.imodes = imodes self.rpsi4s = rpsi4s self.ipsi4s = ipsi4s self.mode_file = f"results/result_modes_a{a}-m{m}-N{N}-l{l}-{type}.csv" self.psi4_file = f"results/result_psi4_a{a}-m{m}-N{N}-l{l}-{type}.csv" self.l2_file = f"results/result_l2_a{a}-m{m}-N{N}-l{l}-{type}.csv" self.normarray = np.concatenate((self.rarray, np.linalg.norm(self.outarray, axis=1))) if np.max(self.normarray) < 2.0: self.mesh_type = "CubedSpheres" else: self.mesh_type = "Shell" print(f"[{timestr()}]Start a now tester.") def run_test(self): parfile = self.write_parfile() basename = parfile[:-4] print(f"[{timestr()}]Written to parfile {parfile}") print(f"[{timestr()}]Starting running nmesh") if os.path.exists(basename): print(f"[{timestr()}]\tHave already runed {parfile}, skipping.") else: print(f"[{timestr()}]running: {nmesh_exe} {parfile}") self.run_nmesh(parfile) print(f"[{timestr()}]Finished running nmesh") self.collect_result(basename) print(f"[{timestr()}]Finished result analysis.") def write_parfile(self): filename = f"parfiles/Inv-a{self.a}-m{self.m}-N{self.N}-l{self.l}-{self.type}-"+\ f"nt{self.ntheta}-np{self.nphi}-{self.interp}-"+\ f"n{self.n}-{self.luni}-{self.rlist}.par" preparfile =\ '''################################################################### # modules we use physics = Invariants ADM Teukolsky ################################################################### # parameters for ADM ADM_Set1stDerivsInADMconstraints = yes ''' if self.mesh_type == "Shell": mesh_parfile =\ f'''################################################################### # initial mesh amr_mesh_type = Shell amr_Shell_rin = {np.floor(np.min(self.normarray))} amr_Shell_rout = {np.ceil(np.max(self.normarray))} amr_Shell_r1 = 0 amr_n0 = {self.n} amr_n1 = {self.n} amr_n2 = {self.n} amr_luni = {self.luni} ''' else: mesh_parfile =\ f'''################################################################### # initial mesh amr_mesh_type = 7 CubedSpheres amr_n0 = {self.n} amr_n1 = {self.n} amr_n2 = {self.n} amr_luni = {self.luni} ''' teukolsky_parfile =\ f'''################################################################### # parameters for Teukolsky Teukolsky_waves_amplitude = {self.a} Teukolsky_wave_width = {self.l} Teukolsky_wave_mm = {self.m} Teukolsky_wave_N = {self.N} Teukolsky_wave_type = {self.type} ''' invariants_parfile =\ f'''################################################################### # parameters for Invariants Invariants_ntheta = {self.ntheta} Invariants_nphi = {self.nphi} Invariants_modes_lmin = 1 Invariants_modes_lmax = 8 Invariants_modes_test = no Invariants_compute_modes = yes Invariants_compute_modes_just_interp = {self.interp} Invariants_modes_r = {self.rlist} Invariants_psi4_test_0doutput = yes Invariants_psi4_test_0dout_coordinates = {self.outlist} ''' output_parfile =\ '''################################################################### # output logfile_creation = yes 0douttime = 0.1 0doutput = Invariants_rpsi4 Invariants_ipsi4 Invariants_Teukolsky_rpsi4_err Invariants_Teukolsky_ipsi4_err 1douttime = 0.1 1doutput = Invariants_rpsi4 Invariants_ipsi4 2douttime = 0.1 2doutput = Invariants_Teukolsky_rpsi4_err Invariants_Teukolsky_ipsi4_err ''' with open(filename, "w") as f: f.write(preparfile) f.write(mesh_parfile) f.write(teukolsky_parfile) f.write(invariants_parfile) f.write(output_parfile) return filename def run_nmesh(self, parfile): subprocess.run([nmesh_exe, parfile]) def collect_result(self, basename): print(f"[{timestr()}]Starting collect and analysis result...") if not os.path.exists(self.mode_file): with open(self.mode_file, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(["nt", "np", "interp", "mesh", "n", "luni", "r", "l", "m", "re_mode", "im_mode", "re_mode_wl", "im_mode_wl", "re_mode_err", "im_mode_err", "re_mode_rel_error", "im_mode_rel_err"]) if not os.path.exists(self.psi4_file): with open(self.psi4_file, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(["mesh", "n", "luni", "x", "y", "z", "rpsi4", "ipsi4", "rpsi4_wl", "ipsi4_wl", "rpsi4_err", "ipsi4_err", "rpsi4_rel_err", "ipsi4_rel_err"]) if not os.path.exists(self.l2_file): with open(self.l2_file, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(["nt", "np", "interp", "mesh", "n", "luni", "r", "l2-norm_wl", "l2-distance", "l2-rel_err"]) print(f"[{timestr()}]\tAnalysing modes...") with open(self.mode_file, "a") as mode_fp: csv_writer = csv.writer(mode_fp) ind = 0 for r in self.rarray: l2norm = 0 l2distance = 0 for l in range(1,9): for m in range(-l, l+1): output_line = [self.ntheta, self.nphi, self.interp, self.mesh_type,\ self.n, self.luni, r, l, m] mode_file = f'{basename}/modes/psi4_r{r}_l{l}_m{m}.t' with open(mode_file, 'r') as f: lines = f.readlines() output_line.extend(map(float,lines[-1].split()[1:])) rmode = self.rmodes[ind] imode = self.imodes[ind] rmode_error = output_line[-2] - rmode imode_error = output_line[-1] - imode rmode_rel_error = 'nan' if rmode==0 else rmode_error/rmode imode_rel_error = 'nan' if imode==0 else imode_error/imode l2norm += rmode**2 + imode**2 l2distance += rmode_error**2 + imode_error**2 output_line.extend([rmode, imode, rmode_error, imode_error, rmode_rel_error, imode_rel_error]) ind += 1 csv_writer.writerow(output_line) with open(self.l2_file, "a") as l2_fp: l2_writer = csv.writer(l2_fp) l2_writer.writerow([self.ntheta, self.nphi, self.interp, self.mesh_type, self.n, self.luni, r, l2norm, l2distance, l2distance/l2norm]) print(f"[{timestr()}]\tAnalysing 0doutput...") with open(self.psi4_file, "a") as psi4_fp: csv_writer = csv.writer(psi4_fp) n_points = len(self.outarray)//3 for i in range(n_points): output_line = [self.mesh_type, self.n, self.luni] psi4_files = [f"{basename}/Invariants_rpsi4_pt{i}.t", f"{basename}/Invariants_ipsi4_pt{i}.t"] with open(psi4_files[0]) as f: line = f.readline() line_list = line.split() Nrpsi4 = float(line_list[1]) output_line.extend(line_list[-3:]) output_line.append(Nrpsi4) with open(psi4_files[1]) as f: line = f.readline() Nipsi4 = float(line.split()[1]) output_line.append(Nipsi4) rpsi4 = self.rpsi4s[i] ipsi4 = self.ipsi4s[i] rpsi4_error = Nrpsi4 - rpsi4 ipsi4_error = Nipsi4 - ipsi4 rpsi4_rel_error = 'nan' if rpsi4==0 else rpsi4_error/rpsi4 ipsi4_rel_error = 'nan' if ipsi4==0 else ipsi4_error/ipsi4 output_line.extend([rpsi4, ipsi4, rpsi4_error, ipsi4_error, rpsi4_rel_error, ipsi4_rel_error]) csv_writer.writerow(output_line) rlist = [1.4, 1.8, 2.5] pointlist = [[-0.4, 1.42, 0.33], [1.4, 0.2, 0.35], [1.01, 0.02, 1.48]] rlist_str = ' '.join(str(r) for r in rlist) outlist = ' '.join(str(coord) for point in pointlist for coord in point) for a in [1e-8, 1e-10, 1e-12]: for m in [0, 1, -1, 2, -2]: for N in [1, 2, 3, 4]: for l in [0.5, 1, 2]: for type in ["in", "out"]: psi4_analytical = f"psi4_analytical/psi4-m{m}-a{a}-L{l}-N{N}.txt" if not os.path.exists(psi4_analytical): print(f"[{timestr()}]Staring calculate psi4 analytically...") subprocess.run(["wolframscript", "-file", "Teuk_psi4.wl", f"{m}", f"{a}", f"{l}", f"{N}", f"{type}", f"{psi4_analytical}"]) print(f"[{timestr()}]Finished calculate psi4 and saved in {psi4_analytical}.") rmodes = [] imodes = [] for r in rlist: rmodes_file = f"modes_wolfram/rmode_a{a}-m{m}-N{N}-l{l}-r{r}.txt" imodes_file = f"modes_wolfram/imode_a{a}-m{m}-N{N}-l{l}-r{r}.txt" if (not os.path.exists(rmodes_file)) or \ (not os.path.exists(imodes_file)): print(f"[{timestr()}]Starting calculate modes at r={r}...") subprocess.run(["wolframscript", "-file", "calculate_modes.wl", f"{psi4_analytical}", f"{r}", f"{rmodes_file}", f"{imodes_file}"]) print(f"[{timestr()}]Finished calculate modes and saved in \ {rmodes_file} and {imodes_file}.") with open(rmodes_file) as f: rmodes.extend([float(line.strip()) for line in f.readlines()]) with open(imodes_file) as f: imodes.extend([float(line.strip()) for line in f.readlines()]) rpsi4s = [] ipsi4s = [] for point in pointlist: print(f"[{timestr()}]Starting calculate psi4 at {point}") result = subprocess.run(["wolframscript", "-file", "psi4_value.wl", f"{psi4_analytical}", "{"+ f"{point[0]}, {point[1]}, {point[2]}"+ "}"], capture_output=True, text=True) try: output = result.stdout.strip().split() rpsi4s.append(float(output[0].replace("*^", "e"))) ipsi4s.append(float(output[1].replace("*^", "e"))) except: print(result, output) raise SystemExit("wrong in wolframscript.") for ntheta in [16, 32, 64]: nphi = 2*ntheta for interp in ["yes", "no"]: for n in [10, 20, 40]: for luni in [0, 1, 2]: tester = nmesh_tester(a, m, N, l, type, ntheta, nphi, interp, n, luni, rlist_str, rmodes, imodes, rpsi4s, ipsi4s, outlist) tester.run_test()