From b7056d9329b9d8c1111dd9316e13731ca27d0bc4 Mon Sep 17 00:00:00 2001 From: Chris Lewicki Date: Thu, 12 Dec 2019 14:54:46 -0800 Subject: [PATCH] Implements parallel processing of satobs, where multiple-threads are available About 2x faster on a 2-core system, and 3-4x faster on an 8-core system A balance between regular user feedback and maximum usage of CPUs results in less-than-fastest possible performance. For fastest performance, increase chunk size to a value larger than cpu_count. (Perhaps make this a parameter?) process.py - Implements a multiprocess worker pool of size os.cpu_count() - Batches processes in size os.cpu_count() - Creates pixel_catalog file using source FITS filename, instead of "test.cat" - Forces single-threaded astropy IERS table update, if needed - Receives all file and screen output from children, so that it can be output in the order of the files processed - Changes display order of FITS file being processed (first) followed by SATIDs within - Adds temporary notice of empty input queue - Adds description of screen color codes astrometry.py - Adds fall-back method of shutil.copyfile if PermissionError from copy2 extract.py - Removes terminal output, and instead returns results to calling processes - Tightens up file/io loops satellite.py - Saves hough3dlines output to unique tmp file instead of /tmp/hough.dat - Removes temp file after processing - Adds FIXME note to review for unnecessary code stars.py - Generates star catalog directly to destination file - Updates sextractor call to reference destination instead of tmp file --- process.py | 160 ++++++++++++++++++++++++++++++-------------- stvid/astrometry.py | 5 +- stvid/extract.py | 29 ++++---- stvid/satellite.py | 41 +++++++----- stvid/stars.py | 12 ++-- 5 files changed, 155 insertions(+), 92 deletions(-) diff --git a/process.py b/process.py index 2bf4490..bf4711a 100755 --- a/process.py +++ b/process.py @@ -15,6 +15,8 @@ from stvid.extract import extract_tracks import astropy.units as u from astropy.utils.exceptions import AstropyWarning from astropy.coordinates import EarthLocation +from astropy.time import Time # For getting IERS table in single-thread session +import multiprocessing as mp import warnings import configparser import argparse @@ -24,6 +26,83 @@ import time import shutil import sys +""" +process.py - Utility to process stvid/acquire.py FITS images to detect and + extract satellite positions and create IODs. + +Terminal output Color Codes: + GREEN: Calibrated file + RED: Not calibrated file + YELLOW: Computing astrometric calibration + BLUE: Classified satellite + GREY: Catalog satellite + MAGENTA: UNID satellite +""" + +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i:i + n] + +def process_loop(fname): + """ + Thread to process satobs FourFrame FITS files in a multi-thread compatible manner + """ + + # Generate star catalog + if not os.path.exists(fname + ".cat"): + pix_catalog = generate_star_catalog(fname) + else: + pix_catalog = pixel_catalog(fname + ".cat") + + # Calibrate from reference + calibrate_from_reference(fname, "test.fits", pix_catalog) + + # Store calibration + store_calibration(pix_catalog, fname + ".cal") + + # Generate satellite predictions + generate_satellite_predictions(fname) + + # Detect lines with 3D Hough transform + ids = find_hough3d_lines(fname, nhoughmin, houghrmin) + + # Get properties + ff = fourframe(fname) + + # Extract tracks + if is_calibrated(ff): + screenoutput_idents = extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, root_dir, results_dir) + + # Stars available and used + nused = np.sum(pix_catalog.flag == 1) + nstars = pix_catalog.nstars + + # Write output + screenoutput = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % ( + ff.fname, ff.crval[0], ff.crval[1], nused, nstars, + 3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean( + ff.zavg), np.std(ff.zavg)) + + if is_calibrated(ff): + screenoutput = colored(screenoutput, "green") + else: + screenoutput = colored(screenoutput, "red") + + imgstat_output = ("%s,%.8lf,%.6f,%.6f,%.3f,%.3f,%.3f," + "%.3f,%d,%d\n") % ( + (ff.fname, ff.mjd, ff.crval[0], ff.crval[1], + 3600 * ff.crres[0], 3600 * ff.crres[1], np.mean( + ff.zavg), np.std(ff.zavg), nstars, nused)) + + # Move processed files + shutil.move(fname, os.path.join(processed_dir, fname)) + shutil.move(fname + ".png", os.path.join(processed_dir, fname + ".png")) + shutil.move(fname + ".id", os.path.join(processed_dir, fname + ".id")) + shutil.move(fname + ".cat", os.path.join(processed_dir, fname + ".cat")) + shutil.move(fname + ".cal", os.path.join(processed_dir, fname + ".cal")) + + return (screenoutput, imgstat_output, screenoutput_idents) + if __name__ == "__main__": # Read commandline options conf_parser = argparse.ArgumentParser(description='Process captured' + @@ -74,6 +153,10 @@ if __name__ == "__main__": # Move to processing directory os.chdir(args.file_dir) + # Force single-threaded IERS table update, if needed + t = Time.now() + _ = t.ut1 + # Statistics file fstat = open("imgstat.csv", "w") fstat.write("fname,mjd,ra,de,rmsx,rmsy,mean,std,nstars,nused\n") @@ -95,12 +178,16 @@ if __name__ == "__main__": if not os.path.exists(processed_dir): os.makedirs(processed_dir) + cpu_count = os.cpu_count() + if (cpu_count > 1): + print("Processing with {} threads".format(cpu_count)) + # Forever loop while True: # Get files fnames = sorted(glob.glob("2*.fits")) - # Create reference calibration file + # Create reference calibration file in single threaded environment if not os.path.exists("test.fits"): solved = False # Loop over files to find a suitable calibration file @@ -117,63 +204,36 @@ if __name__ == "__main__": if solved: break - # Loop over files - for fname in fnames: - # Generate star catalog - if not os.path.exists(fname + ".cat"): - pix_catalog = generate_star_catalog(fname) - else: - pix_catalog = pixel_catalog(fname+".cat") + p = mp.Pool(processes=cpu_count) - # Calibrate from reference - calibrate_from_reference(fname, "test.fits", pix_catalog) + try: + # Loop over files in parallel, print output every batch size of cpu_count + satobs_chunks = chunks(fnames,cpu_count) + for chk in satobs_chunks: + for result in p.map(process_loop, chk): + (screenoutput, fileoutput, screenoutput_idents) = result - # Store calibration - store_calibration(pix_catalog, fname + ".cal") + fstat.write(fileoutput) + print(screenoutput) + for [outfilename, iod_line, color] in screenoutput_idents: + print(colored(iod_line,color)) + # Write iodline + with open(outfilename, "a") as fp: + fp.write("%s\n" % iod_line) - # Generate satellite predictions - generate_satellite_predictions(fname) - - # Detect lines with 3D Hough transform - ids = find_hough3d_lines(fname, nhoughmin, houghrmin) - - # Get properties - ff = fourframe(fname) - - # Extract tracks - if is_calibrated(ff): - extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, root_dir, results_dir) - - # Stars available and used - nused = np.sum(pix_catalog.flag == 1) - nstars = pix_catalog.nstars - - # Write output - output = "%s %10.6f %10.6f %4d/%4d %5.1f %5.1f %6.2f +- %6.2f" % ( - ff.fname, ff.crval[0], ff.crval[1], nused, nstars, - 3600.0 * ff.crres[0], 3600.0 * ff.crres[1], np.mean( - ff.zavg), np.std(ff.zavg)) - if is_calibrated(ff): - print(colored(output, "green")) - else: - print(colored(output, "red")) - fstat.write( - ("%s,%.8lf,%.6f,%.6f,%.3f,%.3f,%.3f," + "%.3f,%d,%d\n") % - (ff.fname, ff.mjd, ff.crval[0], ff.crval[1], - 3600 * ff.crres[0], 3600 * ff.crres[1], np.mean( - ff.zavg), np.std(ff.zavg), nstars, nused)) - - # Move processed files - shutil.move(fname, os.path.join(processed_dir, fname)) - shutil.move(fname + ".png", os.path.join(processed_dir, fname + ".png")) - shutil.move(fname + ".id", os.path.join(processed_dir, fname + ".id")) - shutil.move(fname + ".cat", os.path.join(processed_dir, fname + ".cat")) - shutil.move(fname + ".cal", os.path.join(processed_dir, fname + ".cal")) + p.close() + p.join() + except KeyboardInterrupt: + fstat.close() + p.close() + sys.exit() # Sleep try: + print("File queue empty, waiting for new files...\r", end = '') time.sleep(10) except KeyboardInterrupt: + fstat.close() sys.exit() # Close files diff --git a/stvid/astrometry.py b/stvid/astrometry.py index 05e1e34..52a789f 100644 --- a/stvid/astrometry.py +++ b/stvid/astrometry.py @@ -242,7 +242,10 @@ def is_calibrated(ff): def generate_reference_with_anet(fname, cmd_args, reffname="test.fits", tempfroot="cal"): # Copy file to generic name - shutil.copy2(fname, tempfroot + ".fits") + try: + shutil.copy2(fname, tempfroot + ".fits") + except PermissionError: + shutil.copyfile(fname, tempfroot + ".fits") # Get center hdu = fits.open(fname) diff --git a/stvid/extract.py b/stvid/extract.py index 67319b2..8e77696 100644 --- a/stvid/extract.py +++ b/stvid/extract.py @@ -7,7 +7,6 @@ from stvid.astrometry import is_calibrated import numpy as np import ppgplot as ppg from scipy import optimize, ndimage -from termcolor import colored import datetime import astropy.units as u from astropy.coordinates import SkyCoord @@ -163,9 +162,6 @@ def store_results(ident, fname, path, iod_line): outfname = os.path.join(path, "unid/unid.dat") color = "magenta" - # Print iod line - print(colored(iod_line, color)) - # Copy files pngfile = fname.replace(".fits", "_%05d.png" % ident.norad) try: @@ -183,12 +179,8 @@ def store_results(ident, fname, path, iod_line): if os.path.exists(pngfile): shutil.move(pngfile, os.path.join(dest, pngfile)) - # Write iodline - fp = open(outfname, "a") - fp.write("%s\n" % iod_line) - fp.close() - - return + # Return IOD line for screen and file output + return (outfname, iod_line, color) def store_not_seen(ident, fname, path): @@ -256,6 +248,8 @@ def angular_velocity(ident, w, texp): # Extract tracks def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, results_path): + screenoutput_idents = [] + # Read four frame ff = fourframe(fname) @@ -265,12 +259,10 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu # Read satelite IDs try: - f = open(fname + ".id") + with open(fname + ".id") as f: + lines = f.readlines() except OSError: print("Cannot open", fname + ".id") - else: - lines = f.readlines() - f.close() tr = np.array([-0.5, 1.0, 0.0, -0.5, 0.0, 1.0]) @@ -367,7 +359,8 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu ppg.pgend() # Store results - store_results(ident, fname, results_path, iod_line) + outfilename, iod_line, color = store_results(ident, fname, results_path, iod_line) + screenoutput_idents.append([outfilename, iod_line, color]) elif ident.catalog.find("classfd.tle") > 0: # Track and stack @@ -454,4 +447,8 @@ def extract_tracks(fname, trkrmin, drdtmin, drdtmax, trksig, ntrkmin, path, resu ppg.pgend() # Store results - store_results(ident, fname, results_path, iod_line) + outfilename, iod_line, color = store_results(ident, fname, results_path, iod_line) + screenoutput_idents.append([outfilename, iod_line, color]) + + # Return list of idents for screen output + return screenoutput_idents diff --git a/stvid/satellite.py b/stvid/satellite.py index 978341f..5cf1252 100644 --- a/stvid/satellite.py +++ b/stvid/satellite.py @@ -3,7 +3,8 @@ from __future__ import print_function import subprocess from stvid.stio import fourframe from stvid.stio import satid - +import os +import tempfile def generate_satellite_predictions(fname): # Format command @@ -31,22 +32,27 @@ def find_hough3d_lines(fname, ntrkmin, trkrmin): if len(t) < 2: return [] - # Save points to temporary file - with open("/tmp/hough.dat", "w") as f: - for i in range(len(t)): - f.write("%f,%f,%f\n" % (x[i], y[i], z[i])) - f.close() + # Save points to a unique temporary file + (fd, tmpfile_path) = tempfile.mkstemp(prefix="hough_tmp",suffix=".dat") - # Run 3D Hough line-finding algorithm - command = "hough3dlines -dx %d -minvotes %d %s" % (trkrmin, ntrkmin, - "/tmp/hough.dat") try: - output = subprocess.check_output(command, - shell=True, - stderr=subprocess.STDOUT) - except Exception: - return [] + with os.fdopen(fd, "w") as f: + for i in range(len(t)): + f.write("%f,%f,%f\n" % (x[i], y[i], z[i])) + # Run 3D Hough line-finding algorithm + command = "hough3dlines -dx %d -minvotes %d %s" % (trkrmin, ntrkmin, + tmpfile_path) + try: + output = subprocess.check_output(command, + shell=True, + stderr=subprocess.STDOUT) + except Exception: + return [] + finally: + os.remove(tmpfile_path) + + # FIXME: Is this still needed? hough3dlines output does not appear to have these items -- interplanetarychris # Clean output (a bit cluncky) cleaned_output = output.decode("utf-8").replace("npoints=", "") cleaned_output = cleaned_output.replace(", a=(", " ") @@ -76,9 +82,8 @@ def find_hough3d_lines(fname, ntrkmin, trkrmin): lines.append(line) # Store identifications - fp = open(fname + ".id", "a") - for line in lines: - fp.write(line) - fp.close() + with open(fname + ".id", "a") as fp: + for line in lines: + fp.write(line) return [satid(line) for line in lines] diff --git a/stvid/stars.py b/stvid/stars.py index 40ab324..623b8a4 100644 --- a/stvid/stars.py +++ b/stvid/stars.py @@ -31,23 +31,21 @@ class pixel_catalog: def generate_star_catalog(fname): + catalog_fname = fname + ".cat" + # Skip if file already exists - if not os.path.exists(os.path.join(fname, ".cat")): + if not os.path.exists(catalog_fname): # Get sextractor location env = os.getenv("ST_DATADIR") # Format command - command = "sextractor %s -c %s/sextractor/default.sex" % (fname, env) + command = "sextractor %s -c %s/sextractor/default.sex -CATALOG_NAME %s " % (fname, env, catalog_fname) # Run sextractor output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) - # Rename file - if os.path.exists("test.cat"): - os.rename("test.cat", fname+".cat") - - return pixel_catalog(fname+".cat") + return pixel_catalog(catalog_fname) def store_calibration(pix_catalog, fname):