Implements parallel processing of satobs, where multiple-threads are available

About 2x faster on a 2-core system, and 3-4x faster on an 8-core system
A balance between regular user feedback and maximum usage of CPUs results
in less-than-fastest possible performance.

For fastest performance, increase chunk size to a value larger than cpu_count.
(Perhaps make this a parameter?)

process.py
 - Implements a multiprocess worker pool of size os.cpu_count()
 - Batches processes in size os.cpu_count()
 - Creates pixel_catalog file using source FITS filename, instead of "test.cat"  - Forces single-threaded astropy IERS table update, if needed
 - Receives all file and screen output from children, so that it can be output
   in the order of the files processed
 - Changes display order of FITS file being processed (first) followed by
   SATIDs within
 - Adds temporary notice of empty input queue
 - Adds description of screen color codes

astrometry.py
 - Adds fall-back method of shutil.copyfile if PermissionError from copy2

extract.py
 - Removes terminal output, and instead returns results to calling processes
 - Tightens up file/io loops

satellite.py
 - Saves hough3dlines output to unique tmp file instead of /tmp/hough.dat
 - Removes temp file after processing
 - Adds FIXME note to review for unnecessary code

stars.py
 - Generates star catalog directly to destination file
 - Updates sextractor call to reference destination instead of tmp file
pull/56/head
Chris Lewicki 2019-12-12 14:54:46 -08:00
parent f0578de10c
commit b7056d9329
5 changed files with 155 additions and 92 deletions

View File

@ -15,6 +15,8 @@ from stvid.extract import extract_tracks
import astropy.units as u import astropy.units as u
from astropy.utils.exceptions import AstropyWarning from astropy.utils.exceptions import AstropyWarning
from astropy.coordinates import EarthLocation from astropy.coordinates import EarthLocation
from astropy.time import Time # For getting IERS table in single-thread session
import multiprocessing as mp
import warnings import warnings
import configparser import configparser
import argparse import argparse
@ -24,6 +26,83 @@ import time
import shutil import shutil
import sys import sys
"""
process.py - Utility to process stvid/acquire.py FITS images to detect and
extract satellite positions and create IODs.
Terminal output Color Codes:
GREEN: Calibrated file
RED: Not calibrated file
YELLOW: Computing astrometric calibration
BLUE: Classified satellite
GREY: Catalog satellite
MAGENTA: UNID satellite
"""
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def process_loop(fname):
"""
Thread to process satobs FourFrame FITS files in a multi-thread compatible manner
"""
# Generate star catalog
if not os.path.exists(fname + ".cat"):
pix_catalog = generate_star_catalog(fname)
else:
pix_catalog = pixel_catalog(fname + ".cat")
# Calibrate from reference
calibrate_from_reference(fname, "test.fits", pix_catalog)
# Store calibration
store_calibration(pix_catalog, fname + ".cal")
# Generate satellite predictions
generate_satellite_predictions(fname)
# Detect lines with 3D Hough transform
ids = find_hough3d_lines(fname, nhoughmin, houghrmin)
# Get properties
ff = fourframe(fname)
# Extract tracks
if is_calibrated(ff):
screenoutput_idents = extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, root_dir, results_dir)
# Stars available and used
nused = np.sum(pix_catalog.flag == 1)
nstars = pix_catalog.nstars
# Write output
screenoutput = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % (
ff.fname, ff.crval[0], ff.crval[1], nused, nstars,
3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean(
ff.zavg), np.std(ff.zavg))
if is_calibrated(ff):
screenoutput = colored(screenoutput, "green")
else:
screenoutput = colored(screenoutput, "red")
imgstat_output = ("%s,%.8lf,%.6f,%.6f,%.3f,%.3f,%.3f," + "%.3f,%d,%d\n") % (
(ff.fname, ff.mjd, ff.crval[0], ff.crval[1],
3600 * ff.crres[0], 3600 * ff.crres[1], np.mean(
ff.zavg), np.std(ff.zavg), nstars, nused))
# Move processed files
shutil.move(fname, os.path.join(processed_dir, fname))
shutil.move(fname + ".png", os.path.join(processed_dir, fname + ".png"))
shutil.move(fname + ".id", os.path.join(processed_dir, fname + ".id"))
shutil.move(fname + ".cat", os.path.join(processed_dir, fname + ".cat"))
shutil.move(fname + ".cal", os.path.join(processed_dir, fname + ".cal"))
return (screenoutput, imgstat_output, screenoutput_idents)
if __name__ == "__main__": if __name__ == "__main__":
# Read commandline options # Read commandline options
conf_parser = argparse.ArgumentParser(description='Process captured' + conf_parser = argparse.ArgumentParser(description='Process captured' +
@ -74,6 +153,10 @@ if __name__ == "__main__":
# Move to processing directory # Move to processing directory
os.chdir(args.file_dir) os.chdir(args.file_dir)
# Force single-threaded IERS table update, if needed
t = Time.now()
_ = t.ut1
# Statistics file # Statistics file
fstat = open("imgstat.csv", "w") fstat = open("imgstat.csv", "w")
fstat.write("fname,mjd,ra,de,rmsx,rmsy,mean,std,nstars,nused\n") fstat.write("fname,mjd,ra,de,rmsx,rmsy,mean,std,nstars,nused\n")
@ -95,12 +178,16 @@ if __name__ == "__main__":
if not os.path.exists(processed_dir): if not os.path.exists(processed_dir):
os.makedirs(processed_dir) os.makedirs(processed_dir)
cpu_count = os.cpu_count()
if (cpu_count > 1):
print("Processing with {} threads".format(cpu_count))
# Forever loop # Forever loop
while True: while True:
# Get files # Get files
fnames = sorted(glob.glob("2*.fits")) fnames = sorted(glob.glob("2*.fits"))
# Create reference calibration file # Create reference calibration file in single threaded environment
if not os.path.exists("test.fits"): if not os.path.exists("test.fits"):
solved = False solved = False
# Loop over files to find a suitable calibration file # Loop over files to find a suitable calibration file
@ -117,63 +204,36 @@ if __name__ == "__main__":
if solved: if solved:
break break
# Loop over files p = mp.Pool(processes=cpu_count)
for fname in fnames:
# Generate star catalog
if not os.path.exists(fname + ".cat"):
pix_catalog = generate_star_catalog(fname)
else:
pix_catalog = pixel_catalog(fname+".cat")
# Calibrate from reference try:
calibrate_from_reference(fname, "test.fits", pix_catalog) # Loop over files in parallel, print output every batch size of cpu_count
satobs_chunks = chunks(fnames,cpu_count)
for chk in satobs_chunks:
for result in p.map(process_loop, chk):
(screenoutput, fileoutput, screenoutput_idents) = result
# Store calibration fstat.write(fileoutput)
store_calibration(pix_catalog, fname + ".cal") print(screenoutput)
for [outfilename, iod_line, color] in screenoutput_idents:
print(colored(iod_line,color))
# Write iodline
with open(outfilename, "a") as fp:
fp.write("%s\n" % iod_line)
# Generate satellite predictions p.close()
generate_satellite_predictions(fname) p.join()
except KeyboardInterrupt:
# Detect lines with 3D Hough transform fstat.close()
ids = find_hough3d_lines(fname, nhoughmin, houghrmin) p.close()
sys.exit()
# Get properties
ff = fourframe(fname)
# Extract tracks
if is_calibrated(ff):
extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, root_dir, results_dir)
# Stars available and used
nused = np.sum(pix_catalog.flag == 1)
nstars = pix_catalog.nstars
# Write output
output = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % (
ff.fname, ff.crval[0], ff.crval[1], nused, nstars,
3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean(
ff.zavg), np.std(ff.zavg))
if is_calibrated(ff):
print(colored(output, "green"))
else:
print(colored(output, "red"))
fstat.write(
("%s,%.8lf,%.6f,%.6f,%.3f,%.3f,%.3f," + "%.3f,%d,%d\n") %
(ff.fname, ff.mjd, ff.crval[0], ff.crval[1],
3600 * ff.crres[0], 3600 * ff.crres[1], np.mean(
ff.zavg), np.std(ff.zavg), nstars, nused))
# Move processed files
shutil.move(fname, os.path.join(processed_dir, fname))
shutil.move(fname + ".png", os.path.join(processed_dir, fname + ".png"))
shutil.move(fname + ".id", os.path.join(processed_dir, fname + ".id"))
shutil.move(fname + ".cat", os.path.join(processed_dir, fname + ".cat"))
shutil.move(fname + ".cal", os.path.join(processed_dir, fname + ".cal"))
# Sleep # Sleep
try: try:
print("File queue empty, waiting for new files...\r", end = '')
time.sleep(10) time.sleep(10)
except KeyboardInterrupt: except KeyboardInterrupt:
fstat.close()
sys.exit() sys.exit()
# Close files # Close files

View File

@ -242,7 +242,10 @@ def is_calibrated(ff):
def generate_reference_with_anet(fname, cmd_args, reffname="test.fits", tempfroot="cal"): def generate_reference_with_anet(fname, cmd_args, reffname="test.fits", tempfroot="cal"):
# Copy file to generic name # Copy file to generic name
shutil.copy2(fname, tempfroot + ".fits") try:
shutil.copy2(fname, tempfroot + ".fits")
except PermissionError:
shutil.copyfile(fname, tempfroot + ".fits")
# Get center # Get center
hdu = fits.open(fname) hdu = fits.open(fname)

View File

@ -7,7 +7,6 @@ from stvid.astrometry import is_calibrated
import numpy as np import numpy as np
import ppgplot as ppg import ppgplot as ppg
from scipy import optimize, ndimage from scipy import optimize, ndimage
from termcolor import colored
import datetime import datetime
import astropy.units as u import astropy.units as u
from astropy.coordinates import SkyCoord from astropy.coordinates import SkyCoord
@ -163,9 +162,6 @@ def store_results(ident, fname, path, iod_line):
outfname = os.path.join(path, "unid/unid.dat") outfname = os.path.join(path, "unid/unid.dat")
color = "magenta" color = "magenta"
# Print iod line
print(colored(iod_line, color))
# Copy files # Copy files
pngfile = fname.replace(".fits", "_%05d.png" % ident.norad) pngfile = fname.replace(".fits", "_%05d.png" % ident.norad)
try: try:
@ -183,12 +179,8 @@ def store_results(ident, fname, path, iod_line):
if os.path.exists(pngfile): if os.path.exists(pngfile):
shutil.move(pngfile, os.path.join(dest, pngfile)) shutil.move(pngfile, os.path.join(dest, pngfile))
# Write iodline # Return IOD line for screen and file output
fp = open(outfname, "a") return (outfname, iod_line, color)
fp.write("%s\n" % iod_line)
fp.close()
return
def store_not_seen(ident, fname, path): def store_not_seen(ident, fname, path):
@ -256,6 +248,8 @@ def angular_velocity(ident, w, texp):
# Extract tracks # Extract tracks
def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, results_path): def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, results_path):
screenoutput_idents = []
# Read four frame # Read four frame
ff = fourframe(fname) ff = fourframe(fname)
@ -265,12 +259,10 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu
# Read satelite IDs # Read satelite IDs
try: try:
f = open(fname + ".id") with open(fname + ".id") as f:
lines = f.readlines()
except OSError: except OSError:
print("Cannot open", fname + ".id") print("Cannot open", fname + ".id")
else:
lines = f.readlines()
f.close()
tr = np.array([-0.5, 1.0, 0.0, -0.5, 0.0, 1.0]) tr = np.array([-0.5, 1.0, 0.0, -0.5, 0.0, 1.0])
@ -367,7 +359,8 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu
ppg.pgend() ppg.pgend()
# Store results # Store results
store_results(ident, fname, results_path, iod_line) outfilename, iod_line, color = store_results(ident, fname, results_path, iod_line)
screenoutput_idents.append([outfilename, iod_line, color])
elif ident.catalog.find("classfd.tle") > 0: elif ident.catalog.find("classfd.tle") > 0:
# Track and stack # Track and stack
@ -454,4 +447,8 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu
ppg.pgend() ppg.pgend()
# Store results # Store results
store_results(ident, fname, results_path, iod_line) outfilename, iod_line, color = store_results(ident, fname, results_path, iod_line)
screenoutput_idents.append([outfilename, iod_line, color])
# Return list of idents for screen output
return screenoutput_idents

View File

@ -3,7 +3,8 @@ from __future__ import print_function
import subprocess import subprocess
from stvid.stio import fourframe from stvid.stio import fourframe
from stvid.stio import satid from stvid.stio import satid
import os
import tempfile
def generate_satellite_predictions(fname): def generate_satellite_predictions(fname):
# Format command # Format command
@ -31,22 +32,27 @@ def find_hough3d_lines(fname, ntrkmin, trkrmin):
if len(t) < 2: if len(t) < 2:
return [] return []
# Save points to temporary file # Save points to a unique temporary file
with open("/tmp/hough.dat", "w") as f: (fd, tmpfile_path) = tempfile.mkstemp(prefix="hough_tmp",suffix=".dat")
for i in range(len(t)):
f.write("%f,%f,%f\n" % (x[i], y[i], z[i]))
f.close()
# Run 3D Hough line-finding algorithm
command = "hough3dlines -dx %d -minvotes %d %s" % (trkrmin, ntrkmin,
"/tmp/hough.dat")
try: try:
output = subprocess.check_output(command, with os.fdopen(fd, "w") as f:
shell=True, for i in range(len(t)):
stderr=subprocess.STDOUT) f.write("%f,%f,%f\n" % (x[i], y[i], z[i]))
except Exception:
return []
# Run 3D Hough line-finding algorithm
command = "hough3dlines -dx %d -minvotes %d %s" % (trkrmin, ntrkmin,
tmpfile_path)
try:
output = subprocess.check_output(command,
shell=True,
stderr=subprocess.STDOUT)
except Exception:
return []
finally:
os.remove(tmpfile_path)
# FIXME: Is this still needed? hough3dlines output does not appear to have these items -- interplanetarychris
# Clean output (a bit cluncky) # Clean output (a bit cluncky)
cleaned_output = output.decode("utf-8").replace("npoints=", "") cleaned_output = output.decode("utf-8").replace("npoints=", "")
cleaned_output = cleaned_output.replace(", a=(", " ") cleaned_output = cleaned_output.replace(", a=(", " ")
@ -76,9 +82,8 @@ def find_hough3d_lines(fname, ntrkmin, trkrmin):
lines.append(line) lines.append(line)
# Store identifications # Store identifications
fp = open(fname + ".id", "a") with open(fname + ".id", "a") as fp:
for line in lines: for line in lines:
fp.write(line) fp.write(line)
fp.close()
return [satid(line) for line in lines] return [satid(line) for line in lines]

View File

@ -31,23 +31,21 @@ class pixel_catalog:
def generate_star_catalog(fname): def generate_star_catalog(fname):
catalog_fname = fname + ".cat"
# Skip if file already exists # Skip if file already exists
if not os.path.exists(os.path.join(fname, ".cat")): if not os.path.exists(catalog_fname):
# Get sextractor location # Get sextractor location
env = os.getenv("ST_DATADIR") env = os.getenv("ST_DATADIR")
# Format command # Format command
command = "sextractor %s -c %s/sextractor/default.sex" % (fname, env) command = "sextractor %s -c %s/sextractor/default.sex -CATALOG_NAME %s " % (fname, env, catalog_fname)
# Run sextractor # Run sextractor
output = subprocess.check_output(command, shell=True, output = subprocess.check_output(command, shell=True,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
# Rename file return pixel_catalog(catalog_fname)
if os.path.exists("test.cat"):
os.rename("test.cat", fname+".cat")
return pixel_catalog(fname+".cat")
def store_calibration(pix_catalog, fname): def store_calibration(pix_catalog, fname):