Inv_test/test_Inv_modes.py
2024-02-18 00:01:12 -05:00

302 lines
12 KiB
Python

import os
import subprocess
import csv
import itertools
import numpy as np
from datetime import datetime
# from wolframclient.evaluation import WolframLanguageSession
# from wolframclient.language import wl, wlexpr
nmesh_exe = "/home/wyj/Code/NR/nmesh/exe/nmesh"
def timestr():
now = datetime.now()
return now.strftime('%y-%m-%d %H:%M:%S')
class nmesh_tester:
def __init__(self, a=1e-8, m=0, N=1, l=1, type="out", ntheta=16, nphi=32,
interp="yes", n=20, luni=0, rlist="", rmodes=[], imodes=[],
rpsi4s=[], ipsi4s=[],
outlist="0.05 0.08 0.1 -0.4 0.42 0.33 1.4 0.2 0.35"):
self.a = a
self.m = m
self.N = N
self.l = l
self.type = type
self.ntheta = ntheta
self.nphi = nphi
self.interp = interp
self.n = n
self.luni = luni
self.rlist = rlist
self.rarray = list(map(float, rlist.split()))
self.outlist = outlist
self.outarray = np.array(list(map(float, outlist.split()))).reshape(-1,3)
self.rmodes = rmodes
self.imodes = imodes
self.rpsi4s = rpsi4s
self.ipsi4s = ipsi4s
self.mode_file = f"results/result_modes_a{a}-m{m}-N{N}-l{l}-{type}.csv"
self.psi4_file = f"results/result_psi4_a{a}-m{m}-N{N}-l{l}-{type}.csv"
self.l2_file = f"results/result_l2_a{a}-m{m}-N{N}-l{l}-{type}.csv"
self.normarray = np.concatenate((self.rarray, np.linalg.norm(self.outarray, axis=1)))
if np.max(self.normarray) < 2.0:
self.mesh_type = "CubedSpheres"
else:
self.mesh_type = "Shell"
print(f"[{timestr()}]Start a now tester.")
def run_test(self):
parfile = self.write_parfile()
basename = parfile[:-4]
print(f"[{timestr()}]Written to parfile {parfile}")
print(f"[{timestr()}]Starting running nmesh")
if os.path.exists(basename):
print(f"[{timestr()}]\tHave already runed {parfile}, skipping.")
else:
print(f"[{timestr()}]running: {nmesh_exe} {parfile}")
self.run_nmesh(parfile)
print(f"[{timestr()}]Finished running nmesh")
self.collect_result(basename)
print(f"[{timestr()}]Finished result analysis.")
def write_parfile(self):
filename = f"parfiles/Inv-a{self.a}-m{self.m}-N{self.N}-l{self.l}-{self.type}-"+\
f"nt{self.ntheta}-np{self.nphi}-{self.interp}-"+\
f"n{self.n}-{self.luni}-{self.rlist}.par"
preparfile =\
'''###################################################################
# modules we use
physics = Invariants ADM Teukolsky
###################################################################
# parameters for ADM
ADM_Set1stDerivsInADMconstraints = yes
'''
if self.mesh_type == "Shell":
mesh_parfile =\
f'''###################################################################
# initial mesh
amr_mesh_type = Shell
amr_Shell_rin = {np.floor(np.min(self.normarray))}
amr_Shell_rout = {np.ceil(np.max(self.normarray))}
amr_Shell_r1 = 0
amr_n0 = {self.n}
amr_n1 = {self.n}
amr_n2 = {self.n}
amr_luni = {self.luni}
'''
else:
mesh_parfile =\
f'''###################################################################
# initial mesh
amr_mesh_type = 7 CubedSpheres
amr_n0 = {self.n}
amr_n1 = {self.n}
amr_n2 = {self.n}
amr_luni = {self.luni}
'''
teukolsky_parfile =\
f'''###################################################################
# parameters for Teukolsky
Teukolsky_waves_amplitude = {self.a}
Teukolsky_wave_width = {self.l}
Teukolsky_wave_mm = {self.m}
Teukolsky_wave_N = {self.N}
Teukolsky_wave_type = {self.type}
'''
invariants_parfile =\
f'''###################################################################
# parameters for Invariants
Invariants_ntheta = {self.ntheta}
Invariants_nphi = {self.nphi}
Invariants_modes_lmin = 1
Invariants_modes_lmax = 8
Invariants_modes_test = no
Invariants_compute_modes = yes
Invariants_compute_modes_just_interp = {self.interp}
Invariants_modes_r = {self.rlist}
Invariants_psi4_test_0doutput = yes
Invariants_psi4_test_0dout_coordinates = {self.outlist}
'''
output_parfile =\
'''###################################################################
# output
logfile_creation = yes
0douttime = 0.1
0doutput = Invariants_rpsi4 Invariants_ipsi4 Invariants_Teukolsky_rpsi4_err Invariants_Teukolsky_ipsi4_err
1douttime = 0.1
1doutput = Invariants_rpsi4 Invariants_ipsi4
2douttime = 0.1
2doutput = Invariants_Teukolsky_rpsi4_err Invariants_Teukolsky_ipsi4_err
'''
with open(filename, "w") as f:
f.write(preparfile)
f.write(mesh_parfile)
f.write(teukolsky_parfile)
f.write(invariants_parfile)
f.write(output_parfile)
return filename
def run_nmesh(self, parfile):
subprocess.run([nmesh_exe, parfile])
def collect_result(self, basename):
print(f"[{timestr()}]Starting collect and analysis result...")
if not os.path.exists(self.mode_file):
with open(self.mode_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["nt", "np", "interp",
"mesh", "n", "luni", "r", "l", "m", "re_mode",
"im_mode", "re_mode_wl", "im_mode_wl",
"re_mode_err", "im_mode_err",
"re_mode_rel_error", "im_mode_rel_err"])
if not os.path.exists(self.psi4_file):
with open(self.psi4_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["mesh", "n", "luni",
"x", "y", "z", "rpsi4", "ipsi4", "rpsi4_wl",
"ipsi4_wl", "rpsi4_err", "ipsi4_err",
"rpsi4_rel_err", "ipsi4_rel_err"])
if not os.path.exists(self.l2_file):
with open(self.l2_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["nt", "np", "interp",
"mesh", "n", "luni", "r", "l2-norm_wl",
"l2-distance", "l2-rel_err"])
print(f"[{timestr()}]\tAnalysing modes...")
with open(self.mode_file, "a") as mode_fp:
csv_writer = csv.writer(mode_fp)
ind = 0
for r in self.rarray:
l2norm = 0
l2distance = 0
for l in range(1,9):
for m in range(-l, l+1):
output_line = [self.ntheta, self.nphi, self.interp, self.mesh_type,\
self.n, self.luni, r, l, m]
mode_file = f'{basename}/modes/psi4_r{r}_l{l}_m{m}.t'
with open(mode_file, 'r') as f:
lines = f.readlines()
output_line.extend(map(float,lines[-1].split()[1:]))
rmode = self.rmodes[ind]
imode = self.imodes[ind]
rmode_error = output_line[-2] - rmode
imode_error = output_line[-1] - imode
rmode_rel_error = 'nan' if rmode==0 else rmode_error/rmode
imode_rel_error = 'nan' if imode==0 else imode_error/imode
l2norm += rmode**2 + imode**2
l2distance += rmode_error**2 + imode_error**2
output_line.extend([rmode, imode, rmode_error, imode_error,
rmode_rel_error, imode_rel_error])
ind += 1
csv_writer.writerow(output_line)
with open(self.l2_file, "a") as l2_fp:
l2_writer = csv.writer(l2_fp)
l2_writer.writerow([self.ntheta, self.nphi, self.interp,
self.mesh_type, self.n, self.luni, r, l2norm,
l2distance, l2distance/l2norm])
print(f"[{timestr()}]\tAnalysing 0doutput...")
with open(self.psi4_file, "a") as psi4_fp:
csv_writer = csv.writer(psi4_fp)
n_points = len(self.outarray)//3
for i in range(n_points):
output_line = [self.mesh_type, self.n, self.luni]
psi4_files = [f"{basename}/Invariants_rpsi4_pt{i}.t",
f"{basename}/Invariants_ipsi4_pt{i}.t"]
with open(psi4_files[0]) as f:
line = f.readline()
line_list = line.split()
Nrpsi4 = float(line_list[1])
output_line.extend(line_list[-3:])
output_line.append(Nrpsi4)
with open(psi4_files[1]) as f:
line = f.readline()
Nipsi4 = float(line.split()[1])
output_line.append(Nipsi4)
rpsi4 = self.rpsi4s[i]
ipsi4 = self.ipsi4s[i]
rpsi4_error = Nrpsi4 - rpsi4
ipsi4_error = Nipsi4 - ipsi4
rpsi4_rel_error = 'nan' if rpsi4==0 else rpsi4_error/rpsi4
ipsi4_rel_error = 'nan' if ipsi4==0 else ipsi4_error/ipsi4
output_line.extend([rpsi4, ipsi4, rpsi4_error, ipsi4_error,
rpsi4_rel_error, ipsi4_rel_error])
csv_writer.writerow(output_line)
rlist = [1.4, 1.8, 2.5]
pointlist = [[-0.4, 1.42, 0.33], [1.4, 0.2, 0.35], [1.01, 0.02, 1.48]]
rlist_str = ' '.join(str(r) for r in rlist)
outlist = ' '.join(str(coord) for point in pointlist for coord in point)
for a in [1e-8, 1e-10, 1e-12]:
for m in [0, 1, -1, 2, -2]:
for N in [1, 2, 3, 4]:
for l in [0.5, 1, 2]:
for type in ["in", "out"]:
psi4_analytical = f"psi4_analytical/psi4-m{m}-a{a}-L{l}-N{N}.txt"
if not os.path.exists(psi4_analytical):
print(f"[{timestr()}]Staring calculate psi4 analytically...")
subprocess.run(["wolframscript", "-file", "Teuk_psi4.wl", f"{m}",
f"{a}", f"{l}", f"{N}", f"{type}",
f"{psi4_analytical}"])
print(f"[{timestr()}]Finished calculate psi4 and saved in {psi4_analytical}.")
rmodes = []
imodes = []
for r in rlist:
rmodes_file = f"modes_wolfram/rmode_a{a}-m{m}-N{N}-l{l}-r{r}.txt"
imodes_file = f"modes_wolfram/imode_a{a}-m{m}-N{N}-l{l}-r{r}.txt"
if (not os.path.exists(rmodes_file)) or \
(not os.path.exists(imodes_file)):
print(f"[{timestr()}]Starting calculate modes at r={r}...")
subprocess.run(["wolframscript", "-file", "calculate_modes.wl",
f"{psi4_analytical}", f"{r}",
f"{rmodes_file}", f"{imodes_file}"])
print(f"[{timestr()}]Finished calculate modes and saved in \
{rmodes_file} and {imodes_file}.")
with open(rmodes_file) as f:
rmodes.extend([float(line.strip()) for line in f.readlines()])
with open(imodes_file) as f:
imodes.extend([float(line.strip()) for line in f.readlines()])
rpsi4s = []
ipsi4s = []
for point in pointlist:
print(f"[{timestr()}]Starting calculate psi4 at {point}")
result = subprocess.run(["wolframscript", "-file",
"psi4_value.wl", f"{psi4_analytical}",
"{"+
f"{point[0]}, {point[1]}, {point[2]}"+
"}"],
capture_output=True, text=True)
try:
output = result.stdout.strip().split()
rpsi4s.append(float(output[0].replace("*^", "e")))
ipsi4s.append(float(output[1].replace("*^", "e")))
except:
print(result, output)
raise SystemExit("wrong in wolframscript.")
for ntheta in [16, 32, 64]:
nphi = 2*ntheta
for interp in ["yes", "no"]:
for n in [10, 20, 40]:
for luni in [0, 1, 2]:
tester = nmesh_tester(a, m, N, l, type, ntheta, nphi,
interp, n, luni, rlist_str, rmodes,
imodes, rpsi4s, ipsi4s, outlist)
tester.run_test()