Update Security Groups Automatically Using AWS Lambda

Update Security Groups Automatically Using AWS Lambda


Article content

Reference : https://explore.skillbuilder.aws/learn/course/1105/play/25537/update-security-groups-automatically-using-aws-lambda

  • Create Amazon VPC security groups

1.     In the AWS Management Console, use the AWS search bar to search for 

EC2

 and then choose the service from the list.

2.     In the left navigation pane, click Security Groups.


Article content

3.      Click Create security group then configure:

Security group name: AutoUpdateSecurityGroup-Global-A

Description: AutoUpdateSecurityGroup-Global

VPC: Lab VPC

4.      Click Add new tag and configure:

Key: Name

Value: cloudfront_g

 (for global rules)

Click Add new tag again and enter:

Key: AutoUpdate

Value: true

Click Add new tag again and enter:

Key: Protocol

Value: http


Article content

5.      Click Create security group .

6.       Repeat the preceding steps to create a security group named 

AutoUpdateSecurityGroup-Global-B. All of the other entries remain the same.

7.      Click Create security group then configure:

Security group name: AutoUpdateSecurityGroup-Region

Description: AutoUpdateSecurityGroup-Region

VPC: Lab VPC

Scroll down to the Tags - optional section.

 Note: Tags are case sensitive, so use copy and paste to enter the actual values from the following instructions.

8.      Click Add new tag and configure:

Key: Name

Value: cloudfront_r

 (for regional rules)

Click Add new tag again and enter:

Key: AutoUpdate

Value: true

Click Add new tag again and enter:

Key: Protocol

Value: http

9.      Click Create security group .

 

 

Article content

  • Create an AWS Lambda function

1. In the AWS Management Console, use the AWS search bar to search for 

Lambda

 and then choose the service from the list.

2. Click Create function 

3. Choose Author from scratch

Configure the following:

Function name: SecurityGroupAutoUpdate

Runtime: Python 3.11


Article content

4. Expand Change default execution role 

Execution role: Select Use an existing role

Existing role: Choose lambda-role

5. Click Create function .

6. In the Code section, delete all of the code that appears in the Code source lambda_function.py file.

7. Copy and paste the following code into the code editor:

import boto3
import hashlib
import json
import urllib3

http = urllib3.PoolManager()

# Name of the service, as seen in the ip-groups.json file, to extract information for
SERVICE = "CLOUDFRONT"
# Ports your application uses that need inbound permissions from the service for
INGRESS_PORTS = {'Http': 80}
# Tags which identify the security groups you want to update
SECURITY_GROUP_TAG_FOR_GLOBAL_HTTP = {'Name': 'cloudfront_g', 'AutoUpdate': 'true', 'Protocol': 'http'}
SECURITY_GROUP_TAG_FOR_REGION_HTTP = {'Name': 'cloudfront_r', 'AutoUpdate': 'true', 'Protocol': 'http'}


SG_RULES_LIMIT = 60


def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    message = json.loads(event['Records'][0]['Sns']['Message'])

    # Load the ip ranges from the url
    ip_ranges = json.loads(get_ip_groups_json(message['url'], message['md5']))

    # extract the service ranges
    global_cf_ranges = get_ranges_for_service(ip_ranges, SERVICE, "GLOBAL")
    region_cf_ranges = get_ranges_for_service(ip_ranges, SERVICE, "REGION")

    # split global ip list if more than soft limit.
    if (len(global_cf_ranges)) > SG_RULES_LIMIT:
        global_cf_ranges_1 = (global_cf_ranges[:SG_RULES_LIMIT])
        global_cf_ranges_2 = (global_cf_ranges[SG_RULES_LIMIT:])
        global_cf_ranges = [global_cf_ranges_1, global_cf_ranges_2]
    #limit regional ip list if more than soft limit.
    if (len(region_cf_ranges)) > SG_RULES_LIMIT:
        region_cf_ranges = (region_cf_ranges[:SG_RULES_LIMIT])

    ip_ranges = {"GLOBAL": global_cf_ranges, "REGION": region_cf_ranges}

    result = update_security_groups(ip_ranges)

    return result


def get_ip_groups_json(url, expected_hash):
    print("Updating from " + url)
    response = http.request('GET', url)
    ip_json = response.data

    return ip_json


def get_ranges_for_service(ranges, service, subset):
    service_ranges = list()
    for prefix in ranges['prefixes']:
        if prefix['service'] == service and ((subset == prefix['region'] and subset == "GLOBAL") or (
                subset != 'GLOBAL' and prefix['region'] != 'GLOBAL')):
            print('Found ' + service + ' region: ' + prefix['region'] + ' range: ' + prefix['ip_prefix'])
            service_ranges.append(prefix['ip_prefix'])

    return service_ranges


def update_security_groups(new_ranges):
    client = boto3.client('ec2')

    global_http_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_GLOBAL_HTTP)
    region_http_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_REGION_HTTP)

    print('Found ' + str(len(global_http_group)) + ' CloudFront_g HttpSecurityGroups to update')
    print('Found ' + str(len(region_http_group)) + ' CloudFront_r HttpSecurityGroups to update')

    result = list()
    global_http_updated = 0
    region_http_updated = 0

    for i in range(len(global_http_group)):
        if update_security_group(client, global_http_group[i], new_ranges["GLOBAL"][i], INGRESS_PORTS['Http']):
            global_http_updated += 1
            result.append('Updated ' + global_http_group[i]['GroupId'])

    for group in region_http_group:
        if update_security_group(client, group, new_ranges["REGION"], INGRESS_PORTS['Http']):
            region_http_updated += 1
            result.append('Updated ' + group['GroupId'])

    result.append('Updated ' + str(global_http_updated) + ' of ' + str(
        len(global_http_group)) + ' CloudFront_g HttpSecurityGroups')

    result.append('Updated ' + str(region_http_updated) + ' of ' + str(
        len(region_http_group)) + ' CloudFront_r HttpSecurityGroups')

    return result

def update_security_group(client, group, new_ranges, port):
    added = 0
    removed = 0

    if len(group['IpPermissions']) > 0:
        for permission in group['IpPermissions']:
            if permission['FromPort'] <= port and permission['ToPort'] >= port:
                old_prefixes = list()
                to_revoke = list()
                to_add = list()
                for range in permission['IpRanges']:
                    cidr = range['CidrIp']
                    old_prefixes.append(cidr)
                    if new_ranges.count(cidr) == 0:
                        to_revoke.append(range)
                        print(group['GroupId'] + ": Revoking " + cidr + ":" + str(permission['ToPort']))

                for range in new_ranges:
                    if old_prefixes.count(range) == 0:
                        to_add.append({'CidrIp': range})
                        print(group['GroupId'] + ": Adding " + range + ":" + str(permission['ToPort']))

                removed += revoke_permissions(client, group, permission, to_revoke)
                added += add_permissions(client, group, permission, to_add)
    else:
        to_add = list()
        for range in new_ranges:
            to_add.append({'CidrIp': range})
            print(group['GroupId'] + ": Adding " + range + ":" + str(port))
        permission = {'ToPort': port, 'FromPort': port, 'IpProtocol': 'tcp'}
        added += add_permissions(client, group, permission, to_add)

    print(group['GroupId'] + ": Added " + str(added) + ", Revoked " + str(removed))
    return added > 0 or removed > 0


def revoke_permissions(client, group, permission, to_revoke):
    if len(to_revoke) > 0:
        revoke_params = {
            'ToPort': permission['ToPort'],
            'FromPort': permission['FromPort'],
            'IpRanges': to_revoke,
            'IpProtocol': permission['IpProtocol']
        }

        client.revoke_security_group_ingress(GroupId=group['GroupId'], IpPermissions=[revoke_params])

    return len(to_revoke)


def add_permissions(client, group, permission, to_add):
    if len(to_add) > 0:
        add_params = {
            'ToPort': permission['ToPort'],
            'FromPort': permission['FromPort'],
            'IpRanges': to_add,
            'IpProtocol': permission['IpProtocol']
        }

        client.authorize_security_group_ingress(GroupId=group['GroupId'], IpPermissions=[add_params])

    return len(to_add)


def get_security_groups_for_update(client, security_group_tag):
    filters = list()
    for key, value in security_group_tag.items():
        filters.extend(
            [
                {'Name': "tag:" + key, 'Values': [value]}
            ]
        )

    response = client.describe_security_groups(Filters=filters)

    return response['SecurityGroups']
        

Examine the code. It performs the following steps: Get the latest, updated IP ranges from the URL given in the SNS notification Filter the IP ranges for CloudFront and AWS WAF IP ranges Update any VPC security groups that are tagged with “Name”: “cloudfront_g/r” and “AutoUpdate”: “true”

8.      Click Deploy

9.      Click the Configuration tab, in the General configuration section, click Edit and configure:

Description: Automatically update Security Group

Timeout: 1 min 0 sec

10.  Click Save

11.  Scroll up to the Function overview section.

12.  Click  Add trigger then configure:

Select a trigger: SNS

SNS topic: 

arn:aws:sns:us-east-1:806199016981:AmazonIpSpaceChanged


Article content

13.  Click Add .


Article content


Article content

Test a Lambda function with sample events

1.      Click Test tab, and configure Test event, which you can use to simulate a Lambda trigger. In this case, you simulate an update of the CloudFront IP address ranges.

2.      Select Create new event.

3.      For Event name, enter 

Notification

.

4.      Delete the existing test code.

5.      Copy and paste the following test code into the test event window:

{
  "Records": [
    {
      "EventVersion": "1.0",
      "EventSubscriptionArn": "arn:aws:sns:EXAMPLE",
      "EventSource": "aws:sns",
      "Sns": {
        "SignatureVersion": "1",
        "Timestamp": "1970-01-01T00:00:00.000Z",
        "Signature": "EXAMPLE",
        "SigningCertUrl": "EXAMPLE",
        "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
        "Message": "{\"create-time\": \"yyyy-mm-ddThh:mm:ss+00:00\", \"synctoken\": \"0123456789\", \"md5\": \"7fd59f5c7f5cf643036cbd4443ad3e4b\", \"url\": \"https://ip-ranges.amazonaws.com/ip-ranges.json\"}",
        "Type": "Notification",
        "UnsubscribeUrl": "EXAMPLE",
        "TopicArn": "arn:aws:sns:EXAMPLE",
        "Subject": "TestInvoke"
      }
    }
  ]
}
        

6.      Click Save .

7.      Click Test

8.     Expand  Details to view the output of the Lambda function.

 

Article content

 

Verify Security Group Update

1.      In the AWS Management Console, use the AWS search bar to search for 

EC2

 and then choose the service from the list.

2.      In the left navigation pane, click Security Groups.

3.      Select one of the  cloudfront_g security groups.

4.      In the bottom pane, click the Inbound rules tab.

Note: To see more content in this pane, drag the pane upward.

When you scroll through the list of rules, you now see the CloudFront IP ranges added as allowed inbound traffic.

The AutoUpdateSecurityGroup-Region security group also contains a set of rules.



Article content

 

 

 

To view or add a comment, sign in

More articles by Raghunath Erumal

Others also viewed

Explore content categories