import os
from typing import Any, Dict, Optional, Union

import httpx

import litellm
from litellm._logging import verbose_logger
from litellm.caching import InMemoryCache
from litellm.constants import SECRET_MANAGER_REFRESH_INTERVAL
from litellm.llms.custom_httpx.http_handler import (
    _get_httpx_client,
    get_async_httpx_client,
    httpxSpecialProvider,
)
from litellm.proxy._types import KeyManagementSystem

from .base_secret_manager import BaseSecretManager


class HashicorpSecretManager(BaseSecretManager):
    def __init__(self):
        from litellm.proxy.proxy_server import CommonProxyErrors, premium_user

        # Vault-specific config
        self.vault_addr = os.getenv("HCP_VAULT_ADDR", "http://127.0.0.1:8200")
        self.vault_token = os.getenv("HCP_VAULT_TOKEN", "")
        # Vault namespace (for X-Vault-Namespace header)
        self.vault_namespace = os.getenv("HCP_VAULT_NAMESPACE", None)
        # KV engine mount name (default: "secret")
        # If your KV engine is mounted somewhere other than "secret", set HCP_VAULT_MOUNT_NAME
        self.vault_mount_name = os.getenv("HCP_VAULT_MOUNT_NAME", "secret")
        # Optional path prefix for secrets (e.g., "myapp" -> secret/data/myapp/{secret_name})
        self.vault_path_prefix = os.getenv("HCP_VAULT_PATH_PREFIX", None)

        # Optional config for TLS cert auth
        self.tls_cert_path = os.getenv("HCP_VAULT_CLIENT_CERT", "")
        self.tls_key_path = os.getenv("HCP_VAULT_CLIENT_KEY", "")
        self.vault_cert_role = os.getenv("HCP_VAULT_CERT_ROLE", None)

        # Optional config for AppRole auth
        self.approle_role_id = os.getenv("HCP_VAULT_APPROLE_ROLE_ID", "")
        self.approle_secret_id = os.getenv("HCP_VAULT_APPROLE_SECRET_ID", "")
        self.approle_mount_path = os.getenv("HCP_VAULT_APPROLE_MOUNT_PATH", "approle")

        self._verify_required_credentials_exist()

        litellm.secret_manager_client = self
        litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
        _refresh_interval = os.environ.get(
            "HCP_VAULT_REFRESH_INTERVAL", SECRET_MANAGER_REFRESH_INTERVAL
        )
        _refresh_interval = (
            int(_refresh_interval)
            if _refresh_interval
            else SECRET_MANAGER_REFRESH_INTERVAL
        )
        self.cache = InMemoryCache(
            default_ttl=_refresh_interval
        )  # store in memory for 1 day

        if premium_user is not True:
            raise ValueError(
                f"Hashicorp secret manager is only available for premium users. {CommonProxyErrors.not_premium_user.value}"
            )

    def _verify_required_credentials_exist(self) -> None:
        """
        Validate that at least one authentication method is configured.
        
        Raises:
            ValueError: If no valid authentication credentials are provided
        """
        if not self.vault_token and not (self.approle_role_id and self.approle_secret_id):
            raise ValueError(
                "Missing Vault authentication credentials. Please set either:\n"
                "  - HCP_VAULT_TOKEN for token-based auth, or\n"
                "  - HCP_VAULT_APPROLE_ROLE_ID and HCP_VAULT_APPROLE_SECRET_ID for AppRole auth"
            )

    def _auth_via_approle(self) -> str:
        """
        Authenticate to Vault using AppRole auth method.
        
        Ref: https://developer.hashicorp.com/vault/api-docs/auth/approle

        Request:
        ```
        curl \
            --request POST \
            --header "X-Vault-Namespace: mynamespace/" \
            --data '{"role_id": "...", "secret_id": "..."}' \
            http://127.0.0.1:8200/v1/auth/approle/login
        ```

        Response:
        ```
        {
            "auth": {
                "client_token": "hvs.CAESI...",
                "accessor": "hmac-sha256...",
                "policies": ["default", "dev-policy"],
                "token_policies": ["default", "dev-policy"],
                "lease_duration": 2764800,
                "renewable": true
            }
        }
        ```
        """
        verbose_logger.debug("Using AppRole auth for Hashicorp Vault")
        
        # Check cache first
        cached_token = self.cache.get_cache(key="hcp_vault_approle_token")
        if cached_token:
            verbose_logger.debug("Using cached Vault token from AppRole auth")
            return cached_token
        
        # Vault endpoint for AppRole login
        login_url = f"{self.vault_addr}/v1/auth/{self.approle_mount_path}/login"

        headers = {}
        if hasattr(self, "vault_namespace") and self.vault_namespace:
            headers["X-Vault-Namespace"] = self.vault_namespace
        
        try:
            client = _get_httpx_client()
            resp = client.post(
                url=login_url,
                headers=headers,
                json={
                    "role_id": self.approle_role_id,
                    "secret_id": self.approle_secret_id,
                },
            )
            resp.raise_for_status()
            
            auth_data = resp.json()["auth"]
            token = auth_data["client_token"]
            _lease_duration = auth_data["lease_duration"]
            
            verbose_logger.debug(
                f"Successfully obtained Vault token via AppRole auth. Lease duration: {_lease_duration}s"
            )
            
            # Cache the token with its lease duration
            self.cache.set_cache(
                key="hcp_vault_approle_token", value=token, ttl=_lease_duration
            )
            return token
        except Exception as e:
            raise RuntimeError(f"Could not authenticate to Vault via AppRole: {e}")

    def _auth_via_tls_cert(self) -> str:
        """
        Ref: https://developer.hashicorp.com/vault/api-docs/auth/cert

        Request:
        ```
        curl \
            --request POST \
            --cacert vault-ca.pem \
            --cert cert.pem \
            --key key.pem \
            --header "X-Vault-Namespace: mynamespace/" \
            --data '{"name": "my-cert-role"}' \
            https://127.0.0.1:8200/v1/auth/cert/login
        ```

        Response:
        ```
        {
            "auth": {
                "client_token": "cf95f87d-f95b-47ff-b1f5-ba7bff850425",
                "policies": ["web", "stage"],
                "lease_duration": 3600,
                "renewable": true
            }
        }
        ```
        """
        verbose_logger.debug("Using TLS cert auth for Hashicorp Vault")
        # Vault endpoint for cert-based login, e.g. '/v1/auth/cert/login'
        login_url = f"{self.vault_addr}/v1/auth/cert/login"

        # Include your Vault namespace in the header if you're using namespaces.
        # E.g. self.vault_namespace = 'mynamespace/'
        # If you only have root namespace, you can omit this header entirely.
        headers = {}
        if hasattr(self, "vault_namespace") and self.vault_namespace:
            headers["X-Vault-Namespace"] = self.vault_namespace
        try:
            # We use the client cert and key for mutual TLS
            client = httpx.Client(cert=(self.tls_cert_path, self.tls_key_path))
            resp = client.post(
                login_url,
                headers=headers,
                json=self._get_tls_cert_auth_body(),
            )
            resp.raise_for_status()
            token = resp.json()["auth"]["client_token"]
            _lease_duration = resp.json()["auth"]["lease_duration"]
            verbose_logger.debug("Successfully obtained Vault token via TLS cert auth.")
            self.cache.set_cache(
                key="hcp_vault_token", value=token, ttl=_lease_duration
            )
            return token
        except Exception as e:
            raise RuntimeError(f"Could not authenticate to Vault via TLS cert: {e}")

    def _get_tls_cert_auth_body(self) -> dict:
        return {"name": self.vault_cert_role}

    def get_url(self, secret_name: str) -> str:
        """
        Constructs the Vault URL for KV v2 secrets.
        
        Format: {VAULT_ADDR}/v1/{NAMESPACE}/{MOUNT_NAME}/data/{PATH_PREFIX}/{SECRET_NAME}
        
        Examples:
        - Default: http://127.0.0.1:8200/v1/secret/data/mykey
        - With namespace: http://127.0.0.1:8200/v1/mynamespace/secret/data/mykey
        - With custom mount: http://127.0.0.1:8200/v1/kv/data/mykey
        - With path prefix: http://127.0.0.1:8200/v1/secret/data/myapp/mykey
        """
        _url = f"{self.vault_addr}/v1/"
        if self.vault_namespace:
            _url += f"{self.vault_namespace}/"
        _url += f"{self.vault_mount_name}/data/"
        if self.vault_path_prefix:
            _url += f"{self.vault_path_prefix}/"
        _url += secret_name
        return _url

    def _get_request_headers(self) -> dict:
        """
        Get the headers for Vault API requests.
        
        Authentication priority:
        1. AppRole (if role_id and secret_id are configured)
        2. TLS Certificate (if cert paths are configured)
        3. Direct token (if HCP_VAULT_TOKEN is set)
        """
        # Priority 1: AppRole auth
        if self.approle_role_id and self.approle_secret_id:
            return {"X-Vault-Token": self._auth_via_approle()}
        
        # Priority 2: TLS cert auth
        if self.tls_cert_path and self.tls_key_path:
            return {"X-Vault-Token": self._auth_via_tls_cert()}
        
        # Priority 3: Direct token
        return {"X-Vault-Token": self.vault_token}

    async def async_read_secret(
        self,
        secret_name: str,
        optional_params: Optional[dict] = None,
        timeout: Optional[Union[float, httpx.Timeout]] = None,
    ) -> Optional[str]:
        """
        Reads a secret from Vault KV v2 using an async HTTPX client.
        secret_name is just the path inside the KV mount (e.g., 'myapp/config').
        Returns the entire data dict from data.data, or None on failure.
        """
        if self.cache.get_cache(secret_name) is not None:
            return self.cache.get_cache(secret_name)
        async_client = get_async_httpx_client(
            llm_provider=httpxSpecialProvider.SecretManager,
        )
        try:
            # For KV v2: /v1/<mount>/data/<path>
            # Example: http://127.0.0.1:8200/v1/secret/data/myapp/config
            _url = self.get_url(secret_name)
            url = _url

            response = await async_client.get(url, headers=self._get_request_headers())
            response.raise_for_status()

            # For KV v2, the secret is in response.json()["data"]["data"]
            json_resp = response.json()
            _value = self._get_secret_value_from_json_response(json_resp)
            self.cache.set_cache(secret_name, _value)
            return _value

        except Exception as e:
            verbose_logger.exception(f"Error reading secret from Hashicorp Vault: {e}")
            return None

    def sync_read_secret(
        self,
        secret_name: str,
        optional_params: Optional[dict] = None,
        timeout: Optional[Union[float, httpx.Timeout]] = None,
    ) -> Optional[str]:
        """
        Reads a secret from Vault KV v2 using a sync HTTPX client.
        secret_name is just the path inside the KV mount (e.g., 'myapp/config').
        Returns the entire data dict from data.data, or None on failure.
        """
        if self.cache.get_cache(secret_name) is not None:
            return self.cache.get_cache(secret_name)
        sync_client = _get_httpx_client()
        try:
            # For KV v2: /v1/<mount>/data/<path>
            url = self.get_url(secret_name)

            response = sync_client.get(url, headers=self._get_request_headers())
            response.raise_for_status()

            # For KV v2, the secret is in response.json()["data"]["data"]
            json_resp = response.json()
            _value = self._get_secret_value_from_json_response(json_resp)
            self.cache.set_cache(secret_name, _value)
            return _value

        except Exception as e:
            verbose_logger.exception(f"Error reading secret from Hashicorp Vault: {e}")
            return None

    async def async_write_secret(
        self,
        secret_name: str,
        secret_value: str,
        description: Optional[str] = None,
        optional_params: Optional[dict] = None,
        timeout: Optional[Union[float, httpx.Timeout]] = None,
        tags: Optional[Union[dict, list]] = None
    ) -> Dict[str, Any]:
        """
        Writes a secret to Vault KV v2 using an async HTTPX client.

        Args:
            secret_name: Path inside the KV mount (e.g., 'myapp/config')
            secret_value: Value to store
            description: Optional description for the secret
            optional_params: Additional parameters to include in the secret data
            timeout: Request timeout

        Returns:
            dict: Response containing status and details of the operation
        """
        async_client = get_async_httpx_client(
            llm_provider=httpxSpecialProvider.SecretManager,
            params={"timeout": timeout},
        )

        try:
            url = self.get_url(secret_name)

            # Prepare the secret data
            data = {"data": {"key": secret_value}}

            if description:
                data["data"]["description"] = description

            response = await async_client.post(
                url=url, headers=self._get_request_headers(), json=data
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            verbose_logger.exception(f"Error writing secret to Hashicorp Vault: {e}")
            return {"status": "error", "message": str(e)}

    async def async_rotate_secret(
        self,
        current_secret_name: str,
        new_secret_name: str,
        new_secret_value: str,
        optional_params: Dict | None = None,
        timeout: float | httpx.Timeout | None = None,
    ) -> Dict:
        raise NotImplementedError("Hashicorp does not support secret rotation")

    async def async_delete_secret(
        self,
        secret_name: str,
        recovery_window_in_days: Optional[int] = 7,
        optional_params: Optional[dict] = None,
        timeout: Optional[Union[float, httpx.Timeout]] = None,
    ) -> dict:
        """
        Async function to delete a secret from Hashicorp Vault.
        In KV v2, this marks the latest version of the secret as deleted.

        Args:
            secret_name: Name of the secret to delete
            recovery_window_in_days: Not used for Vault (Vault handles this internally)
            optional_params: Additional parameters specific to the secret manager
            timeout: Request timeout

        Returns:
            dict: Response containing status and details of the operation
        """
        async_client = get_async_httpx_client(
            llm_provider=httpxSpecialProvider.SecretManager,
            params={"timeout": timeout},
        )

        try:
            # For KV v2 delete: /v1/<mount>/data/<path>
            url = self.get_url(secret_name)

            response = await async_client.delete(
                url=url, headers=self._get_request_headers()
            )
            response.raise_for_status()

            # Clear the cache for this secret
            self.cache.delete_cache(secret_name)

            return {
                "status": "success",
                "message": f"Secret {secret_name} deleted successfully",
            }
        except Exception as e:
            verbose_logger.exception(f"Error deleting secret from Hashicorp Vault: {e}")
            return {"status": "error", "message": str(e)}

    def _get_secret_value_from_json_response(
        self, json_resp: Optional[dict]
    ) -> Optional[str]:
        """
        Get the secret value from the JSON response

        Json response from hashicorp vault is of the form:

        {
            "request_id":"036ba77c-018b-31dd-047b-323bcd0cd332",
            "lease_id":"",
            "renewable":false,
            "lease_duration":0,
            "data":
                {"data":
                    {"key":"Vault Is The Way"},
                    "metadata":{"created_time":"2025-01-01T22:13:50.93942388Z","custom_metadata":null,"deletion_time":"","destroyed":false,"version":1}
                },
            "wrap_info":null,
            "warnings":null,
            "auth":null,
            "mount_type":"kv"
        }

        Note: LiteLLM assumes that all secrets are stored as under the key "key"
        """
        if json_resp is None:
            return None
        return json_resp.get("data", {}).get("data", {}).get("key", None)
