Source code for pyquil.api._engagement_manager

##############################################################################
# Copyright 2016-2021 Rigetti Computing
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
##############################################################################
import threading
from datetime import datetime
from typing import Dict, NamedTuple, Optional, TYPE_CHECKING

from dateutil.parser import parse as parsedate
from dateutil.tz import tzutc
from qcs_api_client.client import QCSClientConfiguration
from qcs_api_client.models import EngagementWithCredentials, CreateEngagementRequest
from qcs_api_client.operations.sync import create_engagement
from qcs_api_client.types import UNSET
from qcs_api_client.util.errors import QCSHTTPStatusError

from pyquil.api._qcs_client import qcs_client

if TYPE_CHECKING:
    import httpx


class QPUUnavailableError(Exception):
    """
    Exception raised when a QPU is unavailable.
    """

    retry_after: Optional[int]
    """The number of seconds after which to retry the engagement request."""

    def __init__(self, retry_after: Optional[str]) -> None:
        if retry_after is not None:
            super().__init__(f"QPU unavailable. Please retry after {retry_after}s.")
        else:
            super().__init__("QPU unavailable.")


class EngagementCacheKey(NamedTuple):
    quantum_processor_id: str
    endpoint_id: Optional[str]


[docs]class EngagementManager: """ Fetches (and caches) engagements for use when accessing a QPU. """ _lock: threading.Lock """Lock used to ensure that only one engagement request is in flight at once.""" def __init__(self, *, client_configuration: QCSClientConfiguration) -> None: """ Instantiate a new engagement manager. :param client_configuration: Client configuration, used for refreshing engagements. """ self._client_configuration = client_configuration self._cached_engagements: Dict[EngagementCacheKey, EngagementWithCredentials] = {} self._lock = threading.Lock()
[docs] def get_engagement( self, *, quantum_processor_id: str, request_timeout: float = 10.0, endpoint_id: Optional[str] = None ) -> EngagementWithCredentials: """ Gets an engagement for the given quantum processor endpoint. If an engagement was already fetched previously and remains valid, it will be returned instead of creating a new engagement. :param quantum_processor_id: Quantum processor being engaged. :param request_timeout: Timeout for request, in seconds. :param endpoint_id: Optional ID of the endpoint to use for engagement. If provided, it must correspond to an endpoint serving the provided Quantum Processor. :return: Fetched or cached engagement. :raises QPUUnavailableError: raised when the QPU is unavailable due, and provides a suggested number of seconds to wait until retrying. :raises QCSHTTPStatusError: raised when creating an engagement fails for a reason that is not due to QPU unavailability. """ key = EngagementCacheKey(quantum_processor_id, endpoint_id) with self._lock: if not self._engagement_valid(self._cached_engagements.get(key)): with qcs_client( client_configuration=self._client_configuration, request_timeout=request_timeout ) as client: # type: httpx.Client request = CreateEngagementRequest( quantum_processor_id=quantum_processor_id, endpoint_id=endpoint_id or UNSET ) response = create_engagement(client=client, json_body=request) try: self._cached_engagements[key] = response.parsed except QCSHTTPStatusError as e: if response.status_code == 503: raise QPUUnavailableError(retry_after=response.headers.get("Retry-After")) from e raise e return self._cached_engagements[key]
@staticmethod def _engagement_valid(engagement: Optional[EngagementWithCredentials]) -> bool: if engagement is None: return False return all( [ engagement.credentials.client_public != "", engagement.credentials.client_secret != "", engagement.credentials.server_public != "", parsedate(engagement.expires_at) > datetime.now(tzutc()), engagement.address != "", ] )