Source code for opihiexarata.ephemeris.jplhorizons

"""The different ephemeris engines which use JPL horizons as its backend."""

import requests
import numpy as np
import scipy.interpolate as sp_interpolate
import astropy.table as ap_table

import opihiexarata.library as library
import opihiexarata.library.error as error
import opihiexarata.library.hint as hint


[docs] class JPLHorizonsWebAPIEngine(library.engine.EphemerisEngine): """This obtains the ephemeris of an asteroid using JPL horizons provided the Keplerian orbital elements of the asteroid as determined by orbital solutions. Attributes ---------- semimajor_axis : float The semi-major axis of the orbit, in AU. eccentricity : float The eccentricity of the orbit. inclination : float The angle of inclination of the orbit, in degrees. longitude_ascending_node : float The longitude of the ascending node of the orbit, in degrees. argument_perihelion : float The argument of perihelion of the orbit, in degrees. mean_anomaly : float The mean anomaly of the orbit, in degrees. epoch : float The modified Julian date epoch of the osculating orbital elements. ra_velocity : float The right ascension angular velocity of the target, in degrees per second. dec_velocity : float The declination angular velocity of the target, in degrees per second. ra_acceleration : float The right ascension angular acceleration of the target, in degrees per second squared. dec_acceleration : float The declination angular acceleration of the target, in degrees per second squared. """
[docs] def __init__( self, semimajor_axis: float, eccentricity: float, inclination: float, longitude_ascending_node: float, argument_perihelion: float, mean_anomaly: float, epoch: float, ) -> None: """Creating the engine. Parameters ---------- semimajor_axis : float The semi-major axis of the orbit, in AU. eccentricity : float The eccentricity of the orbit. inclination : float The angle of inclination of the orbit, in degrees. longitude_ascending_node : float The longitude of the ascending node of the orbit, in degrees. argument_perihelion : float The argument of perihelion of the orbit, in degrees. mean_anomaly : float The mean anomaly of the orbit, in degrees. epoch : float The full Julian date epoch of these osculating orbital elements. Return ------ None """ # Storing the orbital elements for usage during the API calls. self.semimajor_axis = semimajor_axis self.eccentricity = eccentricity self.inclination = inclination self.longitude_ascending_node = longitude_ascending_node self.argument_perihelion = argument_perihelion self.mean_anomaly = mean_anomaly self.epoch = epoch # We adapt the internal time for accurate ephemeris, this is done so # we do not request too much data. Using sensible defaults. current_time = library.conversion.current_utc_to_julian_day() self.start_time = current_time - ((5 / 60) / 24) self.stop_time = current_time + ((30 / 60) / 24) self.time_step = 120 # Extracting the first pass ephemeris table. The ephemeris function # uses this table to properly compute the future locations. __ = self._refresh_ephemeris( start_time=self.start_time, stop_time=self.stop_time ) # The rates of the asteroid. We care only about the rates most # accurate to the current time and so we can use the rates calculated # from the close by default time. jpl_time = np.array(self.ephemeris_table["julian_day"]) jpl_ra_rate = np.array(self.ephemeris_table["ra_rate"]) jpl_dec_rate = np.array(self.ephemeris_table["dec_rate"]) # Converting the Julian day time to seconds as it is just easier for # the differentials. Using UNIX time as it is easy. unix_time = library.conversion.julian_day_to_unix_time(jd=jpl_time) # We derive the current velocity rates by the observations closest to # the current time to remove as much second order effects as we can. nearby_delta = (5 / 60) / 24 nearby_records = np.where( np.logical_and( (current_time - nearby_delta) < jpl_time, jpl_time < (current_time + nearby_delta), ), True, False, ) # The rates from JPL are in arcseconds per hour, but the ephemeris # parsing already handled the unit conversion. The average should be # fine for this case. self.ra_velocity = np.nanmean(jpl_ra_rate[nearby_records]) self.dec_velocity = np.nanmean(jpl_dec_rate[nearby_records]) # We derive the current acceleration rates by assuming constant # acceleration across the sky (ignoring 3rd order differential # effects). def _determining_average_acceleration( time: hint.array, velocity: hint.array ) -> float: """We just use a function here to better group processes and local variable names. We use central difference to 4th order error. We can afford the number of points needed.""" # Spacing, though we assume uniform, we want to make sure. uniform_spacing = np.mean(time[1:] - time[:-1]) # Splitting up into the different subsets of V_{n-2} ... V_{n+2}. # We do not need V_{n+0} for this method. vel_m2 = velocity[:-4] vel_m1 = velocity[1:-3] vel_p1 = velocity[3:-1] vel_p2 = velocity[4:] # Finding the derivative. def derivative_function( x_m2: float, x_m1: float, x_p1: float, x_p2: float, h: float ) -> float: """The constants for this function is well known for higher order finite difference methods. Here is the 4th order form. Arrays are also accepted.""" d_dv = ( (1 / 12) * x_m2 + (-2 / 3) * x_m1 + (2 / 3) * x_p1 + (-1 / 12) * x_p2 ) / (h) return d_dv # Determining the acceleration. accel_array = np.array( derivative_function( x_m2=vel_m2, x_m1=vel_m1, x_p1=vel_p1, x_p2=vel_p2, h=uniform_spacing, ) ) # Assuming constant acceleration. There may be spikes that we do # not want to deal with so a median is fine. There will also likely # be a slope because the jerk that we otherwise are ignoring. return np.median(accel_array) # Determining the acceleration rates from the entire velocity records. # We are assuming constant acceleration, and our time span is not that # large so this should be okay. self.ra_acceleration = _determining_average_acceleration( time=unix_time, velocity=jpl_ra_rate ) self.dec_acceleration = _determining_average_acceleration( time=unix_time, velocity=jpl_dec_rate ) # All done. return None
@staticmethod def __parse_jpl_horizons_output(response_text: str) -> hint.Table: """This function serves to parse the output from the JPL horizons. It is a text output that is human readable but some parsing is needed. We do that here, assuming the quantities in the original request. Parameters ---------- response_text : str The raw response from the JPL horizons web API service. Returns ------- """ # Using lines is a lot easier to manage. response_lines = response_text.split("\n") # The ephemeris lines are demarked by $$SOE and $$EOE tags, extracting # the ephemeris only as that is what we care about. soe_index = None eoe_index = None for index, linedex in enumerate(response_lines): if "$$SOE" in linedex: soe_index = index elif "$$EOE" in linedex: eoe_index = index else: continue # Something happened, the demarcations were not found. if soe_index is None or eoe_index is None: raise error.WebRequestError( "The demarcations for the ephemeris were not found, it is likely that" " the web request sent was incorrect. The response from the API: \n {r}".format( r=response_text ) ) # Using the demarcations to extract the ephemeris section of the # query lines. We do not need the demarcations themselves though. ephemeris_lines = response_lines[soe_index + 1 : eoe_index] # Parsing the lines individually. def _parse_jpl_horizons_ephemeris_line(line: str) -> dict: """We parse the line into the known parts based on the query.""" # Removing extra spaces which might cause issues. line = line.strip() # The solar presence is not really relevant and because of its # formatting, screws up with the delimitation as "nighttime" is # denoted by a space character. We remove it here. if len(line.split()) == 15: # There exists the state of the Sun, we do not need it. line_sun_split = line.split() line_split = line_sun_split[:2] + line_sun_split[3:] elif len(line.split()) == 14: # The sun information is hidden as consecutive spaces. line_split = line.split() else: raise error.UndiscoveredError( "The JPL response has more entries than accountable for." ) # ...and dealing with the parts. # The date of the observation location, the month is in text # form but it is easier to deal with numbers. date = line_split[0] year, month_name, day = date.split("-") year = int(year) month = library.conversion.string_month_to_number(month_name) day = int(day) # The time of the observation location. time = line_split[1] try: hour, minute, second = time.split(":") except ValueError: # Sometimes they omit the seconds even though we want them. hour, minute = time.split(":") second = 0 hour = int(hour) minute = int(minute) second = float(second) # The RA and DEC strings are split because of the deliminator # being spaces, combine them back to HMS and DMS. ra_sex = line_split[2] + ":" + line_split[3] + ":" + line_split[4] dec_sex = line_split[5] + ":" + line_split[6] + ":" + line_split[7] ra_deg, dec_deg = library.conversion.sexagesimal_ra_dec_to_degrees( ra_sex=ra_sex, dec_sex=dec_sex ) # The RA and DEC rates; these values are in arcsec/hr, but # convention in this software is deg/sec so convert. The flat # planar conversion is already done by Horizon. as_hr_to_dg_s = lambda ashr: ashr / (3600 * 3600) ra_rate = as_hr_to_dg_s(float(line_split[8])) dec_rate = as_hr_to_dg_s(float(line_split[9])) # The true anomaly. true_anomaly = float(line_split[10]) # The exact usage of these parameters are really not know yet # but they seem useful; not using any of that. sky_motion = float(line_split[11]) sky_position_angle = float(line_split[12]) sky_velocity_angle = float(line_split[13]) # From the current date and time, deriving the julian day. It # is useful for all of the other calculations. julian_day = library.conversion.full_date_to_julian_day( year=year, month=month, day=day, hour=hour, minute=minute, second=second ) # Compiling the dictionary. line_dict = { "year": year, "month": month, "day": day, "hour": hour, "minute": minute, "second": second, "julian_day": julian_day, "ra": ra_deg, "dec": dec_deg, "ra_rate": ra_rate, "dec_rate": dec_rate, "true_anomaly": true_anomaly, } return line_dict # Using the above abstracted function for parsing the individual lines. ephemeris_table = [] for linedex in ephemeris_lines: ephemeris_record = _parse_jpl_horizons_ephemeris_line(line=linedex) ephemeris_table.append(ephemeris_record) ephemeris_table = ap_table.Table(ephemeris_table) # All done. return ephemeris_table
[docs] def _refresh_ephemeris( self, start_time: float = None, stop_time: float = None, time_step: float = None ) -> None: """This function refreshes the calculations rederiving the solution table and the ephemeris function. Parameters ---------- start_time : float, default = None The time that the ephemeris should start at, in Julian days. If not provided, then the old start time will be used instead. stop_time : float, default = None The time that the ephemeris should stop at, in Julian days. If not provided, then the old stop time will be used instead. time_step : float, default = None The time step of the entries of the ephemeris calculation, in seconds. (Note, JPL does not accept anything less than a minute.) If not provided, then the old time step will be used instead. Returns ------- None """ # If the values provided do not really differ from what we have now, # then it would be a lot of work for nothing. if (start_time is None) and (stop_time is None) and (time_step is None): # Doing no work. return None else: # Assign refreshed values to the current instance to do the work. self.start_time = self.start_time if start_time is None else start_time self.stop_time = self.stop_time if stop_time is None else stop_time self.time_step = self.time_step if time_step is None else time_step # Requery the Horizons service. ephemeris_table = self._query_jpl_horizons( start_time=self.start_time, stop_time=self.stop_time, time_step=self.time_step, ) self.ephemeris_table = ephemeris_table # All done. return None
[docs] def _query_jpl_horizons( self, start_time: float, stop_time: float, time_step: float ) -> hint.Table: """This function queries JPL horizons. Using the current orbital elements, and provided a minimum time, maximum time, and time step, we can fetch the new table of ephemeris measurements. Parameters ---------- start_time : float The time that the ephemeris should start at, in Julian days. stop_time : float The time that the ephemeris should stop at, in Julian days. time_step : float The time step of the entries of the ephemeris calculation, in seconds. (Note, JPL does not accept anything less than a minute.) Returns ------- epidermis_table : Table The table of computed sky locations over time from JPL horizons. """ # The JPL service does not accept time steps in less than a second # and floors at a minute. Minutes are the best unit, so we round to # the nearest minute. time_step_minute = int(time_step / 60) # The entries for JPL Horizon API. The less needed escape # characters the better. Strings need to be wrapped for the URI call. query_parameters = { # Adding a dummy name as this call has no preconceived knowledge of # what this asteroid or object is. "OBJECT": "OpihiExarata-Small-Body", # The location of the observatory, we assume an observatory on # Earth. "CENTER": "{code}@399".format(code=library.config.MPC_OBSERVATORY_CODE), # Specifying to the JPL system that we are supplying elements, the # semicolon must be URI encoded else the system does not # recognize it. Also specifying that our ephemeris calculations # are for an observer. "COMMAND": ";", "EPHEM_TYPE": "OBSERVER", # Formatting the orbital elements and the current ecliptic that # we are using as our assumption for the angles. "A": self.semimajor_axis, "EC": self.eccentricity, "IN": self.inclination, "OM": self.longitude_ascending_node, "W": self.argument_perihelion, "MA": self.mean_anomaly, "EPOCH": self.epoch, "ECLIP": "J2000", # Formatting the time parameters. "START_TIME": "JD{start}".format(start=start_time), "STOP_TIME": "JD{stop}".format(stop=stop_time), "STEP_SIZE": "{min}m".format(min=time_step_minute), # The output quantities to be extracted from the JPL Horizons # service. It is specified by a set of flags. See: # https://ssd.jpl.nasa.gov/horizons/manual.html#output # We want the ephemeris (1), the on-sky rates (3), with optionally # the true anomaly (41), and vector-form sky motion (47). "QUANTITIES": "'1,3,41,47'", # The output of the time should include seconds, just in case given # the current parser assumes hardcoded column positions. "TIME_DIGITS": "FRACSEC", # The format that the output will be specified in. There is little # difference between JSON and text as it is all just text. "format": "text", } # Constructing the API call. The parameters are delimitated by # ampersands. The query character for query is added as well. The # requests package handles it well. BASE_JPL_HORIZONS_URL = "https://ssd.jpl.nasa.gov/api/horizons.api" # Sending the request, characters are properly encoded here. response = requests.get(BASE_JPL_HORIZONS_URL, params=query_parameters) result = response.text # Extracting from this result the needed results. ephemeris_table = self.__parse_jpl_horizons_output(response_text=result) return ephemeris_table
[docs] def forward_ephemeris(self, future_time: float) -> tuple[float, float]: """This allows the computation of future positions at a future time using the derived ephemeris. Parameters ---------- future_time : array-like The set of future times which to derive new RA and DEC coordinates. The time must be in Julian day time. Returns ------- future_ra : ndarray The set of right ascensions that corresponds to the future times, in degrees. future_dec : ndarray The set of declinations that corresponds to the future times, in degrees. """ # If any of the future time is outside of the current times provided # by the ephemeris table, then more data needs to be obtained. We # do not propagate or extrapolate. future_time = np.array(future_time) if not np.all( np.logical_and(self.start_time < future_time, future_time < self.stop_time) ): # New data to be queried. Establishing it so that the bounds are # set by the new time requested plus a buffer of a few minutes. buffer_time = 5 / (24 * 60) future_start_time = np.nanmin(future_time) - buffer_time future_stop_time = np.nanmax(future_time) + buffer_time # We do not want to reduce the current table we have, use the # previous values where needed. new_start_time = ( self.start_time if self.start_time < future_start_time else future_start_time ) new_stop_time = ( future_stop_time if self.stop_time < future_stop_time else self.stop_time ) # Refreshing the ephemeris data. self._refresh_ephemeris(start_time=new_start_time, stop_time=new_stop_time) # Attempt to compute the forward ephemeris data now with the # updated information. return self.forward_ephemeris(future_time=future_time) # It is unlikely that the ephemeris table has the exact data points, # so we interpolate. ra_interpolate = sp_interpolate.interp1d( self.ephemeris_table["julian_day"], self.ephemeris_table["ra"], kind="quadratic", bounds_error=True, ) dec_interpolate = sp_interpolate.interp1d( self.ephemeris_table["julian_day"], self.ephemeris_table["dec"], kind="quadratic", bounds_error=True, ) # Computing the interpolation. If for some reason it is outside of the # bounds again, something is off. As such, we make sure it is not # caught with other error catching systems. try: future_ra = ra_interpolate(future_time) future_dec = dec_interpolate(future_time) except ValueError: raise error.DevelopmentError( "The ephemeris interpolation functions are trying to interpolate" " outside of their defined range. However, this should have already" " been caught with the time bounds check and a new queried ephemeris" " table should have fixed this issue." ) else: # All done, seem to be fine. return future_ra, future_dec # The code should not reach here. raise error.LogicFlowError return None