Source code for em

""" Simulation of a lightning EMP using FDTD. """

import sys, os
from os.path import splitext
import argparse

from numpy import *
import scipy.constants as co
import h5py

from param import ParamContainer, param, positive, contained_in

from langevin import CylindricalLangevin
from fdtd import CylBox, CylDimensions, avg
from contexttimer import ContextTimer
R, Z_, PHI = 0, 1, 2


[docs]class Parameters(ParamContainer): @param(default=os.path.expanduser("~/projects/em/")) def input_dir(s): """ The directory that contains extra input files. """ return s @param(default='') def electron_density_file(s): """ A file to read the electron density from in h[km] n[cm^3]. """ return s @param(default='') def gas_density_file(s): """ A file to read the gas density from in h[km] n[cm^3]. """ return s @param(default='') def ionization_rate_file(s): """ A file to read the ionization rate from in E/n[Td] k[m^3 s^-1]. """ return s @param(positive, default=0.1 * co.micro) def dt(s): """ The time step in seconds. """ return float(s) @param(positive, default=60 * co.micro) def output_dt(s): """ The time between savings in seconds. """ return float(s) @param(positive, default=60 * co.micro) def end_t(s): """ The final simulation time.""" return float(s) @param(positive, default=1e3) def Q(s): """ The Charge transferred in C.""" return float(s) @param(positive, default=50 * co.micro) def tau_r(s): """ Rise time of the stroke in seconds""" return float(s) @param(positive, default=500 * co.micro) def tau_f(s): """ Fall (decay) time of the stroke in seconds""" return float(s) @param(positive, default=3000) def r(s): """ Radius of the simulation domain in km. """ return float(s) @param(default=-1000) def z0(s): """ Lower boundary of the simulation in km. """ return float(s) @param(default=1100) def z1(s): """ Upperr boundary of the simulation in km. """ return float(s) @param(positive, default=300) def r_cells(s): """ Number of cells in the r direction.""" return int(s) @param(positive, default=110) def z_cells(s): """ Number of cells in the z direction.""" return int(s) @param(positive, default=1.2e24) def mu_N(s): """ Mobility times N in SI units. """ return float(s) @param(positive, default=10) def r_source(s): """ With of the source in km. """ return float(s) @param(default=-160) def z0_source(s): """ Lower edge of the source in km. """ return float(s) @param(default=-160) def z1_source(s): """ Upper edge of the source in km. """ return float(s) @param(positive, default=10) def ncpml(s): """ Number of cells in the convoluted perfectly matching layers.""" return int(s) @param(default=20) def dens_update_lower(s): """ Densities will not be updated below this threshold in km. This is to avoid the effects of high fields close to the source. """ return int(s) @param(default=0) def lower_boundary(s): """ Lower boundary condition. Use 0 for a cpml, != 0 for an electrode . """ return int(s) @param(default=[]) def track_r(s): """ List of r-values to track. """ return array([float(x) for x in s]) @param(default=[]) def track_z(s): """ List of z-values to track. """ return array([float(x) for x in s]) @param(default=[]) def plugins(s): """ List of plugins for the simulation. """ return list(s)
def j_source(t, j0, tau_r, tau_f): if t < tau_r: return j0 * t / tau_r else: return j0 * exp(-(t - tau_r)**2 / tau_f**2)
[docs]def peak(t, A, tau, m): return m * A / (tau * (m - 1)) * (exp(-t / tau) - exp(-m * t / tau))
[docs]def import_object(s): """ Tanslates module.object into __import__("module").getattr("object") """ splitted = s.split('.') mod, obj = '.'.join(splitted[:-1]), splitted[-1] plugins = __import__('plugins.%s' % mod) return getattr(getattr(plugins, mod), obj)
[docs]def main(): # == READ PARAMETERS == parser = argparse.ArgumentParser() parser.add_argument("input", help="Parameter input file") parser.add_argument("-o", "--output", help="Output file", default=None) args = parser.parse_args() ofile = args.output or (splitext(args.input)[0] + '.h5') params = Parameters() params.file_load(args.input) plugins = [import_object(s)(params) for s in params.plugins] # == INIT THE SIMULATION INSTANCE == box = CylBox(0, params.r * co.kilo, params.z0 * co.kilo, params.z1 * co.kilo) dim = CylDimensions(params.r_cells, params.z_cells) z = linspace(params.z0 * co.kilo, params.z1 * co.kilo, dim.nz + 1) sim = CylindricalLangevin(box, dim) # == LOAD FILES == sim.load_ngas(os.path.join(params.input_dir, params.gas_density_file)) sim.load_ne(os.path.join(params.input_dir, params.electron_density_file)) sim.load_ionization(os.path.join(params.input_dir, params.ionization_rate_file)) sim.set_mun(params.mu_N) # == SET BOUNDARY CONDITIONS == flt = ones((2, 2)) if params.lower_boundary != 0: # Set the lower boundary as a perfect conductor: flt[Z_, 0] = False # We also have to set a Neumann bc in hphi for a conductor sim.hphi.neumann_axes.append((Z_, 0, 1)) flt[R, 0] = False sim.add_cpml_boundaries(params.ncpml, filter=flt) # == SET SOURCE == # We set the sources by using a filter in x and y rsource = params.r_source * co.kilo z0source = params.z0_source * co.kilo z1source = params.z1_source * co.kilo r2 = sim.rf**2 source_s = pi * sim.rf[r2 <= rsource**2][-1]**2 source_zflt = logical_and(sim.zf[newaxis, :] <= z1source, sim.zf[newaxis, :] >= z0source) si0, si1 = [nonzero(source_zflt)[1][i] for i in 0, -1] m = params.tau_f / params.tau_r # == PREPARE H5DF OUTPUT == fp = h5py.File(ofile, 'w') params.h5_dump(fp) sim.save_global(fp) gsteps = fp.create_group('steps') gtrack = fp.create_group('track') # == PREPARE THE MAIN LOOP == sim.set_dt(params.dt) sim.dens_update_lower = params.dens_update_lower * co.kilo insteps = 10 nsave = int(params.output_dt / (insteps * params.dt)) t = 0 for p in plugins: p.initialize(sim) # == THE MAIN LOOP == for i in xrange(int(params.end_t / (insteps * params.dt))): with ContextTimer("t = %f ms" % (t / co.milli)): for j in xrange(insteps): sim.update_e() for p in plugins: p.update_e(sim) sim.update_h() sim.j[:, si0:si1 + 1, Z_] = \ (exp(-r2/rsource**2) * peak(sim.th, params.Q / source_s, params.tau_f, m))[:, newaxis] for p in plugins: p.update_h(sim) t += params.dt # Save a coarser grid with higher time resolution if len(params.track_r) > 0 and len(params.track_z) > 0: step = "%.6d" % i g = gtrack.create_group(step) sim.track(g, params.track_r * co.kilo, params.track_z * co.kilo) fp.flush() if 0 == ((i - 1) % nsave): with ContextTimer("Saving step %d" % (i / nsave)): step = "%.4d" % (i / nsave) g = gsteps.create_group(step) sim.save(g) for p in plugins: p.save(sim, g) fp.flush()
if __name__ == '__main__': main()