#!/usr/bin/env python3 import os import numpy as np from astropy.io import fits from astropy.time import Time from astropy import wcs from scipy import ndimage import subprocess import tempfile from astropy.io import ascii from astropy.coordinates import SkyCoord class ThreeDLine: """3D defined line""" def __init__(self, line, nx, ny, nz): p = line.split(" ") self.ax = float(p[0]) self.ay = float(p[1]) self.az = float(p[2]) self.bx = float(p[3]) self.by = float(p[4]) self.bz = float(p[5]) self.n = int(p[6]) self.nx = nx self.ny = ny self.nz = nz def extrema(self): fzmin, fzmax = -self.az / self.bz, (self.nz - self.az) / self.bz xmin, xmax = self.ax + fzmin * self.bx, self.ax + fzmax * self.bx ymin, ymax = self.ay + fzmin * self.by, self.ay + fzmax * self.by if xmin < 0: fzmin = -self.ax / self.bx if xmax >= self.nx: fzmax = (self.nx - self.ax) / self.bx if ymin < 0: fzmin = -self.ay / self.by if ymax >= self.ny: fzmax = (self.ny - self.ay) / self.by self.xmin, self.xmax = self.ax + fzmin * self.bx, self.ax + fzmax * self.bx self.ymin, self.ymax = self.ay + fzmin * self.by, self.ay + fzmax * self.by self.zmin, self.zmax = int(self.az + fzmin * self.bz), int(self.az + fzmax * self.bz) if self.zmin < 0: self.zmin = 0 if self.zmax > self.nz: self.zmax = self.nz self.fmin, self.fmax = fzmin, fzmax return self.fmin, self.fmax def __repr__(self): return f"{self.ax} {self.ay} {self.az} {self.bx} {self.by} {self.bz} {self.n}" class Prediction: """Prediction class""" def __init__(self, satno, mjd, ra, dec, x, y, state, tlefile, age): self.satno = satno self.age = age self.mjd = mjd self.t = 86400 * (self.mjd - self.mjd[0]) self.texp = self.t[-1] - self.t[0] self.ra = ra self.dec = dec self.x = x self.y = y self.state = state self.tlefile = tlefile class Observation: """Satellite observation""" def __init__(self, ff, mjd, x0, y0): """Define an observation""" # Store self.mjd = mjd self.x0 = x0 self.y0 = y0 # Get times self.nfd = Time(self.mjd, format="mjd", scale="utc").isot # Correct for rotation tobs = Time(ff.mjd + 0.5 * ff.texp / 86400.0, format="mjd", scale="utc") tobs.delta_ut1_utc = 0 hobs = tobs.sidereal_time("mean", longitude=0.0).degree tmid = Time(self.mjd, format="mjd", scale="utc") tmid.delta_ut1_utc = 0 hmid = tmid.sidereal_time("mean", longitude=0.0).degree # Compute ra/dec world = ff.w.wcs_pix2world(np.array([[self.x0, self.y0]]), 1) if ff.tracked: self.ra = world[0, 0] else: self.ra = world[0, 0] + hobs - hmid self.de = world[0, 1] class SatId: """Satellite identifications""" def __init__(self, line): s = line.split() self.nfd = s[0] self.x0 = float(s[1]) self.y0 = float(s[2]) self.t0 = 0.0 self.x1 = float(s[3]) self.y1 = float(s[4]) self.t1 = float(s[5]) self.norad = int(s[6]) self.catalog = s[7] self.state = s[8] self.dxdt = (self.x1 - self.x0) / (self.t1 - self.t0) self.dydt = (self.y1 - self.y0) / (self.t1 - self.t0) def __repr__(self): return "%s %f %f %f -> %f %f %f %d %s %s" % ( self.nfd, self.x0, self.y0, self.t0, self.x1, self.y1, self.t1, self.norad, self.catalog, self.state) class FourFrame: """Four Frame class""" def __init__(self, fname=None): if fname is None: # Initialize empty fourframe self.nx = 0 self.ny = 0 self.nz = 0 self.mjd = -1 self.nfd = None self.zavg = None self.zstd = None self.zmax = None self.znum = None self.dt = None self.site_id = 0 self.observer = None self.texp = 0.0 self.fname = None self.crpix = np.array([0.0, 0.0]) self.crval = np.array([0.0, 0.0]) self.cd = np.array([[1.0, 0.0], [0.0, 1.0]]) self.ctype = ["RA---TAN", "DEC--TAN"] self.cunit = np.array(["deg", "deg"]) self.crres = np.array([0.0, 0.0]) self.tracked = False else: # Read FITS file hdu = fits.open(fname) # Read image planes self.zavg, self.zstd, self.zmax, self.znum = hdu[0].data # Generate sigma frame self.zsig = (self.zmax - self.zavg) / (self.zstd + 1e-9) # Frame properties self.ny, self.nx = self.zavg.shape self.nz = hdu[0].header["NFRAMES"] # Read frame time oselfsets self.dt = np.array( [hdu[0].header["DT%04d" % i] for i in range(self.nz)]) # Read header self.mjd = hdu[0].header["MJD-OBS"] self.nfd = hdu[0].header["DATE-OBS"] self.site_id = hdu[0].header["COSPAR"] self.observer = hdu[0].header["OBSERVER"] self.texp = hdu[0].header["EXPTIME"] self.fname = fname # Astrometry keywords self.crpix = np.array( [hdu[0].header["CRPIX1"], hdu[0].header["CRPIX2"]]) self.crval = np.array( [hdu[0].header["CRVAL1"], hdu[0].header["CRVAL2"]]) self.cd = np.array( [[hdu[0].header["CD1_1"], hdu[0].header["CD1_2"]], [hdu[0].header["CD2_1"], hdu[0].header["CD2_2"]]]) self.ctype = [hdu[0].header["CTYPE1"], hdu[0].header["CTYPE2"]] self.cunit = [hdu[0].header["CUNIT1"], hdu[0].header["CUNIT2"]] self.crres = np.array( [hdu[0].header["CRRES1"], hdu[0].header["CRRES2"]]) # Check for sidereal tracking try: self.tracked = bool(hdu[0].header["TRACKED"]) except KeyError: self.tracked = False hdu.close() # Compute image properties self.sx = np.sqrt(self.cd[0, 0]**2 + self.cd[1, 0]**2) self.sy = np.sqrt(self.cd[0, 1]**2 + self.cd[1, 1]**2) self.wx = self.nx * self.sx self.wy = self.ny * self.sy self.zmaxmin = np.mean(self.zmax) - 2.0 * np.std(self.zmax) self.zmaxmax = np.mean(self.zmax) + 6.0 * np.std(self.zmax) self.zavgmin = np.mean(self.zavg) - 2.0 * np.std(self.zavg) self.zavgmax = np.mean(self.zavg) + 6.0 * np.std(self.zavg) self.zsigmin = 0 self.zsigmax = 10 # Setup WCS self.w = wcs.WCS(naxis=2) self.w.wcs.crpix = self.crpix self.w.wcs.crval = self.crval self.w.wcs.cd = self.cd self.w.wcs.ctype = self.ctype self.w.wcs.set_pv([(2, 1, 45.0)]) def mask(self, xmin, xmax, ymin, ymax): x, y = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) c = (x >= xmin) & (x <= self.nx-xmax)\ & (y >= ymin)\ & (y <= self.ny-ymax) self.mask = np.ones_like(self.zavg) self.mask[~c] = 0.0 self.zavg *= self.mask self.zstd *= self.mask self.zmax *= self.mask self.znum *= self.mask self.zsig *= self.mask def selection_mask(self, sigma, zstd): """Create a selection mask""" c1 = ndimage.uniform_filter(self.znum, 3, mode="constant") c2 = ndimage.uniform_filter(self.znum * self.znum, 3, mode="constant") # Add epsilon to keep square root positive z = np.sqrt(c2 - c1 * c1 + 1e-9) # Standard deviation mask c = z < zstd m1 = np.zeros_like(self.zavg) m1[c] = 1.0 # Sigma mask c = self.zsig < sigma m2 = np.zeros_like(self.zavg) m2[~c] = 1.0 self.zsel = m1 * m2 # Generate points c = self.zsel == 1.0 xm, ym = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) x, y = np.ravel(xm[c]), np.ravel(ym[c]) inum = np.ravel(self.znum[c]).astype("int") sig = np.ravel(self.zsig[c]) t = np.array([self.dt[i] for i in inum]) return x, y, inum, t, sig def significant_pixels_along_track(self, sigma, x0, y0, dxdt, dydt, rmin=10.0): """Extract significant pixels along a track""" # Generate sigma frame zsig = (self.zmax - self.zavg) / (self.zstd + 1e-9) # Select c = (zsig > sigma) # Positions xm, ym = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) x, y = np.ravel(xm[c]), np.ravel(ym[c]) inum = np.ravel(self.znum[c]).astype("int") sig = np.ravel(zsig[c]) t = np.array([self.dt[i] for i in inum]) # Predicted positions xr = x0 + dxdt * t yr = y0 + dydt * t r = np.sqrt((x - xr)**2 + (y - yr)**2) c = r < rmin return x[c], y[c], t[c], sig[c] def significant_pixels(self, sigma): """Extract significant pixels""" # Generate sigma frame zsig = (self.zmax - self.zavg) / (self.zstd + 1e-9) # Select c = (zsig > sigma) # Positions xm, ym = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) x, y = np.ravel(xm[c]), np.ravel(ym[c]) inum = np.ravel(self.znum[c]).astype("int") sig = np.ravel(zsig[c]) t = np.array([self.dt[i] for i in inum]) return x, y, t, sig def track(self, dxdt, dydt, tref): """Track and stack""" # Empty frame ztrk = np.zeros_like(self.zavg) # Loop over frames for i in range(self.nz): dx = int(np.round(dxdt * (self.dt[i] - tref))) dy = int(np.round(dydt * (self.dt[i] - tref))) # Skip if shift larger than image if np.abs(dx) >= self.nx: continue if np.abs(dy) >= self.ny: continue # Extract range if dx >= 0: i1min, i1max = dx, self.nx - 1 i2min, i2max = 0, self.nx - dx - 1 else: i1min, i1max = 0, self.nx + dx - 1 i2min, i2max = -dx, self.nx - 1 if dy >= 0: j1min, j1max = dy, self.ny - 1 j2min, j2max = 0, self.ny - dy - 1 else: j1min, j1max = 0, self.ny + dy - 1 j2min, j2max = -dy, self.ny - 1 zsel = np.where(self.znum == i, self.zmax, 0.0) ztrk[j2min:j2max, i2min:i2max] += zsel[j1min:j1max, i1min:i1max] return ztrk def in_frame(self, x, y): if (x >= 0) & (x <= self.nx) & (y >= 0) & (y <= self.ny): return True else: return False def generate_satellite_predictions(self, cfg): # Output file name outfname = f"{self.fname}.csv" # Run predictions if not os.path.exists(outfname): # Extract parameters nfd = self.nfd texp = self.texp nmjd = int(np.ceil(texp)) ra0, de0 = self.crval[0], self.crval[1] radius = np.sqrt(self.wx * self.wx + self.wy * self.wy) lat = cfg.getfloat("Observer", "latitude") lon = cfg.getfloat("Observer", "longitude") height = cfg.getfloat("Observer", "height") # Format command command = f"satpredict -t {nfd} -l {texp} -n {nmjd} -L {lon} -B {lat} -H {height} -o {outfname} -R {ra0} -D {de0} -r {radius}" for key, value in cfg.items("Elements"): if "tlefile" in key: command += f" -c {value}" # Run command output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) # Read results d = ascii.read(outfname, format="csv") # Compute frame coordinates p = SkyCoord(ra=d["ra"], dec=d["dec"], unit="deg", frame="icrs") x, y = p.to_pixel(self.w, 0) # Loop over satnos satnos = np.unique(d["satno"]) predictions = [] for satno in satnos: c = d["satno"] == satno tlefile = np.unique(d["tlefile"][c])[0] age = np.unique(np.asarray(d["age"])[c])[0] p = Prediction(satno, np.asarray(d["mjd"])[c], np.asarray(d["ra"])[c], np.asarray(d["dec"])[c], x[c], y[c], np.array(d["state"])[c], tlefile, age) predictions.append(p) return predictions def find_lines(self, cfg): # Config settings sigma = cfg.getfloat("LineDetection", "trksig") trkrmin = cfg.getfloat("LineDetection", "trkrmin") ntrkmin = cfg.getfloat("LineDetection", "ntrkmin") # Find significant pixels (TODO: store in function?) c = self.zsig > sigma xm, ym = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) x, y = np.ravel(xm[c]), np.ravel(ym[c]) z = np.ravel(self.znum[c]).astype("int") sig = np.ravel(self.zsig[c]) t = np.array([self.dt[i] for i in z]) # Skip if not enough points if len(t) < ntrkmin: return [] # Save points to temporary file (fd, tmpfile_path) = tempfile.mkstemp(prefix="hough_tmp", suffix=".dat") try: with os.fdopen(fd, "w") as f: for i in range(len(t)): f.write(f"{x[i]:f},{y[i]:f},{z[i]:f}\n") # Run 3D Hough line-finding algorithm command = f"hough3dlines -dx {trkrmin} -minvotes {ntrkmin} -raw {tmpfile_path}" try: output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) except Exception: return [] finally: os.remove(tmpfile_path) # Decode output lines = [] for line in output.decode("utf-8").splitlines()[2:]: lines.append(ThreeDLine(line, self.nx, self.ny, self.nz)) return lines def find_tracks(self, cfg): # Config settings sigma = cfg.getfloat("LineDetection", "trksig") trkrmin = cfg.getfloat("LineDetection", "trkrmin") ntrkmin = cfg.getfloat("LineDetection", "ntrkmin") # Find significant pixels (TODO: store in function?) c = self.zsig > sigma xm, ym = np.meshgrid(np.arange(self.nx), np.arange(self.ny)) x, y = np.ravel(xm[c]), np.ravel(ym[c]) z = np.ravel(self.znum[c]).astype("int") sig = np.ravel(self.zsig[c]) t = np.array([self.dt[i] for i in z]) # Skip if not enough points if len(t) < ntrkmin: return [] # Save points to temporary file (fd, tmpfile_path) = tempfile.mkstemp(prefix="hough_tmp", suffix=".dat") try: with os.fdopen(fd, "w") as f: for i in range(len(t)): f.write(f"{x[i]:f},{y[i]:f},{z[i]:f}\n") # Run 3D Hough line-finding algorithm command = f"hough3dlines -dx {trkrmin} -minvotes {ntrkmin} -raw {tmpfile_path}" try: output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) except Exception: return [] finally: os.remove(tmpfile_path) # Decode output lines = [] for line in output.decode("utf-8").splitlines()[2:]: lines.append(ThreeDLine(line, self.nx, self.ny, self.nz)) return lines def __repr__(self): return "%s %dx%dx%d %s %.3f %d %s" % (self.fname, self.nx, self.ny, self.nz, self.nfd, self.texp, self.site_id, self.observer)