import os
import urllib.parse
import urllib.request
import urllib.error
import random
import astropy.wcs as ap_wcs
import opihiexarata.library as library
import opihiexarata.library.error as error
import opihiexarata.library.hint as hint
[docs]
class AstrometryNetWebAPIEngine(library.engine.AstrometryEngine):
"""A python-based wrapper around the web API for astrometry.net.
This API does not have the full functionality of the default Python client
seen at https://github.com/dstndstn/astrometry.net/blob/master/net/client/client.py.
The point of this class is to be simple enough to be understood by others and
be specialized for OpihiExarata.
Attributes
----------
_ASTROMETRY_NET_API_BASE_URL : string
The base URL for the API which all other service URLs are derived from.
_apikey : string
The API key used to log in.
original_upload_filename : string
The original filename that was used to upload the data.
session : string
The session ID of this API connection to astrometry.net
"""
[docs]
def __init__(self, url=None, apikey: str = None, silent: bool = True) -> None:
"""The instantiation, connecting to the web API using the API key.
Parameters
----------
url : string, default = None
The base url which all other API URL links are derived from. This
should be used if the API is a self-hosted install or has a
different web source than nova.astrometry.net. Defaults to the
nova.astrometry.net api service.
apikey : string
The API key of the user.
silent : bool, default = True
Should there be printed messages as the processes are executed.
This is helpful for debugging or similar processes.
Returns
-------
None
"""
# Defining the URL.
self._ASTROMETRY_NET_API_BASE_URL = (
str(url)
if url is not None
else library.config.ASTROMETRYNET_ONLINE_NOVA_WEB_API_URL
)
# Base parameters.
# The default arguments for uploading files. In (key, value, type) form.
# Detailed is also their usage cases per
# https://astrometrynet.readthedocs.io/en/latest/net/api.html
self._DEFAULT_URL_ARGUMENTS = [
# These parameters are for licensing and distribution terms.
("allow_commercial_use", "d", str),
("allow_modifications", "d", str),
# For visibility by the general public.
("publicly_visible", "n", str),
# Image scaling parameters, if provided, when known, helps the
# processing a little.
("scale_units", "arcsecperpix", str),
("scale_type", "ul", str),
("scale_lower", 0.9, float),
("scale_upper", 1.0, float),
("scale_est", None, float),
("scale_err", None, float),
# These parameters allows for the establishment of an initial guess
# specified byt he centers, and its maximal deviation as specified
# by the radius parameter. (In degrees.)
("center_ra", None, float),
("center_dec", None, float),
("radius", None, float),
# Image properties, preprocessing it a little can help in its
# determination.
("parity", None, int),
("downsample_factor", None, int),
("positional_error", None, float),
("tweak_order", None, int),
("crpix_center", None, bool),
("invert", None, bool),
# These parameters are needed if being sent instead is an x,y list of
# source star positions.
("image_width", None, int),
("image_height", None, int),
("x", None, list),
("y", None, list),
("album", None, str),
]
# Use the API key to log in a derive a session key.
self.session = None
session_key = self.__login(apikey=apikey)
self._apikey = apikey
self.session = session_key
# Placeholder variables.
self.original_upload_filename = str()
self._image_return_results = {}
return None
def __login(self, apikey: str) -> str:
"""The method to log into the API system.
Parameters
----------
apikey : string
The API key for the web API service.
Returns
-------
session_key : string
The session key for this login session.
"""
# The key.
args = {"apikey": apikey}
result = self._send_web_request(service="login", args=args)
session = result.get("session", False)
# Check if the session works and that the API key given is valid.
if not session:
raise error.WebRequestError(
"The provided API key did not provide a valid session."
)
else:
# The session should be fine.
session_key = session
return session_key
def __get_submission_id(self) -> str:
"""Extract the submission ID from the image upload results."""
image_results = self._image_return_results
self.__submission_id = image_results.get("subid", None)
return self.__submission_id
def __set_submission_id(self, sub_id) -> None:
"""Assign the submission ID, it should only be done once when the
image is obtained."""
if self.__submission_id is None:
self.__submission_id = sub_id
else:
raise error.ReadOnlyError(
"The submission ID has already been set by obtaining it from the API"
" service."
)
return None
def __del_submission_id(self) -> None:
"""Remove the current submission ID association."""
self.__submission_id = None
return None
__doc_submission_id = (
"When file upload or table upload is sent to the API, the submission ID is"
" saved here."
)
__submission_id = None
submission_id = property(
__get_submission_id,
__set_submission_id,
__del_submission_id,
__doc_submission_id,
)
def __get_job_id(self) -> str:
"""Extract the job ID from the image upload results. It may be the
case that there is not job yet associated with this submission.
"""
# If the job ID already has been obtained, then there is no reason to
# call the API again.
if self.__job_id is not None:
return self.__job_id
# Call the API to get the job ID.
try:
submission_results = self.get_submission_results(
submission_id=self.submission_id
)
except error.WebRequestError:
# Make a more helpful error message for what is going on.
if self.submission_id is None:
raise error.WebRequestError(
"There cannot be a job id without there being a submission for that"
" job to operate on."
)
else:
# What happened is unknown.
raise error.UndiscoveredError("Why the web request failed is unknown.")
else:
job_id_list = submission_results.get("jobs", [])
# If there are no jobs, then it is likely still in queue.
if len(job_id_list) == 0:
self.__job_id = None
else:
self.__job_id = job_id_list[-1]
return self.__job_id
raise error.LogicFlowError
return None
def __set_job_id(self, job_id) -> None:
"""Assign the job ID, it should only be done once when the
image is obtained."""
if self.__job_id is None:
self.__job_id = job_id
else:
raise error.ReadOnlyError(
"The job ID has already been set by obtaining it from the API service."
)
return None
def __del_job_id(self) -> None:
"""Remove the current job ID association."""
self.__job_id = None
return None
__doc_job_id = (
"When file upload or table upload is sent to the API, the job ID of the"
" submission is saved here."
)
__job_id = None
job_id = property(__get_job_id, __set_job_id, __del_job_id, __doc_job_id)
[docs]
def _generate_service_url(self, service: str) -> str:
"""Generate the correct URL for the desired service. Because astrometry.net
uses a convention, we can follow it to obtain the desired service URL.
Parameters
----------
service : str
The service which the API URL for should be generated from.
Returns
-------
url : str
The URL for the service.
"""
url = self._ASTROMETRY_NET_API_BASE_URL + service
return url
[docs]
def _generate_upload_args(self, **kwargs) -> dict:
"""Generate the arguments for sending a request. This constructs the
needed arguments, replacing the defaults with user provided arguments
where desired.
Parameters
----------
**kwargs : dict
Arguments which would override the defaults.
Returns
-------
args : dict
The arguments which can be used to send the request.
"""
args = {}
for keydex, defaultdex, typedex in self._DEFAULT_URL_ARGUMENTS:
if keydex in kwargs:
new_value = kwargs.pop(keydex)
new_value = typedex(new_value)
args.update({keydex: new_value})
elif defaultdex is not None:
args.update({keydex: defaultdex})
return args
[docs]
def _send_web_request(
self, service: str, args: dict = {}, file_args: dict = None
) -> dict:
"""A wrapper function for sending a web request to the astrometry.net
API service. Returns the results as well.
Parameters
----------
service : string
The service which is being requested. The web URL is constructed
from this string.
args : dictionary, default = {}
The arguments being sent over the web request.
file_args : dictionary, default = None
If a file is being uploaded instead, special care must be taken to
sure it matches the upload specifications.
Returns
-------
results : dictionary
The results of the web request if it did not fail.
"""
# Obtain the session key derived when this class is instantiated and
# logged into. Use this session key for requests.
if self.session is not None:
args.update({"session": self.session})
# The API requires that the data format must be a JSON based datatype.
json_data = library.json.dictionary_to_json(dictionary=args)
# The URL which to send this request to, constructed from the service
# desired.
api_url = self._generate_service_url(service=service)
# If the request requires that a file be send, then it must be in the
# correct format. Namely, a multipart/form-data format.
if file_args is not None:
boundary_key = "".join([random.choice("0123456789") for __ in range(19)])
boundary = "==============={bkey}==".format(bkey=boundary_key)
headers = {
"Content-Type": 'multipart/form-data; boundary="{bd}"'.format(
bd=boundary
)
}
data_pre = str(
"--"
+ boundary
+ "\n"
+ "Content-Type: text/plain\r\n"
+ "MIME-Version: 1.0\r\n"
+ 'Content-disposition: form-data; name="request-json"\r\n'
+ "\r\n"
+ json_data
+ "\n"
+ "--"
+ boundary
+ "\n"
+ "Content-Type: application/octet-stream\r\n"
+ "MIME-Version: 1.0\r\n"
+ 'Content-disposition: form-data; name="file"; filename="{name}"'.format(
name=file_args["filename"]
)
+ "\r\n"
+ "\r\n"
)
data_post = "\n" + "--" + boundary + "--\n"
data = data_pre.encode() + file_args["data"] + data_post.encode()
else:
# Otherwise, the form should be standard encoded: x-www-form-encoded
headers = {}
data = {"request-json": json_data}
data = urllib.parse.urlencode(data)
data = data.encode("utf-8")
# Processing the request.
try:
# Finally send the request.
request = urllib.request.Request(url=api_url, headers=headers, data=data)
file = urllib.request.urlopen(
request, timeout=library.config.ASTROMETRYNET_WEBAPI_JOB_QUEUE_TIMEOUT
)
text = file.read()
result = library.json.json_to_dictionary(json_string=text)
# Check if the status of the request provided is a valid status.
status = result.get("status")
if status == "error":
error_message = result.get("errormessage", "(none)")
# Try to deduce what the error is.
if error_message == "bad apikey":
raise error.WebRequestError(
"The API key provided is not a valid key."
)
else:
raise error.WebRequestError(
"The server returned an error status message: \n {message}".format(
message=error_message
)
)
else:
return result
except urllib.error.HTTPError:
raise error.WebRequestError(
"The web request output cannot be properly processed. This is likely"
" from a bad web request."
)
# The logic should not flow beyond this point.
raise error.LogicFlowError
return None
[docs]
def get_job_results(self, job_id: str = None) -> dict:
"""Get the results of a job sent to the API service.
Parameters
----------
job_id : str, default = None
The ID of the job that the results should be obtained from. If not
provided, the ID determined by the file upload is used.
Returns
-------
results : dict
The results of the astrometry.net job. They are, in general: (If
the job has not finished yet, None is returned.)
- Status : The status of the job.
- Calibration : Calibration of the image uploaded.
- Tags : Known tagged objects in the image, people inputted.
- Machine Tags : Ditto for tags, but only via machine inputs.
- Objects in field : Known objects in the image field.
- Annotations : Known objects in the field, with annotations.
- Info : A collection of most everything above.
"""
job_id = job_id if job_id is not None else self.job_id
# Get the result of the job.
service_string = "jobs/{id}".format(id=job_id)
try:
job_result = self._send_web_request(service=service_string)
except error.WebRequestError:
# This error is likely because the job is still in queue.
return None
# Check that the service was successful.
status = job_result.get("status", False)
if status != "success":
raise error.WebRequestError(
"The job result request failed, check that the job ID is correct or try"
" again later."
)
else:
results = {}
# For the status.
results["status"] = status
# For the calibrations.
service_string = "jobs/{id}/calibration".format(id=job_id)
results["calibration"] = self._send_web_request(service=service_string)
# For the tags.
service_string = "jobs/{id}/tags".format(id=job_id)
results["tags"] = self._send_web_request(service=service_string)
# For the machine tags.
service_string = "jobs/{id}/machine_tags".format(id=job_id)
results["machine_tags"] = self._send_web_request(service=service_string)
# For the objects in field.
service_string = "jobs/{id}/objects_in_field".format(id=job_id)
results["objects_in_field"] = self._send_web_request(service=service_string)
# For the annotations.
service_string = "jobs/{id}/annotations".format(id=job_id)
results["annotations"] = self._send_web_request(service=service_string)
# For the info.
service_string = "jobs/{id}/info".format(id=job_id)
results["info"] = self._send_web_request(service=service_string)
# All done.
return results
[docs]
def get_job_status(self, job_id: str = None) -> str:
"""Get the status of a job specified by its ID.
Parameters
----------
job_id : str, default = None
The ID of the job that the results should be obtained from. If not
provided, the ID determined by the file upload is used.
Returns
-------
status : string
The status of the submission. If the job has not run yet, None is
returned instead.
"""
job_id = job_id if job_id is not None else self.job_id
# Get the result of the job.
service_string = "jobs/{id}".format(id=job_id)
status = None
try:
job_result = self._send_web_request(service=service_string)
except error.WebRequestError:
# This error is likely because the job is still in queue.
status = None
else:
# Check the job status.
status = job_result.get("status")
finally:
return status
# Should not get here.
raise error.LogicFlowError
return None
[docs]
def get_submission_results(self, submission_id: str = None) -> dict:
"""Get the results of a submission specified by its ID.
Parameters
----------
submission_id : str
The ID of the submission. If it is not passed, the ID determined
by the file upload is used.
Returns
-------
result : dict
The result of the submission.
"""
submission_id = (
submission_id if submission_id is not None else self.submission_id
)
service_string = "submissions/{sub_id}".format(sub_id=submission_id)
result = self._send_web_request(service=service_string)
return result
[docs]
def get_submission_status(self, submission_id: str = None) -> str:
"""Get the status of a submission specified by its ID.
Parameters
----------
submission_id : str, default = None
The ID of the submission. If it is not passed, the ID determined
by the file upload is used.
Returns
-------
status : string
The status of the submission.
"""
submission_id = (
submission_id if submission_id is not None else self.submission_id
)
results = self.get_submission_results(submission_id=submission_id)
status = results.get("status")
return status
[docs]
def get_reference_star_pixel_correlation(
self, job_id: str = None, temp_filename: str = None, delete_after: bool = True
) -> hint.Table:
"""This obtains the table that correlates the location of reference
stars and their pixel locations. It is obtained from the fits corr file
that is downloaded into a temporary directory.
Parameters
----------
job_id : string, default = None
The ID of the job that the results should be obtained from. If not
provided, the ID determined by the file upload is used.
temp_filename : string, default = None
The filename that the downloaded correlation file will be
downloaded as. The path is going to still be in the temporary
directory.
delete_after : bool, default = True
Delete the file after downloading it to extract its information.
Returns
-------
correlation_table : Table
The table which details the correlation between the coordinates of
the stars and their pixel locations.
"""
job_id = job_id if job_id is not None else self.job_id
# Download the correlation file to read into a data table.
upload_filename = library.path.get_filename_without_extension(
pathname=self.original_upload_filename
)
fits_table_filename = (
temp_filename if temp_filename is not None else upload_filename + "_corr"
)
# The full path of the filename derived from saving it in a temporary
# directory.
corr_filename = library.path.merge_pathname(
filename=fits_table_filename, extension="fits"
)
corr_pathname = library.temporary.make_temporary_directory_path(
filename=corr_filename
)
# Save the correlation file.
self.download_result_file(
filename=corr_pathname, file_type="corr", job_id=job_id
)
# Load the data from the file.
__, correlation_table = library.fits.read_fits_table_file(
filename=corr_pathname, extension=1
)
# Delete the temporary file after loading it if desired.
if delete_after:
os.remove(corr_pathname)
return correlation_table
[docs]
def get_wcs(
self, job_id: str = None, temp_filename: str = None, delete_after: bool = True
) -> hint.WCS:
"""This obtains the wcs header file and then computes World Coordinate
System solution from it. Because astrometry.net computes it for us,
we just extract it from the header file using Astropy.
Parameters
----------
job_id : string, default = None
The ID of the job that the results should be obtained from. If not
provided, the ID determined by the file upload is used.
temp_filename : string, default = None
The filename that the downloaded wcs file will be downloaded as.
The path is going to still be in the temporary directory.
delete_after : bool, default = True
Delete the file after downloading it to extract its information.
Returns
-------
wcs : Astropy WCS
The world coordinate solution class for the image provided.
"""
job_id = job_id if job_id is not None else self.job_id
# Download the correlation file to read into a data table.
upload_filename = library.path.get_filename_without_extension(
pathname=self.original_upload_filename
)
fits_table_filename = (
temp_filename if temp_filename is not None else upload_filename + "_wcs"
)
# The full path of the filename derived from saving it in a temporary
# directory.
corr_filename = library.path.merge_pathname(
filename=fits_table_filename, extension="fits"
)
corr_pathname = library.temporary.make_temporary_directory_path(
filename=corr_filename
)
# Save the correlation file.
self.download_result_file(
filename=corr_pathname, file_type="wcs", job_id=job_id
)
# Load the header from the file.
wcs_header = library.fits.read_fits_header(filename=corr_pathname)
wcs = ap_wcs.WCS(wcs_header)
# Delete the temporary file after loading it if desired.
if delete_after:
os.remove(corr_pathname)
return wcs
[docs]
def upload_file(self, pathname: str, **kwargs) -> dict:
"""A wrapper to allow for the uploading of files or images to the API.
This also determines the submission ID and the job ID for the uploaded
image and saves it.
Parameters
----------
pathname : str
The pathname of the file to open. The filename is extracted and
used as well.
Returns
-------
results : dict
The results of the API call to upload the image.
"""
# When uploading a new file, the submission and job IDs will change.
# They must be reset because of their read-only nature.
del self.submission_id, self.job_id
# Save the file information.
self.original_upload_filename = pathname
args = self._generate_upload_args(**kwargs)
# Process the file upload.
file_args = None
try:
file = open(pathname, "rb")
filename = library.path.get_filename_with_extension(pathname=pathname)
file_args = {"filename": filename, "data": file.read()}
except IOError:
raise error.FileError("File does not exist: {path}".format(path=pathname))
# Extract the submission id. This allows for easier
# association between this class instance and the uploaded file.
upload_results = self._send_web_request("upload", args, file_args)
self._image_return_results = upload_results
return upload_results
[docs]
def download_result_file(
self, filename: str, file_type: str, job_id: str = None
) -> None:
"""Downloads fits data table files which correspond to the job id.
Parameters
----------
filename : str
The filename of the file when it is downloaded and saved to disk.
file_type : str
The type of file to be downloaded from astrometry.net. It should
one of the following:
- `wcs`: The world coordinate data table file.
- `new_fits`, `new_image`: A new fits file, containing the
original image, annotations, and WCS header information.
- `rdls`: A table of reference stars nearby.
- `axy`: A table in of the location of stars detected in the
provided image.
- `corr`: A table of the correspondences between reference
stars location in the sky and in pixel space.
job_id : str, default = None
The ID of the job that the results should be obtained from. If not
provided, the ID determined by the file upload is used.
Returns
-------
None
"""
# Get the proper job ID.
job_id = job_id if job_id is not None else self.job_id
# Ensure that the type provided is a valid type which we can pull
# from the API service. Accommodating for capitalization.
file_type = str(file_type).lower()
valid_api_file_types = ("wcs", "new_fits", "rdls", "axy", "corr")
if file_type not in valid_api_file_types:
raise error.WebRequestError(
"The provided file type to be downloaded is not a valid type which can"
" be downloaded, it must be one of: {fty}".format(
fty=valid_api_file_types
)
)
# Construct the URL for the request. It is a little different from the
# normal API scheme so a new method is made.
def _construct_file_download_url(ftype: str, id: str) -> str:
"""Construct the file curl from the file type `ftype` and the
job id `id`."""
url = "http://nova.astrometry.net/{_type}_file/{_id}".format(
_type=ftype, _id=id
)
return url
file_download_url = _construct_file_download_url(ftype=file_type, id=job_id)
# Before downloading the file, check that the file actually exists.
if job_id is None:
raise error.WebRequestError("There is no job to download the file from.")
if library.http.get_http_status_code(url=file_download_url) != 200:
raise error.WebRequestError(
"The file download link is not giving an acceptable http status code."
" It is likely that the job is still processing and thus the data files"
" are not ready."
)
# Download the file.
library.http.download_file_from_url(
url=file_download_url, filename=filename, overwrite=True
)
return None
[docs]
class AstrometryNetHostAPIEngine(AstrometryNetWebAPIEngine):
"""This class is the same as the AstrometryNetWebAPIEngine, but the
default URL point is directed somewhere else to the self-hosted version.
For all documentation, please refer to the parent function.
"""
[docs]
def __init__(self, url=None, apikey: str = None, silent: bool = True):
"""Creating the host API engine."""
# Defining the URL and the overall class.
SELFHOST_URL = (
str(url)
if url is not None
else library.config.ASTROMETRYNET_SELFHOST_NOVA_WEB_API_URL
)
# Creating the class.
super().__init__(url=SELFHOST_URL, apikey=apikey, silent=silent)
# All done.
return None