123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #!/usr/bin/env python
- """
- Makes the Okta groups and group rules needed to support the Okta + AWS integration.
- A master group has a group rule associated with it. The group rule auto-assigns
- membership in several "subgroups" - each of which maps to a given role in a given
- AWS account. All of this is so that if we assign to you the "foo-role" group in
- Okta, you'll be able to assume "foo-role" in each of the many AWS accounts we have.
- """
- from __future__ import print_function
- import json
- import logging
- import os
- import sys
- import re
- import requests
- from requests.auth import AuthBase
- # Configuration:
- # * a list of groups that should exist
- # * a regex for matching up 'child groups'
- # Maybe this should be a configuration file? Perhaps one day
- # but for now I'm happy with it in here.
- LOGLEVEL = logging.DEBUG
- API_URL = 'https://mdr-multipass.okta.com'
- API_KEY = os.environ.get('OKTA_API_TOKEN')
- MASTER_GROUPS = [
- {
- 'group_name': 'AWS - MDR_Engineer-Readonly Role',
- 'subgroup_regex': r'^aws(?:-us-gov)?#[^#]+#mdr_engineer_readonly#\d+$'
- },
- {
- 'group_name': 'AWS - Cyber Range / A&I',
- 'subgroup_regex': r'^aws(?:-us-gov)?#afs-mdr-common-services(?:-gov)?#mdr_developer_readonly#\d+$'
- }
- ]
- class OktaAuth(AuthBase):
- """
- Adds Okta API expected auth header
- """
- def __init__(self, api_key):
- self.api_key = api_key
- def __call__(self, r):
- r.headers['Authorization'] = 'SSWS {0}'.format(self.api_key)
- return r
- def main(args):
- """
- The main
- """
- logging.basicConfig(stream=sys.stderr,
- level=LOGLEVEL,
- format='%(asctime)s %(levelname)s %(funcName)s %(message)s')
- for group in MASTER_GROUPS:
- process_group(group)
- def process_group(group):
- """
- Process a group obviously
- """
- log = logging.getLogger(__name__)
- payload = {
- 'q' : group.get('group_name')
- }
- log.debug("Processing Group %s", group.get('group_name'))
- r = requests.get('{0}/api/v1/groups'.format(API_URL),
- auth=OktaAuth(API_KEY),
- params=payload)
- log.debug("Response code %d", r.status_code)
- my_group = None
- # we should "always" get a 200 even if we get an empty-ish response
- # a basic [ ] json doc.
- if r.status_code == 200:
- data = r.json()
- if data:
- for rec in data:
- if rec.get('profile').get('name') == group.get('group_name'):
- my_group = rec
- # Our group does not exist, we need to make it
- payload = {
- 'profile': {
- 'name' : group.get('group_name'),
- 'description': 'AWS SAML Role'
- }
- }
- headers = {
- 'Accept': 'application/json',
- 'Content-Type': 'application/json'
- }
- if my_group is None:
- log.info("Creating Group %s", group.get('group_name'))
- r = requests.post('{0}/api/v1/groups'.format(API_URL),
- auth=OktaAuth(API_KEY),
- data=json.dumps(payload),
- headers=headers)
- log.debug("Response code: %d", r.status_code)
- my_group = r.json()
- log.info("Master group id=%s", my_group.get('id'))
- subgroups = find_subgroups(group.get('subgroup_regex'))
- log.info("Valid Subgroups = %s", repr(subgroups))
- create_subgroup_rule(group.get('group_name'), my_group.get('id'), subgroups)
- def create_subgroup_rule(rule_name, master_group_id, subgroups):
- """
- Create a rule to assign a user to all of the subgroups
- when they are assigned to the master group. If a rule
- by the same name exists, it will be destroyed and
- re-created. (Okta API artifact)
- """
- log = logging.getLogger(__name__)
- log.debug("Checking for existing rule named %s", rule_name)
-
- payload = {
- 'q': rule_name
- }
- existing_rule_id = None
- r = requests.get('{0}/api/v1/groups/rules'.format(API_URL),
- auth=OktaAuth(API_KEY),
- params=payload)
- log.debug("Response code %d", r.status_code)
- if r.status_code == 200:
- rules = r.json()
- for rule in rules:
- if rule.get('name') == rule_name:
- existing_rule_id = rule.get('id')
- else:
- raise
- # Need to remove the existing rule to add a new one
- if existing_rule_id is not None:
- remove_group_rule(existing_rule_id)
- # Now let's make a new rule whee
- new_rule = {
- 'type' : 'group_rule',
- 'name' : rule_name,
- 'conditions': {
- 'expression': {
- 'type' : 'urn:okta:expression:1.0',
- 'value': 'isMemberOfAnyGroup("{}")'.format(master_group_id)
- }
- },
- 'actions' : {
- 'assignUserToGroups': {
- 'groupIds': subgroups
- }
- }
- }
- # First deactivate the rule, per Okta API
- url = '{0}/api/v1/groups/rules'.format(API_URL)
- headers = {
- 'Accept': 'application/json',
- 'Content-Type': 'application/json'
- }
- r = requests.post(url,
- auth=OktaAuth(API_KEY),
- headers=headers,
- data=json.dumps(new_rule))
- log.debug("Response code: %d", r.status_code)
- if r.status_code != 200:
- log.error("Response code %d trying to create group rule id=%s", r.rule_name)
- raise
- new_rule_response=r.json()
- url = '{0}/api/v1/groups/rules/{1}/lifecycle/activate'.format(
- API_URL,
- new_rule_response.get('id'))
- r = requests.post(url,
- auth=OktaAuth(API_KEY),
- headers=headers,
- data=json.dumps(new_rule))
- log.debug("Response code: %d", r.status_code)
-
- def remove_group_rule(rule_id):
- log = logging.getLogger(__name__)
- # First deactivate the rule, per Okta API
- url = '{0}/api/v1/groups/rules/{1}/lifecycle/deactivate'.format(
- API_URL,
- rule_id)
- headers = {
- 'Accept': 'application/json',
- 'Content-Type': 'application/json'
- }
- r = requests.post(url,
- auth=OktaAuth(API_KEY),
- headers=headers)
- log.debug("Response code: %d", r.status_code)
- if r.status_code != 204:
- log.error("Response code %d trying to deactivate group rule id=%s", r.status_code, rule_id)
- raise
- # Now let's try to delete it
- url = '{0}/api/v1/groups/rules/{1}'.format(
- API_URL,
- rule_id)
- headers = {
- 'Accept': 'application/json',
- 'Content-Type': 'application/json'
- }
- r = requests.delete(url,
- auth=OktaAuth(API_KEY),
- headers=headers)
- log.debug("Response code: %d", r.status_code)
- if r.status_code != 202:
- log.error("Response code %d trying to delete group rule id=%s", r.status_code, rule_id)
- raise
-
- def find_subgroups(regex):
- """
- Finds all the groups matching a given regex, so that
- we can attach them to a rule.
- """
- log = logging.getLogger(__name__)
- log.debug("Looking for groups matching regex '%s'", regex)
- payload = {
- 'q' : 'aws'
- }
- r = requests.get('{0}/api/v1/groups'.format(API_URL),
- auth=OktaAuth(API_KEY),
- params=payload)
- log.debug("Response code %d", r.status_code)
- groups = []
- # we should "always" get a 200 even if we get an empty-ish response
- # a basic [ ] json doc.
- if r.status_code == 200:
- data = r.json()
- if data:
- for rec in data:
- name = rec.get('profile').get('name')
- groupid = rec.get('id')
- if re.match(regex, name) is not None:
- groups.append(groupid)
- return groups
- if __name__ == "__main__":
- sys.exit(main(sys.argv))
|