"""This is where the automatic mode window is implemented."""
import sys
import os
import copy
import threading
import time
import random
import datetime
import zoneinfo
from PySide6 import QtCore, QtWidgets, QtGui
import opihiexarata
import opihiexarata.library as library
import opihiexarata.library.error as error
import opihiexarata.library.hint as hint
import opihiexarata.astrometry as astrometry
import opihiexarata.photometry as photometry
import opihiexarata.gui as gui
[docs]
class OpihiAutomaticWindow(QtWidgets.QMainWindow):
"""The GUI that is responsible for the implementation of the automatic mode
of Opihi, fetching images automatically based on time and solving for both
the astrometry and photometry.
Only non-GUI attributes are listed here.
Attributes
----------
fits_fetch_directory : string
The directory which the fits files should be automatically pulled from.
fetched_fits_filename : string
The filename of the fits file which has just most recently fetched.
working_fits_filename : string
The filename of the fits file which is being worked on, or will be
worked on.
results_fits_filename : string
The filename of the fits file which has already been solved. The
results of which is posted.
fetch_filename_record : list
A list of all of the files which were fetched before and so it creates
a record of files which not to do.
results_opihi_solution : OpihiSolution
The OpihiSolution of the current results fits file. The results fits
file solution, when determined to be solved, should be saved to disk
automatically.
preprocess_solution : OpihiPreprocessSolution
The preprocessing solution which is used to convert raw images to
preprocessed files.
zero_point_database : OpihiZeroPointDatabaseSolution
If a zero point database is going to be constructed, as per the
configuration file, this is the instance which manages the database.
loop_state : string
The loop state.
"""
[docs]
def __init__(self) -> None:
"""The automatic GUI window for OpihiExarata. This interacts with
the user with regards to the automatic solving mode of Opihi.
Parameters
----------
None
Returns
-------
None
"""
# Creating the GUI itself using the Qt framework and the converted
# Qt designer files.
super(OpihiAutomaticWindow, self).__init__()
self.ui = gui.qtui.Ui_AutomaticWindow()
self.ui.setupUi(self)
# Window icon, we use the default for now.
gui.functions.apply_window_icon(window=self, icon_path=None)
# Establishing the defaults for all of the relevant attributes.
self.fits_fetch_directory = None
self.fetch_fits_filename = None
self.working_fits_filename = None
self.results_fits_filename = None
self.fetch_filename_record = []
self.fetch_opihi_solution = None
self.results_opihi_solution = None
self.preprocess_solution = None
self.zero_point_database = None
self.loop_state = "None"
# The configuration file has a default fits fetch directory.
AF_DIR = library.config.GUI_AUTOMATIC_INITIAL_AUTOMATIC_IMAGE_FETCHING_DIRECTORY
if os.path.isdir(AF_DIR):
self.fits_fetch_directory = os.path.abspath(AF_DIR)
else:
self.fits_fetch_directory = None
# Preparing the buttons, GUI, and other functionality.
self.__init_gui_connections()
self.__init_preprocess_solution()
# Preparing the zero point database if the user desired the database
# to record observations.
if library.config.GUI_AUTOMATIC_DATABASE_SAVE_OBSERVATIONS:
database = opihiexarata.OpihiZeroPointDatabaseSolution(
database_directory=library.config.MONITOR_DATABASE_DIRECTORY
)
else:
database = None
self.zero_point_database = database
# Update all of the text.
self.refresh_window()
# All done.
return None
def __init_gui_connections(self) -> None:
"""Creating the function connections for the GUI interface.
Parameters
----------
None
Returns
-------
None
"""
# For the fetch directory.
self.ui.push_button_change_directory.clicked.connect(
self.__connect_push_button_change_directory
)
# For the start and stop buttons.
self.ui.push_button_start.clicked.connect(self.__connect_push_button_start)
self.ui.push_button_stop.clicked.connect(self.__connect_push_button_stop)
self.ui.push_button_trigger.clicked.connect(self.__connect_push_button_trigger)
# All done.
return None
def __init_preprocess_solution(self):
"""Initialize the preprocessing solution. The preprocessing files
should be specified in the configuration file.
Parameters
----------
None
Returns
-------
None
"""
# Using the configuration file to extract where the preprocessing
# filenames are to build the solution.
try:
preprocess = opihiexarata.OpihiPreprocessSolution(
mask_c_fits_filename=library.config.PREPROCESS_MASK_C_FITS_FILENAME,
mask_g_fits_filename=library.config.PREPROCESS_MASK_G_FITS_FILENAME,
mask_r_fits_filename=library.config.PREPROCESS_MASK_R_FITS_FILENAME,
mask_i_fits_filename=library.config.PREPROCESS_MASK_I_FITS_FILENAME,
mask_z_fits_filename=library.config.PREPROCESS_MASK_Z_FITS_FILENAME,
mask_1_fits_filename=library.config.PREPROCESS_MASK_1_FITS_FILENAME,
mask_2_fits_filename=library.config.PREPROCESS_MASK_2_FITS_FILENAME,
mask_b_fits_filename=library.config.PREPROCESS_MASK_B_FITS_FILENAME,
flat_c_fits_filename=library.config.PREPROCESS_FLAT_C_FITS_FILENAME,
flat_g_fits_filename=library.config.PREPROCESS_FLAT_G_FITS_FILENAME,
flat_r_fits_filename=library.config.PREPROCESS_FLAT_R_FITS_FILENAME,
flat_i_fits_filename=library.config.PREPROCESS_FLAT_I_FITS_FILENAME,
flat_z_fits_filename=library.config.PREPROCESS_FLAT_Z_FITS_FILENAME,
flat_1_fits_filename=library.config.PREPROCESS_FLAT_1_FITS_FILENAME,
flat_2_fits_filename=library.config.PREPROCESS_FLAT_2_FITS_FILENAME,
flat_b_fits_filename=library.config.PREPROCESS_FLAT_B_FITS_FILENAME,
bias_fits_filename=library.config.PREPROCESS_BIAS_FITS_FILENAME,
dark_current_fits_filename=library.config.PREPROCESS_DARK_CURRENT_FITS_FILENAME,
linearity_fits_filename=library.config.PREPROCESS_LINEARITY_FITS_FILENAME,
)
except Exception as err:
# Something failed with making the preprocess solution, a
# configuration file issue is likely the reason.
error.warn(
warn_class=error.UnknownWarning,
message=(
"We are not sure why the preprocess solution failed. {e}".format(
e=err
)
),
)
finally:
self.preprocess_solution = preprocess
# All done.
return None
def __connect_push_button_change_directory(self) -> None:
"""The connection for the button to change the automatic fetch
directory.
Parameters
----------
None
Returns
-------
None
"""
# Ask the user for the filename via a dialog.
# We start off from the current one for some semblance of consistency.
new_fetch_directory = QtWidgets.QFileDialog.getExistingDirectory(
parent=self,
caption="Select Opihi Fetch Directory",
dir=self.fits_fetch_directory,
options=QtWidgets.QFileDialog.ShowDirsOnly,
)
# If the user did not provide a file to enter, there is nothing to be
# changed.
if os.path.isdir(new_fetch_directory):
# Assign the new fits filename.
self.fits_fetch_directory = os.path.abspath(new_fetch_directory)
else:
# Nothing to do.
pass
# Refresh the GUI information.
self.refresh_window()
# All done.
return None
def __connect_push_button_trigger(self) -> None:
"""This does one process, fetching a single image and processing it as
normal. However, it does not trigger the automatic mode loop as it is
built for a single image only.
Parameters
----------
None
Returns
-------
None
"""
# As we instigated a manual trigger, update the GUI/status.
self.loop_state = "trigger"
self.refresh_window()
# We just call the trigger itself. We still thread it out as to not
# completely freeze the GUI.
self.threaded_trigger_opihi_image_solve()
return None
def __connect_push_button_start(self) -> None:
"""This enables the automatic active mode by changing the flag and
starting the process.
Parameters
----------
None
Returns
-------
None
"""
# Enable the flag.
self.loop_state = "running"
# Start the automatic loop.
self.threaded_automatic_opihi_image_solve()
self.refresh_window()
# All done.
return None
def __connect_push_button_stop(self) -> None:
"""This disables the automatic active mode by changing the flag. The
loop itself should detect that the flag has changed. It finishes the
current process but does not fetch any more.
Parameters
----------
None
Returns
-------
None
"""
# Enable the flag.
self.loop_state = "stopped"
self.refresh_window()
# All done.
return None
[docs]
def fetch_new_filename(self) -> str:
"""This function fetches a new fits filename based on the most recent
filename within the automatic fetching directory.
Parameters
----------
None
Returns
-------
fetched_filename : string
The filename that was fetched. It is the most recent file added to
the automatic fetching directory.
"""
# We are looking for only fits files.
try:
fits_extension = "fits"
fetched_filename = library.path.get_most_recent_filename_in_directory(
directory=self.fits_fetch_directory,
extension=fits_extension,
recursive=True,
exclude_opihiexarata_output_files=True,
)
except ValueError:
# There is likely no actual matching file in the directory.
fetched_filename = None
else:
# Absolute paths are generally much easier to work with.
fetched_filename = os.path.abspath(fetched_filename)
return fetched_filename
[docs]
def verify_new_filename(self, filename: str) -> bool:
"""This function verifies a filename. Basically, it checks that the
file exists and has not already been done before.
Parameters
----------
filename : string
The filename to verify.
Returns
-------
verification : bool
If the filename is good, it it True.
"""
# We assume the filename is good.
verification = True
# Initial check.
if filename is None:
# There is no filename.
verification = False
return verification
else:
# Absolute paths are easier to work with.
filename = os.path.abspath(str(filename))
# We first need to test if the file even exits.
if not os.path.isfile(filename):
# The file does not exist so it fails verification.
verification = False
# If the file is the same as the results file, it
# has already been done.
working_fits_filename = str(copy.deepcopy(self.working_fits_filename))
results_fits_filename = str(copy.deepcopy(self.results_fits_filename))
if os.path.isfile(working_fits_filename) and os.path.samefile(
filename, working_fits_filename
):
verification = False
if os.path.isfile(results_fits_filename) and os.path.samefile(
filename, results_fits_filename
):
verification = False
# If the file is already a processed file.
if library.config.PREPROCESS_DEFAULT_SAVING_SUFFIX in filename:
verification = False
if library.config.GUI_AUTOMATIC_DEFAULT_FITS_SAVING_SUFFIX in filename:
verification = False
# If there exists processed versions of the file.
file_dir, file_base, file_ext = library.path.split_pathname(pathname=filename)
proposed_preprocess_filename = library.path.merge_pathname(
directory=file_dir,
filename=file_base + library.config.PREPROCESS_DEFAULT_SAVING_SUFFIX,
extension=file_ext,
)
proposed_solved_filename = library.path.merge_pathname(
directory=file_dir,
filename=file_base
+ library.config.GUI_AUTOMATIC_DEFAULT_FITS_SAVING_SUFFIX,
extension=file_ext,
)
proposed_preprocess_solved_filename = library.path.merge_pathname(
directory=file_dir,
filename=file_base
+ library.config.PREPROCESS_DEFAULT_SAVING_SUFFIX
+ library.config.GUI_AUTOMATIC_DEFAULT_FITS_SAVING_SUFFIX,
extension=file_ext,
)
if os.path.isfile(proposed_preprocess_filename):
verification = False
if os.path.isfile(proposed_solved_filename):
verification = False
if os.path.isfile(proposed_preprocess_solved_filename):
verification = False
# We do an early exit check here to save processing time.
if not verification:
return verification
# Now we check based on all previously fetched files.
for filedex in copy.deepcopy(self.fetch_filename_record):
if filename == filedex:
verification = False
return bool(verification)
[docs]
def trigger_opihi_image_solve(self) -> None:
"""This function does a single instance of the automatic solving.
Parameters
----------
None
Returns
-------
None
"""
# It is always good to have a refresh of the information.
self.refresh_window()
# A new image is to be solved. We fetch the new image.
self.fetch_fits_filename = self.fetch_new_filename()
self.refresh_window()
# We need to verify the filename before doing anything.
if not self.verify_new_filename(filename=self.fetch_fits_filename):
# The file cannot be verified to be a good file so we do nothing.
return None
# The filename is officially a working filename now.
working_fits_filename = copy.deepcopy(self.fetch_fits_filename)
self.working_fits_filename = copy.deepcopy(working_fits_filename)
self.refresh_window()
# We add it to the record.
self.fetch_filename_record.append(working_fits_filename)
# We have a new file, however, in the unlikely event that this
# file is still being written and is locked under permissions because
# it is mid-write, we wait a little bit.
library.http.api_request_sleep(seconds=1)
# If we have a preprocessing solution, we can preprocess the data first.
if isinstance(self.preprocess_solution, opihiexarata.OpihiPreprocessSolution):
preprocess_filename = self.preprocess_opihi_image(
filename=working_fits_filename
)
else:
# No preprocessing done.
preprocess_filename = working_fits_filename
# We need to determine the engines which we will be using to solve
# this image.
astrometry_engine_name = self.ui.combo_box_astrometry_engine.currentText()
astrometry_engine_name = astrometry_engine_name.casefold()
photometry_engine_name = self.ui.combo_box_photometry_engine.currentText()
photometry_engine_name = photometry_engine_name.casefold()
astrometry_engine = opihiexarata.gui.functions.pick_engine_class_from_name(
engine_name=astrometry_engine_name,
engine_type=library.engine.AstrometryEngine,
)
photometry_engine = opihiexarata.gui.functions.pick_engine_class_from_name(
engine_name=photometry_engine_name,
engine_type=library.engine.PhotometryEngine,
)
# Now, we try and solve the image.
opihi_solution = self.solve_opihi_image(
filename=preprocess_filename,
astrometry_engine=astrometry_engine,
photometry_engine=photometry_engine,
)
# Refreshing any data.
self.refresh_window()
# We attempt to write a zero point record to the database, the wrapper
# writing function checks if writing to the database is a valid
# operation. We work on a copy of the solution just in case.
self.write_zero_point_record_to_database(opihi_solution=opihi_solution)
# Finally, we try and save the image.
# Extracting the entire path from the current name, we are saving it
# to the same location.
directory, basename, extension = library.path.split_pathname(
pathname=preprocess_filename
)
# We are just adding the suffix to the filename.
new_basename = (
basename + library.config.GUI_AUTOMATIC_DEFAULT_FITS_SAVING_SUFFIX
)
# Recombining the path.
saving_fits_filename = library.path.merge_pathname(
directory=directory, filename=new_basename, extension=extension
)
opihi_solution.save_to_fits_file(filename=saving_fits_filename, overwrite=True)
# The solution is now the most recent results solution.
self.results_fits_filename = copy.deepcopy(saving_fits_filename)
self.results_opihi_solution = copy.deepcopy(opihi_solution)
# Refreshing any data.
self.refresh_window()
# All done.
return None
[docs]
def threaded_trigger_opihi_image_solve(self) -> None:
"""This function is just a wrapper around the original function to
allow for threading.
Parameters
----------
None
Returns
-------
None
"""
# We just call the trigger itself. We still thread it out as to not
# completely freeze the GUI.
trigger_solving_thread = threading.Thread(target=self.trigger_opihi_image_solve)
trigger_solving_thread.start()
[docs]
def automatic_opihi_image_solve(self) -> None:
"""This function contains the loop which runs to do automatic solving.
We just model automatic mode as repeatedly clicking the trigger button.
Parameters
----------
None
Returns
-------
None
"""
# Just to check if there is already a reason to stop.
stop = self.check_automatic_stops()
self.refresh_window()
if stop:
# We should not even try to do the loop, we are stopped.
return None
# Automatic triggering is an infinite loop as we want to do it
# until stopped via the stop checks.
while not stop:
# See if we are to stop; if so, we stop.
stop = self.check_automatic_stops()
if stop:
break
# We take a little break to ensure that, in the case of no new
# file from the trigger, we are not hammering the disk too hard.
__ = library.http.api_request_sleep(
seconds=library.config.GUI_AUTOMATIC_SOLVE_LOOP_COOLDOWN_DELAY_SECONDS
)
# We attempt to do another trigger solve.
self.threaded_trigger_opihi_image_solve()
# Refreshing the window.
self.refresh_window()
# The loop has been broken and likely this is because the stop check
# signified to stop. Either way, the automatic loop is no longer
# active.
stop = True
self.loop_state = "stopped"
self.refresh_window()
# All done.
self.refresh_window()
[docs]
def threaded_automatic_opihi_image_solve(self) -> None:
"""This function is just a wrapper around the original function to
allow for threading.
Parameters
----------
None
Returns
-------
None
"""
# We just call the trigger itself. We still thread it out as to not
# completely freeze the GUI.
automatic_solving_thread = threading.Thread(
target=self.automatic_opihi_image_solve
)
automatic_solving_thread.start()
[docs]
def check_automatic_stops(self) -> bool:
"""This function checks for the stops to stop the automatic triggering
of the next image.
Parameters
----------
None
Returns
-------
stop : bool
This is the flag which signifies if the triggering should stop or
not. If True, the triggering should stop.
"""
try:
# Assume we keep on going.
stop = False
# Check if internally the GUI stop flag was made.
if self.loop_state == "running":
stop = False
else:
stop = True
# The automatic mode should not be running during the day. We set
# this time as a "good enough" always-daytime limit. This also serves
# to stop it.
# We get the timezone we are checking, this is important as the
# configuration values are local time.
if library.config.GUI_AUTOMATIC_DAYTIME_BREAK_TIMEZONE is None:
local_timezone = "Etc/UTC"
else:
local_timezone = library.config.GUI_AUTOMATIC_DAYTIME_BREAK_TIMEZONE
local_time = datetime.datetime.now(zoneinfo.ZoneInfo(local_timezone))
local_hour = local_time.hour
# Configuration based Hardcoded "daytime hours".
if (
library.config.GUI_AUTOMATIC_DAYTIME_BREAK_LOWER_HOUR
<= local_hour
<= library.config.GUI_AUTOMATIC_DAYTIME_BREAK_UPPER_HOUR
):
stop = True
# Check if a stop file was placed in the directory where the automatic
# files are being retrieved from. As this is a manual file intervention
# the program is considered halted rather than stopped.
stop_file_dir = self.fits_fetch_directory
stop_file_fname = "opihiexarata"
stop_file_ext = "stop"
stop_file_pathname = library.path.merge_pathname(
directory=stop_file_dir,
filename=stop_file_fname,
extension=stop_file_ext,
)
if os.path.exists(stop_file_pathname):
self.loop_state = "halted"
stop = True
except Exception as err:
# For some reason, one of the stop checks could not be done
# properly, we stop.
stop = True
# All done.
return stop
[docs]
def preprocess_opihi_image(self, filename: str) -> str:
"""This function preprocess an Opihi image, where available and
returns the filename of the preprocessed file.
Parameters
----------
filename : string
The filename of file which will be preprocessed.
Returns
-------
preprocess_filename : string
The filename of the file which has been preprocessed with the
preprocessed solution of this class instance.
"""
# We first need to check that we have a preprocess solution.
if not isinstance(
self.preprocess_solution, opihiexarata.OpihiPreprocessSolution
):
raise error.InputError(
"The preprocess solution does not exist, we cannot preprocess any data."
)
# We check if the file was already preprocessed.
header, __ = library.fits.read_fits_image_file(filename=filename)
is_preprocessed = header.get("OXM_REDU", False)
if is_preprocessed:
# The file is already preprocessed, nothing to do.
return filename
# Deriving the preprocessed filename.
file_dir, file_base, file_ext = library.path.split_pathname(pathname=filename)
preprocess_filename = library.path.merge_pathname(
directory=file_dir,
filename=file_base + library.config.PREPROCESS_DEFAULT_SAVING_SUFFIX,
extension=file_ext,
)
# Finally, we attempt to preprocess the data.
try:
self.preprocess_solution.preprocess_fits_file(
raw_filename=filename,
out_filename=preprocess_filename,
overwrite=True,
)
except Exception as err:
# Sending out a warning.
error.warn(
warn_class=error.UnknownWarning,
message=(
"The data could not be preprocessed, an error was thrown: {e}".format(
e=err
)
),
)
# For some reason, the preprocessing failed. Reverting.
preprocess_filename = filename
# All done.
return preprocess_filename
[docs]
@staticmethod
def solve_opihi_image(
filename: str,
astrometry_engine: hint.AstrometryEngine,
photometry_engine: hint.PhotometryEngine,
) -> hint.OpihiSolution:
"""This function solves the Opihi image provided by the filename.
We use a static method here to be a little more thread safe.
Parameters
----------
filename : string
The filename to load and solve.
astrometry_engine : AstrometryEngine
The astrometry engine to use.
photometry_engine : PhotometryEngine
The photometry engine to use.
Returns
-------
opihi_solution : OpihiSolution
The solution class of the Opihi image after it has been solved
(or at least attempted to be).
"""
# Extracting the header of this fits file to get the observing
# metadata from it.
header, __ = library.fits.read_fits_image_file(filename=filename)
# The filter which image is in, extracted from the fits file,
# assuming standard form.
filter_header_string = str(header["FWHL"])
filter_name = library.conversion.filter_header_string_to_filter_name(
header_string=filter_header_string
)
# The exposure time of the image, extracted from the fits file,
# assuming standard form.
exposure_time = float(header["ITIME"])
# Converting date to Julian day as the solution class requires it.
# We use the modified Julian day from the header file.
observing_time = library.conversion.modified_julian_day_to_julian_day(
mjd=header["MJD_OBS"]
)
# From this filename, create the Opihi solution. There is no asteroid
# information as the automatic mode does not take asteroids into
# account.
opihi_solution = opihiexarata.OpihiSolution(
fits_filename=filename,
filter_name=filter_name,
exposure_time=exposure_time,
observing_time=observing_time,
)
# Given the engines, solve for both the astrometry and photometry.
# We rely on the error handling of the OpihiSolution solving itself.
try:
__, __ = opihi_solution.solve_astrometry(
solver_engine=astrometry_engine,
overwrite=True,
raise_on_error=True,
vehicle_args={},
)
__, __ = opihi_solution.solve_photometry(
solver_engine=photometry_engine,
overwrite=True,
raise_on_error=True,
vehicle_args={},
)
except error.ExarataException as err:
# Something went wrong with the solving. We do nothing more.
error.warn(
warn_class=error.InputWarning,
message="The filename {f} failed to solve with the error {e}".format(
f=filename, e=err
),
)
# All done.
return opihi_solution
[docs]
def write_zero_point_record_to_database(
self, opihi_solution: hint.OpihiSolution
) -> None:
"""This function writes the zero point information assuming a solved
photometric solution. This function is so that threading this process
away is a lot easier.
Parameters
----------
opihi_solution : OpihiSolution
The solution class of the image.
Returns
-------
None
"""
# The photometry solution must exist and it must be properly solved.
if not isinstance(opihi_solution.photometrics, photometry.PhotometricSolution):
return None
if not opihi_solution.photometrics_status:
return None
# The database solution must exist to write to, and the user must
# actually want to write to it.
if not isinstance(
self.zero_point_database, opihiexarata.OpihiZeroPointDatabaseSolution
):
return None
if not library.config.GUI_AUTOMATIC_DATABASE_SAVE_OBSERVATIONS:
return None
# Because many files are being written to the database, we do not
# want to try and busy the database with cleaning itself up every time
# we want to write to it so we do it randomly.
CLEAN_RATE = library.config.GUI_AUTOMATIC_DATABASE_CLEAN_FILE_RATE
will_clean_record_file = random.random() <= CLEAN_RATE
# We write the record based on the information from the solution.
self.zero_point_database.write_zero_point_record_julian_day(
jd=opihi_solution.observing_time,
zero_point=opihi_solution.photometrics.zero_point,
zero_point_error=opihi_solution.photometrics.zero_point_error,
filter_name=opihi_solution.filter_name,
clean_file=will_clean_record_file,
)
# We additionally create a new figure for the monitoring webpage.
self.zero_point_database.create_plotly_zero_point_html_plot_via_configuration()
# All done.
return None
[docs]
def refresh_window(self) -> None:
"""Refreshes the GUI window with new information where available.
Parameters
----------
None
Returns
-------
None
"""
# Refreshing text.
self.__refresh_dynamic_label_text()
# All done.
return None
def __refresh_dynamic_label_text(self) -> None:
"""Refreshes the GUI window's dynamic text.
Parameters
----------
None
Returns
-------
None
"""
# Refreshing the directory text.
self.ui.label_dynamic_fits_directory.setText(self.fits_fetch_directory)
# Refreshing the filename text, we do not need the directory part.
# If the filenames have not been provided, then we just highlight that
# they have not been provided (which is different from the default
# filler names).
if isinstance(self.fetch_fits_filename, str):
fetch_basename = library.path.get_filename_with_extension(
pathname=self.fetch_fits_filename
)
self.ui.label_dynamic_fetch_filename.setText(fetch_basename)
else:
self.ui.label_dynamic_fetch_filename.setText("None")
if isinstance(self.working_fits_filename, str):
working_basename = library.path.get_filename_with_extension(
pathname=self.working_fits_filename
)
self.ui.label_dynamic_working_filename.setText(working_basename)
else:
self.ui.label_dynamic_working_filename.setText("None")
if isinstance(self.results_fits_filename, str):
results_basename = library.path.get_filename_with_extension(
pathname=self.results_fits_filename
)
self.ui.label_dynamic_results_filename.setText(results_basename)
else:
self.ui.label_dynamic_results_filename.setText("None")
# If there is no resulting Opihi solution, then there is no data to be
# extracted for results. The results solution should also be always
# solved so we do not need to check for it. We also need to make sure
# that the astrometric solution and photometric solution was valid.
if (
isinstance(self.results_opihi_solution, opihiexarata.OpihiSolution)
and self.results_opihi_solution.astrometrics_status
and self.results_opihi_solution.photometrics_status
):
# Obtaining the observing time.
observing_time_jd = self.results_opihi_solution.observing_time
(
year_int,
moth_int,
days_int,
hour_int,
mint_int,
secs_float,
) = library.conversion.julian_day_to_full_date(jd=observing_time_jd)
# Allowing for padded zeros for ISO 8601 (like) compatibility,
# because it is a more unambiguous format.
date_string = "{y:04d}-{m:02d}-{d:02d}".format(
y=year_int, m=moth_int, d=days_int
)
time_string = "{h:02d}:{m:02d}:{s:04.1f}".format(
h=hour_int, m=mint_int, s=secs_float
)
self.ui.label_dynamic_date.setText(date_string)
self.ui.label_dynamic_time.setText(time_string)
# Refreshing the astrometry results.
ra_deg = self.results_opihi_solution.astrometrics.ra
dec_deg = self.results_opihi_solution.astrometrics.dec
# To sexagesimal as it is easier to read, the results provided are
# in degrees.
ra_sex, dec_sex = library.conversion.degrees_to_sexagesimal_ra_dec(
ra_deg=ra_deg, dec_deg=dec_deg, precision=3
)
self.ui.label_dynamic_ra.setText(ra_sex)
self.ui.label_dynamic_dec.setText(dec_sex)
# Refreshing photometry results.
zero_point = self.results_opihi_solution.photometrics.zero_point
zero_point_error = self.results_opihi_solution.photometrics.zero_point_error
filter_name = self.results_opihi_solution.photometrics.filter_name
# Formatting the information as a string.
pm_sym = "\u00b1"
zero_point_str = "{zp:.3f} {pm} {zpe:.3f}".format(
zp=zero_point, pm=pm_sym, zpe=zero_point_error
)
filter_name = str(filter_name)
self.ui.label_dynamic_zero_point.setText(zero_point_str)
self.ui.label_dynamic_filter.setText(filter_name)
else:
# There is no OpihiSolution to pull information from.
pass
# Refreshing the operational status.
loop_state = self.loop_state.casefold()
if loop_state == "none":
loop_state_string = "None"
elif loop_state == "trigger":
loop_state_string = "Trigger"
elif loop_state == "running":
loop_state_string = "Running"
elif loop_state == "stopped":
loop_state_string = "Stopped"
elif loop_state == "halted":
loop_state_string = "Halted"
else:
loop_state_string = "Default"
raise error.DevelopmentError(
"The operational status flag is `{oflag}`. There is no string refresh"
" case built to handle this flag. It must be implemented."
)
# Setting the string.
self.ui.label_dynamic_operational_status.setText(loop_state_string)
# All done.
return None
[docs]
def reset_window(self) -> None:
"""This function resets the window to the default values or
parameters
Parameters
----------
None
Returns
-------
None
"""
# Resetting all of the text to their defaults.
# Directory.
self.ui.label_dynamic_fits_directory.setText("/path/to/fits/directory/")
# Filenames.
self.ui.label_dynamic_fetch_filename.setText(
"opi.20XXA999.YYMMDD.AAAAAAAAA.00001.a.fits"
)
self.ui.label_dynamic_working_filename.setText(
"opi.20XXA999.YYMMDD.AAAAAAAAA.00001.a.fits"
)
self.ui.label_dynamic_results_filename.setText(
"opi.20XXA999.YYMMDD.AAAAAAAAA.00001.b.fits"
)
# Astrometric coordinates and results.
self.ui.label_dynamic_ra.setText("RR:RR:RR.RRR")
self.ui.label_dynamic_dec.setText("+DD:DD:DD.DDD")
# Photometric results.
self.ui.label_dynamic_zero_point.setText("ZZZ.ZZZ + EE.EEE")
self.ui.label_dynamic_filter.setText("FF")
# Operational status.
self.ui.label_dynamic_operational_status.setText("Default")
# All done.
return None
[docs]
def closeEvent(self, event) -> None:
"""We override the original Qt close event to take into account the
automatic loop.
Parameters
----------
event : ?
The event that occurs.
Returns
-------
None
"""
# We just need to stop the loop, if it is going on.
self.loop_state = "halted"
# We can close now.
event.accept()
# All done.
return None
[docs]
def start_automatic_window() -> None:
"""This is the function to create the automatic window for usage.
Parameters
----------
None
Returns
-------
None
"""
# Creating the application and its infrastructure.
app = QtWidgets.QApplication([])
# The automatic GUI window.
automatic_window = OpihiAutomaticWindow()
automatic_window.show()
# Closing out of the window.
sys.exit(app.exec())
# All done.
return None
if __name__ == "__main__":
start_automatic_window()