#!/usr/bin/env python """ Makes the Okta groups and group rules needed to support the Okta + AWS integration. A master group has a group rule associated with it. The group rule auto-assigns membership in several "subgroups" - each of which maps to a given role in a given AWS account. All of this is so that if we assign to you the "foo-role" group in Okta, you'll be able to assume "foo-role" in each of the many AWS accounts we have. """ from __future__ import print_function import json import logging import os import sys import re import requests from requests.auth import AuthBase # Configuration: # * a list of groups that should exist # * a regex for matching up 'child groups' # Maybe this should be a configuration file? Perhaps one day # but for now I'm happy with it in here. LOGLEVEL = logging.DEBUG API_URL = 'https://mdr-multipass.okta.com' API_KEY = os.environ.get('OKTA_API_TOKEN') MASTER_GROUPS = [ { 'group_name': 'AWS - MDR_Engineer-Readonly Role', 'subgroup_regex': r'^aws(?:-us-gov)?#[^#]+#mdr_engineer_readonly#\d+$' }, { 'group_name': 'AWS - Cyber Range / A&I', 'subgroup_regex': r'^aws(?:-us-gov)?#afs-mdr-common-services(?:-gov)?#mdr_developer_readonly#\d+$' } ] class OktaAuth(AuthBase): """ Adds Okta API expected auth header """ def __init__(self, api_key): self.api_key = api_key def __call__(self, r): r.headers['Authorization'] = 'SSWS {0}'.format(self.api_key) return r def main(args): """ The main """ logging.basicConfig(stream=sys.stderr, level=LOGLEVEL, format='%(asctime)s %(levelname)s %(funcName)s %(message)s') for group in MASTER_GROUPS: process_group(group) def process_group(group): """ Process a group obviously """ log = logging.getLogger(__name__) payload = { 'q' : group.get('group_name') } log.debug("Processing Group %s", group.get('group_name')) r = requests.get('{0}/api/v1/groups'.format(API_URL), auth=OktaAuth(API_KEY), params=payload) log.debug("Response code %d", r.status_code) my_group = None # we should "always" get a 200 even if we get an empty-ish response # a basic [ ] json doc. if r.status_code == 200: data = r.json() if data: for rec in data: if rec.get('profile').get('name') == group.get('group_name'): my_group = rec # Our group does not exist, we need to make it payload = { 'profile': { 'name' : group.get('group_name'), 'description': 'AWS SAML Role' } } headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } if my_group is None: log.info("Creating Group %s", group.get('group_name')) r = requests.post('{0}/api/v1/groups'.format(API_URL), auth=OktaAuth(API_KEY), data=json.dumps(payload), headers=headers) log.debug("Response code: %d", r.status_code) my_group = r.json() log.info("Master group id=%s", my_group.get('id')) subgroups = find_subgroups(group.get('subgroup_regex')) log.info("Valid Subgroups = %s", repr(subgroups)) create_subgroup_rule(group.get('group_name'), my_group.get('id'), subgroups) def create_subgroup_rule(rule_name, master_group_id, subgroups): """ Create a rule to assign a user to all of the subgroups when they are assigned to the master group. If a rule by the same name exists, it will be destroyed and re-created. (Okta API artifact) """ log = logging.getLogger(__name__) log.debug("Checking for existing rule named %s", rule_name) payload = { 'q': rule_name } existing_rule_id = None r = requests.get('{0}/api/v1/groups/rules'.format(API_URL), auth=OktaAuth(API_KEY), params=payload) log.debug("Response code %d", r.status_code) if r.status_code == 200: rules = r.json() for rule in rules: if rule.get('name') == rule_name: existing_rule_id = rule.get('id') else: raise # Need to remove the existing rule to add a new one if existing_rule_id is not None: remove_group_rule(existing_rule_id) # Now let's make a new rule whee new_rule = { 'type' : 'group_rule', 'name' : rule_name, 'conditions': { 'expression': { 'type' : 'urn:okta:expression:1.0', 'value': 'isMemberOfAnyGroup("{}")'.format(master_group_id) } }, 'actions' : { 'assignUserToGroups': { 'groupIds': subgroups } } } # First deactivate the rule, per Okta API url = '{0}/api/v1/groups/rules'.format(API_URL) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } r = requests.post(url, auth=OktaAuth(API_KEY), headers=headers, data=json.dumps(new_rule)) log.debug("Response code: %d", r.status_code) if r.status_code != 200: log.error("Response code %d trying to create group rule id=%s", r.rule_name) raise new_rule_response=r.json() url = '{0}/api/v1/groups/rules/{1}/lifecycle/activate'.format( API_URL, new_rule_response.get('id')) r = requests.post(url, auth=OktaAuth(API_KEY), headers=headers, data=json.dumps(new_rule)) log.debug("Response code: %d", r.status_code) def remove_group_rule(rule_id): log = logging.getLogger(__name__) # First deactivate the rule, per Okta API url = '{0}/api/v1/groups/rules/{1}/lifecycle/deactivate'.format( API_URL, rule_id) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } r = requests.post(url, auth=OktaAuth(API_KEY), headers=headers) log.debug("Response code: %d", r.status_code) if r.status_code != 204: log.error("Response code %d trying to deactivate group rule id=%s", r.status_code, rule_id) raise # Now let's try to delete it url = '{0}/api/v1/groups/rules/{1}'.format( API_URL, rule_id) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } r = requests.delete(url, auth=OktaAuth(API_KEY), headers=headers) log.debug("Response code: %d", r.status_code) if r.status_code != 202: log.error("Response code %d trying to delete group rule id=%s", r.status_code, rule_id) raise def find_subgroups(regex): """ Finds all the groups matching a given regex, so that we can attach them to a rule. """ log = logging.getLogger(__name__) log.debug("Looking for groups matching regex '%s'", regex) payload = { 'q' : 'aws' } r = requests.get('{0}/api/v1/groups'.format(API_URL), auth=OktaAuth(API_KEY), params=payload) log.debug("Response code %d", r.status_code) groups = [] # we should "always" get a 200 even if we get an empty-ish response # a basic [ ] json doc. if r.status_code == 200: data = r.json() if data: for rec in data: name = rec.get('profile').get('name') groupid = rec.get('id') if re.match(regex, name) is not None: groups.append(groupid) return groups if __name__ == "__main__": sys.exit(main(sys.argv))