Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true

[*.md]
indent_size = 2
indent_style = space

[LICENSE.txt]
insert_final_newline = false

[*.{diff,patch}]
trim_trailing_whitespace = false

[*.{json,yaml,yml}]
[*.{json,md,yaml,yml}]
indent_size = 2
indent_style = space

[.{prettierrc,yamllint}]
indent_size = 2
indent_style = space
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
- --allow-missing-credentials
- id: detect-private-key
- id: end-of-file-fixer
exclude: '.bumpversion.cfg'
exclude: ".bumpversion.cfg"
- id: mixed-line-ending
- id: name-tests-test
args:
Expand Down
16 changes: 14 additions & 2 deletions cert_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from ._helpers import PendingError
from .acme import ACMEAccount
from .admin import Admin
from .client import Client
from .client import Client, OAuth2Client
from .dcv import DomainControlValidation
from .domain import Domain
from .organization import Organization
from .person import Person
Expand All @@ -12,5 +13,16 @@
from .ssl import SSL

__all__ = [
"ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL"
"ACMEAccount",
"Admin",
"Client",
"Domain",
"DomainControlValidation",
"OAuth2Client",
"Organization",
"PendingError",
"Person",
"Report",
"SMIME",
"SSL",
]
28 changes: 14 additions & 14 deletions cert_manager/_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def __init__(self, client, endpoint, api_version="v1"):
super().__init__(client=client, endpoint=endpoint, api_version=api_version)

# Set to None initially. Will be filled in by methods later.
self.__cert_types = None
self.__custom_fields = None
self.__reason_maxlen = 512
self._cert_types = None
self._custom_fields = None
self._reason_maxlen = 512

@property
def types(self):
Expand All @@ -51,19 +51,19 @@ def types(self):
"""
# Only go to the API if we haven't done the API call yet, or if someone
# specifically wants to refresh the internal cache
if not self.__cert_types:
if not self._cert_types:
url = self._url("/types")
result = self._client.get(url)

# Build a dictionary instead of a flat list of dictionaries
self.__cert_types = {}
self._cert_types = {}
for res in result.json():
name = res["name"]
self.__cert_types[name] = {}
self.__cert_types[name]["id"] = res["id"]
self.__cert_types[name]["terms"] = res["terms"]
self._cert_types[name] = {}
self._cert_types[name]["id"] = res["id"]
self._cert_types[name]["terms"] = res["terms"]

return self.__cert_types
return self._cert_types

@property
def custom_fields(self):
Expand All @@ -73,13 +73,13 @@ def custom_fields(self):
"""
# Only go to the API if we haven't done the API call yet, or if someone
# specifically wants to refresh the internal cache
if not self.__custom_fields:
if not self._custom_fields:
url = self._url("/customFields")
result = self._client.get(url)

self.__custom_fields = result.json()
self._custom_fields = result.json()

return self.__custom_fields
return self._custom_fields

def _validate_custom_fields(self, custom_fields):
"""Check the structure and contents of a list of custom fields dicts. Raise exceptions if validation fails.
Expand Down Expand Up @@ -230,8 +230,8 @@ def revoke(self, cert_id, reason=""):
url = self._url(f"/revoke/{cert_id}")

# Sectigo has a 512 character limit on the "reason" message, so catch that here.
if not reason or len(reason) >= self.__reason_maxlen:
raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.__reason_maxlen} characters")
if not reason or len(reason) >= self._reason_maxlen:
raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self._reason_maxlen} characters")

data = {"reason": reason}

Expand Down
12 changes: 6 additions & 6 deletions cert_manager/acme.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, client, api_version="v1"):
"""
super().__init__(client=client, endpoint="/acme", api_version=api_version)
self._api_url = self._url("/account")
self.__acme_accounts = None
self._acme_accounts = None

def all(self, org_id, force=False):
"""Return a list of acme accounts from Sectigo.
Expand All @@ -45,15 +45,15 @@ def all(self, org_id, force=False):

:return list: A list of dictionaries representing the acme accounts
"""
if (self.__acme_accounts) and (not force):
return self.__acme_accounts
if (self._acme_accounts) and (not force):
return self._acme_accounts

self.__acme_accounts = []
self._acme_accounts = []
result = self.find(org_id)
for acct in result:
self.__acme_accounts.append(acct)
self._acme_accounts.append(acct)

return self.__acme_accounts
return self._acme_accounts

@paginate
def find(self, org_id, **kwargs):
Expand Down
10 changes: 5 additions & 5 deletions cert_manager/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, client, api_version="v1"):
"""
super().__init__(client=client, endpoint="/admin", api_version=api_version)

self.__admins = None
self._admins = None
self.all()

def all(self, force=False):
Expand All @@ -35,14 +35,14 @@ def all(self, force=False):

:return list: A list of dictionaries representing the admins
"""
if (self.__admins) and (not force):
return self.__admins
if (self._admins) and (not force):
return self._admins

result = self._client.get(self._api_url)

self.__admins = result.json()
self._admins = result.json()

return self.__admins
return self._admins

def create(self, login, email, forename, surname, password, credentials, **kwargs): # noqa: PLR0913
"""Create a new administrator.
Expand Down
110 changes: 74 additions & 36 deletions cert_manager/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,44 +37,44 @@ def __init__(self, **kwargs):
:param string user_key_file: The path to the key file if using client cert auth
"""
# These options are required, so raise a KeyError if they are not provided.
self.__login_uri = kwargs["login_uri"]
self.__username = kwargs["username"]
self._login_uri = kwargs["login_uri"]
self._username = kwargs["username"]

# Using get for consistency and to allow defaults to be easily set
self.__base_url = kwargs.get("base_url", "https://cert-manager.com/api")
self.__cert_auth = kwargs.get("cert_auth", False)
self.__session = requests.Session()
self._base_url = kwargs.get("base_url", "https://cert-manager.com/api")
self._cert_auth = kwargs.get("cert_auth", False)
self._session = requests.Session()

self.__user_crt_file = kwargs.get("user_crt_file")
self.__user_key_file = kwargs.get("user_key_file")
self._user_crt_file = kwargs.get("user_crt_file")
self._user_key_file = kwargs.get("user_key_file")

# Set the default HTTP headers
self.__headers = {
"login": self.__username,
"customerUri": self.__login_uri,
self._headers = {
"login": self._username,
"customerUri": self._login_uri,
"Accept": "application/json",
"User-Agent": self.user_agent,
}

# Setup the Session for certificate auth
if self.__cert_auth:
if self._cert_auth:
# Require keys if cert_auth is True or raise a KeyError
self.__user_crt_file = kwargs["user_crt_file"]
self.__user_key_file = kwargs["user_key_file"]
self.__session.cert = (self.__user_crt_file, self.__user_key_file)
self._user_crt_file = kwargs["user_crt_file"]
self._user_key_file = kwargs["user_key_file"]
self._session.cert = (self._user_crt_file, self._user_key_file)

# Warn about using /api instead of /private/api if doing certificate auth
if not re.search("/private", self.__base_url):
cert_uri = re.sub("/api", "/private/api", self.__base_url)
if not re.search("/private", self._base_url):
cert_uri = re.sub("/api", "/private/api", self._base_url)
LOGGER.warning("base URI should probably be %s due to certificate auth", cert_uri)

else:
# If we're not doing certificate auth, we need a password, so make sure an exception is raised if
# a password was not passed as an argument
self.__password = kwargs["password"]
self.__headers["password"] = self.__password
self._password = kwargs["password"]
self._headers["password"] = self._password

self.__session.headers.update(self.__headers)
self._session.headers.update(self._headers)

@property
def user_agent(self):
Expand All @@ -87,18 +87,18 @@ def user_agent(self):

@property
def base_url(self):
"""Return the internal __base_url value."""
return self.__base_url
"""Return the internal _base_url value."""
return self._base_url

@property
def headers(self):
"""Return the internal __headers value."""
return self.__headers
"""Return the internal _headers value."""
return self._headers

@property
def session(self):
"""Return the setup internal __session requests.Session object."""
return self.__session
"""Return the setup internal _session requests.Session object."""
return self._session

def add_headers(self, headers=None):
"""Add the provided headers to the internally stored headers.
Expand All @@ -109,10 +109,10 @@ def add_headers(self, headers=None):
:param dict headers: A dictionary where key is the header with its value being the setting for that header.
"""
if headers:
head = self.__headers.copy()
head = self._headers.copy()
head.update(headers)
self.__headers = head
self.__session.headers.update(self.__headers)
self._headers = head
self._session.headers.update(self._headers)

def remove_headers(self, headers=None):
"""Remove the requested header keys from the internally stored headers.
Expand All @@ -124,9 +124,9 @@ def remove_headers(self, headers=None):
"""
if headers:
for head in headers:
if head in self.__headers:
del self.__headers[head]
del self.__session.headers[head]
if head in self._headers:
del self._headers[head]
del self._session.headers[head]

@traffic_log(traffic_logger=LOGGER)
def head(self, url, headers=None, params=None):
Expand All @@ -137,7 +137,7 @@ def head(self, url, headers=None, params=None):
:param dict params: A dictionary with any parameters to add to the request URL
:return obj: A requests.Response object received as a response
"""
result = self.__session.head(url, headers=headers, params=params)
result = self._session.head(url, headers=headers, params=params)
# Raise an exception if the return code is in an error range
result.raise_for_status()

Expand All @@ -152,7 +152,7 @@ def get(self, url, headers=None, params=None):
:param dict params: A dictionary with any parameters to add to the request URL
:return obj: A requests.Response object received as a response
"""
result = self.__session.get(url, headers=headers, params=params)
result = self._session.get(url, headers=headers, params=params)
# Raise an exception if the return code is in an error range
result.raise_for_status()

Expand All @@ -167,7 +167,7 @@ def post(self, url, headers=None, data=None):
:param dict data: A dictionary with the data to use for the body of the POST
:return obj: A requests.Response object received as a response
"""
result = self.__session.post(url, json=data, headers=headers)
result = self._session.post(url, json=data, headers=headers)
# Raise an exception if the return code is in an error range
result.raise_for_status()

Expand All @@ -182,7 +182,7 @@ def put(self, url, headers=None, data=None):
:param dict data: A dictionary with the data to use for the body of the PUT
:return obj: A requests.Response object received as a response
"""
result = self.__session.put(url, json=data, headers=headers)
result = self._session.put(url, json=data, headers=headers)
# Raise an exception if the return code is in an error range
result.raise_for_status()

Expand All @@ -197,8 +197,46 @@ def delete(self, url, headers=None, data=None):
:param dict data: A dictionary with the data to use for the body of the DELETE
:return obj: A requests.Response object received as a response
"""
result = self.__session.delete(url, json=data, headers=headers)
result = self._session.delete(url, json=data, headers=headers)
# Raise an exception if the return code is in an error range
result.raise_for_status()

return result


class OAuth2Client(Client):
"""Serve as a Base class for calls to the Sectigo Cert Manager APIs using OAuth2."""

def __init__(self, client_id, client_secret, auth_url="https://auth.sso.sectigo.com/auth/realms/apiclients/protocol/openid-connect/token",
base_url="https://admin.enterprise.sectigo.com/api"):
"""Initialize the class.

:param string auth_url: The full URL to the Sectigo OAuth2 token endpoint; the default is "https://auth.sso.sectigo.com/auth/realms/apiclients/protocol/openid-connect/token"
:param string client_id: The Client ID to use for OAuth2 authentication
:param string client_secret: The Client Secret to use for OAuth2 authentication
:param string base_url: The base URL for the Sectigo API; the default is "https://admin.enterprise.sectigo.com/api"
"""
self._base_url = base_url
# Using get for consistency and to allow defaults to be easily set
self._session = requests.Session()

payload = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials"
}
headers = {
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded"
}

response = requests.post(auth_url, data=payload, headers=headers)

# Set the default HTTP headers
self._headers = {
"Authorization": f"Bearer {response.json()['access_token']}",
"Accept": "application/json",
"User-Agent": self.user_agent,
}

self._session.headers.update(self._headers)
Loading